目录

日常工作,MQ的7种常用使用场景

日常工作,MQ的7种常用使用场景


以下为你详细介绍 MQ(消息队列)在日常工作中的 8 种常用使用场景:

1. 异步处理

详细解释

在一些业务流程中,存在部分操作耗时较长且不影响主流程的立即响应。通过将这些耗时操作放入消息队列,主流程可以继续执行,而耗时操作会在后台异步处理,从而提高系统的整体响应速度和吞吐量。

运用场景
  • 电商系统中,用户下单后,订单创建是主流程,而发送订单确认邮件、短信通知等操作可以异步处理。
  • 社交平台中,用户发布动态后,动态的点赞、评论计数更新等操作可以异步完成。
代码示例
// 生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AsyncProducer {
    private static final String QUEUE_NAME = "async_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "耗时操作的任务信息";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AsyncConsumer {
    private static final String QUEUE_NAME = "async_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                // 模拟耗时操作
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

2. 流量削峰

详细解释

在业务高峰期,系统会收到大量的请求,若直接处理这些请求,可能导致系统负载过高甚至崩溃。消息队列可以作为缓冲,将大量请求先存入队列,系统再按照自身处理能力从队列中逐步获取请求进行处理,实现流量的削峰填谷,保护系统稳定性。

运用场景
  • 电商大促活动,如双 11、618 等,大量用户同时下单,订单请求先进入消息队列,系统按一定速率处理订单。
  • 在线直播活动,大量观众同时发送弹幕、点赞等请求,消息队列可缓冲这些请求。
代码示例
// 生产者代码(模拟高并发请求)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TrafficPeakProducer {
    private static final String QUEUE_NAME = "traffic_peak_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 1000; i++) {
                String message = "Request " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(按一定速率处理请求)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TrafficPeakConsumer {
    private static final String QUEUE_NAME = "traffic_peak_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 模拟处理时间,控制处理速率
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

3. 日志处理

详细解释

系统运行过程中会产生大量日志,如业务日志、操作日志等。将日志信息发送到消息队列,由专门的日志处理系统从队列中获取日志进行存储、分析和展示等操作,实现日志处理与业务系统的解耦,方便对日志进行统一管理。

运用场景
  • 大型互联网应用中,各个微服务产生的日志可通过消息队列汇总到日志中心进行处理。
  • 企业级系统中,将用户操作日志发送到消息队列,用于后续的审计和安全分析。
代码示例
// 生产者代码(日志发送)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class LogProducer {
    private static final String QUEUE_NAME = "log_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String logMessage = "This is a log message";
            channel.basicPublish("", QUEUE_NAME, null, logMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent log: '" + logMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(日志处理)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class LogConsumer {
    private static final String QUEUE_NAME = "log_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for log messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String logMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received log: '" + logMessage + "'");
                // 模拟日志存储操作
                // 这里可以将日志存储到文件、数据库等
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

4. 数据同步

详细解释

在分布式系统中,不同数据库、不同系统之间需要进行数据同步。消息队列可作为数据同步的桥梁,当数据发生变化时,将变化的消息发送到消息队列,接收方从队列中获取消息并更新相应数据,实现数据的异步同步。

运用场景
  • 电商系统中,订单数据在主数据库更新后,将订单变更消息发送到消息队列,库存系统从队列中获取消息更新库存数据。
  • 企业的数据仓库与业务系统之间的数据同步,通过消息队列保证数据的一致性。
代码示例
// 生产者代码(数据变更通知)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DataSyncProducer {
    private static final String QUEUE_NAME = "data_sync_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String dataChangeMessage = "Order data has been updated";
            channel.basicPublish("", QUEUE_NAME, null, dataChangeMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent data change message: '" + dataChangeMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(数据同步更新)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DataSyncConsumer {
    private static final String QUEUE_NAME = "data_sync_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for data change messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String dataChangeMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received data change message: '" + dataChangeMessage + "'");
                // 模拟数据同步更新操作
                // 这里可以更新库存、数据仓库等
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

5. 任务调度

详细解释

对于定时任务或按一定顺序执行的任务,可将任务信息放入消息队列,由任务调度系统从队列中获取任务并按规则调度执行,提高任务调度的灵活性和可扩展性。

运用场景
  • 数据处理系统中,每天凌晨执行数据备份、数据统计等任务,可将任务信息提前放入消息队列。
  • 定时提醒系统,如会议提醒、任务到期提醒等,将提醒任务放入消息队列进行调度。
代码示例
// 生产者代码(任务发布)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TaskSchedulerProducer {
    private static final String QUEUE_NAME = "task_scheduler_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String taskMessage = "Data backup task";
            channel.basicPublish("", QUEUE_NAME, null, taskMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent task: '" + taskMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(任务执行)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TaskSchedulerConsumer {
    private static final String QUEUE_NAME = "task_scheduler_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for tasks. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String taskMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received task: '" + taskMessage + "'");
                // 模拟任务执行
                // 这里可以执行数据备份、统计等任务
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

6. 分布式事务

详细解释

在分布式系统中,涉及多个服务之间的事务操作时,消息队列可用于实现最终一致性。通过将事务操作相关的消息发送到消息队列,各个服务根据消息进行相应操作,在一定时间内达到数据的一致性。

运用场景
  • 跨银行转账系统中,转出银行将转账消息发送到消息队列,转入银行从队列中获取消息并执行入账操作。
  • 电商系统中,订单服务和库存服务之间的事务处理,通过消息队列保证订单和库存数据的一致性。
代码示例
// 生产者代码(转出银行发送转账消息)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DistributedTransactionProducer {
    private static final String QUEUE_NAME = "distributed_transaction_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String transferMessage = "Transfer 1000 yuan from account A to account B";
            channel.basicPublish("", QUEUE_NAME, null, transferMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent transfer message: '" + transferMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(转入银行处理转账消息)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DistributedTransactionConsumer {
    private static final String QUEUE_NAME = "distributed_transaction_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for transfer messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String transferMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received transfer message: '" + transferMessage + "'");
                // 模拟转入银行入账操作
                // 这里可以更新账户余额等
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

7. 系统集成

详细解释

多个不同系统集成时,消息队列可作为系统间通信的桥梁。各系统将需交互的数据或操作指令以消息形式发送到消息队列,其他系统从队列中获取消息并处理,实现系统间的解耦和异步通信。

运用场景
  • 企业的 ERP 系统与 CRM 系统集成,ERP 系统创建新客户订单后,将订单消息发送到消息队列,CRM 系统从队列获取消息更新客户订单信息和销售数据。
  • 电商平台与物流系统集成,订单生成后将订单信息发送到消息队列,物流系统从队列获取信息进行发货处理。
代码示例
// 生产者代码(ERP系统发送订单消息)
import com.rabbitmq.client.Channel;