RabbitMQ-高级特性解析RabbitMQ-消息可靠性保障-上
RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)
RabbitMQ 核心功能
前言
最近再看 RabbitMQ,看了看自己之前写的博客,诶,一言难尽,当时学的懵懵懂懂的。这里重新整理 RabbitMQ 的核心功能。
在分布式系统中,消息队列是实现异步通信、解耦服务的关键组件。RabbitMQ 作为一款功能强大的消息队列,其消息可靠性是确保系统稳定运行的重要因素。这里将深入探讨 RabbitMQ 的消息确认机制、持久化策略、发送方确认机制以及重试机制 !!
一、消息确认机制
1.1 消息确认机制概述
生产者发送消息到消费端后,可能出现消息处理成功或异常的情况。如果 RabbitMQ 在发送消息后就将其删除,当消息处理异常时,就会造成消息丢失。为了确保消费端成功接收并正确处理消息,RabbitMQ 提供了消息确认机制(message acknowledgement)。
消费者在订阅队列时,可以指定
autoAck
参数,根据该参数设置,消息确认机制分为自动确认和手动确认两种:
- 自动确认(autoAck=true) :RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正消费到了这些消息。这种模式适合对消息可靠性要求不高的场景。
- 手动确认(autoAck=false) :RabbitMQ 会等待消费者显式地调用
Basic.Ack
命令,回复确认信号后才从内存(或者磁盘)中移去消息。这种模式适合对消息可靠性要求比较高的场景。
以下是basicConsume
方法的定义:
/**
- Start a non-nolocal, non-exclusive consumer, with
- a server-generated consumerTag.
- @param queue the name of the queue
- @param autoAck true if the server should consider messages
- acknowledged once delivered; false if the server should expect
- explicit acknowledgements
- @param callback an interface to the consumer object
- @return the consumerTag generated by the server
- @throws java.io.IOException if an error is encountered
- @see com.rabbitmq.client.AMQP.Basic.Consume
- @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
- @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
1.2 手动确认方法
消费者在收到消息之后,可以选择确认、直接拒绝或者跳过,RabbitMQ 提供了三种不同的确认应答方式:
- 肯定确认:
Channel.basicAck(long deliveryTag, boolean multiple)
:RabbitMQ 已知道该消息并且成功处理消息,可以将其丢弃。deliveryTag
:消息的唯一标识,是一个单调递增的 64 位长整型值,每个通道(Channel)独立维护,所以在每个通道上都是唯一的。当消费者确认(ack)一条消息时,必须使用对应的通道进行确认。multiple
:是否批量确认。值为true
则会一次性 ack 所有小于或等于指定deliveryTag
的消息;值为false
,则只确认当前指定deliveryTag
的消息。- 否定确认:
Channel.basicReject(long deliveryTag, boolean requeue)
:消费者客户端可以调用channel.basicReject
方法来告诉 RabbitMQ 拒绝这个消息。deliveryTag
:参考channel.basicAck
。requeue
:表示拒绝后,这条消息如何处理。如果requeue
参数设置为true
,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue
参数设置为false
,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者。- 否定确认:
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
:如果想要批量拒绝消息,可以使用Basic.Nack
命令。参数介绍参考前面两个方法。
multiple
参数设置为true
则表示拒绝deliveryTag
编号之前所有未被当前消费者确认的消息。
1.3 代码示例
我们基于 Spring Boot 来演示消息的确认机制,Spring - AMQP 对消息确认机制提供了三种策略: public enum AcknowledgeMode { NONE, MANUAL, AUTO; }
1.3.1 AcknowledgeMode.NONE
- 配置确认机制 : spring: rabbitmq: addresses: amqp://study:study@110.41.51.65:15673/bite listener: simple: acknowledge-mode: none
- 发送消息 : public class Constant { public static final String ACK_EXCHANGE_NAME = “ack_exchange”; public static final String ACK_QUEUE = “ack_queue”; } @Configuration public class RabbitmqConfig { @Bean(“ackExchange”) public Exchange ackExchange(){ return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build(); } @Bean(“ackQueue”) public Queue ackQueue() { return QueueBuilder.durable(Constant.ACK_QUEUE).build(); } @Bean(“ackBinding”) public Binding ackBinding(@Qualifier(“ackExchange”) Exchange exchange, @Qualifier(“ackQueue”) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(“ack”).noargs(); } } @RestController @RequestMapping("/producer") public class ProductController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/ack") public String ack(){ rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, “ack”, “consumer ack test…”); return “发送成功!”; } }
- 消费端逻辑 :
@Component
public class AckQueueListener {
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
System.out.printf(“接收到消息: %s, deliveryTag: %d%n”, new String(message.getBody(),“UTF-8”),
message.getMessageProperties().getDeliveryTag());
//模拟处理失败
int num = 3/0;
System.out.println(“处理完成”);
}
}
运行结果:消息处理失败,但消息已从 RabbitMQ 中移除,因为
NONE
模式下消息一旦投递就会被自动确认。
1.3.2 AcknowledgeMode.AUTO
- 配置确认机制 :
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: auto
重新运行程序,当消费者出现异常时,RabbitMQ 会不断重发消息,由于异常多次重试还是失败,消息没被确认,也无法
nack,就一直是
unacked
状态,导致消息积压。
1.3.3 AcknowledgeMode.MANUAL
- 配置确认机制 : spring: rabbitmq: addresses: amqp://study:study@110.41.51.65:15673/bite listener: simple: acknowledge-mode: manual
- 消费端手动确认逻辑 : @Component public class AckQueueListener { @RabbitListener(queues = Constant.ACK_QUEUE) public void ListenerQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1. 接收消息 System.out.printf(“接收到消息: %s, deliveryTag: %d%n”, new String(message.getBody(),“UTF-8”), message.getMessageProperties().getDeliveryTag()); //2. 处理业务逻辑 System.out.println(“处理业务逻辑”); //手动设置一个异常, 来测试异常拒绝机制 // int num = 3/0; //3. 手动签收 channel.basicAck(deliveryTag, true); } catch (Exception e) { //4. 异常了就拒绝签收 //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送, 若为false, 则直接丢弃 channel.basicNack(deliveryTag, true, true); } } } 运行结果:消息正常处理时会被签收;异常时会不断重试。
二、持久性
2.1 交换机持久化
交换器的持久化是通过在声明交换机时将durable
参数置为true
实现的。这样当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ
时不需要重新去建立交换机,交换机会自动建立。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失。
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
2.2 队列持久化
队列的持久化是通过在声明队列时将durable
参数置为true
实现的。如果队列不设置持久化,那么在 RabbitMQ
服务重启之后,该队列就会被删掉,数据也会丢失。但队列持久化不能保证内部所存储的消息不丢失,要确保消息不丢失,需要将消息设置为持久化。
QueueBuilder.durable(Constant.ACK_QUEUE).build();
创建非持久化队列:
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3 消息持久化
消息实现持久化,需要把消息的投递模式(MessageProperties
中的deliveryMode
)设置为
2,也就是MessageDeliveryMode.PERSISTENT
。
// 要发送的消息内容
String message = “This is a persistent message”;
// 创建一个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使用RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, “ack”, messageObject);
需要注意的是,将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,因为写入磁盘的速度比写入内存的速度慢很多。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。
即使将交换器、队列、消息都设置了持久化,也不能百分之百保证数据不丢失。例如,消费者订阅队列时
autoAck
参数设置为true
,消费者接收到消息后还没来得及处理就宕机,会导致数据丢失;持久化的消息存入 RabbitMQ 后,还需要一段时间才能存入磁盘,如果在这段时间内 RabbitMQ 服务节点发生异常,消息可能会丢失。可以通过引入 RabbitMQ 的仲裁队列或在发送端引入事务机制、发送方确认机制来提高可靠性。(后续都会讲到)
三、发送方确认
3.1 confirm 确认模式
Producer
在发送消息时,对发送端设置一个ConfirmCallback
的监听,无论消息是否到达Exchange
,这个监听都会被执行。如果Exchange
成功收到消息,ACK
为true
;如果没收到消息,ACK
为false
。
配置 RabbitMQ :
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: manual #消息接收确认
publisher-confirm-type: correlated #消息发送确认
设置确认回调逻辑并发送消息 :
@Configuration
public class RabbitTemplateConfig {
@Bean(“confirmRabbitTemplate”)
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.printf(“消息接收成功, id:%s \n”, correlationData.getId());
}else {
System.out.printf(“消息接收失败, id:%s, cause: %s”, correlationData.getId(), cause);
}
}
});
return rabbitTemplate;
}
}
@RestController
@RequestMapping("/product")
public class ProductController {
@Resource(name = “confirmRabbitTemplate”)
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() throws InterruptedException {
CorrelationData correlationData1 = new CorrelationData(“1”);
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, “confirm”, “confirm test…”, correlationData1);
return “确认成功”;
}
}
- 测试 :
运行程序,调用接口
http://127.0.0.1:8080/product/confirm
,观察控制台,消息确认成功。修改交换机名称,重新运行,会触发消息发送失败的结果。
3.2 return 退回模式
消息到达Exchange
之后,会根据路由规则匹配,把消息放入Queue
中。如果一条消息无法被任何队列消费,可以选择把消息退还给发送者。
配置 RabbitMQ :
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: manual #消息接收确认
publisher-confirm-type: correlated #消息发送确认
- 设置返回回调逻辑并发送消息 :
@Configuration
public class RabbitTemplateConfig {
@Bean(“confirmRabbitTemplate”)
public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.printf(“消息被退回: %s”, returned);
}
});
return rabbitTemplate;
}
}
@RestController
@RequestMapping("/product")
public class ProductController {
@Resource(name = “confirmRabbitTemplate”)
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/msgReturn")
public String msgReturn(){
CorrelationData correlationData = new CorrelationData(“2”);
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, “confirm11”, “message return test…”, correlationData);
return “消息发送成功”;
}
}
测试 :
运行程序,调用接口
http://127.0.0.1:8080/product/msgReturn
,观察控制台,消息被退回。
四、重试机制
在消息传递过程中,可能会遇到各种问题,如网络故障、服务不可用、资源不足等,这些问题可能导致消息处理失败。为了解决这些问题,RabbitMQ 提供了重试机制,允许消息在处理失败后重新发送。但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数。
4.1 重试配置
spring: rabbitmq: addresses: amqp://study:study@110.41.51.65:15673/bite listener: simple: acknowledge-mode: auto #消息接收确认 retry: enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时长为5秒 max-attempts: 5 # 最大重试次数(包括自身消费的一次)
4.2 配置交换机 & 队列
//重试机制 public static final String RETRY_QUEUE = “retry_queue”; public static final String RETRY_EXCHANGE_NAME = “retry_exchange”; //重试机制 发布订阅模式 //1. 交换机 @Bean(“retryExchange”) public Exchange retryExchange() { return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build(); } //2. 队列 @Bean(“retryQueue”) public Queue retryQueue() { return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
五:如何保证 RabbitMQ 消息的可靠传输?
消息可能丢失的场景以及解决方案如下:
生产者将消息发送到 RabbitMQ 失败 :
- 可能原因是网络问题等,
- 解决办法是参考
发送方确认 **- confirm确认模式**
。消息在交换机中无法路由到指定队列 :
- 可能原因是代码或者配置层面错误,导致消息路由失败,
- 解决办法是参考
发送方确认**- return模式**
。消息队列自身数据丢失 :
- 可能原因是消息到达 RabbitMQ 之后,RabbitMQ Server 宕机导致消息丢失,
- 解决办法是参考持久化 ,开启 RabbitMQ 持久化 ,也可以通过集群的方式提高可靠性。
消费者异常,导致消息丢失 :
- 可能原因是消息到达消费者,还没来得及消费,消费者宕机或消费者逻辑有问题,
- 解决办法是参考**
消息确认
,开启手动确认,配置重试机制**。
以上就是四个RabbitMQ保证消息可靠性的四个机制,后续有更多核心机制的更新,感谢阅览!!