分布式事务解决方案-RocketMQ

项目地址:https://github.com/chenkaixin12121/study/tree/master/rocketmq

image.png

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>

订单服务发送事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@RequiredArgsConstructor
@RequestMapping("/order")
@RestController
public class OrderController {

private final RocketMQTemplate rocketMQTemplate;

@PostMapping("/save")
public String sendTransaction() {
Order order = Order.builder()
.userId("10")
.commodityCode("product-1")
.count(1)
.money(60)
.build();
Map<String, Object> map = new HashMap<>();
map.put("commodityCode", order.getCommodityCode());
map.put("count", order.getCount());
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
"txTopic:txTag",
MessageBuilder.withPayload(map)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build(),
order);
return "发送事务消息成功:" + transactionSendResult;
}
}

订单服务接收事务消息

接收消息后,执行本地事务,本地事务成功,则提交事务,否则回滚事务,如长时间无法收到事务消息状态的变更,则调用事务状态回查进行提交或回滚,所以我们在本地事务中需要记录事务执行日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Slf4j
@Component
@RocketMQTransactionListener
@RequiredArgsConstructor
public class OrderTxListener implements RocketMQLocalTransactionListener {

private final OrderRepository orderRepository;

private final TransactionLogRepository transactionLogRepository;

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("收到创建订单事务消息,msg:{},arg:{}", msg, arg);
Order order = (Order) arg;

MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
try {
TransactionLog transactionLog = TransactionLog.builder()
.transactionId(transactionId)
.log("创建订单事务消息")
.build();
this.saveOrder(order, transactionLog);
log.info("创建订单事务提交:{}", transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.info("创建订单事务回滚:{}", transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

int resultCount = transactionLogRepository.countByTransactionId(transactionId);
if (resultCount == 0) {
log.info("创建订单回查事务回滚:{}", transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
log.info("创建订单回查事务提交:{}", transactionId);
return RocketMQLocalTransactionState.COMMIT;
}

@Transactional
public void saveOrder(Order order, TransactionLog transactionLog) {
orderRepository.save(order);
transactionLogRepository.save(transactionLog);
}
}

库存服务收到扣减库存消息

库存服务收到扣减库存消息,如果本地事务执行失败,则进行重试(因为追求的是最终一致性),重试达一定次数后失败消息入库,由人工进行处理,这里也需要判断消息是否重复消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "txTopic", selectorExpression = "txTag", consumerGroup = "tx-consumer-group")
@Component
public class DeductionStorageListener implements RocketMQListener<MessageExt> {

private final StorageRepository storageRepository;

private final StringRedisTemplate stringRedisTemplate;

@Override
public void onMessage(MessageExt message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("收到扣减库存消息:{}", msg);

String msgId = message.getMsgId();
if (stringRedisTemplate.opsForSet().isMember("msgId", msgId)) {
log.info("msgId:{},消息不能重复消费", msgId);
return;
}
try {
Storage storage = JSONUtil.toBean(msg, Storage.class);
// int i = 1/0;

Storage selectStorage = storageRepository.findByCommodityCode(storage.getCommodityCode());
selectStorage.setCount(selectStorage.getCount() - storage.getCount());
storageRepository.save(selectStorage);

stringRedisTemplate.opsForSet().add("msgId", msgId);
} catch (Exception e) {
if (message.getReconsumeTimes() < 3) {
log.info("扣减库存消息消费失败,进行第 {} 次重试", message.getReconsumeTimes() + 1);
throw new RuntimeException(e);
} else {
log.info("扣减库存消息已达最大重试次数");
// TODO 失败消息入库
}
}
}
}

测试

  1. 测试成功执行,http://localhost:8088/order/save
  2. 测试事务状态回查,启动两个服务,在事务状态提交时打断点,并强制杀死订单服务的进程(不要使用IDEA进行关闭服务),重启订单服务一分钟后,打印日志“创建订单回查事务提交”
1
2
netstat -ano | findstr 8088
taskkill /f /pid 11240

image.png

  1. 测试扣减库存消息的重试,去除 DeductionStorageListener 的注释即可
  2. 测试消息重复消费,打开 rocketmq-dashboard:https://github.com/apache/rocketmq-dashboard ,根据消息主题找到消息后重新发送