目录

RabbitMQ之旅1

RabbitMQ之旅(1)

相信自 己,终会成功

https://i-blog.csdnimg.cn/direct/c089ed3a0b0a4cc5a7e42e9f401a7235.jpeg


主流MQ产品

1.kafaka

  1. 特点 :高吞吐量、分布式、持久化、支持分区和副本。
  2. 适用场景 :日志收集、流处理、实时数据分析等大数据场景。
  3. 优势 :高吞吐量和低延迟,适合处理大量数据。
  4. 缺点 :配置复杂,对小型项目可能过于重量级。

2.RocketMQ

  1. 特点 :分布式、高吞吐量、低延迟、支持事务消息。
  2. 适用场景 :电商、金融等需要高可靠性和事务支持的场景。
  3. 优势 :支持事务消息,适合金融等高可靠性要求的场景。
  4. 缺点 :社区相对较小,文档和资源不如Kafka丰富。

3.RabbitMQ

  1. 特点 :轻量级、支持多种消息协议、易于使用和部署。
  2. 适用场景 :中小型项目、需要快速上手的场景。
  3. 优势 :易于使用,支持多种消息协议,社区活跃。
  4. 缺点 :在大规模高并发场景下性能不如Kafka和RocketMQ。

在xshell上安装RabbitMQ

1.更新xshell中最新的软件包列表

sudo apt-get update

2.安装erlang

sudo apt-get install erlang

输入erl,出现下图内容表示安装成功  输入halt().退出即可

https://i-blog.csdnimg.cn/direct/c52e09c2a8ba47c4b739d81806f0c662.png

3.安装rabbitmq

sudo apt-get install rabbitmq-server

4.确认安装结果

systemctl status rabbitmq-server

显示running即可

https://i-blog.csdnimg.cn/direct/9b6fafa95dfc4e22ab51a4577a96e313.png

5.安装RabbitMQ管理界面

rabbitmq-plugins enable rabbiting_management

6.启动服务

sudo service rabbitmq-server start

在浏览器中输入自己云服务器的端口号+15672即可访问页面

https://i-blog.csdnimg.cn/direct/9c382bfb836140229cc11a72197b21e9.png

添加用户名和密码

rabbitmqctl add_user 用户名 密码

给用户权限

rabbitmqctl set_user_tags 用户名 权限等级

RabbitMQ七种工作模式

P:生产者        C:消费者

queue:队列   X:交换机

在使用绑定的时候为 BindingKey

在发送消息的时候为RoutingKey

官方网站:

建立连接需要的信息:

1.IP   2.端口号   3.账号   4.密码   5. 虚拟主机

消费者代码:

1.创建连接  2.创建Channel  3.声明一个队列Queue  4.消费信息  5.释放资源

1.简单模式

一个生产者,一个消费者,点到点模式

2.工作队列模式

一个生产者,多个消费者

假如有十条队列消息,C1和C2是共同消费这10条消息,消息不会重复消费

https://i-blog.csdnimg.cn/direct/cbe7e05e170c4e79b3f32beb0e433c5e.png

3.发布/订阅模式

https://i-blog.csdnimg.cn/direct/36f05727bd564190923ea7f1e498539c.png

https://i-blog.csdnimg.cn/direct/9f803155505a4a2d9dedbfd1389484b3.png

4.路由模式

订阅模式的变化形式

https://i-blog.csdnimg.cn/direct/43592cee467d4df39bc093cc1b8489b8.png

5.通配符模式

https://i-blog.csdnimg.cn/direct/69c6eaec470a4e83a61d1d7db548c2ed.png

https://i-blog.csdnimg.cn/direct/8c5fe7a86b86446383cb4f26810a0a47.png

6.RPC模式

https://i-blog.csdnimg.cn/direct/8e5698b751994020b9116aca157fd730.png

https://i-blog.csdnimg.cn/direct/c27e37624c254478ad34bac788dab93c.png

AMQP.BasicProperties 设置消息属性的类
属性名类型说明
contentTypeString消息内容的 MIME 类型(如 text/plainapplication/json )。
contentEncodingString消息内容的编码方式(如 UTF-8 )。
headersMap<String, Object>自定义消息头,用于传递额外信息。
deliveryModeInteger消息的持久化模式: 1 (非持久化)或 2 (持久化)。
priorityInteger消息的优先级(0 到 9,数值越大优先级越高)。
correlationIdString关联 ID,通常用于 RPC 模式,匹配请求和响应。
replyToString响应队列的名称,通常用于 RPC 模式。
expirationString消息的过期时间(以毫秒为单位的字符串)。
messageIdString消息的唯一标识符。
timestampDate消息的时间戳。
typeString消息的类型标识符。
userIdString用户 ID,用于验证消息的发送者。
appIdString应用程序 ID,用于标识发送消息的应用程序。
//        AMQP.BasicProperties 是一个不可变类,因此需要通过其内部类 Builder 来创建对象。
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
  1. 客户端

    • 生成唯一的 correlationId
    • 设置 replyTo 为响应队列的名称。
    • 发送消息到请求队列( Constants.RPC_REQUEST_QUEUE )。
    • 监听响应队列( Constants.RPC_RESPONSE_QUEUE ),等待服务器返回结果。
  2. 服务器

    • 监听请求队列( Constants.RPC_REQUEST_QUEUE )。
    • 处理请求后,将结果发送到客户端指定的响应队列( replyTo )。
    • 在响应消息中设置与请求相同的 correlationId
  3. 客户端匹配响应

    • 收到响应后,根据 correlationId 匹配对应的请求。

7.发布确认模式

是RabbitMQ消息可靠性保证的关键


代码展示(生产者)

下图的代码Constants是自己写的 Java 常量

常量命名规范

1.常量名使用 全大写字母 ,并用下划线 _ 分隔单词(如 VIRTUALHOSTWORK_QUEUE )。

2.这是 Java 中的命名约定,便于区分常量和变量。

//1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost(Constants.HOST); //云服务器的IP地址
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUALHOST); //虚拟主机


        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机   使用内置的交换机

连接工厂 ( ConnectionFactory )

是一个设计模式中的“工厂类”,它的目的是 隐藏创建连接的复杂细节 (比如网络配置、认证信息等)。你可以通过这个工厂对象预先设置连接参数(如服务器地址、端口、用户名、密码等),然后通过它来生成具体的连接对象

Channel(通道)

通道 是建立在连接( Connection )之上的一个轻量级逻辑连接。一个连接( Connection )可以创建多个通道,每个通道可以独立地发送和接收消息。通道的设计是为了复用连接,避免频繁创建和销毁连接的开销。创建通道后,通常会用它来声明队列、发送消息或消费消息

channel.queueDeclare 声明队列
//4.声明队列
        
channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);

         /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                  Map<String, Object> arguments)
         *  参数说明:
         *  queue: 队列名称
         *  durable: 可持久化
         *  exclusive: 是否独占
         *  autoDelete: 是否自动删除
         *  arguments: 参数
         */

https://i-blog.csdnimg.cn/direct/e5cafa42fcaf495da1e2e386ff06ca80.png

channel.basicPublish 发送消息
        //5. 发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明:
         * exchange: 交换机名称
         * routingKey: 内置交换机, routingkey和队列名称保持一致
         * props: 属性配置
         * body: 消息
         */
        for (int i = 0; i < 10; i++) {
            String msg = "hello rabbitmq~"+i;
            channel.basicPublish("","hello", null, msg.getBytes());
        }

https://i-blog.csdnimg.cn/direct/ad4ccfd89df64d319ce718b15f03f82c.png

https://i-blog.csdnimg.cn/direct/e1cf205e6c4745a9b27112e02656e6fd.png

 //6. 资源释放
        channel.close();
        connection.close();
channel.exchangeDeclare 声明创建交换机
Exchange.DeclareOk exchangeDeclare(
    String exchange,              // 交换机名称
    String type,                  // 交换机类型(direct、fanout、topic、headers)
    boolean durable,              // 是否持久化
    boolean autoDelete,           // 是否自动删除
    boolean internal,            // 是否内部交换机
    Map<String, Object> arguments // 额外参数
) throws IOException;

https://i-blog.csdnimg.cn/direct/842e122c288946be9978c4acb2064194.png

https://i-blog.csdnimg.cn/direct/5f9c8ff715324a7fa462fa48a284c9ac.png

channel.queueBind() 将队列绑定到交换机
  • queue :队列名称(如 Constants.PUBLISH_QUEUE1 )。
  • exchange :交换机名称(如 Constants.PUBLISH_EXCHANGE )。
  • routingKey :路由键(如 """key1"
channel.queueBind(Constants.PUBLISH_QUEUE1, Constants.PUBLISH_EXCHANGE, "");
channel.queueBind(Constants.PUBLISH_QUEUE2, Constants.PUBLISH_EXCHANGE, "");
//作用:将 Constants.PUBLISH_QUEUE1 和 Constants.PUBLISH_QUEUE2 绑定到 //Constants.PUBLISH_EXCHANGE。
//路由键为空字符串(""),表示所有消息都会被路由到这两个队列(前提是交换机类型支持)。
channel.basicQos设置消费者预取限制
参数名类型说明
prefetchSizeint预取消息的总大小(以字节为单位),通常设置为 0 表示不限制大小。
prefetchCountint预取消息的数量限制(即未确认消息的最大数量)。
globalboolean是否全局生效: true 表示对整个 Channel 生效, false 表示对每个消费者生效。
// 设置每个消费者最多预取 10 条未确认消息
channel.basicQos(10);

// 设置整个 Channel 最多预取 100 条未确认消息
channel.basicQos(100, true);

// 设置预取消息的总大小不超过 1MB,数量不超过 10 条
channel.basicQos(1024 * 1024, 10, false);
channel.basicAck 手动确认消息
参数名类型说明
deliveryTaglong消息的唯一标识符,由 RabbitMQ 分配。
multipleboolean是否批量确认: true 表示确认所有比 deliveryTag 小的消息; false 表示仅确认当前消息。

使用场景

  • 手动确认模式 :当消费者从队列中拉取消息时,如果启用了手动确认模式( autoAck=false ),则必须调用 basicAck 来确认消息。
  • 确保消息可靠性 :通过手动确认,可以确保消息在处理成功后才会从队列中删除,避免消息丢失。
  • 批量确认 :通过设置 multiple=true ,可以一次性确认多条消息,提高效率。

代码展示(消费者)

public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        6. 资源释放
        channel.close();
        connection.close();
    }

DefaultConsumer

RabbitMQ 提供的默认消费者类。

channel :消费者绑定的通道(Channel)。

匿名内部类 :通过 new DefaultConsumer(channel) { ... } 创建一个匿名内部类,并写 handleDelivery 方法。

handleDelivery 方法

当队列中有消息时,RabbitMQ 会调用此方法将消息传递给消费者

https://i-blog.csdnimg.cn/direct/c5c396233bcb4fdba7129981340b7685.png

RabbitMQ 支持两种消息确认模式:

  1. 自动确认

    • basicConsume 方法中将第二个参数设置为 true

    • 消费者接收到消息后,RabbitMQ 会自动将消息标记为已处理。

    • 示例:

      channel.basicConsume(QUEUE_NAME, true, consumer);
  2. 手动确认

    • basicConsume 方法中将第二个参数设置为 false

    • 需要在 handleDelivery 方法中手动调用 channel.basicAck() 确认消息。

    • 示例:

      channel.basicAck(envelope.getDeliveryTag(), false);