删除数据不会同步到 es 中,设置软删除即可

参考:https://www.elastic.co/cn/blog/how-to-keep-elasticsearch-synchronized-with-a-relational-database-using-logstash

1. sql 文件

CREATE DATABASE es_db;

USE es_db;

DROP TABLE IF EXISTS es_table;

CREATE TABLE `es_table` (
  `id` bigint unsigned NOT NULL,
  `client_name` varchar(32) NOT NULL,
  `modification_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `insertion_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `unique_id` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

2. 在 config 下添加 sync_mysql.conf

input {
  jdbc {
    jdbc_driver_library => "D:\repository\mysql\mysql-connector-java\8.0.13\mysql-connector-java-8.0.13.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/es_db?serverTimezone=Asia/Shanghai"
    jdbc_user => root
    jdbc_password => root
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    tracking_column => "modification_time"
    use_column_value => true
    tracking_column_type => "timestamp"
    schedule => "*/5 * * * * *"
    statement => "SELECT * FROM es_table WHERE modification_time >= :sql_last_value"
    last_run_metadata_path => "D:\logstash-7.12.0\config\sync_mysql_last_time.txt"
  }
}
output {
  elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "es_db_index"
        document_type => "_doc"
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

3. 启动服务并测试

logstash -f ../config/sync_mysql.conf
INSERT es_table ( id, client_name ) VALUES ( 10, '陈三')
UPDATE es_table SET client_name = '陈四' WHERE id = 10

Q.E.D.


盛年不重来,一日难再晨。