分布式事务
分布式事务 | 使用 DTM 的 Saga 模式详解
目录
分布式事务 | 使用 DTM 的 Saga 模式详解
1. 背景与概述
2. Saga 模式的核心原理
2.1 Saga 模式的定义
2.2 Saga 模式的工作流程
2.3 Saga 模式的关键特性
3. 案例一:银行转账的 Saga 实现
3.1 业务场景
3.2 Saga 模式的分解
3.3 代码实现(Go 语言)
3.4 子事务的实现
3.5 失败回滚的流程
4. 案例二:电商订单处理的 Saga 实现
4.1 业务场景
4.2 Saga 模式的分解
4.3 代码实现(Go 语言)
4.4 子事务的实现
4.5 失败回滚的流程
5. DTM Saga 模式的高级特性
5.1 并发执行子事务
5.2 失败重试与指数退避
5.3 自定义事务状态码
5.4 子事务屏障(Barrier)
6. 部署与调试
6.1 安装 DTM 服务
6.2 部署数据库
6.3 调试工具
7. Saga 模式的优缺点分析
7.1 优点
7.2 缺点
8. 最佳实践建议
9. 总结
1. 背景与概述
在现代微服务架构中,分布式事务是一个不可避免的挑战。当多个独立的服务需要协同完成一个业务逻辑时,如何保证这些服务的事务一致性成为核心问题。DTM(Distributed Transaction Manager) 是一个开源的分布式事务管理框架,支持多种事务模式,其中 Saga 模式 是其核心特性之一。
Saga 模式的核心思想是将一个长事务拆分为多个短事务,每个短事务由独立的微服务执行。如果所有短事务成功,则全局事务提交;如果某一步骤失败,则按照相反顺序调用补偿操作回滚已执行的步骤。这种模式通过牺牲隔离性来提升性能,适用于对一致性要求不极端严格的场景。
本文将通过 转账案例 和 电商订单处理案例 详细讲解如何使用 DTM 的 Saga 模式,并提供完整的代码实现。
2. Saga 模式的核心原理
2.1 Saga 模式的定义
Saga 模式是一种长事务的分解策略。它将一个全局事务拆分为多个本地事务(称为 子事务),每个子事务对应一个业务操作(如转账、扣库存、支付等)。每个子事务需要定义 正向操作(执行业务逻辑)和 补偿操作(回滚业务逻辑)。
2.2 Saga 模式的工作流程
- 正向执行:DTM 按顺序执行所有子事务的正向操作。
- 成功提交:如果所有子事务成功,则全局事务提交。
- 失败回滚:如果某个子事务失败,DTM 会按相反顺序调用已执行子事务的补偿操作,回滚整个事务。
2.3 Saga 模式的关键特性
- 最终一致性:通过补偿操作保证数据的最终一致性。
- 高可用性:子事务独立提交,避免长时间锁定资源。
- 灵活性:支持并发执行子事务(默认顺序执行)。
- 容错性:失败重试、指数退避算法等机制确保事务可靠性。
3. 案例一:银行转账的 Saga 实现
3.1 业务场景
假设需要将用户 A 的账户金额转出,并存入用户 B 的账户。涉及的微服务包括:
- 转出服务:扣除用户 A 的余额。
- 转入服务:增加用户 B 的余额。
- 日志服务:记录转账记录(可选)。
3.2 Saga 模式的分解
将转账操作分解为以下子事务:
- 转出操作(TransOut):扣除用户 A 的余额。
- 转入操作(TransIn):增加用户 B 的余额。
- 补偿操作:
- TransOutCompensate:回滚转出操作(增加用户 A 的余额)。
- TransInCompensate:回滚转入操作(扣除用户 B 的余额)。
3.3 代码实现(Go 语言)
以下是使用 DTM 的 Go SDK 实现的代码示例:
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""log"
)// 定义转账请求结构体
type TransferRequest struct {FromUserID int `json:"from_user_id"`ToUserID int `json:"to_user_id"`Amount float64 `json:"amount"`
}func main() {// 初始化 Gin 服务app := gin.Default()app.GET("/transfer", func(c *gin.Context) {// 创建并提交 Saga 事务gid := executeTransfer()log.Printf("Transfer completed with GID: %s", gid)c.JSON(200, fmt.Sprintf("Transfer completed with GID: %s", gid))})app.Run(":8080")
}// 执行转账逻辑
func executeTransfer() string {// 定义转账请求参数req := &TransferRequest{FromUserID: 1,ToUserID: 2,Amount: 100.0,}// 创建 Saga 事务saga := dtmcli.NewSaga("http://localhost:36789/api/dtmsvr", shortuuid.New())// 添加子事务saga.Add("http://trans-out-service/transOut", // 正向操作 URL"http://trans-out-service/transOutCompensate", // 补偿操作 URLreq,).Add("http://trans-in-service/transIn", // 正向操作 URL"http://trans-in-service/transInCompensate", // 补偿操作 URLreq,)// 设置重试间隔(单位:秒)saga.RetryInterval = 1// 提交 Saga 事务if err := saga.Submit(); err != nil {panic(err)}return saga.Gid
}
3.4 子事务的实现
-
转出服务(TransOut):
func transOut(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 执行扣除用户 A 余额的逻辑// ...c.JSON(200, "SUCCESS") }func transOutCompensate(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 回滚扣除用户 A 余额的逻辑// ...c.JSON(200, "SUCCESS") }
-
转入服务(TransIn):
func transIn(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 执行增加用户 B 余额的逻辑// ...c.JSON(200, "SUCCESS") }func transInCompensate(c *gin.Context) {var req TransferRequestc.BindJSON(&req)// 回滚增加用户 B 余额的逻辑// ...c.JSON(200, "SUCCESS") }
3.5 失败回滚的流程
假设在 转出操作 成功后,转入操作 因账户冻结失败。DTM 会按以下步骤回滚:
- 调用 transInCompensate:减少用户 B 的余额。
- 调用 transOutCompensate:增加用户 A 的余额。
最终,用户 A 的余额恢复,用户 B 的余额回滚,事务失败。
4. 案例二:电商订单处理的 Saga 实现
4.1 业务场景
在电商系统中,下单流程通常包括以下步骤:
- 扣减库存:减少商品库存。
- 支付订单:用户支付订单金额。
- 分配物流:生成物流单号。
- 更新订单状态:标记订单为“已支付”。
4.2 Saga 模式的分解
将上述流程分解为以下子事务:
- 扣减库存(DecreaseStock):减少商品库存。
- 支付订单(Charge):用户支付订单金额。
- 分配物流(AllocateWaybill):生成物流单号。
- 补偿操作:
- DecreaseStockCompensate:回滚库存扣减。
- ChargeCompensate:回滚支付。
- AllocateWaybillCompensate:回滚物流分配。
4.3 代码实现(Go 语言)
以下是使用 DTM 的 Go SDK 实现的代码示例:
package mainimport ("fmt""github.com/dtm-labs/dtmcli""github.com/gin-gonic/gin""github.com/lithammer/shortuuid/v3""log"
)// 定义订单请求结构体
type OrderRequest struct {OrderID string `json:"order_id"`ProductID int `json:"product_id"`Quantity int `json:"quantity"`Amount float64 `json:"amount"`
}func main() {// 初始化 Gin 服务app := gin.Default()app.GET("/create-order", func(c *gin.Context) {// 创建并提交 Saga 事务gid := createOrder()log.Printf("Order created with GID: %s", gid)c.JSON(200, fmt.Sprintf("Order created with GID: %s", gid))})app.Run(":8080")
}// 创建订单逻辑
func createOrder() string {// 定义订单请求参数req := &OrderRequest{OrderID: "O123456",ProductID: 1001,Quantity: 2,Amount: 99.99,}// 创建 Saga 事务saga := dtmcli.NewSaga("http://localhost:36789/api/dtmsvr", shortuuid.New())// 添加子事务saga.Add("http://inventory-service/decreaseStock", // 正向操作 URL"http://inventory-service/decreaseStockCompensate", // 补偿操作 URLreq,).Add("http://payment-service/charge", // 正向操作 URL"http://payment-service/chargeCompensate", // 补偿操作 URLreq,).Add("http://logistics-service/allocateWaybill", // 正向操作 URL"http://logistics-service/allocateWaybillCompensate", // 补偿操作 URLreq,)// 设置重试间隔(单位:秒)saga.RetryInterval = 1// 提交 Saga 事务if err := saga.Submit(); err != nil {panic(err)}return saga.Gid
}
4.4 子事务的实现
-
库存服务(DecreaseStock):
func decreaseStock(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 扣减库存逻辑// ...c.JSON(200, "SUCCESS") }func decreaseStockCompensate(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 回滚库存扣减逻辑// ...c.JSON(200, "SUCCESS") }
-
支付服务(Charge):
func charge(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 支付逻辑// ...c.JSON(200, "SUCCESS") }func chargeCompensate(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 回滚支付逻辑// ...c.JSON(200, "SUCCESS") }
-
物流服务(AllocateWaybill):
func allocateWaybill(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 生成物流单号逻辑// ...c.JSON(200, "SUCCESS") }func allocateWaybillCompensate(c *gin.Context) {var req OrderRequestc.BindJSON(&req)// 回滚物流单号逻辑// ...c.JSON(200, "SUCCESS") }
4.5 失败回滚的流程
假设在 支付订单 成功后,分配物流 因系统故障失败。DTM 会按以下步骤回滚:
- 调用 allocateWaybillCompensate:删除物流单号。
- 调用 chargeCompensate:退款给用户。
- 调用 decreaseStockCompensate:恢复库存。
最终,订单状态回滚到初始状态,用户未支付,库存未扣减。
5. DTM Saga 模式的高级特性
5.1 并发执行子事务
默认情况下,DTM 按顺序执行子事务。如果需要提高性能,可以启用并发执行:
saga.EnableConcurrent()
这允许子事务并行执行,但需确保子事务之间无依赖关系。
5.2 失败重试与指数退避
DTM 默认使用指数退避算法重试失败的子事务。可以通过以下方式配置:
saga.RetryInterval = 1 // 初始重试间隔(秒)
saga.MaxRetryCount = 3 // 最大重试次数
5.3 自定义事务状态码
DTM 通过 HTTP 状态码判断子事务结果:
200 OK
:SUCCESS(成功)409 Conflict
:FAILURE(业务失败,需回滚)202 Accepted
:ONGOING(进行中,需重试)
例如,如果库存不足,子事务应返回 409
:
func decreaseStock(c *gin.Context) {// 业务逻辑if inventory < required {c.AbortWithStatus(409) // 返回 409 表示业务失败}c.JSON(200, "SUCCESS")
}
5.4 子事务屏障(Barrier)
子事务屏障是 DTM 保证事务原子性的核心机制。每个子事务需通过屏障表(dtm_barrier.barrier
)记录执行状态。以下是屏障表的 SQL 定义:
CREATE TABLE IF NOT EXISTS dtm_barrier.barrier (id BIGINT PRIMARY KEY AUTO_INCREMENT,trans_type VARCHAR(45) DEFAULT '',gid VARCHAR(128) DEFAULT '',branch_id VARCHAR(128) DEFAULT '',op VARCHAR(45) DEFAULT '',barrier_id VARCHAR(45) DEFAULT '',reason VARCHAR(45) DEFAULT '',create_time DATETIME DEFAULT NOW(),update_time DATETIME DEFAULT NOW(),UNIQUE KEY (gid, branch_id, op, barrier_id)
);
子事务需在屏障表中插入记录,确保事务的幂等性和一致性。
6. 部署与调试
6.1 安装 DTM 服务
通过 Docker 快速部署 DTM 服务:
docker run -itd \--name dtm \-p 36789:36789 \-p 36790:36790 \yedf/dtm:latest
启动后,访问 http://localhost:36789
查看 DTM 的 Web UI。
6.2 部署数据库
DTM 需要 MySQL 数据库支持,部署命令如下:
docker run -d \--name mysql \-e MYSQL_ROOT_PASSWORD=sa123456 \-e MYSQL_USER=sa \-e MYSQL_PASSWORD=sa123456 \-e MYSQL_DATABASE=test \-p 3306:3306 \mysql:latest
6.3 调试工具
- Web UI:通过
http://localhost:36789
查看事务状态。 - 日志分析:检查子事务服务的日志,定位失败原因。
- 重试机制:观察 DTM 是否自动重试失败的子事务。
7. Saga 模式的优缺点分析
7.1 优点
- 高性能:通过短事务提交避免长时间锁定资源。
- 适用遗留系统:对业务代码侵入性低,适合改造现有系统。
- 易于实现:无需复杂的 Confirm/Cancel 接口(如 TCC 模式)。
7.2 缺点
- 不保证隔离性:子事务可能读取到中间状态。
- 补偿逻辑复杂:需设计可靠的补偿操作。
- 最终一致性:可能需要等待一段时间才能保证数据一致性。
8. 最佳实践建议
- 设计幂等的补偿操作:确保补偿操作可重复执行。
- 监控事务状态:通过 DTM Web UI 或日志实时监控事务进度。
- 测试失败场景:模拟子事务失败,验证补偿逻辑的正确性。
- 合理设置重试策略:根据业务需求调整重试间隔和次数。
- 记录事务日志:通过数据库记录事务状态,便于排查问题。
9. 总结
DTM 的 Saga 模式通过拆分长事务为短事务,解决了分布式系统中的事务一致性问题。通过两个实际案例(银行转账和电商订单处理),我们展示了如何利用 DTM 的 Go SDK 快速实现 Saga 模式。此外,DTM 提供了丰富的高级特性(如并发执行、失败重试、子事务屏障),进一步提升了事务管理的灵活性和可靠性。
在实际开发中,开发者需权衡 Saga 模式的优缺点,并结合业务需求选择合适的事务模式。通过合理设计补偿逻辑和监控机制,可以确保系统的稳定性和数据的一致性。