当前位置: 首页 > news >正文

(3)Seata AT 模式的事务一致性保证机制

您说得非常对!方案一(Seata AT模式)我只展示了正向逻辑,确实需要补充完整的反向逻辑和一致性保证机制。让我详细解释并补充完整代码。

Seata AT 模式的事务一致性保证机制

核心原理:二阶段提交 + 回滚日志

  1. 一阶段:执行业务SQL,生成回滚日志(undo_log)
  2. 二阶段提交:删除回滚日志
  3. 二阶段回滚:用回滚日志进行数据恢复

完整的事务流程(包含反向逻辑)

graph TDA[全局事务开始] --> B[一阶段: 执行业务SQL]B --> C[生成undo_log回滚日志]C --> D{所有分支成功?}D -- 是 --> E[二阶段提交]E --> F[删除undo_log]F --> G[事务成功]D -- 否 --> H[二阶段回滚]H --> I[使用undo_log恢复数据]I --> J[删除undo_log]J --> K[事务回滚]

完整的代码实现(包含异常处理)

1. 全局事务入口(OrderService)

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate AccountFeignClient accountFeignClient;@Autowiredprivate StorageFeignClient storageFeignClient;@Override@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class, timeoutMills = 300000)public void createOrder(OrderDTO orderDTO) {try {log.info("=== 开始创建订单全局事务 ===");// 1. 创建订单(状态为创建中)Order order = createOrderRecord(orderDTO);// 2. 扣减库存(可能会抛出异常)decreaseStorage(orderDTO.getProductId(), orderDTO.getCount());// 3. 扣减余额(可能会抛出异常)decreaseAccount(orderDTO.getUserId(), orderDTO.getMoney());// 4. 更新订单状态为成功updateOrderStatus(order.getId(), 1); // 1-成功log.info("=== 订单创建成功 ===");} catch (Exception e) {log.error("=== 订单创建失败,全局事务回滚 ===", e);// 这里不需要手动回滚,@GlobalTransactional 会自动触发回滚// Seata 会通过undo_log自动回滚已经执行的SQLthrow new RuntimeException("订单创建失败: " + e.getMessage(), e);}}private Order createOrderRecord(OrderDTO orderDTO) {Order order = new Order();order.setUserId(orderDTO.getUserId());order.setProductId(orderDTO.getProductId());order.setCount(orderDTO.getCount());order.setMoney(orderDTO.getMoney());order.setStatus(0); // 0-创建中orderMapper.insert(order);log.info("创建订单记录: {}", order);return order;}private void decreaseStorage(Long productId, Integer count) {try {ResponseEntity<String> result = storageFeignClient.decrease(productId, count);if (!result.getStatusCode().is2xxSuccessful()) {throw new RuntimeException("库存服务调用失败: " + result.getBody());}log.info("库存扣减成功: productId={}, count={}", productId, count);} catch (Exception e) {log.error("库存扣减失败", e);throw new RuntimeException("库存不足或服务异常", e);}}private void decreaseAccount(Long userId, BigDecimal money) {try {ResponseEntity<String> result = accountFeignClient.decrease(userId, money);if (!result.getStatusCode().is2xxSuccessful()) {throw new RuntimeException("账户服务调用失败: " + result.getBody());}log.info("账户扣减成功: userId={}, money={}", userId, money);} catch (Exception e) {log.error("账户扣减失败", e);throw new RuntimeException("余额不足或服务异常", e);}}private void updateOrderStatus(Long orderId, Integer status) {Order order = new Order();order.setId(orderId);order.setStatus(status);orderMapper.updateById(order);}
}

2. 库存服务异常处理增强

@Service
@Slf4j
public class StorageServiceImpl implements StorageService {@Autowiredprivate StorageMapper storageMapper;@Override@Transactionalpublic void decrease(Long productId, Integer count) {try {Storage storage = storageMapper.selectByProductId(productId);if (storage == null) {throw new RuntimeException("商品不存在: " + productId);}if (storage.getResidue() < count) {// 这里抛出异常会触发全局回滚throw new RuntimeException("库存不足: 当前库存=" + storage.getResidue() + ", 需要数量=" + count);}// 执行业务SQL - Seata代理会拦截并生成undo_logstorage.setResidue(storage.getResidue() - count);storage.setUsed(storage.getUsed() + count);storageMapper.updateById(storage);log.info("库存扣减完成: productId={}, 扣减数量={}, 剩余库存={}", productId, count, storage.getResidue());} catch (Exception e) {log.error("库存扣减业务异常", e);throw e; // 异常继续抛出,触发全局回滚}}/*** 查询库存(用于前端校验)*/public Storage getStorage(Long productId) {return storageMapper.selectByProductId(productId);}
}

3. 账户服务异常处理增强

@Service
@Slf4j
public class AccountServiceImpl implements AccountService {@Autowiredprivate AccountMapper accountMapper;@Override@Transactionalpublic void decrease(Long userId, BigDecimal money) {try {Account account = accountMapper.selectByUserId(userId);if (account == null) {throw new RuntimeException("账户不存在: " + userId);}if (account.getBalance().compareTo(money) < 0) {// 余额不足,抛出异常触发回滚throw new RuntimeException("余额不足: 当前余额=" + account.getBalance() + ", 需要金额=" + money);}// 执行业务SQL - Seata代理会拦截并生成undo_logaccount.setBalance(account.getBalance().subtract(money));accountMapper.updateById(account);log.info("账户扣减完成: userId={}, 扣减金额={}, 剩余余额={}", userId, money, account.getBalance());} catch (Exception e) {log.error("账户扣减业务异常", e);throw e; // 异常继续抛出,触发全局回滚}}
}

4. 异常场景测试用例

@SpringBootTest
@Slf4j
public class DistributedTransactionTest {@Autowiredprivate OrderService orderService;@Testpublic void testNormalScenario() {// 正常场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));orderService.createOrder(orderDTO);// 应该成功,所有数据一致}@Testpublic void testInsufficientInventory() {// 库存不足场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 1000, new BigDecimal("100.00")); // 数量太大try {orderService.createOrder(orderDTO);Assert.fail("应该抛出异常");} catch (Exception e) {log.info("预期中的异常: {}", e.getMessage());// 应该所有数据都回滚:订单不存在或状态为失败,库存和余额不变}}@Testpublic void testInsufficientBalance() {// 余额不足场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("1000000.00")); // 金额太大try {orderService.createOrder(orderDTO);Assert.fail("应该抛出异常");} catch (Exception e) {log.info("预期中的异常: {}", e.getMessage());// 应该所有数据都回滚}}@Testpublic void testServiceTimeout() {// 服务超时场景OrderDTO orderDTO = new OrderDTO(1L, 1L, 2, new BigDecimal("100.00"));// 模拟账户服务超时(需要在AccountService中模拟sleep)try {orderService.createOrder(orderDTO);Assert.fail("应该超时异常");} catch (Exception e) {log.info("超时异常: {}", e.getMessage());// Seata会检测超时并触发回滚}}
}

5. 事务状态查询和补偿接口

@RestController
@RequestMapping("/transaction")
@Slf4j
public class TransactionMonitorController {@Autowiredprivate DataSource dataSource;/*** 查询全局事务状态*/@GetMapping("/status/{xid}")public String getTransactionStatus(@PathVariable String xid) {try (Connection conn = dataSource.getConnection();PreparedStatement ps = conn.prepareStatement("SELECT * FROM undo_log WHERE xid = ? ORDER BY log_created DESC")) {ps.setString(1, xid);ResultSet rs = ps.executeQuery();if (rs.next()) {return "事务存在,状态: " + rs.getInt("log_status");} else {return "事务已完成或不存在";}} catch (Exception e) {return "查询失败: " + e.getMessage();}}/*** 手动补偿接口(用于异常情况)*/@PostMapping("/compensate/{orderId}")public String manualCompensate(@PathVariable Long orderId) {log.warn("手动触发事务补偿: orderId={}", orderId);// 这里可以实现手动补偿逻辑// 1. 检查订单状态// 2. 如果处于中间状态,进行补偿操作// 3. 发送通知等return "补偿操作已触发";}
}

一致性保证的关键点

  1. undo_log 机制:Seata 在执行业务SQL时自动生成回滚日志
  2. 全局锁:防止脏写,确保数据隔离性
  3. 超时控制@GlobalTransactional(timeoutMills = 300000) 设置事务超时
  4. 异常传播:任何服务的异常都会触发全局回滚
  5. 幂等性设计:服务需要支持重试和幂等调用

监控和运维建议

  1. 监控undo_log表:定期清理已完成的事务日志
  2. 设置事务超时:避免长时间占用资源
  3. 日志记录:详细记录事务执行过程
  4. 告警机制:对失败的事务进行告警
  5. 人工补偿接口:提供手动干预的能力

这样完整的实现确保了分布式事务的一致性,无论是正向流程还是异常情况都能正确处理。

http://www.xdnf.cn/news/1472527.html

相关文章:

  • MySQL慢查询优化策略
  • 洛谷 P2392 kkksc03考前临时抱佛脚-普及-
  • 【C++题解】贪心和模拟
  • Linux设备down机,如何识别是 断电还是软件复位
  • Java笔记20240726
  • 【Day 22】94.二叉树的中序遍历 104.二叉树的最大深度 226.翻转二叉树 101.对称二叉树
  • linux上nexus安装教程
  • 从“下山”到AI引擎:全面理解梯度下降(下)
  • 学习心得分享
  • 【OJ】C++ vector类OJ题
  • 使用国内镜像源解决 Electron 安装卡在 postinstall 的问题
  • 【Python - 类库 - BeautifulSoup】(01)“BeautifulSoup“使用示例
  • ESP-idf注册双服务器配置
  • SemiSAM+:在基础模型时代重新思考半监督医学图像分割|文献速递-深度学习人工智能医疗图像
  • 笔记:现代操作系统:原理与实现(2)
  • CLIP学习
  • 【C++】Vector完全指南:动态数组高效使用
  • Transformer核心—自注意力机制
  • 大批项目经理被迫上前线,酸爽
  • 图片在vue2中引用的方式和优缺点
  • 【数字孪生核心技术】什么是倾斜摄影?
  • 遇到 Git 提示大文件无法上传确实让人头疼
  • SVT-AV1编码器中实现WPP依赖管理核心调度
  • 门控MLP(Qwen3MLP)与稀疏混合专家(Qwen3MoeSparseMoeBlock)模块解析
  • 【开题答辩全过程】以 基于JSP的宠物医院管理系统设计为例,包含答辩的问题和答案
  • LTV-1008-TP1-G 电子元器件 LiteOn光宝 发光二极管 核心解析
  • 字符串(2)
  • 一文读懂 RAG 与 KAG:原理、工程落地与开源实战
  • scrypt 密钥派生算法(RFC7914)技术解析及源码示例
  • 流固耦合|08-1外部数据导入