RabbitMQ消息可靠性投递

源码(已改用延迟队列插件):https://github.com/chenkaixin12121/rabbit-reliability

介绍

image.png

消息可靠性投递步骤

如何保证 RabbitMQ 可靠性投递,本文主要分为以下几个步骤进行:

  1. 生成数据库消息日志,状态为投递中,再把消息放入队列
  2. 根据 confirm(ConfirmCallback 和 ReturnCallback)的结果来确定消息是否投递成功,投递成功的,修改投递状态为发送成功,投递失败的消息由定时任务定期扫描并重新投递
  3. 定时任务重新投递发送失败的消息,如果投递次数达过阈值后还是发送失败,设置投递状态为发送失败,由人工进行检查
  4. 消费者取到消息后,从消息中取出唯一标识,先判断此消息有没有被消费过,若已消费过,则直接 ack(避免重复消费)
  5. 正常处理成功后,修改投递状态为消费成功,并 ack
  6. 遇到异常时,捕获异常,验证消息重试次数是否达到阈值,超过则修改投递状态为消费失败,由人工进行处理
消息投递的顺序
  1. Producer -> RabbitMQ Broker Cluster -> Exchange -> Queue -> Consumer
  2. Message 从 Producer 到 RabbitMQ Broker Cluster 会返回一个 ConfirmCallback
  3. Message 从 Exchange 到 Queue 投递失败则会返回一个 ReturnCallback,我们使用这两个 Callback 来控制消息的最终一致性和部分纠错能力
死信队列

没有被及时消费的消息存放的队列被称为死信队列,消息没有被及时消费有以下几点原因:

  1. 消息被拒绝(basic.reject / basic.nack)并且不再重新投递(requeue = false)
  2. TTL(Time-To-Live)消息超时未消费
  3. 达到最大队列长度

使用死信队列实现延迟消息:将正常的业务队列或者是需要延迟处理的队列添加 x-message-ttl 和 x-dead-letter-exchange 等参数,这样当触发死信的条件后,就会转发到死信交换器,就由监听 dlx.queue 死信队列的程序进行相应业务处理了

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
spring:
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制,要和 mandatory 一起使用
publisher-returns: true
template:
mandatory: true
listener:
simple:
# 设置消费端需要手动 ack,默认为 auto
acknowledge-mode: manual
# 一次处理的消息数量
prefetch: 2
# 消费端最大并发数
max-concurrency: 10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
@Slf4j
@Configuration
public class RabbitConfig {

public static final Integer MSG_TIMEOUT = 1; // 消息超时时间
public static final Integer MAX_TRY_COUNT = 3; // 最大重试次数
public static final String ORDER_MSG_LOG = "order_msg_log";

public static final String MAIL_QUEUE_NAME = "ckx.mail.queue";
public static final String MAIL_EXCHANGE_NAME = "ckx.mail.exchange";
public static final String MAIL_ROUTING_KEY_NAME = "ckx.mail.routing.key";

public static final String RETRY_QUEUE_NAME = "ckx.retry.queue";
public static final String RETRY_EXCHANGE_NAME = "ckx.retry.exchange";
public static final String RETRY_ROUTING_KEY_NAME = "ckx.retry.routing.key";
public static final Integer RETRY_TTL = 5000; // 在延迟队列的时间

public static final Integer MSG_DELIVER_SUCCESS = 1; // 发送成功
public static final Integer MSG_DELIVER_FAIL = 2; // 发送失败
public static final Integer MSG_CONSUMED_SUCCESS = 3; // 消费成功
public static final Integer MSG_CONSUMED_FAIL = 4; // 消费失败

private final CachingConnectionFactory cachingConnectionFactory;

private final IMsgLogService msgLogService;

public RabbitConfig(CachingConnectionFactory cachingConnectionFactory, IMsgLogService msgLogService) {
this.cachingConnectionFactory = cachingConnectionFactory;
this.msgLogService = msgLogService;
}

@Bean
public RabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setMessageConverter(converter());

/**
* correlationData 唯一标识
* ack 消息是否到达
* cause 失败的异常消息
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String msgId = Objects.requireNonNull(correlationData).getId();
if (ack) {
log.info("消息成功发送到Exchange, msgId: {}", msgId);
msgLogService.updateStatus(msgId, MSG_DELIVER_SUCCESS);
} else {
log.info("消息发送到Exchange失败, correlationData : {}, cause: {}", correlationData, cause);
}
});

rabbitTemplate.setReturnsCallback(
returnedMessage ->
log.info("消息从Exchange路由到Queue失败: exchange: {}, routingKey: {}, replyCode: {}, replyText: {}, message: {}",
returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),
returnedMessage.getReplyText(), returnedMessage.getMessage()));

return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}

@Bean
public DirectExchange mailExchange() {
return ExchangeBuilder
.directExchange(MAIL_EXCHANGE_NAME)
.durable(true)
.build();
}

@Bean
public DirectExchange retryExchange() {
return ExchangeBuilder
.directExchange(RETRY_EXCHANGE_NAME)
.durable(true)
.build();
}

@Bean
public Queue mailQueue() {
return new Queue(MAIL_QUEUE_NAME, true);
}

/**
* 经过延迟时间后,将该消息重新投递到对应的 Exchange 中
*/
@Bean
public Queue retryQueue() {
return QueueBuilder
.durable(RETRY_QUEUE_NAME)
.withArgument("x-dead-letter-routing-key", MAIL_ROUTING_KEY_NAME)
.withArgument("x-dead-letter-exchange", MAIL_EXCHANGE_NAME)
.withArgument("x-message-ttl", RETRY_TTL)
.build();
}

@Bean
public Binding mailBinding() {
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
}

@Bean
public Binding retryBinding() {
return BindingBuilder.bind(retryQueue()).to(retryExchange()).with(RETRY_ROUTING_KEY_NAME);
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
public class MailProducer {

private final MsgLogMapper msgLogMapper;

private final RabbitTemplate rabbitTemplate;

public MailProducer(MsgLogMapper msgLogMapper, RabbitTemplate rabbitTemplate) {
this.msgLogMapper = msgLogMapper;
this.rabbitTemplate = rabbitTemplate;
}

public void send(Mail mail) {

String msgId = UUID.randomUUID().toString().replace("-", "");
String msg = JsonUtil.toJson(mail);
MsgLog msgLog = MsgLog.builder()
.msgId(msgId)
.msg(msg)
.exchange(MAIL_EXCHANGE_NAME)
.routingKey(MAIL_ROUTING_KEY_NAME)
.nextTryTime(LocalDateTime.now().plusMinutes(MSG_TIMEOUT))
.createTime(LocalDateTime.now())
.build();
msgLogMapper.insert(msgLog); // 消息入库

Message message = MessageBuilder.withBody(Objects.requireNonNull(msg).getBytes()).build();
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); // JSON
rabbitTemplate.convertAndSend(MAIL_EXCHANGE_NAME, MAIL_ROUTING_KEY_NAME, message, new CorrelationData(msgId)); // 发送消息
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
@Component
public class MailConsumer {

private final StringRedisTemplate redisTemplate;

private final MailService mailService;

private final IMsgLogService msgLogService;

private final RabbitTemplate rabbitTemplate;

public MailConsumer(StringRedisTemplate redisTemplate, MailService mailService, IMsgLogService msgLogService, RabbitTemplate rabbitTemplate) {
this.redisTemplate = redisTemplate;
this.mailService = mailService;
this.msgLogService = msgLogService;
this.rabbitTemplate = rabbitTemplate;
}

@RabbitListener(queues = MAIL_QUEUE_NAME)
public void handler(Message message, Channel channel) throws IOException {
Mail mail = JsonUtil.fromJson(new String(message.getBody()), new TypeReference<Mail>() {
});
MessageProperties messageProperties = message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
long tag = messageProperties.getDeliveryTag();
String msgId = (String) headers.get("spring_returned_message_correlation");
long retryCount = 0;
if (headers.containsKey("x-death")) {
List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
retryCount = (long) deathList.get(0).get("count");
}

if (redisTemplate.opsForSet().isMember(ORDER_MSG_LOG, msgId)) {
// redis中包含该key,说明此消息已经被消费过
log.info("消息已经被消费, msgId: {}", msgId);
// 确认消息已消费
channel.basicAck(tag, false);
return;
}

try {
// 发送邮件
mailService.send(mail);
msgLogService.updateStatus(msgId, MSG_CONSUMED_SUCCESS);
redisTemplate.opsForSet().add(ORDER_MSG_LOG, msgId);
channel.basicAck(tag, false);
log.info("发送邮件成功,msgId: {}", msgId);
} catch (Exception e) {
channel.basicNack(tag, false, false);
if (retryCount < MAX_TRY_COUNT) {
// 发送到延迟队列
rabbitTemplate.send(RETRY_EXCHANGE_NAME, RETRY_ROUTING_KEY_NAME, message, new CorrelationData(msgId));
log.info("发送邮件失败,进入第{}次重试,msgId: {}", retryCount + 1, msgId);
} else {
msgLogService.updateStatus(msgId, MSG_CONSUMED_FAIL);
log.info("发送邮件失败,达到最大重试次数,msgId: {}", msgId);
}
}
}
}
如何保证消息的幂等性

当消费者消费完消息时,在给生产端返回 ack 时由于网络中断,导致生产端未收到确认信息,该条消息会重新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题

解决方案:

  1. 使用全局唯一 ID,只要消费过该消息就存储过 Redis 中,开始消费前,先去 Redis 中查询有没消费记录即可
  2. 利用 Redis 的原子性去实现,可以使用 setNX 命令,执行成功就表示没有处理过这条消息,可以进行消费
手动应答机制

手动 ack 机制下,只要没有明确告诉 RabbitMQ 消息被消费,没有 ack,消息就一直是 unacked 状态,即使 consumer 宕机,消息不会丢失,会变为 ready 状态,下次一有新的 consumer 连接进来就会发给它

  • basicAck:确认一个或多个接收的消息
  • basicNack:拒绝一个或多个接收的消息,可以设置 requeue 是否重回队列
  • basicReject:拒绝一个消息,其他同上

失败消息重新投递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@RequiredArgsConstructor
@Slf4j
@Component
public class ResendMsgTask {

private final IMsgLogService msgLogService;

private final RabbitTemplate rabbitTemplate;

@Scheduled(cron = "0/20 * * * * ?")
public void resend() {
log.info("定时任务 -> 重新投递消息 -> 开始");

List<MsgLog> msgLogList = msgLogService.selectTimeoutMsg();
msgLogList.forEach(
msgLog -> {
String msgId = msgLog.getMsgId();
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
// 设置该条消息发送失败
msgLogService.updateStatus(msgId, MSG_DELIVER_FAIL);
log.info("超过最大重试次数,消息投递失败,msgId: {}", msgId);
} else {
msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(), new CorrelationData(msgId));
}
}
);

log.info("任务结束 <- 重新投递消息 <- 结束");
}
}

问题处理

消费者消费消息的时候抛出异常,由于 设置 basicNack(重回队列)为 true,消息回滚到消息队列的时候不会回到队列尾部,而是仍是在队列头部,这时消费者又立即接收到这条消息进行处理,接着抛出异常,进行回滚,造成死循环

解决方案:无论消费是否成功,都必须对 channel 进行应答,使用 nack 时设置 requeue 为 false,抛出异常后进入延迟队列进行重试机制,达到阈值时修改投递状态为消费失败,不再重试