分布式事务解决方案-RocketMQ

分布式事务解决方案-RocketMQ

开心 345 2021-09-03

image.png

项目地址:https://github.com/chenkaixin12121/study/tree/master/rocketmq

1. 添加依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

2. 订单服务发送事务消息

@RequiredArgsConstructor
@RequestMapping("/order")
@RestController
public class OrderController {

    private final RocketMQTemplate rocketMQTemplate;

    @PostMapping("/save")
    public String sendTransaction() {
        Order order = Order.builder()
                .userId("10")
                .commodityCode("product-1")
                .count(1)
                .money(60)
                .build();
        Map<String, Object> map = new HashMap<>();
        map.put("commodityCode", order.getCommodityCode());
        map.put("count", order.getCount());
        TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
                "txTopic:txTag",
                MessageBuilder.withPayload(map)
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                        .build(),
                order);
        return "发送事务消息成功:" + transactionSendResult;
    }
}

3. 订单服务接收事务消息

接收消息后,执行本地事务,本地事务成功,则提交事务,否则回滚事务,如长时间无法收到事务消息状态的变更,则调用事务状态回查进行提交或回滚,所以我们在本地事务中需要记录事务执行日志

@Slf4j
@Component
@RocketMQTransactionListener
@RequiredArgsConstructor
public class OrderTxListener implements RocketMQLocalTransactionListener {

    private final OrderRepository orderRepository;

    private final TransactionLogRepository transactionLogRepository;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("收到创建订单事务消息,msg:{},arg:{}", msg, arg);
        Order order = (Order) arg;

        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        try {
            TransactionLog transactionLog = TransactionLog.builder()
                    .transactionId(transactionId)
                    .log("创建订单事务消息")
                    .build();
            this.saveOrder(order, transactionLog);
            log.info("创建订单事务提交:{}", transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.info("创建订单事务回滚:{}", transactionId);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        int resultCount = transactionLogRepository.countByTransactionId(transactionId);
        if (resultCount == 0) {
            log.info("创建订单回查事务回滚:{}", transactionId);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        log.info("创建订单回查事务提交:{}", transactionId);
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Transactional
    public void saveOrder(Order order, TransactionLog transactionLog) {
        orderRepository.save(order);
        transactionLogRepository.save(transactionLog);
    }
}

4. 库存服务收到扣减库存消息

库存服务收到扣减库存消息,如果本地事务执行失败,则进行重试(因为追求的是最终一致性),重试达一定次数后失败消息入库,由人工进行处理,这里也需要判断消息是否重复消费

@Slf4j
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "txTopic", selectorExpression = "txTag", consumerGroup = "tx-consumer-group")
@Component
public class DeductionStorageListener implements RocketMQListener<MessageExt> {

    private final StorageRepository storageRepository;

    private final StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MessageExt message) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("收到扣减库存消息:{}", msg);

        String msgId = message.getMsgId();
        if (stringRedisTemplate.opsForSet().isMember("msgId", msgId)) {
            log.info("msgId:{},消息不能重复消费", msgId);
            return;
        }
        try {
            Storage storage = JSONUtil.toBean(msg, Storage.class);
//            int i = 1/0;

            Storage selectStorage = storageRepository.findByCommodityCode(storage.getCommodityCode());
            selectStorage.setCount(selectStorage.getCount() - storage.getCount());
            storageRepository.save(selectStorage);

            stringRedisTemplate.opsForSet().add("msgId", msgId);
        } catch (Exception e) {
            if (message.getReconsumeTimes() < 3) {
                log.info("扣减库存消息消费失败,进行第 {} 次重试", message.getReconsumeTimes() + 1);
                throw new RuntimeException(e);
            } else {
                log.info("扣减库存消息已达最大重试次数");
                // TODO 失败消息入库
            }
        }
    }
}

5. 测试

  • 测试成功执行,http://localhost:8088/order/save
  • 测试事务状态回查,启动两个服务,在事务状态提交时打断点,并强制杀死订单服务的进程(不要使用IDEA进行关闭服务),重启订单服务一分钟后,打印日志“创建订单回查事务提交”
netstat -ano | findstr 8088
taskkill /f /pid 11240

image.png

  • 测试扣减库存消息的重试,去除 DeductionStorageListener 的注释即可
  • 测试消息重复消费,打开 rocketmq-dashboard:https://github.com/apache/rocketmq-dashboard ,根据消息主题找到消息后重新发送

# rocketmq