多节点数据同步

自定义多数据源并通过mybatis的拦截器来实现
源码:https://github.com/chenkaixin12121/study/tree/master/dynamic_datasource/many

加载多节点数据源

数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:
# 主节点
datasource:
url: jdbc:mysql://127.0.0.1:3306/db1?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
sync:
# 从节点
datasource:
mysql:
db2:
url: jdbc:mysql://127.0.0.1:3306/db2?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
db3:
url: jdbc:mysql://127.0.0.1:3306/db3?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver

加载数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
@Setter
@Component
@ConfigurationProperties(prefix = "sync.datasource")
public class DataSourceConfig {

private Map<String, MysqlDataSource> mysql;

public static final Map<String, HikariDataSource> MYSQL_SOURCE_MAP = new ConcurrentHashMap<>();

@PostConstruct
public void loadDatasourceConfig() {
if (CollUtil.isEmpty(mysql)) {
log.error("同步数据源不存在");
return;
}
mysql.forEach((key, properties) -> {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setUsername(properties.getUsername());
hikariConfig.setPassword(properties.getPassword());
hikariConfig.setJdbcUrl(properties.getUrl());
hikariConfig.setDriverClassName(properties.getDriverClassName());
MYSQL_SOURCE_MAP.put(key, new HikariDataSource(hikariConfig));
});
}
}

@Data
class MysqlDataSource {

private String driverClassName;

private String url;

private String username;

private String password;
}

拦截主节点Sql,从节点执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Slf4j
@Intercepts({
@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class}),
@Signature(type = StatementHandler.class, method = "getBoundSql", args = {}),
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
@Component
@RequiredArgsConstructor
public class EventPublish implements Interceptor {

private final ApplicationEventPublisher applicationEventPublisher;

@Override
public Object intercept(Invocation invocation) throws Throwable {
MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];
Object parameter = invocation.getArgs()[1];
Object returnValue = invocation.proceed();
// 同步数据
BoundSql boundSql = mappedStatement.getBoundSql(parameter);
String originalSql = boundSql.getSql();
DataSourceConfig.MYSQL_SOURCE_MAP.forEach((key, source) -> {
try (Connection connection = source.getConnection()) {
PreparedStatement prepareStatement = connection.prepareStatement(originalSql);
DefaultParameterHandler handler = new DefaultParameterHandler(mappedStatement, boundSql.getParameterObject(), boundSql);
handler.setParameters(prepareStatement);
prepareStatement.executeUpdate();
} catch (SQLException e) {
log.error("同步数据异常,数据源:{},错误信息", key, e);
}
});
return returnValue;
}

@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
}
return target;
}

@Override
public void setProperties(Properties properties) {
Interceptor.super.setProperties(properties);
}
}

以上已经可以做到从节点的同步数据了。

事务回滚问题

以上代码虽然可以实现从节点同步数据,但是会有一个问题,就是主节点回滚,从节点不会回滚。这个问题可以使用 Spring 事件发布订阅来解决,即拦截器里发布Sql事件,事件订阅者监听此事件,在事务执行完成后执行Sql即可。

事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DataEvent extends ApplicationEvent {

private static final long serialVersionUID = 4217685139752436445L;

private MappedStatement ms;

private Object parameter;

public DataEvent(Object source) {
super(source);
}

public MappedStatement getMs() {
return ms;
}

public void setMs(MappedStatement ms) {
this.ms = ms;
}

public Object getParameter() {
return parameter;
}

public void setParameter(Object parameter) {
this.parameter = parameter;
}
}

事件发布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
@Intercepts({
@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class}),
@Signature(type = StatementHandler.class, method = "getBoundSql", args = {}),
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
@Component
@RequiredArgsConstructor
public class EventPublish implements Interceptor {

private final ApplicationEventPublisher applicationEventPublisher;

@Override
public Object intercept(Invocation invocation) throws Throwable {
MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];
Object parameter = invocation.getArgs()[1];
Object returnValue = invocation.proceed();
// 发布同步事件
DataEvent event = new DataEvent(this);
event.setMs(mappedStatement);
event.setParameter(parameter);
applicationEventPublisher.publishEvent(event);
return returnValue;
}

@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
}
return target;
}

@Override
public void setProperties(Properties properties) {
Interceptor.super.setProperties(properties);
}
}

事件监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@Component
public class EventListener {

@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)
public void eventListener(DataEvent event) {
MappedStatement ms = event.getMs();
Object parameter = event.getParameter();

BoundSql boundSql = ms.getBoundSql(parameter);
String originalSql = boundSql.getSql();

DataSourceConfig.MYSQL_SOURCE_MAP.forEach((key, source) -> {
try (Connection connection = source.getConnection()) {
log.info("{}", SqlUtils.showSql(key, ms.getConfiguration(), boundSql, ms.getId()));
PreparedStatement prepareStatement = connection.prepareStatement(originalSql);
DefaultParameterHandler handler = new DefaultParameterHandler(ms, boundSql.getParameterObject(), boundSql);
handler.setParameters(prepareStatement);
prepareStatement.executeUpdate();
} catch (SQLException e) {
log.error("同步数据异常,数据源:{},错误信息", key, e);
}
});
}
}

测试

测试正常情况下从节点同步及异常情况下从节点回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
public class SysUserServiceImpl extends ServiceImpl<SysUserMapper, SysUser>
implements SysUserService {

@Override
public void insert() {
SysUser sysUser = new SysUser();
sysUser.setUsername("abc");
sysUser.setPassword("123");
this.save(sysUser);
}

@Transactional(rollbackFor = Exception.class)
@Override
public void insertAndError() {
SysUser sysUser = new SysUser();
sysUser.setUsername("abc");
sysUser.setPassword("123");
this.save(sysUser);

int i = 1 / 0;

SysUser sysUser2 = new SysUser();
sysUser2.setUsername("abc");
sysUser2.setPassword("123");
this.save(sysUser2);
}
}