SpringBoot整合ELK实现日志采集

SpringBoot整合ELK实现日志采集
kibana 配置中文,kibana.yml 中添加 i18n.locale: “zh-CN”

SpringBoot 配置

添加依赖
1
2
3
4
5
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.5</version>
</dependency>
logback 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration>
<configuration>
<!--引用默认日志配置-->
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<!--使用默认的控制台日志输出实现-->
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
<!--应用名称-->
<springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="doublemintDefault"/>
<!--日志文件保存路径-->
<property name="LOG_FILE_PATH" value="${catalina.home:-.}/logs}"/>
<!--LogStash 访问 host-->
<springProperty name="LOG_STASH_HOST" scope="context" source="logstash.host" defaultValue="localhost"/>

<!--DEBUG 日志输出到 LogStash-->
<appender name="LOG_STASH_DEBUG" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<destination>${LOG_STASH_HOST}:4560</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>Asia/Shanghai</timeZone>
</timestamp>
<!--自定义日志输出格式-->
<pattern>
<pattern>
{
"project": "doublemint",
"level": "%level",
"service": "${APP_NAME:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger",
"message": "%message",
"stack_trace": "%exception{20}"
}
</pattern>
</pattern>
</providers>
</encoder>
<!--当有多个 LogStash 服务时,设置访问策略为轮询-->
<connectionStrategy>
<roundRobin>
<connectionTTL>5 minutes</connectionTTL>
</roundRobin>
</connectionStrategy>
</appender>

<!--ERROR 日志输出到 LogStash-->
<appender name="LOG_STASH_ERROR" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<destination>${LOG_STASH_HOST}:4561</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>Asia/Shanghai</timeZone>
</timestamp>
<!--自定义日志输出格式-->
<pattern>
<pattern>
{
"project": "doublemint",
"level": "%level",
"service": "${APP_NAME:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger",
"message": "%message",
"stack_trace": "%exception{20}"
}
</pattern>
</pattern>
</providers>
</encoder>
<!--当有多个 LogStash 服务时,设置访问策略为轮询-->
<connectionStrategy>
<roundRobin>
<connectionTTL>5 minutes</connectionTTL>
</roundRobin>
</connectionStrategy>
</appender>

<!--业务日志输出到 LogStash-->
<appender name="LOG_STASH_BUSINESS" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>${LOG_STASH_HOST}:4562</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>Asia/Shanghai</timeZone>
</timestamp>
<!--自定义日志输出格式-->
<pattern>
<pattern>
{
"project": "doublemint",
"level": "%level",
"service": "${APP_NAME:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger",
"message": "%message",
"stack_trace": "%exception{20}"
}
</pattern>
</pattern>
</providers>
</encoder>
<!--当有多个 LogStash 服务时,设置访问策略为轮询-->
<connectionStrategy>
<roundRobin>
<connectionTTL>5 minutes</connectionTTL>
</roundRobin>
</connectionStrategy>
</appender>

<!--接口访问记录日志输出到 LogStash-->
<appender name="LOG_STASH_RECORD" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>${LOG_STASH_HOST}:4563</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>Asia/Shanghai</timeZone>
</timestamp>
<!--自定义日志输出格式-->
<pattern>
<pattern>
{
"project": "doublemint",
"level": "%level",
"service": "${APP_NAME:-}",
"class": "%logger",
"message": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
<!--当有多个 LogStash 服务时,设置访问策略为轮询-->
<connectionStrategy>
<roundRobin>
<connectionTTL>5 minutes</connectionTTL>
</roundRobin>
</connectionStrategy>
</appender>

<!--控制框架输出日志-->
<logger name="org.slf4j" level="INFO"/>
<logger name="springfox" level="INFO"/>
<logger name="io.swagger" level="INFO"/>
<logger name="org.springframework" level="INFO"/>
<logger name="org.hibernate.validator" level="INFO"/>
<logger name="com.alibaba.nacos.client.naming" level="INFO"/>

<root>
<appender-ref ref="CONSOLE"/>
<appender-ref ref="LOG_STASH_DEBUG"/>
<appender-ref ref="LOG_STASH_ERROR"/>
</root>

<logger name="ink.ckx.doublemint.common.aspect.WebLogAspect" level="DEBUG">
<appender-ref ref="LOG_STASH_RECORD"/>
</logger>

<logger name="ink.ckx.doublemint" level="DEBUG">
<appender-ref ref="LOG_STASH_BUSINESS"/>
</logger>
</configuration>
请求响应日志记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Slf4j
@Aspect
@Component
@Order(1)
public class WebLogAspect {

@Pointcut("execution(public * ink.ckx.doublemint.*.controller.*.*(..))")
public void webLog() {
}

@Before("webLog()")
public void doBefore(JoinPoint joinPoint) {
}

@AfterReturning(value = "webLog()", returning = "ret")
public void doAfterReturning(Object ret) {
}

@Around("webLog()")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
// 获取当前请求对象
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
// 记录请求信息(通过 Logstash 传入 Elasticsearch)
WebLog webLog = new WebLog();
Object result = joinPoint.proceed();
Signature signature = joinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method method = methodSignature.getMethod();
if (method.isAnnotationPresent(ApiOperation.class)) {
ApiOperation log = method.getAnnotation(ApiOperation.class);
webLog.setDescription(log.value());
}
long endTime = System.currentTimeMillis();
String urlStr = request.getRequestURL().toString();
webLog.setBasePath(StrUtil.removeSuffix(urlStr, URLUtil.url(urlStr).getPath()));
webLog.setIp(request.getRemoteUser());
webLog.setMethod(request.getMethod());
webLog.setParameter(getParameter(method, joinPoint.getArgs()));
webLog.setResult(result);
webLog.setSpendTime((int) (endTime - startTime));
webLog.setStartTime(startTime);
webLog.setUri(request.getRequestURI());
webLog.setUrl(request.getRequestURL().toString());
Map<String, Object> logMap = new HashMap<>();
logMap.put("url", webLog.getUrl());
logMap.put("method", webLog.getMethod());
logMap.put("parameter", webLog.getParameter());
logMap.put("spendTime", webLog.getSpendTime());
logMap.put("description", webLog.getDescription());
log.info(Markers.appendEntries(logMap), JSONUtil.parse(webLog).toString());
return result;
}

/**
* 根据方法和传入的参数获取请求参数
*/
private Object getParameter(Method method, Object[] args) {
List<Object> argList = new ArrayList<>();
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
// 将 RequestBody 注解修饰的参数作为请求参数
RequestBody requestBody = parameters[i].getAnnotation(RequestBody.class);
if (requestBody != null) {
argList.add(args[i]);
}
// 将 RequestParam 注解修饰的参数作为请求参数
RequestParam requestParam = parameters[i].getAnnotation(RequestParam.class);
if (requestParam != null) {
Map<String, Object> map = new HashMap<>();
String key = parameters[i].getName();
if (!StrUtil.isEmpty(requestParam.value())) {
key = requestParam.value();
}
map.put(key, args[i]);
argList.add(map);
}
}
if (argList.size() == 0) {
return null;
} else if (argList.size() == 1) {
return argList.get(0);
} else {
return argList;
}
}
}

Logstash 配置(logstash.conf)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
type => "debug"
}
tcp {
mode => "server"
host => "0.0.0.0"
port => 4561
codec => json_lines
type => "error"
}
tcp {
mode => "server"
host => "0.0.0.0"
port => 4562
codec => json_lines
type => "business"
}
tcp {
mode => "server"
host => "0.0.0.0"
port => 4563
codec => json_lines
type => "record"
}
}
filter{
if [type] == "record" {
mutate {
remove_field => "port"
remove_field => "host"
remove_field => "@version"
}
json {
source => "message"
remove_field => ["message"]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
action => "index"
codec => json
index => "doublemint-%{type}-%{+YYYY.MM.dd}"
template_name => "doublemint"
}
}

测试

  1. 启动 ELK 环境并打开 kibana 界面
  2. 启动你的项目,随意访问几个接口,然后打开 kibana 界面的索引管理,会出现这几个索引

image.png

  1. 新创建几个索引模式

image.png

  1. 打开 Analytics -> Discover,显示如下界面后就可以愉快的玩耍了

image.png