数据同步-Logstash

删除数据不会同步到 es 中,设置软删除即可,参考

sql 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE DATABASE es_db;

USE es_db;

DROP TABLE IF EXISTS es_table;

CREATE TABLE `es_table` (
`id` bigint unsigned NOT NULL,
`client_name` varchar(32) NOT NULL,
`modification_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`insertion_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `unique_id` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

在 config 下添加 sync_mysql.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
input {
jdbc {
jdbc_driver_library => "D:\repository\mysql\mysql-connector-java\8.0.13\mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/es_db?serverTimezone=Asia/Shanghai"
jdbc_user => root
jdbc_password => root
jdbc_paging_enabled => true
jdbc_page_size => 10000
tracking_column => "modification_time"
use_column_value => true
tracking_column_type => "timestamp"
schedule => "*/5 * * * * *"
statement => "SELECT * FROM es_table WHERE modification_time >= :sql_last_value"
last_run_metadata_path => "D:\logstash-7.12.0\config\sync_mysql_last_time.txt"
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "es_db_index"
document_type => "_doc"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}

启动服务并测试

1
logstash -f ../config/sync_mysql.conf
1
2
INSERT es_table ( id, client_name ) VALUES ( 10, '陈三');
UPDATE es_table SET client_name = '陈四' WHERE id = 10;