日常工作,MQ的7种常用使用场景
目录
日常工作,MQ的7种常用使用场景
以下为你详细介绍 MQ(消息队列)在日常工作中的 8 种常用使用场景:
1. 异步处理
详细解释
在一些业务流程中,存在部分操作耗时较长且不影响主流程的立即响应。通过将这些耗时操作放入消息队列,主流程可以继续执行,而耗时操作会在后台异步处理,从而提高系统的整体响应速度和吞吐量。
运用场景
- 电商系统中,用户下单后,订单创建是主流程,而发送订单确认邮件、短信通知等操作可以异步处理。
- 社交平台中,用户发布动态后,动态的点赞、评论计数更新等操作可以异步完成。
代码示例
// 生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class AsyncProducer {
private static final String QUEUE_NAME = "async_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "耗时操作的任务信息";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class AsyncConsumer {
private static final String QUEUE_NAME = "async_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 模拟耗时操作
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
2. 流量削峰
详细解释
在业务高峰期,系统会收到大量的请求,若直接处理这些请求,可能导致系统负载过高甚至崩溃。消息队列可以作为缓冲,将大量请求先存入队列,系统再按照自身处理能力从队列中逐步获取请求进行处理,实现流量的削峰填谷,保护系统稳定性。
运用场景
- 电商大促活动,如双 11、618 等,大量用户同时下单,订单请求先进入消息队列,系统按一定速率处理订单。
- 在线直播活动,大量观众同时发送弹幕、点赞等请求,消息队列可缓冲这些请求。
代码示例
// 生产者代码(模拟高并发请求)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TrafficPeakProducer {
private static final String QUEUE_NAME = "traffic_peak_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 1000; i++) {
String message = "Request " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者代码(按一定速率处理请求)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TrafficPeakConsumer {
private static final String QUEUE_NAME = "traffic_peak_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟处理时间,控制处理速率
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
3. 日志处理
详细解释
系统运行过程中会产生大量日志,如业务日志、操作日志等。将日志信息发送到消息队列,由专门的日志处理系统从队列中获取日志进行存储、分析和展示等操作,实现日志处理与业务系统的解耦,方便对日志进行统一管理。
运用场景
- 大型互联网应用中,各个微服务产生的日志可通过消息队列汇总到日志中心进行处理。
- 企业级系统中,将用户操作日志发送到消息队列,用于后续的审计和安全分析。
代码示例
// 生产者代码(日志发送)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class LogProducer {
private static final String QUEUE_NAME = "log_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String logMessage = "This is a log message";
channel.basicPublish("", QUEUE_NAME, null, logMessage.getBytes("UTF-8"));
System.out.println(" [x] Sent log: '" + logMessage + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者代码(日志处理)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class LogConsumer {
private static final String QUEUE_NAME = "log_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for log messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String logMessage = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received log: '" + logMessage + "'");
// 模拟日志存储操作
// 这里可以将日志存储到文件、数据库等
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
4. 数据同步
详细解释
在分布式系统中,不同数据库、不同系统之间需要进行数据同步。消息队列可作为数据同步的桥梁,当数据发生变化时,将变化的消息发送到消息队列,接收方从队列中获取消息并更新相应数据,实现数据的异步同步。
运用场景
- 电商系统中,订单数据在主数据库更新后,将订单变更消息发送到消息队列,库存系统从队列中获取消息更新库存数据。
- 企业的数据仓库与业务系统之间的数据同步,通过消息队列保证数据的一致性。
代码示例
// 生产者代码(数据变更通知)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DataSyncProducer {
private static final String QUEUE_NAME = "data_sync_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String dataChangeMessage = "Order data has been updated";
channel.basicPublish("", QUEUE_NAME, null, dataChangeMessage.getBytes("UTF-8"));
System.out.println(" [x] Sent data change message: '" + dataChangeMessage + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者代码(数据同步更新)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DataSyncConsumer {
private static final String QUEUE_NAME = "data_sync_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for data change messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String dataChangeMessage = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received data change message: '" + dataChangeMessage + "'");
// 模拟数据同步更新操作
// 这里可以更新库存、数据仓库等
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
5. 任务调度
详细解释
对于定时任务或按一定顺序执行的任务,可将任务信息放入消息队列,由任务调度系统从队列中获取任务并按规则调度执行,提高任务调度的灵活性和可扩展性。
运用场景
- 数据处理系统中,每天凌晨执行数据备份、数据统计等任务,可将任务信息提前放入消息队列。
- 定时提醒系统,如会议提醒、任务到期提醒等,将提醒任务放入消息队列进行调度。
代码示例
// 生产者代码(任务发布)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TaskSchedulerProducer {
private static final String QUEUE_NAME = "task_scheduler_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String taskMessage = "Data backup task";
channel.basicPublish("", QUEUE_NAME, null, taskMessage.getBytes("UTF-8"));
System.out.println(" [x] Sent task: '" + taskMessage + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者代码(任务执行)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TaskSchedulerConsumer {
private static final String QUEUE_NAME = "task_scheduler_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for tasks. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String taskMessage = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received task: '" + taskMessage + "'");
// 模拟任务执行
// 这里可以执行数据备份、统计等任务
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
6. 分布式事务
详细解释
在分布式系统中,涉及多个服务之间的事务操作时,消息队列可用于实现最终一致性。通过将事务操作相关的消息发送到消息队列,各个服务根据消息进行相应操作,在一定时间内达到数据的一致性。
运用场景
- 跨银行转账系统中,转出银行将转账消息发送到消息队列,转入银行从队列中获取消息并执行入账操作。
- 电商系统中,订单服务和库存服务之间的事务处理,通过消息队列保证订单和库存数据的一致性。
代码示例
// 生产者代码(转出银行发送转账消息)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DistributedTransactionProducer {
private static final String QUEUE_NAME = "distributed_transaction_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String transferMessage = "Transfer 1000 yuan from account A to account B";
channel.basicPublish("", QUEUE_NAME, null, transferMessage.getBytes("UTF-8"));
System.out.println(" [x] Sent transfer message: '" + transferMessage + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者代码(转入银行处理转账消息)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DistributedTransactionConsumer {
private static final String QUEUE_NAME = "distributed_transaction_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for transfer messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String transferMessage = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received transfer message: '" + transferMessage + "'");
// 模拟转入银行入账操作
// 这里可以更新账户余额等
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
7. 系统集成
详细解释
多个不同系统集成时,消息队列可作为系统间通信的桥梁。各系统将需交互的数据或操作指令以消息形式发送到消息队列,其他系统从队列中获取消息并处理,实现系统间的解耦和异步通信。
运用场景
- 企业的 ERP 系统与 CRM 系统集成,ERP 系统创建新客户订单后,将订单消息发送到消息队列,CRM 系统从队列获取消息更新客户订单信息和销售数据。
- 电商平台与物流系统集成,订单生成后将订单信息发送到消息队列,物流系统从队列获取信息进行发货处理。
代码示例
// 生产者代码(ERP系统发送订单消息)
import com.rabbitmq.client.Channel;