数据同步-Canal

本文主要演示了 Canal 全量和增量的从 MySQL 同步数据到 ElasticSearch

技术 版本
canal 1.1.5
MySQL 8.0.12
ELK 7.12.0

配置 mysql

  1. 设置 MySQL 的 biglog
1
2
3
4
[mysqld]
log-bin=mysql-bin
server-id=1
binlog-format=ROW
  1. 重启服务后查询是否已设置成功
1
2
SHOW VARIABLES LIKE '%log_bin%';
SHOW VARIABLES LIKE '%binlog_format%';
  1. 创建用户并授权
1
2
3
CREATE USER canal IDENTIFIED BY 'canalpwd';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  1. 新建数据库和表,生成数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
CREATE DATABASE canal_sync_db;
USE canal_sync_db;

CREATE TABLE `canal_sync_order` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`order_no` varchar(255) DEFAULT NULL,
`money` decimal(16,2) DEFAULT '0.00',
`receiver_name` varchar(255) DEFAULT NULL,
`receiver_phone` varchar(255) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE PROCEDURE insert_order ( IN count INT )
BEGIN
DECLARE xing VARCHAR ( 2056 ) DEFAULT '赵钱孙李周郑王冯陈楮卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢喻柏水窦章云苏潘葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳酆鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闽席季麻强贾路娄危江童颜郭梅盛林刁锺徐丘骆高夏蔡田樊胡凌霍虞万支柯昝管卢莫经裘缪干解应宗丁宣贲邓郁单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁';
DECLARE ming VARCHAR ( 2056 ) DEFAULT '嘉懿煜城懿轩烨伟苑博伟泽熠彤鸿煊博涛烨霖烨华煜祺智宸正豪昊然明杰诚立轩立辉峻熙弘文熠彤鸿煊烨霖哲瀚鑫鹏致远俊驰雨泽烨磊晟睿天佑文昊修洁黎昕远航旭尧鸿涛伟祺轩越泽浩宇瑾瑜皓轩擎苍擎宇志泽睿渊楷瑞轩弘文哲瀚雨泽鑫磊梦琪忆之桃慕青问兰尔岚元香初夏沛菡傲珊曼文乐菱痴珊恨玉惜文香寒新柔语蓉海安夜蓉涵柏水桃醉蓝春儿语琴从彤傲晴语兰又菱碧彤元霜怜梦紫寒妙彤曼易南莲紫翠雨寒易烟如萱若南寻真晓亦向珊慕灵以蕊寻雁映易雪柳孤岚笑霜海云凝天沛珊寒云冰旋宛儿绿真盼儿晓霜碧凡夏菡曼香若烟半梦雅绿冰蓝灵槐平安书翠翠风香巧代云梦曼幼翠友巧听寒梦柏醉易访旋亦玉凌萱访卉怀亦笑蓝春翠靖柏夜蕾冰夏梦松书雪乐枫念薇靖雁寻春恨山从寒忆香觅波静曼凡旋以亦念露芷蕾千兰新波代真新蕾雁玉冷卉紫山千琴恨天傲芙盼山怀蝶冰兰山柏翠萱乐丹翠柔谷山之瑶冰露尔珍谷雪乐萱涵菡海莲傲蕾青槐冬儿易梦惜雪宛海之柔夏青亦瑶妙菡春竹修杰伟诚建辉晋鹏天磊绍辉泽洋明轩健柏煊昊强伟宸博超君浩子骞明辉鹏涛炎彬鹤轩越彬风华靖琪明诚高格光华国源宇晗昱涵润翰飞翰海昊乾浩博和安弘博鸿朗华奥华灿嘉慕坚秉建明金鑫锦程瑾瑜鹏经赋景同靖琪君昊俊明季同开济凯安康成乐语力勤良哲理群茂彦敏博明达朋义彭泽鹏举濮存溥心璞瑜浦泽奇邃祥荣轩';
DECLARE I_xing INT DEFAULT LENGTH( xing ) / 3;
DECLARE I_ming INT DEFAULT LENGTH( ming ) / 3;
DECLARE receiver_name_str VARCHAR ( 2056 ) DEFAULT '';

DECLARE head VARCHAR(100) DEFAULT '000,156,136,176';
DECLARE content CHAR(10) DEFAULT '0123456789';
DECLARE phone CHAR(11) DEFAULT '';
DECLARE ctime DATETIME DEFAULT now();

DECLARE i INT DEFAULT 0;

SET autocommit = 0;
WHILE i < count DO
SET receiver_name_str = '';
SET receiver_name_str = CONCAT( receiver_name_str, substring( xing, floor( 1 + RAND() * I_xing ), 1 ));
SET receiver_name_str = CONCAT( receiver_name_str, substring( ming, floor( 1 + RAND() * I_ming ), 1 ));
IF RAND() > 0.400 THEN
SET receiver_name_str = CONCAT( receiver_name_str, substring( ming, floor( 1 + RAND() * I_ming ), 1 ));
END IF;
SET phone = CONCAT( '1', CEILING( RAND()* 9000000000+1000000000 ));
SET ctime = DATE_ADD( now(), INTERVAL -( SELECT ROUND(( RAND() * 240 ))) HOUR );

INSERT INTO canal_sync_order ( order_no, money, receiver_name, receiver_phone, create_time ) VALUES ( MD5( RAND()), FLOOR( RAND() * 10001 ), receiver_name_str, phone, ctime );
SET i = i + 1;
END WHILE;
COMMIT;
END;

call insert_order(100000);
  1. 查看表是否已经有 10w 条数据

配置 es

  1. 启动 es 和 kibana 后创建索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PUT /canal_sync_order
{
"mappings": {
"properties": {
"order_no": {
"type": "text"
},
"money": {
"type": "double"
},
"receiver_name": {
"type": "text"
},
"receiver_phone": {
"type": "text"
},
"create_time": {
"type": "date"
}
}
}
}

image.png

  1. 创建索引模式 canal_sync_*

image.png

配置 canal

  1. 下载:https://github.com/alibaba/canal/releases/tag/canal-1.1.5
  2. 修改 canal.deployer-1.1.5/conf/example/instance.properties
1
2
3
4
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canalpwd
canal.instance.filter.regex=.*\\..*
  1. 修改 canal.adapter-1.1.5/conf/application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/canal_sync_db?useUnicode=true
username: canal
password: canalpwd
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7
hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
properties:
mode: transport # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
  1. 在 canal.adapter-1.1.5/conf/es7 目录下新建 sync_order.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: canal_sync_order
_id: _id
sql: "SELECT
a.id AS _id,
a.order_no,
a.money,
a.receiver_name,
a.receiver_phone,
a.create_time
FROM
canal_sync_order a"
etlCondition: "where a.create_time >= {}"
commitBatch: 3000
  1. 启动 canal.deployer 和 canal.adapter
  2. 查询 canal.adapter 的日志,可以看到已经有数据同步了

image.png

测试

  1. 插入数据,查看数据可以打开 kibana -> Analytics -> Discover
1
INSERT INTO `canal_sync_order` ( `order_no`, `money`, `receiver_name`, `receiver_phone` ) VALUES ( 'test213', 5017.00, '陈开开', '18312345678' );

image.png
image.png

  1. 修改及删除数据
1
2
UPDATE `canal_sync_order` SET `receiver_name` = '陈一', `create_time` = now() WHERE `id` = 100003;
DELETE FROM `canal_sync_order` WHERE `id` = 100003;
  1. 全量数据同步测试
1
2
3
curl -X POST  http://localhost:8081/etl/es7/sync_order.yml

{"succeeded":true,"resultMessage":"导入ES 数据:100000 条"}

问题处理

  • c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
    原因:这是因为 escore 模块 与 common 模块的 jar 包冲突所导致的
    解决方案:
    1. 下载源码:https://github.com/alibaba/canal
    2. 修改 client-adapter -> escore -> pom.xml 的 druid 作用域
      1
      2
      3
      4
      5
      <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <scope>provided</scope>
      </dependency>
    3. 重新打包,将 es7x 模块的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 放到我们的 canal.adapter-1.1.5/plugin 目录下,覆盖原文件