MQ消息发送不在MySQL事务中,该如何保证一致性
目录
MQ消息发送不在MySQL事务中,该如何保证一致性?
在 MQ 消息发送与 MySQL 事务分离的场景下,可通过以下方案保障数据一致性:
一、核心原则
确保 业务操作成功 与 消息发送成功 的最终一致性,避免因网络抖动、服务宕机等异常导致以下问题:
- 场景1 :业务操作成功但消息未发送(下游无感知)
- 场景2 :业务操作失败但消息已发送(下游误处理)
二、具体方案
1. 事务消息(RocketMQ 专有)
适用场景 :强依赖 MQ 且支持事务消息的系统(如 RocketMQ)。
核心流程 :
- 发送半消息 :业务操作前发送半消息(暂不可消费)。
- 执行本地事务 :执行业务逻辑(如 MySQL 更新)。
- 提交/回滚
:
- 若事务成功 → 提交半消息为可消费状态。
- 若事务失败 → 回滚半消息(不投递)。
- 事务回查 :若未收到二次确认,MQ 主动询问事务状态(兜底机制)。
代码示例 (RocketMQ):
// 发送半消息
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
// 实现 LocalTransactionChecker 处理回查
@Override
public LocalTransactionState check(MessageExt msg) {
return checkDBTransactionStatus(msg.getTransactionId()) ? COMMIT : ROLLBACK;
}
优点 :原生支持,流程标准化。
缺点 :依赖特定 MQ(如 RocketMQ),技术栈受限。
2. 本地消息表(通用方案)
适用场景 :任何 MQ(如 Kafka、RabbitMQ),需业务系统额外开发。
实现步骤 :
同库事务 :业务操作与消息记录写入同一数据库事务。
BEGIN; UPDATE order SET status = 'paid' WHERE id = 1; -- 业务操作 INSERT INTO mq_message (id, content, status) VALUES (1001, 'order_paid', 'pending'); -- 消息记录 COMMIT;
异步发送 :定时任务轮询
mq_message
表,发送状态为pending
的消息。确认与重试 :
- 发送成功 → 更新消息状态为
sent
。 - 发送失败 → 记录重试次数,超过阈值则标记为
dead_letter
(人工介入)。
- 发送成功 → 更新消息状态为
优点 :与 MQ 无关,兼容性强。
缺点 :需维护消息表,增加数据库压力。
3. 事务同步监听(Spring 生态)
适用场景 :基于 Spring 事务管理的系统。
实现方式 :
@Transactional
public void processOrder(Order order) {
orderDao.update(order); // 业务操作
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 事务提交后发送消息
mqService.send(order);
}
}
);
}
注意事项 :
- 发送失败处理
:需配合重试机制(如
@Retryable
)或记录日志人工补偿。 - 避免事务未提交提前发送
:确保消息发送在
afterCommit
回调中触发。
4. 基于 Binlog 的变更捕获(CDC 方案)
适用场景 :高吞吐、解耦业务与消息发送的场景(如数据同步)。
实现流程 :
- 监听数据库日志 :通过工具(如 Canal、Debezium)解析 MySQL Binlog。
- 过滤变更事件 :捕获业务表变更(如订单状态更新)。
- 发送 MQ 消息 :将变更事件投递到 MQ。
优点 :完全解耦业务代码,适合大数据量场景。
缺点 :实时性略低(依赖 Binlog 解析延迟),需维护 CDC 服务。
三、最终一致性保障
无论采用何种方案,需补充以下措施:
- 消费者幂等 :通过唯一 ID(如业务主键)去重,避免重复消费。
- 死信队列(DLQ) :处理多次重试失败的消息,人工介入修复。
- 对账补偿 :定时比对业务数据与消息消费状态,修复不一致。
四、方案对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
事务消息 | 原生支持,流程标准化 | 依赖 RocketMQ | 强事务需求且使用 RocketMQ |
本地消息表 | 通用性强,兼容所有 MQ | 需维护消息表 | 技术栈受限或需高可靠性 |
事务同步监听 | 代码侵入小,Spring 生态友好 | 需处理发送失败补偿 | 中小规模 Spring 项目 |
Binlog 变更捕获 | 完全解耦业务 | 实时性低,维护成本高 | 大数据量、异步数据同步场景 |
总结 :优先选择 本地消息表 (通用性强)或 事务消息 (RocketMQ 环境),结合幂等设计与重试机制,可有效解决 MQ 与数据库事务分离场景下的数据一致性问题。