RabbitMQ消息可靠性投递解决方案

RabbitMQ消息可靠性投递解决方案

开心 528 2020-09-25

image.png

如何保证 RabbitMQ 可靠性投递,本文主要分为以下几个步骤进行:

  • 生成数据库消息日志,状态为投递中,再把消息放入队列
  • 根据 confirm(ConfirmCallback 和 ReturnCallback)的结果来确定消息是否投递成功,投递成功的,修改投递状态为发送成功,投递失败的消息由定时任务定期扫描并重新投递
  • 定时任务重新投递发送失败的消息,如果投递次数达过阈值后还是发送失败,设置投递状态为发送失败,由人工进行检查
  • 消费者取到消息后,从消息中取出唯一标识,先判断此消息有没有被消费过,若已消费过,则直接 ack(避免重复消费)
  • 正常处理成功后,修改投递状态为消费成功,并 ack
  • 遇到异常时,捕获异常,验证消息重试次数是否达到阈值,超过则修改投递状态为消费失败,由人工进行处理

项目地址:https://github.com/chenkaixin12121/rabbit-reliability (已改为使用延迟队列插件)

1. 配置

spring:
  rabbitmq:
    username: guest
    password: guest
    host: localhost
    port: 5672
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated
    # 发送者开启 return 确认机制,要和 mandatory 一起使用
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        # 设置消费端需要手动 ack,默认为 auto
        acknowledge-mode: manual
        # 一次处理的消息数量
        prefetch: 2
        # 消费端最大并发数
        max-concurrency: 10
@Slf4j
@Configuration
public class RabbitConfig {

    public static final Integer MSG_TIMEOUT = 1; // 消息超时时间
    public static final Integer MAX_TRY_COUNT = 3; // 最大重试次数
    public static final String ORDER_MSG_LOG = "order_msg_log";

    public static final String MAIL_QUEUE_NAME = "ckx.mail.queue";
    public static final String MAIL_EXCHANGE_NAME = "ckx.mail.exchange";
    public static final String MAIL_ROUTING_KEY_NAME = "ckx.mail.routing.key";

    public static final String RETRY_QUEUE_NAME = "ckx.retry.queue";
    public static final String RETRY_EXCHANGE_NAME = "ckx.retry.exchange";
    public static final String RETRY_ROUTING_KEY_NAME = "ckx.retry.routing.key";
    public static final Integer RETRY_TTL = 5000; // 在延迟队列的时间

    public static final Integer MSG_DELIVER_SUCCESS = 1; // 发送成功
    public static final Integer MSG_DELIVER_FAIL = 2; // 发送失败
    public static final Integer MSG_CONSUMED_SUCCESS = 3; // 消费成功
    public static final Integer MSG_CONSUMED_FAIL = 4; // 消费失败

    private final CachingConnectionFactory cachingConnectionFactory;

    private final IMsgLogService msgLogService;

    public RabbitConfig(CachingConnectionFactory cachingConnectionFactory, IMsgLogService msgLogService) {
        this.cachingConnectionFactory = cachingConnectionFactory;
        this.msgLogService = msgLogService;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.setMessageConverter(converter());

        /**
         * correlationData 唯一标识
         * ack 消息是否到达
         * cause 失败的异常消息
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String msgId = Objects.requireNonNull(correlationData).getId();
            if (ack) {
                log.info("消息成功发送到Exchange, msgId: {}", msgId);
                msgLogService.updateStatus(msgId, MSG_DELIVER_SUCCESS);
            } else {
                log.info("消息发送到Exchange失败, correlationData : {}, cause: {}", correlationData, cause);
            }
        });

        rabbitTemplate.setReturnsCallback(
                returnedMessage ->
                        log.info("消息从Exchange路由到Queue失败: exchange: {}, routingKey: {}, replyCode: {}, replyText: {}, message: {}",
                                returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),
                                returnedMessage.getReplyText(), returnedMessage.getMessage()));

        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange mailExchange() {
        return ExchangeBuilder
                .directExchange(MAIL_EXCHANGE_NAME)
                .durable(true)
                .build();
    }

    @Bean
    public DirectExchange retryExchange() {
        return ExchangeBuilder
                .directExchange(RETRY_EXCHANGE_NAME)
                .durable(true)
                .build();
    }

    @Bean
    public Queue mailQueue() {
        return new Queue(MAIL_QUEUE_NAME, true);
    }

    /**
     * 经过延迟时间后,将该消息重新投递到对应的 Exchange 中
     */
    @Bean
    public Queue retryQueue() {
        return QueueBuilder
                .durable(RETRY_QUEUE_NAME)
                .withArgument("x-dead-letter-routing-key", MAIL_ROUTING_KEY_NAME)
                .withArgument("x-dead-letter-exchange", MAIL_EXCHANGE_NAME)
                .withArgument("x-message-ttl", RETRY_TTL)
                .build();
    }

    @Bean
    public Binding mailBinding() {
        return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }

    @Bean
    public Binding retryBinding() {
        return BindingBuilder.bind(retryQueue()).to(retryExchange()).with(RETRY_ROUTING_KEY_NAME);
    }
}

消息投递的顺序:Producer -> RabbitMQ Broker Cluster -> Exchange -> Queue -> Consumer
Message 从 Producer 到 RabbitMQ Broker Cluster 会返回一个 ConfirmCallback
Message 从 Exchange 到 Queue 投递失败则会返回一个 ReturnCallback,我们使用这两个 Callback 来控制消息的最终一致性和部分纠错能力

1.1 死信队列

没有被及时消费的消息存放的队列被称为死信队列,消息没有被及时消费有以下几点原因:

  1. 消息被拒绝(basic.reject / basic.nack)并且不再重新投递(requeue = false)
  2. TTL(Time-To-Live)消息超时未消费
  3. 达到最大队列长度

使用死信队列实现延迟消息:将正常的业务队列或者是需要延迟处理的队列添加 x-message-ttl 和 x-dead-letter-exchange 等参数,这样当触发死信的条件后,就会转发到死信交换器,就由监听 dlx.queue 死信队列的程序进行相应业务处理了

2. 生产者

@Component
public class MailProducer {

    private final MsgLogMapper msgLogMapper;

    private final RabbitTemplate rabbitTemplate;

    public MailProducer(MsgLogMapper msgLogMapper, RabbitTemplate rabbitTemplate) {
        this.msgLogMapper = msgLogMapper;
        this.rabbitTemplate = rabbitTemplate;
    }

    public void send(Mail mail) {

        String msgId = UUID.randomUUID().toString().replace("-", "");
        String msg = JsonUtil.toJson(mail);
        MsgLog msgLog = MsgLog.builder()
                .msgId(msgId)
                .msg(msg)
                .exchange(MAIL_EXCHANGE_NAME)
                .routingKey(MAIL_ROUTING_KEY_NAME)
                .nextTryTime(LocalDateTime.now().plusMinutes(MSG_TIMEOUT))
                .createTime(LocalDateTime.now())
                .build();
        msgLogMapper.insert(msgLog); // 消息入库

        Message message = MessageBuilder.withBody(Objects.requireNonNull(msg).getBytes()).build();
        MessageProperties messageProperties = message.getMessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); // JSON
        rabbitTemplate.convertAndSend(MAIL_EXCHANGE_NAME, MAIL_ROUTING_KEY_NAME, message, new CorrelationData(msgId)); // 发送消息
    }
}

3. 消费者

@Slf4j
@Component
public class MailConsumer {

    private final StringRedisTemplate redisTemplate;

    private final MailService mailService;

    private final IMsgLogService msgLogService;

    private final RabbitTemplate rabbitTemplate;

    public MailConsumer(StringRedisTemplate redisTemplate, MailService mailService, IMsgLogService msgLogService, RabbitTemplate rabbitTemplate) {
        this.redisTemplate = redisTemplate;
        this.mailService = mailService;
        this.msgLogService = msgLogService;
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MAIL_QUEUE_NAME)
    public void handler(Message message, Channel channel) throws IOException {
        Mail mail = JsonUtil.fromJson(new String(message.getBody()), new TypeReference<Mail>() {
        });
        MessageProperties messageProperties = message.getMessageProperties();
        Map<String, Object> headers = messageProperties.getHeaders();
        long tag = messageProperties.getDeliveryTag();
        String msgId = (String) headers.get("spring_returned_message_correlation");
        long retryCount = 0;
        if (headers.containsKey("x-death")) {
            List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
            retryCount = (long) deathList.get(0).get("count");
        }

        if (redisTemplate.opsForSet().isMember(ORDER_MSG_LOG, msgId)) {
            // redis中包含该key,说明此消息已经被消费过
            log.info("消息已经被消费, msgId: {}", msgId);
            // 确认消息已消费
            channel.basicAck(tag, false);
            return;
        }

        try {
            // 发送邮件
            mailService.send(mail);
            msgLogService.updateStatus(msgId, MSG_CONSUMED_SUCCESS);
            redisTemplate.opsForSet().add(ORDER_MSG_LOG, msgId);
            channel.basicAck(tag, false);
            log.info("发送邮件成功,msgId: {}", msgId);
        } catch (Exception e) {
            channel.basicNack(tag, false, false);
            if (retryCount < MAX_TRY_COUNT) {
                // 发送到延迟队列
                rabbitTemplate.send(RETRY_EXCHANGE_NAME, RETRY_ROUTING_KEY_NAME, message, new CorrelationData(msgId));
                log.info("发送邮件失败,进入第{}次重试,msgId: {}", retryCount + 1, msgId);
            } else {
                msgLogService.updateStatus(msgId, MSG_CONSUMED_FAIL);
                log.info("发送邮件失败,达到最大重试次数,msgId: {}", msgId);
            }
        }
    }
}
3.1 如何保证消息的幂等性

当消费者消费完消息时,在给生产端返回 ack 时由于网络中断,导致生产端未收到确认信息,该条消息会重新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题

解决方案:

  1. 使用全局唯一 ID,只要消费过该消息就存储过 Redis 中,开始消费前,先去 Redis 中查询有没消费记录即可
  2. 利用 Redis 的原子性去实现,可以使用 setnx 命令,执行成功就表示没有处理过这条消息,可以进行消费
3.2 手动应答机制

手动 ack 机制下,只要没有明确告诉 RabbitMQ 消息被消费,没有 ack,消息就一直是 unacked 状态,即使 consumer 宕机,消息不会丢失,会变为 ready 状态,下次一有新的 consumer 连接进来就会发给它

basicAck:确认一个或多个接收的消息
basicNack:拒绝一个或多个接收的消息,可以设置 requeue 是否重回队列
basicReject:拒绝一个消息,其他同上

4. 失败消息重新投递

@Scheduled(cron = "0/20 * * * * ?")
public void resend() {
    log.info("定时任务 -> 重新投递消息 -> 开始");
    List<MsgLog> msgLogList = msgLogService.selectTimeoutMsg();
    msgLogList.forEach(
            msgLog -> {
                String msgId = msgLog.getMsgId();
                if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
                    // 设置该条消息发送失败
                    msgLogService.updateStatus(msgId, MSG_DELIVER_FAIL);
                    log.info("超过最大重试次数,消息投递失败,msgId: {}", msgId);
                } else {
                    msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());
                    rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(), new CorrelationData(msgId));
                }
            }
    );
    log.info("任务结束 <- 重新投递消息 <- 结束");
}

5. 问题

消费者消费消息的时候抛出异常,由于 设置 basicNack(重回队列)为 true,消息回滚到消息队列的时候不会回到队列尾部,而是仍是在队列头部,这时消费者又立即接收到这条消息进行处理,接着抛出异常,进行回滚,造成死循环

解决方案:无论消费是否成功,都必须对 channel 进行应答,使用 nack 时设置 requeue 为 false,抛出异常后进入延迟队列进行重试机制,达到阈值时修改投递状态为消费失败,不再重试


# rabbitmq