日志管理
微服务一般处于分布式环境下,在该架构中,每个服务都是独立的,分布在不同的节点或容器中,通过日志管理,可以帮助开发者和运维人员收集分散在不同微服务中的日志数据,通过集中化的工具进行汇总和分析,由此提高整个系统的可观测性。
常用的日志管理方案有ELK Stack,或可通过influxDB类时序数据库对日志进行管理。
ELK解决方案
Elasticsearch
Elasticsearch是一个分布式搜索和分析引擎,可以用于存储、搜索、分析日志数据。它具有高可用性、可扩展性和易用性等特点,可以轻松地与各种日志收集工具集成,如Logstash、Fluentd等。
Logstash
Logstash是一个开源的数据收集引擎,可以用于收集、处理和转发日志数据。它可以与各种数据源集成,如文件、网络、数据库等,并将数据发送到Elasticsearch或其他存储系统。
Kibana
Kibana是一个开源的数据可视化工具,可以用于分析和可视化Elasticsearch中的日志数据。它提供了丰富的图表、仪表板和搜索功能,可以帮助开发者和运维人员更好地理解日志数据。
ELK Stack的部署
ELK Stack的部署可以通过Docker容器化部署,也可以通过Kubernetes进行部署。具体部署方式可以参考官方文档或相关教程。
此处以容器化部署ELK 7.14.0版本为例。
首先,需要下载对应版本的容器镜像:
1 2 3
| docker pull elastic/elasticsearch:7.14.0 docker pull elastic/kibana:7.14.0 docker pull elastic/logstash:7.14.0
|
对于es来说,首先,创建挂载目录并赋予权限:
1 2
| mkdir -p /data/elk/es/{config,data,logs,plugins} sudo chown -R 1000:1000 /data/elk/es
|
在此基础上,创建配置文件:
1 2
| cd /data/elk/es/config sudo touch elasticsearch.yml
|
配置内容为:
1 2 3 4 5
| cluster.name: "my-es" network.host: 0.0.0.0 http.port: 9200 http.cors.enabled: true http.cors.allow-origin: "*"
|
随后运行:
1 2 3 4 5 6 7 8 9 10 11 12
| docker run -it -d \ -p 9200:9200 \ -p 9300:9300 \ --name es01 \ -e ES_JAVA_OPTS="-Xms4g -Xmx4g" \ -e "discovery.type=single-node" \ --restart=always \ -v /data/elk/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \ -v /data/elk/es/data:/usr/share/elasticsearch/data \ -v /data/elk/es/logs:/usr/share/elasticsearch/logs \ -v /data/elk/es/plugins:/usr/share/elasticsearch/plugins \ elasticsearch:7.14.0
|
对于kibana来说,同理,首先创建目录及配置文件用于挂在:
1 2
| mkdir -p /data/elk/kibana/ vim /data/elk/kibana/kibana.yml
|
配置文件配置如下:
1 2 3 4 5
| #Default Kibana configuration for docker target server.name: kibana server.host: "0" elasticsearch.hosts: ["http://172.17.0.2:9200"] xpack.monitoring.ui.container.elasticsearch.enabled: true
|
随后运行:
1 2 3 4 5 6
| docker run -it -d \ -p 5601:5601 \ --name kibana \ --restart=always \ -v /data/elk/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml \ kibana:7.14.0
|
至此,elasticsearch和kibana的docker容器已经启动,可以通过浏览器访问:
在此基础下,进一步安装logstash用于收集日志:
1
| docker run -d --name=logstash logstash:7.14.0
|
先运行一个logstash的容器。
把配置文件从容器拷贝出来:
1 2 3 4
| sudo docker cp logstash:/usr/share/logstash /data/elk
cd /data/elk/ sudo chmod 777 logstash -R
|
接着配置logstash的配置文件:
1 2 3 4 5 6
| sudo mkdir -p /data/elk/logstash/conf.d cd /data/elk/logstash/conf.d sudo vim logstash.conf
cd /data/elk/logstash/config sudo vim logstash.yml
|
logstash.conf文件配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| input { tcp { mode => "server" host => "0.0.0.0" port => 5044 codec => json_lines } }
filter {
grok { match => [ "message", "(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s+\[(?<thread>.*)\]\s+(?<level>\w*)\s{1,2}+(?<class>\S*)\s+-\s+(?<content>.*)\s*"] match => [ "message", "(?<logTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\s{1,2}+(?<level>\w*)\s{1,2}+.\s---+\s\[(?<thread>.*)\]+\s(?<class>\S*)\s*:+\s(?<content>.*)\s*"] match => [ "source", "/home/passjava/logs/(?<logName>\w+)/.*.log" ] overwrite => [ "source"] break_on_match => false } mutate { convert => { "bytes" => "integer" } remove_field => ["agent","message","@version", "tags", "ecs", "_score", "input", "[log][offset]"] }
useragent { source => "user_agent" target => "useragent" }
date { match => ["logTime", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"] timezone => "Asia/Shanghai" } }
output { stdout { }
elasticsearch { hosts => ["192.168.186.1:9200"] index => "gagaduck_log" } }
|
logstash.yml文件配置如下:
1 2 3 4 5 6
| path.config: /usr/share/logstash/conf.d/*.conf
path.logs: /var/log/logstash http.host: "0.0.0.0" xpack.monitoring.elasticsearch.hosts: [ "http://192.168.186.1:9200" ]
|
删除logstash容器并重启启动如下:
1 2 3 4 5 6 7 8
| docker rm -f logstash
docker run -it -d \ -p 5044:5044 \ --name logstash \ --restart=always \ -v /data/elk/logstash:/usr/share/logstash \ logstash:7.14.0
|
由此,一个简单的ELK组件便部署完成了。
ELK Stack的使用
在部署完成后,便可以使用ELK Stack进行日志管理了,在应用中,只需要将日志输出到标准输出即可,logstash会自动将日志收集到Elasticsearch中,并使用Kibana进行可视化展示。
在应用中,只需要将日志输出到标准输出即可,logstash会自动将日志收集到Elasticsearch中,并使用Kibana进行可视化展示。
以一个spring boot的应用为例子:
1 2 3 4 5
| <dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>7.3</version> </dependency>
|
首先,加入logstash的依赖,并在resource目录下配置合适的logback.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| <?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/base.xml"/> <property name="springAppName" value="system-service" /> <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <destination>192.168.186.1:5044</destination> <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <providers> <timestamp> <timeZone>UTC</timeZone> </timestamp> <pattern> <pattern> { "logLevel": "%level", "serviceName": "${springAppName:-}", "pid": "${PID:-}", "thread": "%thread", "class": "%logger{40}", "rest": "%message" } </pattern> </pattern> </providers> </encoder> </appender>
<root level="INFO"> <appender-ref ref="LOGSTASH"/> <appender-ref ref="CONSOLE"/> </root>
</configuration>
|
在ELK的基础上,实际还可以增加filebeat,
1 2 3
| docker pull elastic/filebeat:7.14.0 docker run -d --name=filebeat elastic/filebeat:7.14.0 ……
|
通过filebeat实现对于日志文件收集的需要。
当logstash把日志收集同步到es之后,就会自动在es中创建这样的一个搜索,那么,到kibana可视化控制台中创建一个对应的索引来创建并查看日志即可。
首先进入kibana控制台(Management),找到index patterns这个设置,创建索引。
填写名字,其实该索引会自动出来,若未出来,那么需要回去看看logstash的日志等内容排查一下故障情况。
随后便可以在kibana的discard处对日志进行查看分析等处理操作。参见https://github.com/gagaducko/learning_demos/tree/main/system-service
基于时序数据库实现的日志管理
influxdb
InfluxDB是一个开源的时序数据库,使用Go语言编写,由InfluxData公司开发和维护。它被设计用来处理高写入和查询负载,并且可以很好地扩展到多个节点。InfluxDB非常适合用于存储和查询时间序列数据,如服务器指标、应用程序性能监控数据、物联网设备数据等。
通过docker可以对influxdb做简单的安装:
1 2
| docker pull influxdb docker run -d -p 8086:8086 --name my_influxdb influxdb
|
配置好后记录下API Token如图:

随后便可进行编码,参见https://github.com/gagaducko/learning_demos/tree/main/influxdb-demo
首先,要添加influxdb依赖:
1 2 3 4 5
| <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.22</version> </dependency>
|
随后在配置文件中,对influxdb的连接进行配置如下:
1 2 3 4 5
| spring.application.name=influxdb-demo influxdb.url=http://localhost:8086 influxdb.username=gagaduck influxdb.password=gagaduck influxdb.database=log_database
|
创建一个配置类InfluxDBConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Configuration public class InfluxDBConfig {
@Value("${influxdb.url}") private String influxDbUrl;
@Value("${influxdb.username}") private String username;
@Value("${influxdb.password}") private String password;
@Value("${influxdb.database}") private String databaseName;
@Bean public InfluxDB influxDB() {
return InfluxDBFactory.connect(influxDbUrl, username, password); }
@Bean public void createDatabase() {
InfluxDB influxDB = influxDB(); influxDB.query(new Query("CREATE DATABASE " + databaseName)); influxDB.setDatabase(databaseName); } }
|
需要注意的是,上面的是influxdb1.X的配置,下面是influxdb2.X的配置:
1 2 3 4 5
| <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>6.7.0</version> </dependency>
|
1 2 3 4
| influxdb.url=http://localhost:8086 influxdb.token=yourAPIToken influxdb.organization=yourOrganization influxdb.bucket=yourBucket
|
在此基础上,进行进一步的开发:
1 2 3 4 5 6 7 8 9 10
| // 增加日志记录 public void addLog(String logMessage, String level) { WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); Point point = Point .measurement("logs") .addTag("level", level) .addField("logMessage", logMessage) .time(Instant.now(), WritePrecision.NS); writeApi.writePoint(bucket, org, point); }
|
例如新增数据。
1 2 3 4 5 6 7 8 9
| public void addLog(String logMessage, String level) { WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); Point point = Point .measurement("logs") .addTag("level", level) .addField("logMessage", logMessage) .time(Instant.now(), WritePrecision.NS); writeApi.writePoint(bucket, org, point); }
|

再比如查找数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public List<LogEntry> getAllLogs() { List<LogEntry> logEntries = new ArrayList<>(); String fluxQuery = "from(bucket: \"" + bucket + "\")" + " |> range(start: -1h)" + " |> filter(fn: (r) => r[\"_measurement\"] == \"logs\")" + " |> filter(fn: (r) => r[\"_field\"] == \"logMessage\")"; QueryApi queryApi = influxDBClient.getQueryApi(); List<FluxTable> tables = queryApi.query(fluxQuery, org); for (FluxTable table : tables) { for (FluxRecord record : table.getRecords()) { LogEntry logEntry = new LogEntry(); String message = (String) record.getValueByKey("_value"); String level = (String) record.getValueByKey("level"); String timestamp = record.getTime() != null ? record.getTime().toString() : null; if (message != null && level != null) { logEntry.setMessage(message); logEntry.setLevel(level); logEntry.setTimestamp(timestamp); logEntries.add(logEntry); } } } return logEntries; }
|

凡此种种,暂按不表。