项目地址:https://github.com/chenkaixin12121/study/tree/master/rocketmq
依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
|
订单服务发送事务消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @RequiredArgsConstructor @RequestMapping("/order") @RestController public class OrderController {
private final RocketMQTemplate rocketMQTemplate;
@PostMapping("/save") public String sendTransaction() { Order order = Order.builder() .userId("10") .commodityCode("product-1") .count(1) .money(60) .build(); Map<String, Object> map = new HashMap<>(); map.put("commodityCode", order.getCommodityCode()); map.put("count", order.getCount()); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction( "txTopic:txTag", MessageBuilder.withPayload(map) .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()) .build(), order); return "发送事务消息成功:" + transactionSendResult; } }
|
订单服务接收事务消息
接收消息后,执行本地事务,本地事务成功,则提交事务,否则回滚事务,如长时间无法收到事务消息状态的变更,则调用事务状态回查进行提交或回滚,所以我们在本地事务中需要记录事务执行日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Slf4j @Component @RocketMQTransactionListener @RequiredArgsConstructor public class OrderTxListener implements RocketMQLocalTransactionListener {
private final OrderRepository orderRepository;
private final TransactionLogRepository transactionLogRepository;
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { log.info("收到创建订单事务消息,msg:{},arg:{}", msg, arg); Order order = (Order) arg;
MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); try { TransactionLog transactionLog = TransactionLog.builder() .transactionId(transactionId) .log("创建订单事务消息") .build(); this.saveOrder(order, transactionLog); log.info("创建订单事务提交:{}", transactionId); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.info("创建订单事务回滚:{}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; } }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { MessageHeaders headers = msg.getHeaders(); String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
int resultCount = transactionLogRepository.countByTransactionId(transactionId); if (resultCount == 0) { log.info("创建订单回查事务回滚:{}", transactionId); return RocketMQLocalTransactionState.ROLLBACK; } log.info("创建订单回查事务提交:{}", transactionId); return RocketMQLocalTransactionState.COMMIT; }
@Transactional public void saveOrder(Order order, TransactionLog transactionLog) { orderRepository.save(order); transactionLogRepository.save(transactionLog); } }
|
库存服务收到扣减库存消息
库存服务收到扣减库存消息,如果本地事务执行失败,则进行重试(因为追求的是最终一致性),重试达一定次数后失败消息入库,由人工进行处理,这里也需要判断消息是否重复消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Slf4j @RequiredArgsConstructor @RocketMQMessageListener(topic = "txTopic", selectorExpression = "txTag", consumerGroup = "tx-consumer-group") @Component public class DeductionStorageListener implements RocketMQListener<MessageExt> {
private final StorageRepository storageRepository;
private final StringRedisTemplate stringRedisTemplate;
@Override public void onMessage(MessageExt message) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("收到扣减库存消息:{}", msg);
String msgId = message.getMsgId(); if (stringRedisTemplate.opsForSet().isMember("msgId", msgId)) { log.info("msgId:{},消息不能重复消费", msgId); return; } try { Storage storage = JSONUtil.toBean(msg, Storage.class);
Storage selectStorage = storageRepository.findByCommodityCode(storage.getCommodityCode()); selectStorage.setCount(selectStorage.getCount() - storage.getCount()); storageRepository.save(selectStorage);
stringRedisTemplate.opsForSet().add("msgId", msgId); } catch (Exception e) { if (message.getReconsumeTimes() < 3) { log.info("扣减库存消息消费失败,进行第 {} 次重试", message.getReconsumeTimes() + 1); throw new RuntimeException(e); } else { log.info("扣减库存消息已达最大重试次数"); } } } }
|
测试
- 测试成功执行,http://localhost:8088/order/save
- 测试事务状态回查,启动两个服务,在事务状态提交时打断点,并强制杀死订单服务的进程(不要使用IDEA进行关闭服务),重启订单服务一分钟后,打印日志“创建订单回查事务提交”
1 2
| netstat -ano | findstr 8088 taskkill /f /pid 11240
|
- 测试扣减库存消息的重试,去除 DeductionStorageListener 的注释即可
- 测试消息重复消费,打开 rocketmq-dashboard:https://github.com/apache/rocketmq-dashboard ,根据消息主题找到消息后重新发送