当前位置: 首页 > web >正文

可靠消息最终一致性分布式事务解决方案

之前文章写过主流的一些 分布式事务的解决方案,但其实工作中很少有一些高并发的业务中去使用这些方案,因为对于高并发的场景来说,引入这些方案的性能损耗太大,且对系统事务侵入性太强影响系统稳定性。

所以在高并发的业务中,如果对实时性可以容忍秒级的延迟,那么使用最终一致性事务方案是最合适的选择。

可靠消息最终一致性事务

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

可靠消息最终一致性方案要解决以下几个问题:

  1. 本地事务与消息发送的原子性问题:即实现本地事务和消息发送的原子性,要么都成功,要么都失败。这是实现可靠消息最终一致性方案的关键问题。
  2. 事务参与方接收消息的可靠性:事务参与方必须能够从消息队列接收到消息,如果接收消费消息失败需要重复尝试消费,即实现最终消费成功。
  3. 消息重复消费的问题:由于步骤2的存在,若某一个消费节点出现消费超时但是处理逻辑执行成功了,此时由于消息中间件会重复投递就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
     

1.RocketMQ事务消息实现

RocketMQ独有的事务回调扩展可以比较轻松的实现最终一致性事务。

假设有两个本地事务组成当前的全局事务,实现流程如下:

  1. 先发送half消息到MQ,MQ服务端收到后保存消息,但是half是对消费端不可见状态。
  2. MQ回调发送者的事务事件回调接口,这时候在这个接口中我们执行本地事务。
  3. 如果本地事务执行成功,就提交MQ的事务,此时MQ会把消息设置为可消费状态,否则执行事务回滚,本地事务失败且消息也会被删除。
  4. 如果长时间未响应事务提交,MQ服务端会回查发送者的事务状态,可以做补偿提交。

上述流程保证了第一个本地事务与消息发送的一致性,即本地事务发送成功后消息才可消费。基于MQ的分布式事务实现的是最终一致性并不保证实时性,所以对于消费者而言只要确保收到消息完成第二个本地事务的提交就可以了。 

在这里插入图片描述 

 1.发送事务消息

@RestController
@Slf4j
public class AccountInfoController {@Autowiredprivate AccountInfoService accountInfoService;@GetMapping(value = "/transfer")public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){//创建一个事务id,作为消息内容发到mqString tx_no = UUID.randomUUID().toString();AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);//发送消息accountInfoService.sendUpdateAccountBalance(accountChangeEvent);return "转账成功";}
}

2.RocketMQLocalTransactionListener 接口(本地事务执行和消息事务提交,消息事务回查补偿提交)


@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@AutowiredAccountInfoService accountInfoService;@AutowiredAccountInfoDao accountInfoDao;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("accountChange");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额accountInfoService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//MQ回调事务状态回查接口,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("accountChange");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();log.info("事务状态回查");int existTx = accountInfoDao.isExistTx(txNo);if(existTx>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}

3.本地事务类

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {@AutowiredAccountInfoDao accountInfoDao;@Autowired(required = false)RocketMQTemplate rocketMQTemplate;//向mq发送转账消息@Overridepublic void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("accountChange",accountChangeEvent);String jsonString = jsonObject.toJSONString();log.info(jsonString);//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){return ;}//扣减金额accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志accountInfoDao.addTx(accountChangeEvent.getTxNo());if(accountChangeEvent.getAmount() == 3){throw new RuntimeException("人为制造异常");}}
}

2.本地消息表实现

由于有些项目或者公司架构中不使用RocketMQ,无法通过事务消息机制来实现。那么使用 本地消息表+普通MQ中间件 也可以实现可靠消息最终一致性事务。

本地消息表实现方案的核心是新建一个本地消息数据库表,通过本地数据库事务把业务操作和消息数据绑定在一起,然后通过异步(定时任务重试)将消息发送至消息中间件,最终待确认消息发送给消费方被成功消费。

CREATE TABLE transaction_messages (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL UNIQUE,topic VARCHAR(128) NOT NULL,body TEXT NOT NULL,status TINYINT NOT NULL COMMENT '0-待发送 1-已发送 2-发送失败',retry_count INT DEFAULT 0,created_at DATETIME NOT NULL,updated_at DATETIME NOT NULL,INDEX idx_status_retry (status, retry_count)
);

该方案的流程其实就是模拟RocketMQ事务消息的流程,通过本地消息数据的多个状态来实现本地事务与消息发送的原子性。具体如下:

  1. 在本地事务中完成业务操作后,插入一条状态为 待发送的 的消息记录
  2. 异步或定时任务拿到 待发送的消息(能拿到说明本地事务提交成功),处理消息发送,发送完成后更新状态为已发送(这里无论是发送失败还是更新失败都会重试,最终保证成功)
  3. 事务参与方接收到消息并完成消费,保证幂等和最终消费成功

方案瑕疵

我觉得这个方案是跟RocketMQ方案相比,无法保证发送成功后的消息一定能投放给消费者。

因为高并发系统建设中,出于性能考虑大部分场景在使用消息中间件时都是设置异步复制和刷盘,这就意味着如果出现MQ服务宕机的情况,就可能会出现未复制或未落盘的数据丢失的情况。


如果要解决这个问题则需要基于上面的流程增加核对流程,事务参与方消费完成后记录消费记录,定期核对发送和消费记录,对发送未消费的消息进行补偿发送处理。

 

http://www.xdnf.cn/news/15813.html

相关文章:

  • 基础密码协议
  • Xilinx Zynq:一款适用于软件定义无线电的现代片上系统
  • 代理模式及优化
  • 手撕Spring底层系列之:Bean的生命周期
  • C++进阶-红黑树(难度较高)
  • Docker报错:No address associated with hostname
  • 广东省省考备考(第四十九天7.18)——判断推理:位置规律(听课后强化训练)
  • 深度学习×第10卷:她用一块小滤镜,在图像中找到你
  • 基于 WinForm 与虹软实现人脸识别功能:从理论到实践
  • 洛谷 P1395 会议
  • 周志华《机器学习导论》第9章 聚类
  • Linux基本操作
  • Linux内核设计与实现 - 第3章:Linux的进程
  • 使用python读取json数据,简单的处理成元组数组
  • 2026python实战——如何利用海外代理ip爬取海外数据
  • 【机器学习】AdamW可调参数介绍及使用说明
  • Ubuntu查看Docker容器
  • 双向广搜算法详解
  • 数据结构——单调栈
  • 服务管理智能化:R²AIN SUITE 升级带来的两大功能更新哪些值得关注?
  • SQLite / LiteDB 单文件数据库为何“清空表后仍占几 GB”?——原理解析与空间回收实战
  • 告别宕机!Ubuntu自动重启定时任务设置(一键脚本/手动操作)
  • 怎么自己搭建云手机
  • 数据库防止数组字符串序列化
  • 知识管理中的人工智能:概述、主要功能和管理工具
  • #vscode# #SSH远程# #Ubuntu 16.04# 远程ubuntu旧版Linux
  • 【Nginx】nginx+lua+redis实现限流
  • ARCS系统机器视觉实战(直播回放)
  • 医疗人工智能的心电图分析:创新技术与临床应用
  • Java面试宝典:Maven