本文主要演示了 canal 全量和增量的从 MySQL 同步数据到 ElasticSearch

技术版本
canal1.1.5
MySQL8.0.12
ELK7.12.0

1. 相关 bug

c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

这是因为 escore 模块 与 common 模块的 jar 包冲突所导致的

解决方案:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <scope>provided</scope>
</dependency>
  • 重新打包,将 es7x 模块的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 放到我们的 canal.adapter-1.1.5/plugin 目录下,覆盖原文件

2. 配置 MySQL

  • 设置 MySQL 的 biglog
[mysqld]
log-bin=mysql-bin
server-id=1
binlog-format=ROW
  • 重启服务后查询是否已设置成功
SHOW VARIABLES LIKE '%log_bin%';
SHOW VARIABLES LIKE '%binlog_format%';
  • 创建用户并授权
CREATE USER canal IDENTIFIED BY 'canalpwd';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  • 新建数据库和表,生成数据
CREATE DATABASE canal_sync_db;
USE canal_sync_db;

CREATE TABLE `canal_sync_order` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `order_no` varchar(255) DEFAULT NULL,
  `money` decimal(16,2) DEFAULT '0.00',
  `receiver_name` varchar(255) DEFAULT NULL,
  `receiver_phone` varchar(255) DEFAULT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE PROCEDURE insert_order ( IN count INT ) 
BEGIN
    DECLARE xing VARCHAR ( 2056 ) DEFAULT '赵钱孙李周郑王冯陈楮卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢喻柏水窦章云苏潘葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳酆鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闽席季麻强贾路娄危江童颜郭梅盛林刁锺徐丘骆高夏蔡田樊胡凌霍虞万支柯昝管卢莫经裘缪干解应宗丁宣贲邓郁单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁';
    DECLARE ming VARCHAR ( 2056 ) DEFAULT '嘉懿煜城懿轩烨伟苑博伟泽熠彤鸿煊博涛烨霖烨华煜祺智宸正豪昊然明杰诚立轩立辉峻熙弘文熠彤鸿煊烨霖哲瀚鑫鹏致远俊驰雨泽烨磊晟睿天佑文昊修洁黎昕远航旭尧鸿涛伟祺轩越泽浩宇瑾瑜皓轩擎苍擎宇志泽睿渊楷瑞轩弘文哲瀚雨泽鑫磊梦琪忆之桃慕青问兰尔岚元香初夏沛菡傲珊曼文乐菱痴珊恨玉惜文香寒新柔语蓉海安夜蓉涵柏水桃醉蓝春儿语琴从彤傲晴语兰又菱碧彤元霜怜梦紫寒妙彤曼易南莲紫翠雨寒易烟如萱若南寻真晓亦向珊慕灵以蕊寻雁映易雪柳孤岚笑霜海云凝天沛珊寒云冰旋宛儿绿真盼儿晓霜碧凡夏菡曼香若烟半梦雅绿冰蓝灵槐平安书翠翠风香巧代云梦曼幼翠友巧听寒梦柏醉易访旋亦玉凌萱访卉怀亦笑蓝春翠靖柏夜蕾冰夏梦松书雪乐枫念薇靖雁寻春恨山从寒忆香觅波静曼凡旋以亦念露芷蕾千兰新波代真新蕾雁玉冷卉紫山千琴恨天傲芙盼山怀蝶冰兰山柏翠萱乐丹翠柔谷山之瑶冰露尔珍谷雪乐萱涵菡海莲傲蕾青槐冬儿易梦惜雪宛海之柔夏青亦瑶妙菡春竹修杰伟诚建辉晋鹏天磊绍辉泽洋明轩健柏煊昊强伟宸博超君浩子骞明辉鹏涛炎彬鹤轩越彬风华靖琪明诚高格光华国源宇晗昱涵润翰飞翰海昊乾浩博和安弘博鸿朗华奥华灿嘉慕坚秉建明金鑫锦程瑾瑜鹏经赋景同靖琪君昊俊明季同开济凯安康成乐语力勤良哲理群茂彦敏博明达朋义彭泽鹏举濮存溥心璞瑜浦泽奇邃祥荣轩';
    DECLARE I_xing INT DEFAULT LENGTH( xing ) / 3;
    DECLARE I_ming INT DEFAULT LENGTH( ming ) / 3;
    DECLARE receiver_name_str VARCHAR ( 2056 ) DEFAULT '';
    
    DECLARE head VARCHAR(100) DEFAULT '000,156,136,176';
    DECLARE content CHAR(10) DEFAULT '0123456789';
    DECLARE phone CHAR(11) DEFAULT '';
        
    DECLARE i INT DEFAULT 0;
    
    SET autocommit = 0;
    WHILE i < count DO
        SET receiver_name_str = '';
        SET receiver_name_str = CONCAT( receiver_name_str, substring( xing, floor( 1 + RAND() * I_xing ), 1 ));
        SET receiver_name_str = CONCAT( receiver_name_str, substring( ming, floor( 1 + RAND() * I_ming ), 1 ));
        IF RAND() > 0.400 THEN 
            SET receiver_name_str = CONCAT( receiver_name_str, substring( ming, floor( 1 + RAND() * I_ming ), 1 ));
        END IF;
        SET phone = substring(head, 1+(FLOOR(1 + (RAND() * 3))*4), 3);
        
        INSERT INTO canal_sync_order ( order_no, money, receiver_name, receiver_phone ) VALUES ( MD5( RAND()), FLOOR( RAND() * 10001 ), receiver_name_str, phone );
        SET i = i + 1;
    END WHILE;
    COMMIT;
END

call insert_order(100000);
  • 查看表是否已经有 10w 条数据

3. 配置 es

  • 启动 es 和 kibana 后创建索引
PUT /canal_sync_order
{
  "mappings": {
    "properties": {
      "order_no": {
        "type": "text"
      },
      "money": {
        "type": "double"
      },
      "receiver_name": {
        "type": "text"
      },
      "receiver_phone": {
        "type": "text"
      },
      "create_time": {
        "type": "date"
      }
    }
  }
}

image.png

  • 创建索引模式 canal_sync_*

image.png

4. 配置 canal

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canalpwd
canal.instance.filter.regex=.*\\..*
  • 修改 canal.adapter-1.1.5/conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal_sync_db?useUnicode=true
      username: canal
      password: canalpwd
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch
  • 在 canal.adapter-1.1.5/conf/es7 目录下新建 sync_order.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: canal_sync_order
  _id: _id
  sql: "SELECT
          a.id AS _id,
          a.order_no,
          a.money,
          a.receiver_name,
          a.receiver_phone,
          a.create_time 
        FROM
          canal_sync_order a"
  etlCondition: "where a.create_time >= {}"
  commitBatch: 3000
  • 启动 canal.deployer 和 canal.adapter
  • 查询 canal.adapter 的日志,可以看到已经有数据同步了

image.png

5. 测试

  • 插入数据,查看数据可以打开 kibana -> Analytics -> Discover
INSERT INTO `canal_sync_order` ( `order_no`, `money`, `receiver_name`, `receiver_phone` ) VALUES ( 'test213', 5017.00, '陈开开', '18312345678' );

image.png
image.png

  • 修改及删除数据
UPDATE `canal_sync_order` SET `receiver_name` = '陈一', `create_time` = now() WHERE `id` = 100003;
DELETE FROM `canal_sync_order` WHERE `id` = 100003;
  • 全量数量同步测试
curl -X POST  http://localhost:8081/etl/es7/sync_order.yml

{"succeeded":true,"resultMessage":"导入ES 数据:100000 条"}

Q.E.D.


盛年不重来,一日难再晨。