当前位置: 首页 > java >正文

5.3 分布式事务

目录

1. 分布式事务核心挑战

1.1 CAP理论在事务中的权衡 1.2 典型业务场景分析(跨服务扣库存案例)

2. Seata事务模式详解

2.1 AT模式(自动补偿型) • 工作流程四阶段(TM/RM/TC协作) • Undo Log实现原理与全局锁机制 2.2 TCC模式(柔性事务) • Try/Confirm/Cancel接口设计规范 • 空回滚与幂等性保障方案 2.3 Saga模式(长事务解决方案) • 状态机配置与异常补偿策略

3. 生产环境最佳实践

3.1 高可用部署方案 • TC Server集群配置(DB存储模式) • 事务分组与集群隔离策略 3.2 性能优化技巧 • 批量操作减少全局锁竞争 • 异步提交事务实现

4. 源码级原理剖析

4.1 全局事务ID(XID)传播机制 4.2 分支事务注册流程源码跟踪 4.3 事务恢复补偿机制实现

5. 面试高频问题攻坚

5.1 AT模式如何解决脏写问题? 5.2 TCC模式网络抖动如何处理? 5.3 Seata与MQ事务消息的优劣对比

6. 行业实战案例

6.1 电商订单-库存-积分事务案例 6.2 金融行业跨行转账解决方案


1. 分布式事务核心挑战

1.1 CAP理论在事务中的权衡

分布式事务三难选择
​
```mermaid
graph TDA[一致性] -->|强一致性| B[可用性下降]C[分区容错] -->|必须保证| D[CP或AP选型]

CP模式选择

// Seata的AT模式默认CP(通过全局锁保证)
@GlobalTransactional
public void transfer() {accountService.debit();storageService.deduct(); 
}

AP模式补偿

// Saga模式最终一致性
@SagaStart
public void createOrder() {orderService.create();// 异步调用inventoryService.asyncDeduct(); 
}

1.2 跨服务扣库存案例

典型异常场景
故障点风险现象
库存服务超时订单已创建但库存未扣减超卖
订单服务宕机库存已扣减但订单未生成少卖
事务边界划分
// 错误示范:跨服务事务不统一
public void createOrder() {orderDao.insert(); // 本地事务restTemplate.post("http://inventory/deduct"); // 非事务
}
​
// 正确方案:全局事务控制
@GlobalTransactional // Seata注解
public void createOrder() {orderService.create();inventoryService.deduct();
}

2. Seata事务模式详解

2.1 AT模式(Auto Transaction)

四阶段工作流程
 

TMRMTCDB开启全局事务(XID)注册分支事务执行业务SQL(生成undo_log)提交/回滚异步补偿(根据undo_log)TMRMTCDB

Undo Log关键结构
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL, -- 前后镜像数据PRIMARY KEY (`id`),KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
全局锁机制
// 获取全局锁源码片段
public boolean acquireLock() {// 执行SELECT FOR UPDATEif (tryLockWithDB()) return true;// 锁冲突时重试while (timeoutNotExceeded()) {if (tryLockWithDB()) return true;Thread.sleep(50); // 退避等待}throw new LockConflictException();
}

2.2 TCC模式(Try-Confirm-Cancel)

三阶段接口规范
@LocalTCC
public interface InventoryTccAction {@TwoPhaseBusinessAction(name = "InventoryTccAction",commitMethod = "confirm",rollbackMethod = "cancel")boolean try(BusinessActionContext ctx, @BusinessActionContextParameter("sku") String sku,@BusinessActionContextParameter("count") int count);boolean confirm(BusinessActionContext ctx);boolean cancel(BusinessActionContext ctx);
}
空回滚防御方案
public boolean cancel(BusinessActionContext ctx) {// 检查try阶段是否执行if (!triedRecords.exists(ctx.getXid())) {log.warn("空回滚防御: {}", ctx.getXid());return true;}// 正常回滚逻辑return doCancel(ctx);
}
幂等性保障
@Transactional
public boolean confirm(BusinessActionContext ctx) {// 检查是否已处理if (processedRecords.contains(ctx.getXid())) {return true; // 幂等返回}// 业务处理boolean result = doConfirm(ctx);// 记录处理状态processedRecords.save(ctx.getXid());return result;
}

2.3 Saga模式

状态机配置示例
{"name": "orderSaga","steps": [{"name": "createOrder","compensate": "cancelOrder","retryPolicy": {"maxAttempts": 3,"backoffPeriod": 1000}},{"name": "deductInventory","compensate": "compensateInventory"}]
}
异常补偿策略
public void compensateOrder(String orderId) {// 1. 查询订单状态OrderStatus status = orderDao.getStatus(orderId);// 2. 条件补偿if (status == UNPAID) {orderDao.cancel(orderId);} else {log.warn("订单已支付,不可撤销");}
}

3. 生产环境最佳实践

3.1 高可用部署方案

TC Server集群配置(DB存储模式)
​
```yaml
registry.conf
store {mode = "db" # 必须使用数据库模式db {datasource = "druid"url = "jdbc:mysql://mysql-cluster:3306/seata?useSSL=false"user = "seata"password = "加密密码"min-conn = 5max-conn = 100global.table = "global_table"branch.table = "branch_table"}
}
​
集群节点配置(每个TC Server)
service.vgroup_mapping.default_tx_group = "cluster1"
service.cluster-name = "cluster1"
事务分组与隔离策略
事务分组映射(客户端配置)
spring.cloud.alibaba.seata.tx-service-group=order-service-group
seata.service.order-service-group.grouplist=192.168.1.101:8091,192.168.1.102:8091
​
集群隔离策略
transport.enable-client-batch-send-request=true
transport.thread-factory.boss-thread-prefix=NettyBoss
transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker

3.2 性能优化技巧

批量操作优化
@GlobalTransactional
public void batchProcess(List<Order> orders) {// 1. 批量插入本地事务表orderMapper.batchInsert(orders);// 2. 合并全局锁申请Set<String> lockKeys = orders.stream().map(o -> "order:" + o.getId()).collect(Collectors.toSet());GlobalLockHelper.lockBatch(lockKeys);// 3. 异步提交CompletableFuture.runAsync(() -> {GlobalTransactionContext.reload(RootContext.getXID()).commit();});
}
异步提交实现
public class AsyncTransactionManager {@Async("transactionExecutor")public void asyncCommit(String xid) {try {GlobalTransaction tx = GlobalTransactionContext.reload(xid);tx.commit();} catch (TransactionException e) {log.error("异步提交失败", e);}}
}

4. 源码级原理剖析

4.1 XID传播机制

全链路透传实现
// RootContext核心逻辑
public class RootContext {private static ThreadLocal<String> CONTEXT_HOLDER = new InheritableThreadLocal<>();public static void bind(String xid) {CONTEXT_HOLDER.set(xid);// 跨线程传递(线程池场景)if (TransactionContextParser.hasContext()) {TransactionContextParser.setContext(xid);}}
}
​
// Feign拦截器自动传播
public class SeataFeignInterceptor implements RequestInterceptor {@Overridepublic void apply(RequestTemplate template) {String xid = RootContext.getXID();if (StringUtils.isNotBlank(xid)) {template.header("TX_XID", xid);}}
}

4.2 分支事务注册流程

核心源码路径
// DefaultGlobalManager.registerBranch
public Long registerBranch(...) {// 1. 生成branchIdlong branchId = UUIDGenerator.generateUUID();// 2. 写入branch_tablebranchSessionLock.lock();try {BranchSession branchSession = new BranchSession(...);branchSession.setStatus(BranchStatus.Registered);branchSessionManager.addBranch(branchSession);} finally {branchSessionLock.unlock();}// 3. 返回branchIdreturn branchId;
}
分支注册时序图
 

RMTCDBBranchRegisterRequest获取全局锁插入branch_table返回branchId抛出LockConflictExceptionalt[获取成功][获取失败]RMTCDB

4.3 事务恢复机制

补偿任务调度
// DefaultCoordinator.start
public void start() {// 每5秒扫描异常事务scheduledExecutor.scheduleAtFixedRate(() -> {List<GlobalSession> sessions = SessionHolder.findSessions(SessionStatus.Begin);sessions.forEach(session -> {if (session.isTimeout()) {// 触发异步补偿asyncCommit(session.getXid());}});}, 0, 5, TimeUnit.SECONDS);
}
补偿执行逻辑
public void doGlobalCommit(GlobalSession session) {try {// 1. 重试提交if (session.canBeCommitted()) {session.commit();} // 2. 超过最大重试次数转人工else if (session.getRetriedCount() > MAX_RETRY) {alertService.notifyAdmin(session);}} catch (Exception ex) {log.error("补偿执行失败", ex);session.incrRetriedCount();}
}

5. 面试高频问题攻坚

5.1 AT模式如何解决脏写问题?

解决方案三步走
​
```mermaid
graph TDA[获取本地锁] --> B[获取全局锁]B --> C[执行业务操作]C --> D[释放锁]
核心代码实现
// AbstractConnectionProxy.checkLock
protected boolean checkLock(String tableName, String pk) {// 1. 检查全局锁表if (SELECT * FROM lock_table WHERE table_name=? AND pk=? FOR UPDATE) {throw new LockConflictException();}// 2. 本地事务提交前注册全局锁INSERT INTO lock_table VALUES (tableName, pk, xid);
}
面试回答要点
  1. 双重锁机制:本地事务锁+全局锁双重保障

  2. 锁冲突检测:先检查全局锁表再操作数据

  3. 自动回滚:获取锁失败时立即回滚本地事务

5.2 TCC模式网络抖动处理

重试机制设计
@Slf4j
public class TccRetryPolicy {private static final int MAX_RETRY = 3;public boolean executeWithRetry(TccAction action) {int retryCount = 0;while (retryCount < MAX_RETRY) {try {return action.execute();} catch (NetworkException e) {log.warn("网络异常重试: {}", retryCount);Thread.sleep(1000 * (retryCount + 1));retryCount++;}}throw new TccFailException("超过最大重试次数");}
}
事务恢复方案
异常阶段恢复策略实现方式
Try定时重试Try操作状态检查+幂等控制
Confirm最终一致性重试消息队列+死信队列
Cancel本地事务记录+人工干预补偿任务平台

5.3 Seata vs MQ事务消息

对比矩阵
维度SeataRocketMQ事务消息
一致性强一致性(AT)/最终一致(TCC)最终一致性
性能全局锁有性能损耗更高吞吐量
侵入性需要改造数据源需要实现回调接口
场景跨库事务业务解耦场景
选型决策树
 

需要强一致性?

Seata AT模式

需要业务解耦?

RocketMQ事务消息

Seata TCC模式


6. 行业实战案例

6.1 电商订单-库存-积分案例

AT模式实现
@GlobalTransactional
public void createOrder(OrderDTO order) {// 1. 扣减库存inventoryService.deduct(order.getSku(), order.getCount());// 2. 创建订单orderService.create(order);// 3. 增加积分pointsService.add(order.getUserId(), order.getAmount());
}
异常处理方案
@Transactional(rollbackFor = Exception.class)
public void deduct(String sku, int count) {// 检查库存int remain = inventoryDao.query(sku);if (remain < count) {throw new InventoryException("库存不足");}// 生成undo_logInventory old = inventoryDao.selectForUpdate(sku);inventoryDao.update(sku, remain - count);undoLogDao.insert(new UndoLog(old));
}

6.2 金融跨行转账案例

TCC模式实现
@LocalTCC
public interface TransferTccAction {@TwoPhaseBusinessAction(name = "prepareTransfer")boolean try(BusinessActionContext ctx, @BusinessActionContextParameter("from") String fromAccount,@BusinessActionContextParameter("to") String toAccount,@BusinessActionContextParameter("amount") BigDecimal amount);boolean confirm(BusinessActionContext ctx);boolean cancel(BusinessActionContext ctx);
}
资金冻结方案
-- Try阶段操作
UPDATE accounts 
SET balance = balance - 100,frozen = frozen + 100 
WHERE account_no = 'A';
​
-- Confirm阶段
UPDATE accounts 
SET frozen = frozen - 100 
WHERE account_no = 'A';
​
-- Cancel阶段
UPDATE accounts 
SET balance = balance + 100,frozen = frozen - 100 
WHERE account_no = 'A';
对账补偿机制
public class TransferCompensator {@Scheduled(cron = "0 0 3 * * ?")public void dailyCheck() {List<Transaction> unfinished = txnDao.queryUnfinished();unfinished.forEach(txn -> {if (txn.isTimeout(24)) {alertService.notify(txn);}});}
}
http://www.xdnf.cn/news/1399.html

相关文章:

  • git lfs下载大文件限额
  • 查询Hologres或postgresql中的数据
  • php基础
  • 算法训练营第一天|704.二分查找、27.移除元素、977.有序数组的平方
  • 集结号海螺捕鱼组件搭建教程与源码结构详解(第四篇)
  • crictl 拉取镜像报错 Unimplemented desc = unknown service runtime.v1.ImageService
  • redis 使用 Docker 部署 简单的Redis 集群(包括哨兵机制)
  • 修电脑之电脑没有声音
  • 武装Burp Suite工具:xia SQL自动化测试_插件
  • date-picker组件的shortcuts为什么不能配置在vue的data的return中
  • 小红书文字配图平替工具
  • Vue3-原始值的响应式方案ref
  • 实时数仓体系概览与架构演进
  • python实战项目64:selenium采集软科中国大学排名数据
  • Django DRF实现用户数据权限控制
  • 服务器数据恢复—双循环RAID5数据恢复揭秘
  • 2025.04.23华为机考第二题-200分
  • 第七节:进阶特性高频题-Vue3的ref与reactive选择策略
  • 数据结构初阶:二叉树(四)
  • CSS3 基础(边框效果)
  • 从 Vue 到 React:React.memo + useCallback 组合技
  • PCB规则
  • 【android bluetooth 协议分析 11】【AVDTP详解 2】【avdtp 初始化阶段主要回调关系梳理】
  • 基于FPGA 和DSP 的高性能6U VPX 采集处理板
  • 深入解析C++ STL Queue:先进先出的数据结构
  • Android Gradle Plugin (AGP) 和 Gradle 的關係
  • 【Qwen2.5-VL 踩坑记录】本地 + 海外账号和国内账号的 API 调用区别(阿里云百炼平台)
  • 学习记录:DAY16
  • 2.RabbitMQ - 入门
  • 从入门到精通:CMakeLists.txt 完全指南