当前位置: 首页 > backend >正文

分布式事务

分布式事务 | 使用 DTM 的 Saga 模式详解

目录

分布式事务 | 使用 DTM 的 Saga 模式详解

1. 背景与概述

2. Saga 模式的核心原理

2.1 Saga 模式的定义

2.2 Saga 模式的工作流程

2.3 Saga 模式的关键特性

3. 案例一:银行转账的 Saga 实现

3.1 业务场景

3.2 Saga 模式的分解

3.3 代码实现(Go 语言)

3.4 子事务的实现

3.5 失败回滚的流程

4. 案例二:电商订单处理的 Saga 实现

4.1 业务场景

4.2 Saga 模式的分解

4.3 代码实现(Go 语言)

4.4 子事务的实现

4.5 失败回滚的流程

5. DTM Saga 模式的高级特性

5.1 并发执行子事务

5.2 失败重试与指数退避

5.3 自定义事务状态码

5.4 子事务屏障(Barrier)

6. 部署与调试

6.1 安装 DTM 服务

6.2 部署数据库

6.3 调试工具

7. Saga 模式的优缺点分析

7.1 优点

7.2 缺点

8. 最佳实践建议

9. 总结

1. 背景与概述

在现代微服务架构中,分布式事务是一个不可避免的挑战。当多个独立的服务需要协同完成一个业务逻辑时,如何保证这些服务的事务一致性成为核心问题。DTM(Distributed Transaction Manager) 是一个开源的分布式事务管理框架,支持多种事务模式,其中 Saga 模式 是其核心特性之一。

Saga 模式的核心思想是将一个长事务拆分为多个短事务,每个短事务由独立的微服务执行。如果所有短事务成功,则全局事务提交;如果某一步骤失败,则按照相反顺序调用补偿操作回滚已执行的步骤。这种模式通过牺牲隔离性来提升性能,适用于对一致性要求不极端严格的场景。

本文将通过 转账案例电商订单处理案例 详细讲解如何使用 DTM 的 Saga 模式,并提供完整的代码实现。

2. Saga 模式的核心原理
2.1 Saga 模式的定义

Saga 模式是一种长事务的分解策略。它将一个全局事务拆分为多个本地事务(称为 子事务),每个子事务对应一个业务操作(如转账、扣库存、支付等)。每个子事务需要定义 正向操作(执行业务逻辑)和 补偿操作(回滚业务逻辑)。

2.2 Saga 模式的工作流程
  1. 正向执行:DTM 按顺序执行所有子事务的正向操作。
  2. 成功提交:如果所有子事务成功,则全局事务提交。
  3. 失败回滚:如果某个子事务失败,DTM 会按相反顺序调用已执行子事务的补偿操作,回滚整个事务。
2.3 Saga 模式的关键特性
  • 最终一致性:通过补偿操作保证数据的最终一致性。
  • 高可用性:子事务独立提交,避免长时间锁定资源。
  • 灵活性:支持并发执行子事务(默认顺序执行)。
  • 容错性:失败重试、指数退避算法等机制确保事务可靠性。
3. 案例一:银行转账的 Saga 实现
3.1 业务场景

假设需要将用户 A 的账户金额转出,并存入用户 B 的账户。涉及的微服务包括:

  • 转出服务:扣除用户 A 的余额。
  • 转入服务:增加用户 B 的余额。
  • 日志服务:记录转账记录(可选)。
3.2 Saga 模式的分解

将转账操作分解为以下子事务:

  1. 转出操作(TransOut):扣除用户 A 的余额。
  2. 转入操作(TransIn):增加用户 B 的余额。
  3. 补偿操作
    • TransOutCompensate:回滚转出操作(增加用户 A 的余额)。
    • TransInCompensate:回滚转入操作(扣除用户 B 的余额)。
3.3 代码实现(Go 语言)

以下是使用 DTM 的 Go SDK 实现的代码示例:

package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""log"
)// 定义转账请求结构体
type TransferRequest struct {FromUserID  int     `json:"from_user_id"`ToUserID    int     `json:"to_user_id"`Amount      float64 `json:"amount"`
}func main() {// 初始化 Gin 服务app := gin.Default()app.GET("/transfer", func(c *gin.Context) {// 创建并提交 Saga 事务gid := executeTransfer()log.Printf("Transfer completed with GID: %s", gid)c.JSON(200, fmt.Sprintf("Transfer completed with GID: %s", gid))})app.Run(":8080")
}// 执行转账逻辑
func executeTransfer() string {// 定义转账请求参数req := &TransferRequest{FromUserID: 1,ToUserID:   2,Amount:     100.0,}// 创建 Saga 事务saga := dtmcli.NewSaga("http://localhost:36789/api/dtmsvr", shortuuid.New())// 添加子事务saga.Add("http://trans-out-service/transOut",               // 正向操作 URL"http://trans-out-service/transOutCompensate",      // 补偿操作 URLreq,).Add("http://trans-in-service/transIn",                  // 正向操作 URL"http://trans-in-service/transInCompensate",        // 补偿操作 URLreq,)// 设置重试间隔(单位:秒)saga.RetryInterval = 1// 提交 Saga 事务if err := saga.Submit(); err != nil {panic(err)}return saga.Gid
}
3.4 子事务的实现
  • 转出服务(TransOut):

    func transOut(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 执行扣除用户 A 余额的逻辑// ...c.JSON(200, "SUCCESS")
    }func transOutCompensate(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 回滚扣除用户 A 余额的逻辑// ...c.JSON(200, "SUCCESS")
    }
  • 转入服务(TransIn):

    func transIn(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 执行增加用户 B 余额的逻辑// ...c.JSON(200, "SUCCESS")
    }func transInCompensate(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 回滚增加用户 B 余额的逻辑// ...c.JSON(200, "SUCCESS")
    }
3.5 失败回滚的流程

假设在 转出操作 成功后,转入操作 因账户冻结失败。DTM 会按以下步骤回滚:

  1. 调用 transInCompensate:减少用户 B 的余额。
  2. 调用 transOutCompensate:增加用户 A 的余额。

最终,用户 A 的余额恢复,用户 B 的余额回滚,事务失败。

4. 案例二:电商订单处理的 Saga 实现
4.1 业务场景

在电商系统中,下单流程通常包括以下步骤:

  1. 扣减库存:减少商品库存。
  2. 支付订单:用户支付订单金额。
  3. 分配物流:生成物流单号。
  4. 更新订单状态:标记订单为“已支付”。
4.2 Saga 模式的分解

将上述流程分解为以下子事务:

  1. 扣减库存(DecreaseStock):减少商品库存。
  2. 支付订单(Charge):用户支付订单金额。
  3. 分配物流(AllocateWaybill):生成物流单号。
  4. 补偿操作
    • DecreaseStockCompensate:回滚库存扣减。
    • ChargeCompensate:回滚支付。
    • AllocateWaybillCompensate:回滚物流分配。
4.3 代码实现(Go 语言)

以下是使用 DTM 的 Go SDK 实现的代码示例:

package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""log"
)// 定义订单请求结构体
type OrderRequest struct {OrderID     string  `json:"order_id"`ProductID   int     `json:"product_id"`Quantity    int     `json:"quantity"`Amount      float64 `json:"amount"`
}func main() {// 初始化 Gin 服务app := gin.Default()app.GET("/create-order", func(c *gin.Context) {// 创建并提交 Saga 事务gid := createOrder()log.Printf("Order created with GID: %s", gid)c.JSON(200, fmt.Sprintf("Order created with GID: %s", gid))})app.Run(":8080")
}// 创建订单逻辑
func createOrder() string {// 定义订单请求参数req := &OrderRequest{OrderID:    "O123456",ProductID:  1001,Quantity:   2,Amount:     99.99,}// 创建 Saga 事务saga := dtmcli.NewSaga("http://localhost:36789/api/dtmsvr", shortuuid.New())// 添加子事务saga.Add("http://inventory-service/decreaseStock",           // 正向操作 URL"http://inventory-service/decreaseStockCompensate", // 补偿操作 URLreq,).Add("http://payment-service/charge",                    // 正向操作 URL"http://payment-service/chargeCompensate",          // 补偿操作 URLreq,).Add("http://logistics-service/allocateWaybill",         // 正向操作 URL"http://logistics-service/allocateWaybillCompensate", // 补偿操作 URLreq,)// 设置重试间隔(单位:秒)saga.RetryInterval = 1// 提交 Saga 事务if err := saga.Submit(); err != nil {panic(err)}return saga.Gid
}
4.4 子事务的实现
  • 库存服务(DecreaseStock):

    func decreaseStock(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 扣减库存逻辑// ...c.JSON(200, "SUCCESS")
    }func decreaseStockCompensate(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 回滚库存扣减逻辑// ...c.JSON(200, "SUCCESS")
    }
  • 支付服务(Charge):

    func charge(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 支付逻辑// ...c.JSON(200, "SUCCESS")
    }func chargeCompensate(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 回滚支付逻辑// ...c.JSON(200, "SUCCESS")
    }
  • 物流服务(AllocateWaybill):

    func allocateWaybill(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 生成物流单号逻辑// ...c.JSON(200, "SUCCESS")
    }func allocateWaybillCompensate(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 回滚物流单号逻辑// ...c.JSON(200, "SUCCESS")
    }
4.5 失败回滚的流程

假设在 支付订单 成功后,分配物流 因系统故障失败。DTM 会按以下步骤回滚:

  1. 调用 allocateWaybillCompensate:删除物流单号。
  2. 调用 chargeCompensate:退款给用户。
  3. 调用 decreaseStockCompensate:恢复库存。

最终,订单状态回滚到初始状态,用户未支付,库存未扣减。

5. DTM Saga 模式的高级特性
5.1 并发执行子事务

默认情况下,DTM 按顺序执行子事务。如果需要提高性能,可以启用并发执行:

saga.EnableConcurrent()

这允许子事务并行执行,但需确保子事务之间无依赖关系。

5.2 失败重试与指数退避

DTM 默认使用指数退避算法重试失败的子事务。可以通过以下方式配置:

saga.RetryInterval = 1  // 初始重试间隔(秒)
saga.MaxRetryCount = 3  // 最大重试次数
5.3 自定义事务状态码

DTM 通过 HTTP 状态码判断子事务结果:

  • 200 OK:SUCCESS(成功)
  • 409 Conflict:FAILURE(业务失败,需回滚)
  • 202 Accepted:ONGOING(进行中,需重试)

例如,如果库存不足,子事务应返回 409

func decreaseStock(c *gin.Context) {// 业务逻辑if inventory < required {c.AbortWithStatus(409) // 返回 409 表示业务失败}c.JSON(200, "SUCCESS")
}
5.4 子事务屏障(Barrier)

子事务屏障是 DTM 保证事务原子性的核心机制。每个子事务需通过屏障表(dtm_barrier.barrier)记录执行状态。以下是屏障表的 SQL 定义:

CREATE TABLE IF NOT EXISTS dtm_barrier.barrier (id BIGINT PRIMARY KEY AUTO_INCREMENT,trans_type VARCHAR(45) DEFAULT '',gid VARCHAR(128) DEFAULT '',branch_id VARCHAR(128) DEFAULT '',op VARCHAR(45) DEFAULT '',barrier_id VARCHAR(45) DEFAULT '',reason VARCHAR(45) DEFAULT '',create_time DATETIME DEFAULT NOW(),update_time DATETIME DEFAULT NOW(),UNIQUE KEY (gid, branch_id, op, barrier_id)
);

子事务需在屏障表中插入记录,确保事务的幂等性和一致性。

6. 部署与调试
6.1 安装 DTM 服务

通过 Docker 快速部署 DTM 服务:

docker run -itd \--name dtm \-p 36789:36789 \-p 36790:36790 \yedf/dtm:latest

启动后,访问 http://localhost:36789 查看 DTM 的 Web UI。

6.2 部署数据库

DTM 需要 MySQL 数据库支持,部署命令如下:

docker run -d \--name mysql \-e MYSQL_ROOT_PASSWORD=sa123456 \-e MYSQL_USER=sa \-e MYSQL_PASSWORD=sa123456 \-e MYSQL_DATABASE=test \-p 3306:3306 \mysql:latest
6.3 调试工具
  • Web UI:通过 http://localhost:36789 查看事务状态。
  • 日志分析:检查子事务服务的日志,定位失败原因。
  • 重试机制:观察 DTM 是否自动重试失败的子事务。
7. Saga 模式的优缺点分析
7.1 优点
  1. 高性能:通过短事务提交避免长时间锁定资源。
  2. 适用遗留系统:对业务代码侵入性低,适合改造现有系统。
  3. 易于实现:无需复杂的 Confirm/Cancel 接口(如 TCC 模式)。
7.2 缺点
  1. 不保证隔离性:子事务可能读取到中间状态。
  2. 补偿逻辑复杂:需设计可靠的补偿操作。
  3. 最终一致性:可能需要等待一段时间才能保证数据一致性。
8. 最佳实践建议
  1. 设计幂等的补偿操作:确保补偿操作可重复执行。
  2. 监控事务状态:通过 DTM Web UI 或日志实时监控事务进度。
  3. 测试失败场景:模拟子事务失败,验证补偿逻辑的正确性。
  4. 合理设置重试策略:根据业务需求调整重试间隔和次数。
  5. 记录事务日志:通过数据库记录事务状态,便于排查问题。
9. 总结

DTM 的 Saga 模式通过拆分长事务为短事务,解决了分布式系统中的事务一致性问题。通过两个实际案例(银行转账和电商订单处理),我们展示了如何利用 DTM 的 Go SDK 快速实现 Saga 模式。此外,DTM 提供了丰富的高级特性(如并发执行、失败重试、子事务屏障),进一步提升了事务管理的灵活性和可靠性。

在实际开发中,开发者需权衡 Saga 模式的优缺点,并结合业务需求选择合适的事务模式。通过合理设计补偿逻辑和监控机制,可以确保系统的稳定性和数据的一致性。

http://www.xdnf.cn/news/7270.html

相关文章:

  • SID 2025上的天马,用“好屏”技术重构产业叙事
  • 【NLP】36. 从指令微调到人类偏好:构建更有用的大语言模型
  • [Spring Boot]整合Java Mail实现Outlook发送邮件
  • 《AI高效运维体系建设创新》技术连载(四)
  • 数据库连接问题排查全攻略:从服务状态到网络配置的深度解析
  • PCL点云库点云数据处理入门系列教材目录(2025年5月更新....)
  • HttpMessageConverter 的作用是什么? 它是如何实现请求体到对象、对象到响应体的自动转换的(特别是 JSON/XML)?
  • Qwen3 - 0.6B与Bert文本分类实验:深度见解与性能剖析
  • 遨游科普:三防平板是什么?应用在什么场景?
  • Perl数据库测试实战:从基础到高级的完整解决方案
  • 视觉-和-语言导航的综述:任务、方法和未来方向
  • Python编程从入门到实践 PDF 高清版
  • 【深度学习基础】损失函数与优化算法详解:从理论到实践
  • A3B和AWQ 是什么;Safetensors 是什么?
  • 解决 Linux Bash 脚本因换行符问题导致的 “bash^M: No such file or directory“ 错误
  • 在CentOS系统上部署GitLabRunner并配置CICD自动项目集成!
  • ubuntu下配置vscode生成c_cpp_properties.json
  • 【大数据】MapReduce 编程-- PageRank--网页排名算法,用于衡量网页“重要性”-排序网页
  • 展锐Android14及更新版本split_build编译方法
  • 百度OCR:证件识别
  • Python将Excel单元格某一范围生成—截图(进阶版—带样式+批量+多级表头)
  • 《黑马前端ajax+node.js+webpack+git教程》(笔记)——ajax教程(axios教程)
  • 确保高质量的音视频通话,如何最大化利用视频带宽
  • win10 上删除文件夹失败的一个原因:sqlYog 备份/导出关联了该文件夹
  • 【QT】一个界面中嵌入其它界面(二)
  • 星云智控v1.0.0产品发布会圆满举行:以创新技术重构物联网监控新生态
  • 线程池模式与C#中用法
  • 解决服务器重装之后vscode Remote-SSH无法连接的问题
  • Vue百日学习计划Day33-35天详细计划-Gemini版
  • 基于tar包安装,创建两个tomcat实例