源码(已改用延迟队列插件):https://github.com/chenkaixin12121/rabbit-reliability
介绍
消息可靠性投递步骤
如何保证 RabbitMQ 可靠性投递,本文主要分为以下几个步骤进行:
- 生成数据库消息日志,状态为投递中,再把消息放入队列
- 根据 confirm(ConfirmCallback 和 ReturnCallback)的结果来确定消息是否投递成功,投递成功的,修改投递状态为发送成功,投递失败的消息由定时任务定期扫描并重新投递
- 定时任务重新投递发送失败的消息,如果投递次数达过阈值后还是发送失败,设置投递状态为发送失败,由人工进行检查
- 消费者取到消息后,从消息中取出唯一标识,先判断此消息有没有被消费过,若已消费过,则直接 ack(避免重复消费)
- 正常处理成功后,修改投递状态为消费成功,并 ack
- 遇到异常时,捕获异常,验证消息重试次数是否达到阈值,超过则修改投递状态为消费失败,由人工进行处理
消息投递的顺序
- Producer -> RabbitMQ Broker Cluster -> Exchange -> Queue -> Consumer
- Message 从 Producer 到 RabbitMQ Broker Cluster 会返回一个 ConfirmCallback
- Message 从 Exchange 到 Queue 投递失败则会返回一个 ReturnCallback,我们使用这两个 Callback 来控制消息的最终一致性和部分纠错能力
死信队列
没有被及时消费的消息存放的队列被称为死信队列,消息没有被及时消费有以下几点原因:
- 消息被拒绝(basic.reject / basic.nack)并且不再重新投递(requeue = false)
- TTL(Time-To-Live)消息超时未消费
- 达到最大队列长度
使用死信队列实现延迟消息:将正常的业务队列或者是需要延迟处理的队列添加 x-message-ttl 和 x-dead-letter-exchange 等参数,这样当触发死信的条件后,就会转发到死信交换器,就由监听 dlx.queue 死信队列的程序进行相应业务处理了
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| spring: rabbitmq: username: guest password: guest host: localhost port: 5672 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true listener: simple: acknowledge-mode: manual prefetch: 2 max-concurrency: 10
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| @Slf4j @Configuration public class RabbitConfig {
public static final Integer MSG_TIMEOUT = 1; public static final Integer MAX_TRY_COUNT = 3; public static final String ORDER_MSG_LOG = "order_msg_log";
public static final String MAIL_QUEUE_NAME = "ckx.mail.queue"; public static final String MAIL_EXCHANGE_NAME = "ckx.mail.exchange"; public static final String MAIL_ROUTING_KEY_NAME = "ckx.mail.routing.key";
public static final String RETRY_QUEUE_NAME = "ckx.retry.queue"; public static final String RETRY_EXCHANGE_NAME = "ckx.retry.exchange"; public static final String RETRY_ROUTING_KEY_NAME = "ckx.retry.routing.key"; public static final Integer RETRY_TTL = 5000;
public static final Integer MSG_DELIVER_SUCCESS = 1; public static final Integer MSG_DELIVER_FAIL = 2; public static final Integer MSG_CONSUMED_SUCCESS = 3; public static final Integer MSG_CONSUMED_FAIL = 4;
private final CachingConnectionFactory cachingConnectionFactory;
private final IMsgLogService msgLogService;
public RabbitConfig(CachingConnectionFactory cachingConnectionFactory, IMsgLogService msgLogService) { this.cachingConnectionFactory = cachingConnectionFactory; this.msgLogService = msgLogService; }
@Bean public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.setMessageConverter(converter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String msgId = Objects.requireNonNull(correlationData).getId(); if (ack) { log.info("消息成功发送到Exchange, msgId: {}", msgId); msgLogService.updateStatus(msgId, MSG_DELIVER_SUCCESS); } else { log.info("消息发送到Exchange失败, correlationData : {}, cause: {}", correlationData, cause); } });
rabbitTemplate.setReturnsCallback( returnedMessage -> log.info("消息从Exchange路由到Queue失败: exchange: {}, routingKey: {}, replyCode: {}, replyText: {}, message: {}", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage()));
return rabbitTemplate; }
@Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); }
@Bean public DirectExchange mailExchange() { return ExchangeBuilder .directExchange(MAIL_EXCHANGE_NAME) .durable(true) .build(); }
@Bean public DirectExchange retryExchange() { return ExchangeBuilder .directExchange(RETRY_EXCHANGE_NAME) .durable(true) .build(); }
@Bean public Queue mailQueue() { return new Queue(MAIL_QUEUE_NAME, true); }
@Bean public Queue retryQueue() { return QueueBuilder .durable(RETRY_QUEUE_NAME) .withArgument("x-dead-letter-routing-key", MAIL_ROUTING_KEY_NAME) .withArgument("x-dead-letter-exchange", MAIL_EXCHANGE_NAME) .withArgument("x-message-ttl", RETRY_TTL) .build(); }
@Bean public Binding mailBinding() { return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME); }
@Bean public Binding retryBinding() { return BindingBuilder.bind(retryQueue()).to(retryExchange()).with(RETRY_ROUTING_KEY_NAME); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Component public class MailProducer {
private final MsgLogMapper msgLogMapper;
private final RabbitTemplate rabbitTemplate;
public MailProducer(MsgLogMapper msgLogMapper, RabbitTemplate rabbitTemplate) { this.msgLogMapper = msgLogMapper; this.rabbitTemplate = rabbitTemplate; }
public void send(Mail mail) {
String msgId = UUID.randomUUID().toString().replace("-", ""); String msg = JsonUtil.toJson(mail); MsgLog msgLog = MsgLog.builder() .msgId(msgId) .msg(msg) .exchange(MAIL_EXCHANGE_NAME) .routingKey(MAIL_ROUTING_KEY_NAME) .nextTryTime(LocalDateTime.now().plusMinutes(MSG_TIMEOUT)) .createTime(LocalDateTime.now()) .build(); msgLogMapper.insert(msgLog);
Message message = MessageBuilder.withBody(Objects.requireNonNull(msg).getBytes()).build(); MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); rabbitTemplate.convertAndSend(MAIL_EXCHANGE_NAME, MAIL_ROUTING_KEY_NAME, message, new CorrelationData(msgId)); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @Slf4j @Component public class MailConsumer {
private final StringRedisTemplate redisTemplate;
private final MailService mailService;
private final IMsgLogService msgLogService;
private final RabbitTemplate rabbitTemplate;
public MailConsumer(StringRedisTemplate redisTemplate, MailService mailService, IMsgLogService msgLogService, RabbitTemplate rabbitTemplate) { this.redisTemplate = redisTemplate; this.mailService = mailService; this.msgLogService = msgLogService; this.rabbitTemplate = rabbitTemplate; }
@RabbitListener(queues = MAIL_QUEUE_NAME) public void handler(Message message, Channel channel) throws IOException { Mail mail = JsonUtil.fromJson(new String(message.getBody()), new TypeReference<Mail>() { }); MessageProperties messageProperties = message.getMessageProperties(); Map<String, Object> headers = messageProperties.getHeaders(); long tag = messageProperties.getDeliveryTag(); String msgId = (String) headers.get("spring_returned_message_correlation"); long retryCount = 0; if (headers.containsKey("x-death")) { List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death"); retryCount = (long) deathList.get(0).get("count"); }
if (redisTemplate.opsForSet().isMember(ORDER_MSG_LOG, msgId)) { log.info("消息已经被消费, msgId: {}", msgId); channel.basicAck(tag, false); return; }
try { mailService.send(mail); msgLogService.updateStatus(msgId, MSG_CONSUMED_SUCCESS); redisTemplate.opsForSet().add(ORDER_MSG_LOG, msgId); channel.basicAck(tag, false); log.info("发送邮件成功,msgId: {}", msgId); } catch (Exception e) { channel.basicNack(tag, false, false); if (retryCount < MAX_TRY_COUNT) { rabbitTemplate.send(RETRY_EXCHANGE_NAME, RETRY_ROUTING_KEY_NAME, message, new CorrelationData(msgId)); log.info("发送邮件失败,进入第{}次重试,msgId: {}", retryCount + 1, msgId); } else { msgLogService.updateStatus(msgId, MSG_CONSUMED_FAIL); log.info("发送邮件失败,达到最大重试次数,msgId: {}", msgId); } } } }
|
如何保证消息的幂等性
当消费者消费完消息时,在给生产端返回 ack 时由于网络中断,导致生产端未收到确认信息,该条消息会重新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题
解决方案:
- 使用全局唯一 ID,只要消费过该消息就存储过 Redis 中,开始消费前,先去 Redis 中查询有没消费记录即可
- 利用 Redis 的原子性去实现,可以使用 setNX 命令,执行成功就表示没有处理过这条消息,可以进行消费
手动应答机制
手动 ack 机制下,只要没有明确告诉 RabbitMQ 消息被消费,没有 ack,消息就一直是 unacked 状态,即使 consumer 宕机,消息不会丢失,会变为 ready 状态,下次一有新的 consumer 连接进来就会发给它
- basicAck:确认一个或多个接收的消息
- basicNack:拒绝一个或多个接收的消息,可以设置 requeue 是否重回队列
- basicReject:拒绝一个消息,其他同上
失败消息重新投递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @RequiredArgsConstructor @Slf4j @Component public class ResendMsgTask {
private final IMsgLogService msgLogService;
private final RabbitTemplate rabbitTemplate;
@Scheduled(cron = "0/20 * * * * ?") public void resend() { log.info("定时任务 -> 重新投递消息 -> 开始"); List<MsgLog> msgLogList = msgLogService.selectTimeoutMsg(); msgLogList.forEach( msgLog -> { String msgId = msgLog.getMsgId(); if (msgLog.getTryCount() >= MAX_TRY_COUNT) { msgLogService.updateStatus(msgId, MSG_DELIVER_FAIL); log.info("超过最大重试次数,消息投递失败,msgId: {}", msgId); } else { msgLogService.updateTryCount(msgId, msgLog.getNextTryTime()); rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(), new CorrelationData(msgId)); } } ); log.info("任务结束 <- 重新投递消息 <- 结束"); } }
|
问题处理
消费者消费消息的时候抛出异常,由于 设置 basicNack(重回队列)为 true,消息回滚到消息队列的时候不会回到队列尾部,而是仍是在队列头部,这时消费者又立即接收到这条消息进行处理,接着抛出异常,进行回滚,造成死循环
解决方案:无论消费是否成功,都必须对 channel 进行应答,使用 nack 时设置 requeue 为 false,抛出异常后进入延迟队列进行重试机制,达到阈值时修改投递状态为消费失败,不再重试