RabbitMQ-的工作模式
RabbitMQ 的工作模式
目录
RabbitMQ 共提供了 7 种工作模式进行消息传递:
在本篇文章中,我们就来学习 RabbitMQ 的工作模式 ,我们首先来了解这 7 种工作模式分别是怎样的
工作模式
Simple(简单模式)
P 表示 生产者 ,是消息的发送方
C 表示 消费者 ,是消息的接收者
Queue :表示 消息队列 ,用于 缓存消息 ,生产者生产的消息发送到队列中,消费者从队列中取出消息
简单模式下,只有 一个生产者和一个消费者 ,生产者生产的消息存储到队列中后,都由这个消费者消费
特点:一个生产者 P,一个消费者 C,消息只能被消费一次 ,也称为 **点对点(Point-to-Point)**模式
适用场景 :消息只能被单个消费者处理
在 RabbitMQ 入门中的入门代码的工作模式就是简单模式
Work Queue(工作队列)
此时有 一个生产者和多个消费者
当生产者向队列中发送多条消息后,Work Queue 会将消息 分配给不同的消费者 ,每个消费者接收到的消息不同,由 多个消费者共同消费生产者生产的消息
例如:
由 A (生产者)发送不同消息,消息存储到 RabbitMQ 中,接着,由 B(消费者1) 和 C(消费者2) 共同消息A 发送的消息,此时,RabbitMQ 选择将第一条消息分配给 B,B 消费第一条消息,RabbitMQ 将第二条消息分配给 C,C 消费第二条消息…
B 和 C 接收到的消息是不同的,这两个消费者共同消费 A 发送的所有消息
特点 :消息不会重复,分配给不同的消费者
适用场景 :集群环境中实现异步处理
Publish/Subscribe(发布/订阅)
其中,X 表示的是 交换机 ,在 发布/订阅 模式中,多了 Exchange 角色,因此,我们先来学习交换机相关知识
Exchange(交换机)
Exchange(交换机) 的作用是 接收生产者发送的消息 ,并将消息按照一定的规则 路由到一个或多个队列中
生产者的消息都会先发送到交换机 ,然后再由交换机将消息路由到队列中
在前面 简单模式 和 工作队列模式 下,图中都没有出现交换机,但实际上, 生产者生产的消息都是先发送到交换机 ,然后再路由到队列中的。在前两种模式下,直接使用 RabbitMQ 提供的内置交换机就可以实现,因此,并没有突出交换机的存在,但实际上 生产者生产的消息不会直接投递到队列中
在 RabbitMQ 中,交换机有 4 种类型: Fanout 、 Direct 、 Topic 和 Headers ,不同的类型有着不同的路由策略
而 AMQP 协议中还有两种类型, System 和 自定义 ,在这里,我们并不重点关注
Fanout :广播,将消息交给所有绑定到交换机的队列( Publish / Subscribe 模式 )
Direct :定向,将消息交给符合指定 routing key 的队列( Routing 模式 )
Topic :通配符,将消息交给符合 routing patterm (路由模式)的队列( Topics 模式 )
Headers :Headers 类型的交换机通过 消息头部的属性 来路由消息,而 不依赖路由键的匹配规则来路由消息 。根据发送的消息内容中的 headers 属性进行匹配,headers 类型的交换机性能会较差,因此也不太实用,基本上也不会进行使用
Exchange(交换机) 只负责转发消息,并 不具备存储消息的能力 ,因此,若是没有任何队列与 Exchange 绑定,或是没有符合路由规则的队列,消息就会丢失
接下来,我们来看 RoutingKey 和 BindingKey
RoutingKey :路由键,当生产者将消息发送给交换机时,会指定一个字符串,用于 告诉交换机如何处理这个消息
BindingKey :绑定,RabbitMQ 中通过 Binding(绑定) 将交换机与队列关联起来 ,在绑定时会指定一个 Binding Key,这样 RabbitMQ 就知道 如何正确地将消息路由到队列 了
即, 绑定时,需要的路由键是 BindingKey;发送消息时,需要的路由键是 RoutingKey
例如:
使用 BindingKey1 将交换机与 队列1 进行绑定,使用 BindingKey2 将交换机与 队列2 进行绑定
若在发送消息时,若设置 Routing Key 设置为 BindingKey1 ,交换机就会将消息路由到 队列1
即, 当消息的 RoutingKey 与队列绑定的 BindingKey 相匹配时,消息才会被路由到这个队列中
其实, BindingKey 也属于路由键的一种 ,即, 在绑定时使用的路由键 ,有时,也会使用 RoutingKey 表示 BindingKey,即使用 RoutingKey 表示 BindingKey 和 RoutingKey,因此,我们需要根据其使用场景进行区分
在了解了相关概念之后,我们继续看 Publish/ Subscribe 模式
上述有一个生产者 P,多个消费者 C1、C2,X 表示交换机,交换机 将消息复制多份 , 每个消费者接收到相同的消息
也就是说,生产者发送一条消息,经过交换机转发到不同的队列,不同的消费者从不同的队列中取出消息进行消费
特点 :不同的消费者接收到的消息是相同的
适用场景 :消息需要被多个消费者同时接收,如:实时通信或广播消息
Publish/Subscribe(发布/订阅)模式 与Work Queue(工作队列)模式 最大的区别就是: 发布/订阅 模式下, 不同消费者接收到的消息是相同的 ;而 工作队列 模式下, 不同消费者接收到的消息是不同的
Routing(路由模式)
路由模式可以看做是 发布订阅模式 的变种,其在发布订阅模式的基础上,增加了 路由 key
发布订阅模式会无条件的将所有消息发送给所有消费者 ,而路由模式下,交换机会 根据 RoutingKey 的规则 ,将数据筛选后发送给对应的消费者队列
也就是说, 只有满足条件的队列才会收到消息
如上图所示,Q1 通过 a 与交换机进行绑定,Q2 通过 a、b 和 c 与交换机进行绑定
当 P (生产者)在发送消息时,若设置 Routing Key 设置为 a,则此时 Q1 和 Q2 的 BindingKey 都与其相匹配,消息就会被路由到 Q1 和 Q2 中
而当 P 发送消息时,设置 Routing Key 设置为 b,此时,只有 Q2 的 BindingKey 与其相匹配,消息也就只会被路由到 Q2 中
适用场景 :需要根据特定规则分发消息
Topics(通配符模式)
通配符模式 ,则是 路由模式 的变种,在 RoutingKey 的基础上,增加了 通配符 的功能,使得匹配更加灵活
Topics 和 Routing 的基本原理相同,即:生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列
而不同的是, Routing 模式下, 需要RoutingKey 和 BingingKey 完全匹配 ;而 Topics 模式下,则是 通配符匹配
在 BindingKey 中,存在两种特殊的字符串,用于 模糊匹配
:表示能够匹配任意一个单词
# :表示能够匹配任意多个单词(可以为 0 个)
Q1 通过 .a. 与交换机进行绑定,Q2 通过 ..b 和 c.# 与交换机进行绑定
当 P (生产者)在发送消息时,若设置 Routing Key 设置为 work.a.b,则此时 Q1 和 Q2 的BindingKey 都能够与其相匹配,消息就会被路由到 Q1 和 Q2 中
而当 P 发送消息时,设置Routing Key 设置为 a.a.a,此时,只有 Q1的 BindingKey 与其相匹配,消息也就只会被路由到 Q1中
适用场景 :需要灵活匹配和过滤消息的场景
RPC(RPC通信)
在 RPC 通信过程中, 没有生产者和消费者 ,而是 通过两个队列实现了一个可回调的过程
例如:
客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个 回调队列 (amq.gen-Xa2…),这个回调队列 用于接收服务端的响应消息
服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2…)
客户端(Client)在回调队列上等待响应消息,一旦接收到响应, 客户端就会检查消息的 correlation_id 属性,确保其是所期望的响应
简而言之,客户端将请求消息发送到 队列Q1 中,服务器从 Q1 中取出请求消息进行处理,然后将响应消息发送到 队列Q2 中,客户端从 Q2 中读取响应消息
从而实现了 客户端向服务器发送请求,服务器返回对应的响应 的功能
Publisher Confirms(发布确认)
Publisher Confirms 模式是 RabbitMQ 提供的一种 确保消息可靠发送到 RabbitMQ 服务器 的机制,在这种模式下,生产者可以 等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理
其过程为:
(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个 唯一的 ID ,生产者可以将这些 序列号与消息关联 起来,以便追踪消息的状态
(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会 异步 地向生产者发送一个 确认(ACK) ,其中包含消息的唯一 ID,表示消息已经送达
通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失
**适用场景:**对数据安全性要求较高,如金融交易,订单处理等
在基本了解了 RabbitMQ 的 7 种工作模式后,我们就来通过代码简单实现一下这 7 种工作模式
代码实现
Simple(简单模式)
在 简单模式 下,只有 一个生产者和一个消费者 ,生产者生产的消息存储到队列后,都由这个消费者消费
在 中的入门代码的工作模式就是简单模式,因此,在这里就不再进行过多解释了
首先引入依赖:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
生产者代码
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("49.232.238.62"); // ip 的默认值为 localhost
factory.setPort(5672); // 默认值为 5672
factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /
// 账号
factory.setUsername("admin"); // 用户名,默认为 guest
factory.setPassword("123456"); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare("simple.test", true, false, false, null);
// 6. 通过 channel 发送消息到队列中
String message = "test...";
channel.basicPublish("", "simple.test", null, message.getBytes());
System.out.println("消息:" + message + " 发送成功");
// 7. 释放资源
channel.close();
connection.close();
}
}
消费者代码
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("49.232.238.62"); // ip 的默认值为 localhost
factory.setPort(5672); // 默认值为 5672
factory.setVirtualHost("test01"); // 虚拟主机,默认值为 /
// 账号
factory.setUsername("admin"); // 用户名,默认为 guest
factory.setPassword("123456"); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare("simple.test", true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功接收到消息: " + new String(body));
}
};
channel.basicConsume("simple.test", true, consumer);
// 7. 释放资源
channel.close();
connection.close();
}
}
Work Queues(工作队列)
生产者代码
工作队列模式下,由 一个生产者生产消息,多个消费者共同接收消息 ,消费者之间是竞争关系,每个消息只能被一个消费者接收
由于我们每次连接时都要使用 IP、端口号、虚拟主机名等,因此,我们可以将它们提取出来,放到 Constants 类中:
public class Constants {
public static final String HOST = "49.232.238.62";
public static final int PORT = 5672;
public static final String VIRTUAL_HOST = "test01";
public static final String USER_NAME = "admin";
public static final String USER_PASSWORD = "123456";
}
声明 工作队列 模式下使用的队列:
// 工作模式
public static final String WORK_QUEUE = "work.queue";
接下来,我们就来实现生产者的代码:
工作队列模式与简单模式的区别在于工作模式下有多个消费者,因此生产者的消费代码与简单模式下差别不大,但在发送消息时,我们一次发送 20 条消息:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// 6. 通过 channel 发送消息到队列中
for (int i = 0; i < 20; i++) {
String message = "work test... " + i;
channel.basicPublish("", Constants.WORK_QUEUE, null, message.getBytes());
}
System.out.println("消息发送成功!");
// 7. 释放资源
channel.close();
connection.close();
}
}
运行代码,可以看到 work.queue 队列被创建,且存储了 20 条消息
接下来,我们继续编写消费者代码
消费者代码
消费者的代码也与简单模式下的代码差别不大,但在最后,我们并不进行资源的释放:
Consumer1:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
}
}
Consumer2:
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
}
}
启动 Consumer1 和 Consumer2:
由于我们之前先启动了生产者,此时再启动消费者,由于消息较少,因此,先启动的 Consumer1 会瞬间将 20 条消息消费掉
因此,再次启动 Producer,观察结果:
可以看到两个消费者分别消费了 10 条消息
Publish/Subscribe(发布/订阅)
生产者代码
在 发布/订阅 模式中,多了 Exchange 角色
Exchange 常见有三种类型,分别代表不同的路由规则:
Fanout :广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)
Direct :定向,将消息交给符合指定 RoutingKey 的队列(Routing 模式)
Topics :通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)
此时,在 发布/订阅 模式下,我们就需要 声明交换机 ,并 绑定队列和交换机
我们首先来看 声明交换机 :
使用 channel.exchangeDeclare 方法来创建交换机,我们来看 exchangeDeclare 方法:
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
参数:
exchange :交换机名称
type :交换机类型
durable :是否持久化,当为 true 时,会将交换机存盘,在服务器重启时不会丢失相关信息
autoDelete :是否自动删除,自动删除的前提是至少有一个队列或交换机与这个交换机绑定,之后与这个交换机绑定的队列或交换机都会与此解绑
internal :是否为内部使用,若设置为 true,则表示内部使用,客户端无法直接发送消息到这个交换机中,只能通过交换机路由到交换机这种方式
arguments :相关参数
其中, type 表示 交换机类型 ,其类型为 BuiltinExchangeType ,也可以为 String :
我们来看 BuiltinExchangeType ,它是一个 枚举类型 :
DIRECT(“direct”) :定向,直连,将消息交给符合指定 RoutingKey 的队列(Routing 模式)
FANOUT(“fanout”) :扇形,广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)
TOPIC(“topic”) :通配符,将消息交给符合 Routing Pattern(路由模式)的队列(Topics 模式)
**HEADERS(“headers”):**参数模式(较少使用)
返回值:
Exchange.DeclareOk:声明确认方法,用于指示已成功声明交换
在 Constants 类中定义 发布/订阅 模式下使用的交换机和两个队列:
// 广播模式
public static final String PUBLISH_CHANGE = "fanout";
public static final String PUBLISH_QUEUE_1 = "publish.queue.1";
public static final String PUBLISH_QUEUE_2 = "publish.queue.2";
建立连接,并声明交换机和两个队列:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明交换机
channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
// 6. 声明队列
channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);
}
}
交换机的类型为 BuiltinExchangeType.FANOUT 广播模式
接着,我们使用 channel.queueBind 方法将队列和交换机进行绑定:
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
queue :要绑定的队列名称
exchange :要绑定的交换机名称
routingKey :路由 key,路由规则
arguments :相关参数
在这里的 routingKey ,其实就是 BindingKey , 将交换机与队列关联起来 ,从而让 RabbitMQ 知道如何正确地将消息路由到队列
在发布/订阅模式下,交换机类型为 fanout , routingKey 设置为 “” ,表示 每个消费者都可以收到全部信息
// 7. 绑定交换机和队列
channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);
channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);
接下来,就可以发送消息了:
// 8. 发送消息
for (int i = 0; i < 20; i++) {
String message = "work test... " + i;
channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());
}
System.out.println("消息发送成功!");
// 9. 释放资源
channel.close();
connection.close();
完整代码:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明交换机
channel.exchangeDeclare(Constants.PUBLISH_CHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
// 6. 声明队列
channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);
// 7. 绑定交换机和队列
channel.queueBind(Constants.PUBLISH_QUEUE_1, Constants.PUBLISH_CHANGE, "", null);
channel.queueBind(Constants.PUBLISH_QUEUE_2, Constants.PUBLISH_CHANGE, "", null);
// 8. 发送消息
for (int i = 0; i < 20; i++) {
String message = "work test... " + i;
channel.basicPublish(Constants.PUBLISH_CHANGE, "", null, message.getBytes());
}
System.out.println("消息发送成功!");
// 9. 释放资源
channel.close();
connection.close();
}
}
运行代码,并观察结果:
可以看到,publish.queue.1 和 publish.queue.2 中都已经存储了 20 条消息
查看 fanout 的绑定关系:
成功绑定publish.queue.1 和 publish.queue.2:
接下来,我们继续编写消费者代码
消费者代码
交换机和队列的绑定关系已经在生产者中实现了,因此,消费者代码中可以不必再写
其实现与 工作队列模式 下是基本相同的,只需要修改读取的队列即可
Consumer1:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.PUBLISH_QUEUE_1, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.PUBLISH_QUEUE_1, true, consumer);
}
}
Consumer2:
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.PUBLISH_QUEUE_2, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.PUBLISH_QUEUE_2, true, consumer);
}
}
运行 Consumer1 和 Consumer2:
Consumer1 和 Consumer2 都接收到了这 20 条消息
Routing(路由模式)
生产者代码
Routing 模式下,队列与交换机之间的绑定,不再是任意的绑定了,而是需要指定一个 BindingKey
生产者在向 交换机 发送消息时,也需要指定消息的 RoutingKey
交换机不会将消息发送给每一个绑定的 key,而是会根据消息的 RoutingKey 进行判断,只有 队列绑定时的 BindingKey 和发送消息的 RoutingKey 完全一致时 ,才会接收消息
先在 Constants 类中定义 路由模式下使用的交换机和队列:
// 路由模式
public static final String ROUTING_CHANGE = "routing";
public static final String ROUTINT_QUEUE_1 = "routing.queue.1";
public static final String ROUTINT_QUEUE_2 = "routing.queue.2";
路由模式下,生产者的代码与 发布/订阅模式 下的区别在于: 交换机的类型不同 以及 绑定队列的 BindKey 不同
(1)交换机类型不同
在声明交换机时,交换机的类型为 BuiltinExchangeType.DIRECT
// 5. 声明交换机
channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
(2)声明队列
// 6. 声明队列
channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);
(3)交换机与队列的绑定方式不同
// 7. 绑定交换机和队列
channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);
channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);
channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);
channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);
此时,我们就可以发送消息了
在发送消息时,需要指定 RoutingKey :
// 8. 发送消息
String messageA = "test a...";
channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());
String messageB = "test b...";
channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());
String messageC = "test c... ";
channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());
System.out.println("消息发送成功!");
完整代码:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明交换机
channel.exchangeDeclare(Constants.ROUTING_CHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
// 6. 声明队列
channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);
// 7. 绑定交换机和队列
channel.queueBind(Constants.ROUTINT_QUEUE_1, Constants.ROUTING_CHANGE, "a", null);
channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "a", null);
channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "b", null);
channel.queueBind(Constants.ROUTINT_QUEUE_2, Constants.ROUTING_CHANGE, "c", null);
// 8. 发送消息
String messageA = "test a...";
channel.basicPublish(Constants.ROUTING_CHANGE, "a", null, messageA.getBytes());
String messageB = "test b...";
channel.basicPublish(Constants.ROUTING_CHANGE, "b", null, messageB.getBytes());
String messageC = "test c... ";
channel.basicPublish(Constants.ROUTING_CHANGE, "c", null, messageB.getBytes());
System.out.println("消息发送成功!");
// 9. 释放资源
channel.close();
connection.close();
}
}
运行代码,并观察结果:
routing.queue.1 中有 1 条消息,routing.queue.2 中有两条消息
查看 routing 交换机与 队列的绑定关系:
接下来,我们继续编写消费者的代码
消费者代码
消费者代码与 发布/订阅 模式下基本相同,只需要修改队列名称即可:
Consumer1:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.ROUTINT_QUEUE_1, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.ROUTINT_QUEUE_1, true, consumer);
}
}
Consumer2:
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.ROUTINT_QUEUE_2, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.ROUTINT_QUEUE_2, true, consumer);
}
}
运行结果:
Topics(通配符模式)
生产者代码
相比于 routing 模式,topics 类型的交换机在匹配规则上进行了扩展,BindingKey 支持 通配符匹配
其中,RoutingKey 是一系列由 . 分割的单词,如 user.name、work.abc等
BindingKey 也和 RoutingKey 一样,由 . 分割的字符串
在 BindingKey 中可以存在两种特殊的字符串,用于模糊匹配:
:表示能够匹配任意一个单词
# :表示能够匹配任意多个单词(可以为 0 个)
例如:
交换机 与 队列1(Q1)的 BindingKey 为 .a.
交换机 与 队列2(Q2)的 BindingKey 为 ..b
交换机 与 队列2(Q2)的 BindingKey 为 c.#
则:
若生产者的 RoutingKey 为 work.a.b,则消息会被路由到 Q1 和 Q2
若生产者的 RoutingKey 为 a.a.a,则消息会被路由到 Q1
若生产者的 RoutingKey 为 c.work.a,则消息会被路由到 Q2
若生产者的 RoutingKey 为 b.c.g,则消息会被丢弃,或是返回给生产者(需要设置 mandatory 参数)
接下来,我们就来实现 通配符模式下 的生产者:
先在 Constants 类中定义通配符模式下使用的交换机和队列:
// 通配符模式
public static final String TOPICS_CHANGE = "topics";
public static final String TOPICS_QUEUE_1 = "topics.queue.1";
public static final String TOPICS_QUEUE_2 = "topics.queue.2";
与 路由模式相比,发布订阅模式与其区别为: 交换机类型不同 以及 绑定队列的 RoutingKey 不同
(1)交换机类型不同
交换机的类型为 BuiltinExchangeType.TOPIC
// 5. 声明交换机
channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
(2)声明队列
// 6. 声明队列
channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);
(3)交换机与队列的绑定方式不同
// 7. 绑定交换机和队列
channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);
channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);
channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);
此时,我们就可以发送消息了
在发送消息时,需要指定 RoutingKey :
// 8. 发送消息
String message1 = "test work.a.b";
channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());
String message2 = "test a.a.a";
channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());
String message3 = "test c.work.a";
channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());
System.out.println("消息发送成功!");
完整代码:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明交换机
channel.exchangeDeclare(Constants.TOPICS_CHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
// 6. 声明队列
channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);
// 7. 绑定交换机和队列
channel.queueBind(Constants.TOPICS_QUEUE_1, Constants.TOPICS_CHANGE, "*.a.*", null);
channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "*.*.b", null);
channel.queueBind(Constants.TOPICS_QUEUE_2, Constants.TOPICS_CHANGE, "c.#", null);
// 8. 发送消息
String message1 = "test work.a.b";
channel.basicPublish(Constants.TOPICS_CHANGE, "work.a.b", null, message1.getBytes());
String message2 = "test a.a.a";
channel.basicPublish(Constants.TOPICS_CHANGE, "a.a.a", null, message2.getBytes());
String message3 = "test c.work.a";
channel.basicPublish(Constants.TOPICS_CHANGE, "c.work.a", null, message3.getBytes());
System.out.println("消息发送成功!");
// 9. 释放资源
channel.close();
connection.close();
}
}
运行并观察结果:
topics.queue.1 和 topics.queue.2 中都已经存储了两条消息
我们来看topics.queue.1 中的消息:
我们继续实现消费者代码
消费者代码
Topics 模式的消费者代码与 Routing 模式下相同,只需要修改消费的队列名称即可:
Consumer1:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.TOPICS_QUEUE_1, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPICS_QUEUE_1, true, consumer);
}
}
Consumer2:
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 5. 声明队列
channel.queueDeclare(Constants.TOPICS_QUEUE_2, true, false, false, null);
// 6. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法,当接收到消息后,自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 成功接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPICS_QUEUE_2, true, consumer);
}
}
运行 Consumer1 和 Consumer2,并观察结果:
RPC(RPC通信)
RPC(Remote Procedure Call) ,远程过程调用,是一种通过网络从远程计算机上请求服务,不需要了解底层网络的技术,类似于 Http 远程调用
RabbitMQ 实现 RPC 通信,是通过 两个队列 实现一个 可回调 的过程:
其过程为:
客户端(Client)发送请求消息到指定队列(rpc_queue),并在消息属性中设置 reply_to 字段,这个字段指定了一个 回调队列 (amq.gen-Xa2…),这个回调队列 用于接收服务端的响应消息
服务器(Server)从队列 rpc_queue 中取出请求消息,处理请求后,将响应消息发送到 reply_to 指定的回调队列(amq.gen-Xa2…)
客户端(Client)在回调队列上等待响应消息,一旦接收到响应,客户端就会检查消息的 correlation_id 属性,确保其是否是所期望的响应
接下来,我们就来实现 RPC 的客户端:
客户端代码
客户端主要实现的功能有:
发送请求消息到队列中
从回调队列中读取响应消息
我们先来看发送请求消息到队列的过程:
(1)声明两个队列:消息发送到的队列(Queue) 和 回调队列(replayQueue),并声明本次请求的唯一标志 corrId
(2)将 replayQueue 和 corrId 配置到 Queue 中
接下来,需要从回调队列中读取响应消息,若我们直接从回调队列中读取响应消息,此时,可能服务端还没有处理完请求,也就未将响应消息发送到回调队列中,就读取不到响应
因此,我们可以使用 阻塞队列 来监听回调队列中的消息
使用阻塞队列阻塞当前进程,监听回调队列中的消息,回调队列中有消息时,将响应消息放到阻塞队列中
阻塞队列中有消息后,主线程被唤醒,处理返回内容
先在 Constants 类中声明 RPC 模式下使用的两个队列:
// RPC 模式
public static final String RPC_QUEUE_1 = "rpc.queue1";
public static final String RPC_QUEUE_2 = "rpc.queue2";
在这里,我们就不再声明交换机了,直接使用默认的交换机
声明 消息发送的队列 和 回调队列:
// 声明队列
channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);
使用 UUID 生成本次请求的唯一标志,并配置消息属性:
// 本次请求的唯一标识
String corrId = UUID.randomUUID().toString();
消息相关配置的类型为 BasicProperties ,位于 com.rabbitmq.client.AMQP 下:
AMQP.BasicProperties 提供了一个 构造器 ,可以通过 builder() 来设置一些属性:
使用 correlationId 方法设置唯一标识, replyTo 方法设置回调队列:
// 本次请求的唯一标识
String corrId = UUID.randomUUID().toString();
// 消息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder()
.correlationId(corrId) // 唯一ID
.replyTo(Constants.RPC_QUEUE_2) // 回调队列
.build();
最后调用 build 方法创建实例
使用内置交换机发送消息:
// 7. 发送消息
String message = "test rpc...";
channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());
接着,使用阻塞队列存储回调结果:
// 阻塞队列,存放回调结果,一次获取一条消息
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
从回调队列中接收响应消息:
// 8. 接收服务器的响应
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到回调消息: " + new String(body));
// 判断标识是否正确
if(corrId.equals(properties.getCorrelationId())) {
queue.offer(new String(body, "UTF-8"));
}
}
};
channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);
最后,从阻塞队列中获取响应消息:
// 9. 获取响应消息
String result = queue.take();
System.out.println("result: " + result);
完整代码:
public class Client {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 使用默认的交换机
// 5. 声明队列
channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);
// 6. 设置消息属性
// 本次请求的唯一标识
String corrId = UUID.randomUUID().toString();
// 消息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
.builder()
.correlationId(corrId) // 唯一ID
.replyTo(Constants.RPC_QUEUE_2) // 回调队列
.build();
// 7. 发送消息
String message = "test rpc...";
channel.basicPublish("", Constants.RPC_QUEUE_1, basicProperties, message.getBytes());
// 阻塞队列,存放回调结果,一次获取一条消息
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
// 8. 接收服务器的响应
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到回调消息: " + new String(body));
// 判断标识是否正确
if(corrId.equals(properties.getCorrelationId())) {
queue.offer(new String(body, "UTF-8"));
}
}
};
channel.basicConsume(Constants.RPC_QUEUE_2, true, consumer);
// 9. 获取响应消息
String result = queue.take();
System.out.println("result: " + result);
}
}
我们继续编写服务端代码
服务端代码
服务端要实现的功能为:
从队列中接收请求消息
根据消息内容处理请求消息,并将响应消息返回到回调队列中
我们先来实现接收消息:
建立连接、声明队列等过程都与 客户端代码相同
但需要注意的是,我们需要 设置服务端同时最多只能获取一条消息 :
// 6. 设置同时最多只能获取一条消息
channel.basicQos(1);
若不设置 basicQos ,RabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0,当prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前网络状况等因素,可能同时发送多条消息给消费者。这也就意味着,在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有波动
而在 RPC 模式下,通常希望是 一对一的消息处理 ,即,一个请求对应一个响应。服务端在处理完一个消息并确认后,才会接收到下一条消息
接收消息后,就可以对请求消息进行处理并返回响应结果了:
// 7. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 将消息发到队列2中
AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().
builder().
correlationId(properties.getCorrelationId()).
build();
// 返回
String message = new String(body);
System.out.println("接收到消息: " + message);
// 响应消息
String response = "request: " + message + " 接收成功";
channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());
// 对消息进行应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);
需要注意的是,在这里我们需要 手动对消息进行应答 ,而不是自动确认:
在 RabbitMQ 中, basicConsume 方法的 autoAck 参数用于指定消费者是否会自动向消息队列确认消息
当设置为 true 时,消息队列会在将消息发送给消费者后,认为消息已经被成功消费,立即删除该条消息,这也就意味着,若消费者处理消息失败,消息就会丢失
当设置为 false 时,消息队列在将消息发送给消费者后,需要消费者显示地调用 basicAck 方式来确认消息,手动确认提供了更高的可靠性,保证消息不会被意外丢失,适用于消息处理重要且需要确保每个消息被正确处理的场景
完整代码:
public class Service {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
// 4. 创建 Channel
Channel channel = connection.createChannel();
// 使用默认的交换机
// 5. 声明队列
channel.queueDeclare(Constants.RPC_QUEUE_1, true, false, false, null);
channel.queueDeclare(Constants.RPC_QUEUE_2, true, false, false, null);
// 6. 设置同时最多只能获取一条消息
channel.basicQos(1);
// 7. 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 将消息发到队列2中
AMQP.BasicProperties replayProperties = new AMQP.BasicProperties().
builder().
correlationId(properties.getCorrelationId()).
build();
// 返回
String message = new String(body);
System.out.println("接收到消息: " + message);
// 响应消息
String response = "request: " + message + " 接收成功";
channel.basicPublish("", properties.getReplyTo(), replayProperties, response.getBytes());
// 对消息进行应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_QUEUE_1, false, consumer);
}
}
运行代码,观察结果:
Publisher Confirms(发布确认)
消息中间件,都会面临 消息丢失 的问题
消息丢失大概分为三种情况:
生产者的问题 :由于应用程序故障、网络抖动等各种原因,生产者没有成功向 broker 发送消息
消息中间件的问题 :生产者成功将消息发送给了 broker,但 broker 未能将消息保存好,导致消息丢失
消费者的问题 :broker 将消息发送给了消费者,消费者在消费消息时,未处理好,导致 broker 将消费失败的消息从队列中删除了
Rabbit 针对上述问题给出了相应的解决方案:
针对问题1,可以采用**发布确认(Publisher Confirms)**机制实现
针对问题2,可以通过 持久化 机制
针对问题3,可以采用 消息应答 机制
接下来,我们就来进一步学习 发布确认机制
发布确认的过程:
(1)生产者将 Channel 设置为 confirm 模式(通过调用 channel.confirmSelect() 完成),发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便追踪消息的状态
(2)当消息被 RabbitMQ 服务器接收并处理后,服务器会向生产者发送一个 确认(ACK) ,其中包含消息的唯一 ID,表示消息已经送达
其中, deliveryTag 包含了确认消息的序号,此外,broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示 这个序号之前的所有消息都已经被处理
发送确认机制最大的好处在于它是 异步 的,生产者可以 同时发布消息和等待信道返回确认消息
当消息最终得到确认之后,生产者可以通过 回调方法 来处理该确认消息
若 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 **nack(Basic.Nack)**命令,生产者同样可以在回调方法中处理该 nack 命令
使用发布确认机制,需要将信道设置为 **confirm(确认)**模式:
// 开启信道
Channel channel = connection.createChannel();
// 开启信道确认模式
channel.confirmSelect();
发布模式有 3 种确认策略 ,我们分别来进行学习
由于使用每种策略时都需要建立连接,因此,我们将建立连接抽取出来:
public static Connection createConnection() throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
return connection;
}
Publishing Messages Individually(单独确认)
单独确认模式下,每发送一条消息,RabbitMQ 会在消息被成功持久化到队列或者被消费者成功接收后,发回一个确认(acknowledgment)。 生产者可以收到关于每条消息的确认信息
也就是说, 生产者发送消息后会等待每条消息的确认信号 。如果消息发送成功,RabbitMQ 会返回一个确认信号;如果消息失败,RabbitMQ 会返回一个负确认信号(nack)
我们先在 Constans 类中声明会使用的队列:
// 发布确认模式
public static final String PUBLISH_CONFIRMS_QUEUE_1 = "publish.confims.queue1";
public static final String PUBLISH_CONFIRMS_QUEUE_2 = "publish.confims.queue2";
public static final String PUBLISH_CONFIRMS_QUEUE_3 = "publish.confims.queue3";
我们仍使用默认的交换机进行路由
每次都发送 200 条消息:
public class Producer {
public static int MESSAGE_COUNT = 200;
}
每当发送一条消息,就使用 channel.waitForConfirms() 方法等待确认消息
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
等待确认消息,当消息被确认,方法就会返回,若消息超时,就会抛出 TimeoutException 异常,若消息丢失,就会抛出 IOException
此外,我们记录 单独确认策略 发送消息的耗时:
public class Producer {
public static int MESSAGE_COUNT = 200;
public static int WAIT_TIME = 5000;
public static void publishMessageIndividually() {
try (Connection connection = createConnection()){
// 开启信道
Channel channel = connection.createChannel();
// 开启信道确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);
// 记录开始时间
long startTime = System.currentTimeMillis();
// 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());
// 等待确认
channel.waitForConfirms(WAIT_TIME);
}
// 记录结束时间
long endTime = System.currentTimeMillis();
System.out.printf("publish %d messages individually in %d ms
", MESSAGE_COUNT, endTime - startTime);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
完整代码:
public class Producer {
public static int MESSAGE_COUNT = 200;
public static int WAIT_TIME = 5000;
// 建立连接
public static Connection createConnection() throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
return connection;
}
// 单独确认模式
public static void publishMessageIndividually() {
try (Connection connection = createConnection()){
// 开启信道
Channel channel = connection.createChannel();
// 开启信道确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);
// 记录开始时间
long startTime = System.currentTimeMillis();
// 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());
// 等待确认
channel.waitForConfirmsOrDie(WAIT_TIME);
}
// 记录结束时间
long endTime = System.currentTimeMillis();
System.out.printf("publish %d messages individually in %d ms
", MESSAGE_COUNT, endTime - startTime);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
// 单独确认模式
publishMessageIndividually();
}
}
运行结果:
可以看到,发送 200 条消息的耗时较长
且,单独确认策略是每发送一条消息后,就调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认,也就是一种 串行同步等待 的方式,尤其是对于持久化的消息而言,需要等待消息确认存储在磁盘之后才会返回
但发布确认机制支持 异步确认 ,即,可以 一边发送消息,一边等待消息确认
我们接着看另外两种策略
Publishing Messages in Batches(批量确认)
批量确认会在 每发送一批消息后 ,调用 waitForConfirms 方法,等待服务器的确认返回
我们每发送 50 条消息,就调用 waitForConfirms 方法进行确认:
public static int BATCH_SIZE = 50;
// 批量确认模式
public static void publishMessageInBatches() {
try (Connection connection = createConnection()){
// 开启信道
Channel channel = connection.createChannel();
// 设置为 confirm 模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);
// 发送消息
int messageCount = 0;
long startTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());
messageCount++;
// 批量确认
if(messageCount == BATCH_SIZE) {
channel.waitForConfirms(WAIT_TIME);
messageCount = 0;
}
}
// 消息发送完,若还有未确认消息,则进行最后的确认
if (messageCount > 0) {
channel.waitForConfirms(WAIT_TIME);
}
long endTime = System.currentTimeMillis();
System.out.printf("publish %d messages in batch in %d ms
", MESSAGE_COUNT, endTime - startTime);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
需要注意的是,若我们发送的消息为 210 条,此时最后的十条消息未被确认,因此,我们 在消息发送完成后,进行最后的确认
调用 publishMessageInBatches 方法,并观察结果:
我们可以看到,相比于单独确认策略,批量确认极大的提高了 confirm 的效率,但当出现了 Basic.Nack 或超时时,我们 无法确定是哪一条消息出现了问题 ,客户端需要将这一批消息都进行重发,这也就重复发送了很多消息, 当消息经常丢失时,批量确认的性能会不升反降
最后,我们来看 异步确认
Handling Publisher Confirms Asynchronously(异步确认)
异步确认提供了一个 回调方法 ,服务端确认了一条或多条消息后,客户端会调用这个方法进行处理
Channel 接口提供了 addConfirmListener 方法,可以添加 ConfirmListener 回调接口
在 ConfirmListener 接口中包含两个重要方法:
handleAck 和 handleNack ,分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack
deliveryTag 表示发送消息的序号
multiple 表示是否批量确认,开启批量确认后,若 RabbitMQ 返回的消息序号为 20,则表明 前 20 条消息都已经接收成功;当不开启批量确认时,若 RabbitMQ 返回的消息序号为 20 ,则表明 第 20 条消息被成功接收
在使用异步确认策略时,我们需要为每个 Channel 维护一个已发送消息的序号集合 ,当收到 RabbitMQ 的 confirm 回调时,从集合中删除掉对应消息
当 Channel 开启 confirm 模式后, channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号。我们可以使用 SortedSet 的有序性来维护这个已发送消息的集合
实现步骤:
- 使用有序集合存储未确认的消息序号
// 使用有序集合来存储未确认的消息
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
- 当收到 ack 时,从集合中删除消息序号,若为批量确认,则删除小于等于当前消息序号的所有序号
// 进行确认
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 若为批量确认,则删除确认序号前所有元素
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
- 当接收到 nack 时,需要根据具体情况进行消息重发等操作
在这里,我们就不对其进行处理了,直接将消息清除:
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 处理失败,消息重发...
// 若为批量确认,则删除确认序号前所有元素
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
接着,我们发送消息,每当发送一条消息,就将其序号存储到有序集合中:
// 发送消息
long startTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
// 获取序号
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());
// 存储序号
confirmSet.add(nextPublishSeqNo);
}
当有序集合为空时,消息确认完,因此,我们使用 while 循环等待消息确认完毕:
// 消息确认完毕
while (!confirmSet.isEmpty()) {
Thread.sleep(10);
}
若循环体中什么也不写,while 循环执行的速度会非常快,因此,每当判断一次,我们让其等待 10 ms
完整代码:
// 异步确认模式
public static void handlePublishConfirmAsynchronously() {
try (Connection connection = createConnection()){
// 开启信道
Channel channel = connection.createChannel();
// 设置 confirm 模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);
// 使用有序集合来存储未确认的消息
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
// 进行确认
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 若为批量确认,则删除确认序号前所有元素
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 处理失败,消息重发...
// 若为批量确认,则删除确认序号前所有元素
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
// 记录开始时间
long startTime = System.currentTimeMillis();
// 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
// 获取序号
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());
// 存储序号
confirmSet.add(nextPublishSeqNo);
}
// 消息确认完毕
while (!confirmSet.isEmpty()) {
Thread.sleep(10);
}
// 记录结束时间
long endTime = System.currentTimeMillis();
System.out.printf("publish %d messages and handled confirms asynchronously in %d ms
", MESSAGE_COUNT, endTime - startTime);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
运行结果:
可以看到,三种策略中,异步确认的表现更好
完整代码:
public class Producer {
public static int MESSAGE_COUNT = 200;
public static int WAIT_TIME = 5000;
public static int BATCH_SIZE = 50;
// 建立连接
public static Connection createConnection() throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost(Constants.HOST); // ip 的默认值为 localhost
factory.setPort(Constants.PORT); // 默认值为 5672
factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机,默认值为 /
// 账号
factory.setUsername(Constants.USER_NAME); // 用户名,默认为 guest
factory.setPassword(Constants.USER_PASSWORD); // 密码,默认为 guest
// 3. 创建连接 Connection
Connection connection = factory.newConnection(); // 需要处理异常,在此处直接抛出,并不进行处理
return connection;
}
// 单独确认模式
public static void publishMessageIndividually() {
try (Connection connection = createConnection()){
// 开启信道
Channel channel = connection.createChannel();
// 开启信道确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_1, true, false, false, null);
// 记录开始时间
long startTime = System.currentTimeMillis();
// 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_1, null, message.getBytes());
// 等待确认
channel.waitForConfirmsOrDie(WAIT_TIME);
}
// 记录结束时间
long endTime = System.currentTimeMillis();
System.out.printf("publish %d messages individually in %d ms
", MESSAGE_COUNT, endTime - startTime);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
// 批量确认模式
public static void publishMessageInBatches() {
try (Connection connection = createConnection()){
// 开启信道
Channel channel = connection.createChannel();
// 设置为 confirm 模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_2, true, false, false, null);
// 发送消息
int messageCount = 0;
long startTime = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_2, null, message.getBytes());
messageCount++;
// 批量确认
if(messageCount == BATCH_SIZE) {
channel.waitForConfirms(WAIT_TIME);
messageCount = 0;
}
}
// 消息发送完,若还有未确认消息,则进行最后的确认
if (messageCount > 0) {
channel.waitForConfirms(WAIT_TIME);
}
long endTime = System.currentTimeMillis();
System.out.printf("publish %d messages in batch in %d ms
", MESSAGE_COUNT, endTime - startTime);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
// 异步确认模式
public static void handlePublishConfirmAsynchronously() {
try (Connection connection = createConnection()){
// 开启信道
Channel channel = connection.createChannel();
// 设置 confirm 模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE_3, false, false, true, null);
// 使用有序集合来存储未确认的消息
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
// 进行确认
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 若为批量确认,则删除确认序号前所有元素
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 处理失败,消息重发...
// 若为批量确认,则删除确认序号前所有元素
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
// 记录开始时间
long startTime = System.currentTimeMillis();
// 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "message " + i;
// 获取序号
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE_3, null, message.getBytes());
// 存储序号
confirmSet.add(nextPublishSeqNo);
}
// 消息确认完毕
while (!confirmSet.isEmpty()) {
Thread.sleep(10);
}
// 记录结束时间
long endTime = System.currentTimeMillis();
System.out.printf("publish %d messages and handled confirms asynchronously in %d ms
", MESSAGE_COUNT, endTime - startTime);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
// 单独确认模式
publishMessageIndividually();
// 批量确认模式
publishMessageInBatches();
// 异步确认
handlePublishConfirmAsynchronously();
}
}