学习笔记—微服务—技术栈实践(9)—日志管理

日志管理

  微服务一般处于分布式环境下,在该架构中,每个服务都是独立的,分布在不同的节点或容器中,通过日志管理,可以帮助开发者和运维人员收集分散在不同微服务中的日志数据,通过集中化的工具进行汇总和分析,由此提高整个系统的可观测性。

  常用的日志管理方案有ELK Stack,或可通过influxDB类时序数据库对日志进行管理。

ELK解决方案

Elasticsearch

  Elasticsearch是一个分布式搜索和分析引擎,可以用于存储、搜索、分析日志数据。它具有高可用性、可扩展性和易用性等特点,可以轻松地与各种日志收集工具集成,如Logstash、Fluentd等。

Logstash

  Logstash是一个开源的数据收集引擎,可以用于收集、处理和转发日志数据。它可以与各种数据源集成,如文件、网络、数据库等,并将数据发送到Elasticsearch或其他存储系统。

Kibana

  Kibana是一个开源的数据可视化工具,可以用于分析和可视化Elasticsearch中的日志数据。它提供了丰富的图表、仪表板和搜索功能,可以帮助开发者和运维人员更好地理解日志数据。

ELK Stack的部署

  ELK Stack的部署可以通过Docker容器化部署,也可以通过Kubernetes进行部署。具体部署方式可以参考官方文档或相关教程。

  此处以容器化部署ELK 7.14.0版本为例。

  首先,需要下载对应版本的容器镜像:

1
2
3
docker pull elastic/elasticsearch:7.14.0
docker pull elastic/kibana:7.14.0
docker pull elastic/logstash:7.14.0

  对于es来说,首先,创建挂载目录并赋予权限:

1
2
mkdir -p /data/elk/es/{config,data,logs,plugins}
sudo chown -R 1000:1000 /data/elk/es

  在此基础上,创建配置文件:

1
2
cd /data/elk/es/config
sudo touch elasticsearch.yml

  配置内容为:

1
2
3
4
5
cluster.name: "my-es"
network.host: 0.0.0.0
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"

  随后运行:

1
2
3
4
5
6
7
8
9
10
11
12
docker run -it  -d \
-p 9200:9200 \
-p 9300:9300 \
--name es01 \
-e ES_JAVA_OPTS="-Xms4g -Xmx4g" \
-e "discovery.type=single-node" \
--restart=always \
-v /data/elk/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /data/elk/es/data:/usr/share/elasticsearch/data \
-v /data/elk/es/logs:/usr/share/elasticsearch/logs \
-v /data/elk/es/plugins:/usr/share/elasticsearch/plugins \
elasticsearch:7.14.0

  对于kibana来说,同理,首先创建目录及配置文件用于挂在:

1
2
mkdir -p /data/elk/kibana/
vim /data/elk/kibana/kibana.yml

  配置文件配置如下:

1
2
3
4
5
#Default Kibana configuration for docker target
server.name: kibana
server.host: "0"
elasticsearch.hosts: ["http://172.17.0.2:9200"]
xpack.monitoring.ui.container.elasticsearch.enabled: true

  随后运行:

1
2
3
4
5
6
docker run -it -d \
-p 5601:5601 \
--name kibana \
--restart=always \
-v /data/elk/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml \
kibana:7.14.0

  至此,elasticsearch和kibana的docker容器已经启动,可以通过浏览器访问:

1
http://localhost:5601

  在此基础下,进一步安装logstash用于收集日志:

1
docker run -d --name=logstash logstash:7.14.0

  先运行一个logstash的容器。

  把配置文件从容器拷贝出来:

1
2
3
4
sudo docker cp logstash:/usr/share/logstash /data/elk

cd /data/elk/
sudo chmod 777 logstash -R

  接着配置logstash的配置文件:

1
2
3
4
5
6
sudo mkdir -p /data/elk/logstash/conf.d
cd /data/elk/logstash/conf.d
sudo vim logstash.conf

cd /data/elk/logstash/config
sudo vim logstash.yml

  logstash.conf文件配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 5044
codec => json_lines
}
}

filter {



grok {
match => [ "message", "(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s+\[(?<thread>.*)\]\s+(?<level>\w*)\s{1,2}+(?<class>\S*)\s+-\s+(?<content>.*)\s*"]
match => [ "message", "(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s{1,2}+(?<level>\w*)\s{1,2}+.\s---+\s\[(?<thread>.*)\]+\s(?<class>\S*)\s*:+\s(?<content>.*)\s*"]
match => [
"source", "/home/passjava/logs/(?<logName>\w+)/.*.log"
]
overwrite => [ "source"]
break_on_match => false
}
mutate {
convert => {
"bytes" => "integer"
}
remove_field => ["agent","message","@version", "tags", "ecs", "_score", "input", "[log][offset]"]
}

useragent {
source => "user_agent"
target => "useragent"
}

date {
match => ["logTime", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"]
timezone => "Asia/Shanghai"
}
}

output {
stdout { }

elasticsearch {
hosts => ["192.168.186.1:9200"]
index => "gagaduck_log"
}
}

  logstash.yml文件配置如下:

1
2
3
4
5
6
# 指定管道配置的目录,在此目录下的所有管道配置文件都将被 logstash 读取,除管道配置外,不要放任何文件
path.config: /usr/share/logstash/conf.d/*.conf
# logstash 日志目录位置,默认为 logstash 路径下的 logs
path.logs: /var/log/logstash
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.186.1:9200" ]

  删除logstash容器并重启启动如下:

1
2
3
4
5
6
7
8
docker rm -f logstash
# 启动 logstash 容器
docker run -it -d \
-p 5044:5044 \
--name logstash \
--restart=always \
-v /data/elk/logstash:/usr/share/logstash \
logstash:7.14.0

  由此,一个简单的ELK组件便部署完成了。

ELK Stack的使用

  在部署完成后,便可以使用ELK Stack进行日志管理了,在应用中,只需要将日志输出到标准输出即可,logstash会自动将日志收集到Elasticsearch中,并使用Kibana进行可视化展示。

  在应用中,只需要将日志输出到标准输出即可,logstash会自动将日志收集到Elasticsearch中,并使用Kibana进行可视化展示。

  以一个spring boot的应用为例子:

1
2
3
4
5
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.3</version>
</dependency>

  首先,加入logstash的依赖,并在resource目录下配置合适的logback.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml"/>
<property name="springAppName" value="system-service" />
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.186.1:5044</destination>
<encoder charset="UTF-8"
class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"logLevel": "%level",
"serviceName": "${springAppName:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="LOGSTASH"/>
<appender-ref ref="CONSOLE"/>
</root>

</configuration>

  在ELK的基础上,实际还可以增加filebeat,

1
2
3
docker pull elastic/filebeat:7.14.0
docker run -d --name=filebeat elastic/filebeat:7.14.0
……

  通过filebeat实现对于日志文件收集的需要。

  当logstash把日志收集同步到es之后,就会自动在es中创建这样的一个搜索,那么,到kibana可视化控制台中创建一个对应的索引来创建并查看日志即可。

  首先进入kibana控制台(Management),找到index patterns这个设置,创建索引。

  填写名字,其实该索引会自动出来,若未出来,那么需要回去看看logstash的日志等内容排查一下故障情况。

  随后便可以在kibana的discard处对日志进行查看分析等处理操作。参见https://github.com/gagaducko/learning_demos/tree/main/system-service

基于时序数据库实现的日志管理

influxdb

  InfluxDB是一个开源的时序数据库,使用Go语言编写,由InfluxData公司开发和维护。它被设计用来处理高写入和查询负载,并且可以很好地扩展到多个节点。InfluxDB非常适合用于存储和查询时间序列数据,如服务器指标、应用程序性能监控数据、物联网设备数据等。

  通过docker可以对influxdb做简单的安装:

1
2
docker pull influxdb
docker run -d -p 8086:8086 --name my_influxdb influxdb

  配置好后记录下API Token如图:

influx初始化
  随后便可进行编码,参见https://github.com/gagaducko/learning_demos/tree/main/influxdb-demo

  首先,要添加influxdb依赖:

1
2
3
4
5
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.22</version>
</dependency>

  随后在配置文件中,对influxdb的连接进行配置如下:

1
2
3
4
5
spring.application.name=influxdb-demo
influxdb.url=http://localhost:8086
influxdb.username=gagaduck
influxdb.password=gagaduck
influxdb.database=log_database

  创建一个配置类InfluxDBConfig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
public class InfluxDBConfig {


@Value("${influxdb.url}")
private String influxDbUrl;

@Value("${influxdb.username}")
private String username;

@Value("${influxdb.password}")
private String password;

@Value("${influxdb.database}")
private String databaseName;

@Bean
public InfluxDB influxDB() {

return InfluxDBFactory.connect(influxDbUrl, username, password);
}

@Bean
public void createDatabase() {

InfluxDB influxDB = influxDB();
influxDB.query(new Query("CREATE DATABASE " + databaseName));
influxDB.setDatabase(databaseName);
}
}

  需要注意的是,上面的是influxdb1.X的配置,下面是influxdb2.X的配置:

1
2
3
4
5
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.7.0</version>
</dependency>
1
2
3
4
influxdb.url=http://localhost:8086
influxdb.token=yourAPIToken
influxdb.organization=yourOrganization
influxdb.bucket=yourBucket

  在此基础上,进行进一步的开发:

1
2
3
4
5
6
7
8
9
10
// 增加日志记录
public void addLog(String logMessage, String level) {
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
Point point = Point
.measurement("logs")
.addTag("level", level)
.addField("logMessage", logMessage)
.time(Instant.now(), WritePrecision.NS);
writeApi.writePoint(bucket, org, point);
}

  例如新增数据。

1
2
3
4
5
6
7
8
9
public void addLog(String logMessage, String level) {
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
Point point = Point
.measurement("logs")
.addTag("level", level)
.addField("logMessage", logMessage)
.time(Instant.now(), WritePrecision.NS);
writeApi.writePoint(bucket, org, point);
}

influx添加数据后
  再比如查找数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 查询所有日志
public List<LogEntry> getAllLogs() {
List<LogEntry> logEntries = new ArrayList<>();
// InfluxDB 查询语句,查询1h内的日志数据
// 构建 Flux 查询,加入对 _measurement 和 _field 的过滤
String fluxQuery = "from(bucket: \"" + bucket + "\")" +
" |> range(start: -1h)" + // 查询过去 1 小时的数据
" |> filter(fn: (r) => r[\"_measurement\"] == \"logs\")" + // 过滤 measurement 为 logs
" |> filter(fn: (r) => r[\"_field\"] == \"logMessage\")"; // 过滤 field 为 logMessage// 查询过去1小时的日志,可根据需求调整
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(fluxQuery, org);
// 处理查询返回的数据
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
LogEntry logEntry = new LogEntry();
String message = (String) record.getValueByKey("_value"); // 从InfluxDB获取日志消息
String level = (String) record.getValueByKey("level"); // 获取日志级别
String timestamp = record.getTime() != null ? record.getTime().toString() : null; // 获取时间戳
// 如果message和level不为空,则将其添加到logEntry
if (message != null && level != null) {
logEntry.setMessage(message);
logEntry.setLevel(level);
logEntry.setTimestamp(timestamp);
logEntries.add(logEntry);
}
}
}
return logEntries;
}

influx查找
  凡此种种,暂按不表。


学习笔记—微服务—技术栈实践(9)—日志管理
https://gagaducko.github.io/2024/09/16/学习笔记—微服务—技术栈实践-9-—日志管理/
作者
gagaduck
发布于
2024年9月16日
许可协议