当前位置: 首页 > java >正文

分布式事务的炼狱:Spring Cloud 微服务架构下的数据一致性保障战

概述

在单体应用时代,我们依赖数据库自身的 ACID 特性来保证事务的完整性。然而,当系统演进为微服务架构,一个业务操作往往需要跨越多个独立部署的服务和数据库时,传统的本地事务机制就显得力不从心。分布式事务由此成为微服务架构下保证数据一致性的核心挑战。本文将深入探讨分布式事务面临的困境,剖析主流的解决方案,并结合 Spring Cloud 生态进行实践探讨,助您在微服务世界中驯服数据一致性这头猛兽。

1. 微服务架构下的数据一致性困境

跨服务调用的事务边界

在微服务架构中,一个业务流程通常会拆分为多个独立的服务协同完成。例如,一个电商下单流程可能涉及订单服务创建订单、库存服务扣减库存、支付服务完成支付、积分服务增加积分等。每个服务拥有独立的数据库,服务之间的通信通常采用基于网络的 API 调用(如 RESTful、gRPC)。

当这些跨服务的操作需要保证原子性时,传统的数据库本地事务无法跨越这些服务的边界。如果其中一个服务操作失败,如何回滚其他服务的操作,保持整体数据的一致性,就成为了分布式事务需要解决的核心问题。

CAP 理论的权衡

在分布式系统中,CAP 理论(一致性 Consistency、可用性 Availability、分区容错性 Partition Tolerance)指出,任何分布式系统只能同时满足其中两个特性,而必须在另外一个特性上做出妥协。在涉及到分布式事务时,我们往往需要在一致性和可用性之间进行权衡。

  • 强一致性的分布式事务方案(如 2PC)在执行过程中可能会牺牲一定的可用性,因为在事务协调期间,参与者可能会处于阻塞状态。
  • 最终一致性的分布式事务方案(如 Saga 模式、消息队列)则允许数据在一段时间内存在不一致,但最终会达到一致状态,这种方案通常具有更高的可用性。
常见的数据不一致场景

在微服务架构下,如果没有妥善处理分布式事务,很容易出现以下数据不一致的场景:

  • 订单创建成功,但库存扣减失败。
  • 库存扣减成功,但订单创建失败。
  • 支付成功,但订单状态更新失败。
  • 重复支付或重复扣减库存。
  • 多个关联服务的数据状态最终不一致。

这些场景会严重影响业务的正确性和用户体验。


2. 分布式事务的主流解决方案

针对上述困境,业界提出了多种分布式事务的解决方案,各有优劣和适用场景:

2.1 2PC(两阶段提交)

原理: 引入一个协调者(Coordinator)来协调所有参与者(Participant)共同完成事务。

  • 第一阶段(准备阶段): 协调者向所有参与者发送事务执行请求,参与者执行本地事务但不提交,并记录 undo log 等信息,然后向协调者汇报“准备”状态。
  • 第二阶段(提交/回滚阶段):
    • 如果所有参与者都汇报“准备”成功,协调者向所有参与者发送“提交”指令,参与者提交本地事务并释放资源。
    • 如果任何一个参与者汇报“准备”失败,或者协调者在超时后未收到所有参与者的响应,协调者向所有参与者发送“回滚”指令,参与者根据 undo log 回滚本地事务并释放资源。

优缺点:

  • 优点: 原理简单,理论上可以保证强一致性。
  • 缺点: 同步阻塞,参与者在准备阶段需要等待协调者的最终指令,会长时间占用资源,影响系统并发性能;单点故障,协调者宕机会导致整个事务无法完成;数据不一致风险,在第二阶段如果发生网络分区,可能导致部分参与者提交,部分参与者回滚,出现数据不一致。

常见实现: XA 协议(数据库层面的 2PC 实现)。

2.2 TCC(Try-Confirm-Cancel)

原理: 针对每个参与者业务,实现三个阶段的操作:

  • Try 阶段: 尝试执行业务,完成所有业务检查(约束验证、资源预留等),但不要提交最终结果。Try 操作需要具备幂等性。
  • Confirm 阶段: 如果所有参与者的 Try 操作都成功,则执行 Confirm 操作,提交最终的业务结果。Confirm 操作也需要具备幂等性。
  • Cancel 阶段: 如果任何一个参与者的 Try 操作失败,或者协调者在超时后未收到所有 Try 成功的响应,则执行 Cancel 操作,释放 Try 阶段预留的资源,撤销之前的操作。Cancel 操作同样需要具备幂等性。

优缺点:

  • 优点: 相比 2PC,TCC 将资源锁定和业务逻辑结合,减少了资源锁定的范围和时间,提高了并发性能;没有全局的阻塞等待。
  • 缺点: 实现复杂度高,每个业务都需要开发 Try、Confirm、Cancel 三个操作,且需要考虑各种异常情况(如网络抖动、服务宕机等)导致的幂等性和空回滚、悬挂等问题。
2.3 Saga 模式

原理: 将一个分布式事务拆分成一系列本地事务(称为 Saga),每个本地事务更新其所属服务的数据。Saga 中的每个本地事务都有一个补偿事务(称为补偿 Saga),用于在某个 Saga 事务失败时进行回滚。Saga 模式通过异步消息或事件驱动的方式协调各个本地事务和补偿事务。

协调方式:

  • 编排型 Saga(Orchestration): 由一个中心化的协调器(Orchestrator)负责管理整个 Saga 的执行流程,它知道每个步骤需要调用哪个服务,以及在失败时需要调用哪个补偿事务。协调器通过消息与各个参与者服务进行交互。
  • 事务型 Saga(Choreography): 没有中心协调器,每个参与者服务在完成本地事务后,会发布一个事件,其他参与者服务监听这些事件,并根据事件触发后续的本地事务或补偿事务。

优缺点:

  • 优点: 高可用性,参与者服务之间解耦,没有中心协调器的单点故障问题;最终一致性,允许数据在事务处理过程中短暂不一致,但最终会通过补偿机制达到一致。
  • 缺点: 数据一致性较弱,在 Saga 事务未完成时,其他事务可能读取到中间状态的数据,可能导致业务逻辑复杂;补偿事务的实现复杂,需要保证补偿事务的可靠性和幂等性。
2.4 基于消息的最终一致性

原理: 依赖消息队列(如 Kafka、RabbitMQ)来实现最终一致性。当一个服务需要更新数据并影响到其他服务时,它会先执行本地事务,然后向消息队列发送一个包含相关业务信息的事件或消息。其他服务订阅该消息队列,接收到消息后执行相应的本地事务。如果后续操作失败,可以通过补偿消息或人工干预等方式保证最终数据的一致性。

优缺点:

  • 优点: 高吞吐、低耦合,服务之间通过消息队列异步通信,提高了系统的可伸缩性和性能;实现相对简单。
  • 缺点: 最终一致性,存在数据短暂不一致的风险;需要考虑消息的可靠性投递(至少一次、至多一次、恰好一次)以及消息的幂等消费。
2.5 分布式方案对比表

在这里插入图片描述

3. Spring Cloud 生态下的分布式事务实践

在 Spring Cloud 微服务架构中,我们可以借助一些优秀的框架来实现分布式事务,其中 Seata(Simple Extensible Autonomous Transaction Architecture)是阿里巴巴开源的一款非常流行的分布式事务解决方案。

3.1 Seata 简介与核心概念

在这里插入图片描述

Seata 提供了一站式的分布式事务解决方案,致力于在微服务架构下提供高性能和高可用的分布式事务服务。其核心组件包括:

  • TC (Transaction Coordinator): 事务协调者,负责全局事务的生命周期管理,维护全局事务与分支事务的状态。
  • TM (Transaction Manager): 事务管理器,负责开启、提交或回滚全局事务。通常集成在业务服务中。
  • RM (Resource Manager): 资源管理器,负责分支事务的管理,与 TC 通信汇报分支事务的状态,并执行本地事务的提交或回滚。通常以 Agent 的形式集成在数据源代理中。

Seata 支持多种事务模式,其中最常用的是 AT 模式(Automatic Transaction)。

3.2 基于 Seata AT 模式的实践案例

AT 模式是一种无侵入的分布式事务解决方案,它通过对业务 SQL 进行解析,在本地事务提交前生成 undo log,并在需要回滚时根据 undo log 进行数据补偿,从而实现最终一致性。
正常流程
在这里插入图片描述
异常流程-回滚
在这里插入图片描述

3.2.1 环境搭建与依赖引入

假设我们有两个微服务:order-service(订单服务)和 account-service(账户服务)。当用户下单时,order-service 需要创建订单,account-service 需要扣减用户账户余额。我们需要保证这两个操作要么都成功,要么都回滚。

首先,我们需要部署 Seata Server 作为事务协调者。具体部署步骤请参考 Seata 官方文档。

其次,在数据库中创建 Seata 所需的 undo_log 表(Seata 官网有建表脚本)。

然后在 order-serviceaccount-servicepom.xml 文件中引入 Seata 相关依赖:

order-serviceaccount-servicepom.xml 共同部分:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version> <relativePath/> </parent><groupId>com.example</groupId><artifactId>your-service-name</artifactId> <version>0.0.1-SNAPSHOT</version><name>your-service-name</name><properties><java.version>1.8</java><spring-cloud.version>2021.0.5</spring-cloud.version> <seata.version>1.5.2</seata.version> </properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId> <scope>runtime</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>${seata.version}</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>${spring-cloud.version}.RC1</version> </dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId> </dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>

同时,还需要配置 Seata Server 的地址等信息到每个微服务的 application.ymlapplication.properties 文件中。

order-serviceapplication.yml 示例:

spring:application:name: order-servicecloud:nacos: # 如果使用Nacos作为注册中心discovery:server-addr: 127.0.0.1:8848datasource: # 本地数据库配置driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_order?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootserver:port: 8080feign: # Feign配置,用于服务间调用okhttp:enabled: true# Seata 配置
seata:enabled: trueapplication-id: ${spring.application.name} # 应用ID,必须与spring.application.name一致tx-service-group: my_test_tx_group # 事务分组,需要和Seata Server配置的保持一致service:vgroup-mapping:my_test_tx_group: default # 映射到Seata Server的集群名grouplist:default: 127.0.0.1:8091 # Seata Server的地址和端口client:log:exceptionRate: 100 # 日志异常率

account-serviceapplication.yml 示例:

spring:application:name: account-servicecloud:nacos: # 如果使用Nacos作为注册中心discovery:server-addr: 127.0.0.1:8848datasource: # 本地数据库配置driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_account?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghaiusername: rootpassword: rootserver:port: 8081# Seata 配置
seata:enabled: trueapplication-id: ${spring.application.name} # 应用ID,必须与spring.application.name一致tx-service-group: my_test_tx_group # 事务分组,需要和Seata Server配置的保持一致service:vgroup-mapping:my_test_tx_group: default # 映射到Seata Server的集群名grouplist:default: 127.0.0.1:8091 # Seata Server的地址和端口client:log:exceptionRate: 100 # 日志异常率
3.2.2 核心代码实现

order-service 中,我们创建一个创建订单的接口,并在该接口的方法上添加 @GlobalTransactional 注解,声明这是一个全局事务的入口。我们还需要通过 Feign 客户端调用 account-service
order-serviceOrderService.java

package com.example.orderservice.service;import com.example.orderservice.feign.AccountClient;
import com.example.orderservice.mapper.OrderMapper;
import com.example.orderservice.model.Order;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Slf4j
public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate AccountClient accountClient; // Feign客户端/*** 创建订单并扣减账户余额* 使用 @GlobalTransactional 开启全局事务* name 建议与事务分组一致* rollbackFor 遇到任何异常都回滚*/@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)public void createOrder(Long userId, String productId, Integer amount, Double price) {log.info("开始创建订单,用户ID: {}, 产品ID: {}, 数量: {}", userId, productId, amount);// 1. 创建订单Order order = new Order();order.setUserId(userId);order.setProductId(productId);order.setAmount(amount);order.setTotalPrice(price * amount);order.setStatus(0); // 待支付状态orderMapper.insertOrder(order); // 假设这里会插入到 order_tbl 表log.info("订单创建成功,订单ID: {}", order.getId());// 2. 调用账户服务,扣减账户余额log.info("调用账户服务扣减余额...");// 假设每个商品扣减 10 块钱,这里模拟价格*数量accountClient.decreaseAccount(userId, price * amount);// 模拟业务异常,用于测试事务回滚// if (amount > 1) {//     throw new RuntimeException("模拟业务异常,订单数量不能大于1");// }log.info("订单创建和账户扣减完成。");}
}

order-servicefeign.AccountClient.java (Feign 客户端接口):

package com.example.orderservice.feign;import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;/*** 声明式调用 account-service* name: 被调用的服务在Nacos中的服务名*/
@FeignClient(name = "account-service")
public interface AccountClient {@GetMapping("/account/decrease")String decreaseAccount(@RequestParam("userId") Long userId, @RequestParam("money") Double money);
}

account-serviceAccountService.java

package com.example.accountservice.service;import com.example.accountservice.mapper.AccountMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Slf4j
public class AccountService {@Autowiredprivate AccountMapper accountMapper;/*** 扣减账户余额* @Transactional 注解确保本地事务的ACID特性,同时被Seata AT模式代理*/@Transactionalpublic void decreaseAccount(Long userId, Double money) {log.info("账户服务开始扣减余额,用户ID: {}, 金额: {}", userId, money);// 假设这里是更新 account_tbl 表的余额字段int updatedRows = accountMapper.decreaseAccount(userId, money);if (updatedRows == 0) {log.error("扣减账户余额失败,用户ID: {},可能余额不足或用户不存在。", userId);throw new RuntimeException("扣减账户余额失败,用户余额不足或用户不存在。");}log.info("账户余额扣减成功,用户ID: {}, 金额: {}", userId, money);}
}

实体类和 Mapper 接口(示例,具体根据您的数据库表结构调整):

order-serviceOrder.java (实体类):

package com.example.orderservice.model;import lombok.Data;import java.io.Serializable;@Data
public class Order implements Serializable {private Long id;private Long userId;private String productId;private Integer amount;private Double totalPrice;private Integer status; // 0:待支付, 1:已支付, -1:已取消
}

order-serviceOrderMapper.java

package com.example.orderservice.mapper;import com.example.orderservice.model.Order;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;@Mapper
public interface OrderMapper {@Insert("INSERT INTO order_tbl(user_id, product_id, amount, total_price, status) VALUES(#{userId}, #{productId}, #{amount}, #{totalPrice}, #{status})")@Options(useGeneratedKeys = true, keyProperty = "id")int insertOrder(Order order);
}

account-serviceAccountMapper.java

package com.example.accountservice.mapper;import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Update;@Mapper
public interface AccountMapper {@Update("UPDATE account_tbl SET money = money - #{money} WHERE user_id = #{userId} AND money >= #{money}")int decreaseAccount(Long userId, Double money);
}
3.2.3 AT 模式原理简析

order-service 调用 account-service 的扣减余额接口时,由于 @GlobalTransactional 注解的存在,Seata 会拦截这些数据库操作,并在本地事务提交前执行以下步骤:

  1. 记录 undo log: Seata 会分析 SQL 语句,记录修改前的数据快照到 undo log 表中。这些 undo log 包含了数据修改前后的信息,用于在事务回滚时进行数据恢复。
  2. 锁定全局锁: Seata 会尝试获取当前操作记录的全局锁。这个全局锁是一种行锁,用于防止其他全局事务修改同一条数据,确保事务的隔离性。
  3. 提交本地事务: order-serviceaccount-service 各自提交本地数据库事务。此时,本地事务已经提交,数据对外部可见,但全局事务尚未提交,数据可能处于一个中间状态。

如果在全局事务执行过程中发生任何异常(比如 account-service 余额不足抛出异常),或者 TM 收到参与者 RM 上报的失败信息,TM 会发起全局回滚。TC 通知所有参与者进行回滚操作,RM 收到回滚指令后,会根据之前记录的 undo log 执行反向 SQL,恢复数据到修改前的状态,并释放全局锁。

AT 模式的优点在于对业务代码的侵入性很小,开发者只需要关注正常的业务逻辑,Seata 会自动处理分布式事务的协调和回滚,大大降低了开发复杂度。

3.3 其他 Spring Cloud 分布式事务方案简述

除了 Seata,Spring Cloud 生态中还有一些其他的分布式事务方案,例如:

  • Atomikos: 一款老牌的开源事务管理器,支持 JTA 规范,可以用于协调跨多个数据源和消息队列的事务。它通常通过 2PC 协议实现强一致性。
  • Spring Cloud Stream Transaction: 基于消息驱动的最终一致性方案,通过消息中间件(如 Kafka、RabbitMQ)的事务消息机制来保证消息的可靠投递和消费,从而实现服务之间的数据一致性。它通常涉及事务消息(RocketMQ)、两阶段消息提交(Kafka)等机制。

选择哪种方案取决于具体的业务场景、对一致性和可用性的要求、以及团队的技术栈和熟悉程度。


4. 总结

分布式事务是构建可靠微服务架构的关键挑战之一。理解不同分布式事务解决方案的原理、优缺点以及适用场景至关重要。2PC 模式理论上提供强一致性但存在性能和可靠性瓶颈;TCC 模式具有较好的性能但实现复杂度高;Saga 模式和基于消息的最终一致性方案提供了更高的可用性,但牺牲了一定的一致性。

在 Spring Cloud 生态中,Seata 以其 AT 模式的无侵入性和相对完善的功能,成为实现分布式事务的热门选择。通过合理选择和应用分布式事务方案,我们可以有效地保障微服务架构下的数据一致性,构建更加健壮和可靠的分布式系统。


5. FAQ

  1. AT 模式如何解决脏写问题?

    • AT 模式通过在本地事务提交前获取全局锁来防止脏写。当一个全局事务在修改某条记录时,该记录会被全局锁锁定,其他全局事务无法修改该记录。只有当前全局事务提交或回滚后,全局锁才会被释放。
  2. AT 模式的 undo log 会一直存在吗?

    • 不会。Seata 通常会定期清理已经提交的全局事务对应的 undo log,以释放存储空间。具体的清理策略可以通过 Seata Server 的配置进行调整。
  3. Saga 模式如何保证业务的最终一致性?

    • Saga 模式通过定义每个本地事务的补偿事务,当某个本地事务失败时,会触发相应的补偿事务来撤销之前的操作,从而保证业务数据的最终一致性。但需要注意的是,Saga 模式无法保证强一致性,在补偿完成之前,数据可能处于中间不一致的状态。
  4. 在选择分布式事务方案时,应该考虑哪些因素?

    • 一致性要求: 业务对数据一致性的要求是强一致性还是最终一致性?
    • 可用性要求: 系统对服务可用性的要求有多高?是否能容忍短暂的数据不一致?
    • 性能要求: 分布式事务对系统性能的影响是否在可接受的范围内?
    • 技术栈: 团队熟悉的技术栈和已有的基础设施。
    • 实现复杂度: 不同方案的实现复杂度和维护成本不同。
  5. CAP 理论在选择分布式事务方案时有哪些指导意义?

    • CAP 理论提醒我们在设计分布式系统时需要在一致性(C)、可用性(A)、分区容错性(P)之间进行权衡。对于分布式事务而言,强一致性的方案往往会牺牲一定的可用性,而追求高可用性的方案则通常会选择最终一致性。我们需要根据具体的业务场景和需求做出合适的选择。

http://www.xdnf.cn/news/13430.html

相关文章:

  • 时序数据库Influxdb3 core安装
  • 中兴B860AV1.1_晨星MSO9280芯片_4G和8G闪存_TTL-BIN包刷机固件包
  • Android 实现可拖动的ImageView
  • RTX4060安装cuda12.3 cudnn8.9
  • Neo4j批量数据导入完全指南:高效处理大规模数据
  • MyBatis-Plus 混合使用 XML 和注解
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | FAQ Collapse(问题解答折叠面板)
  • Oracle ADG 日常巡检指南
  • 由编译osgEarth源码引发的一系列问题(三)利用vcpkg安装osg与OSGEarth
  • JavaScript跨域全面指南:从原理到最佳实践
  • RV1126+OPENCV在视频中添加LOGO图像
  • JVM面试基础篇
  • `dispatch_source_t` 计时器 vs `NSTimer`:核心差异一览
  • 【实习总结】C++ 通过pugi::xml库对xml文件进行操作
  • 如何正确的配置eureka server集群
  • 【QT】窗口详解
  • Linux进程管理:创建,终止,等待
  • 智能机器人从零构建陪跑计划
  • PyTorch:让深度学习像搭积木一样简单有趣!
  • Vue实现图像对比组件:打造交互式图片比较工具
  • 深度学习:PyTorch简介
  • 【python】基于pycharm的海康相机SDK二次开发
  • 计算机网络:认证和授权 DNS 域名解析过程(如何转换为ip地址) http无状态 5**服务端错误相关的响应状态码 tcp某次握手丢失会有什么现象?
  • 浅谈Linux中一次系统调用的执行过程
  • 网络中基础的三张表(mac、arp、route)
  • 开源、免费、美观的 Vue 后台管理系统模板
  • 《Ansys SIPI仿真技术笔记》 E-desk IBIS模型导入
  • 架空线路智能监控系统的应用与优势剖析
  • ESP32 004 Thonny 配置简单的轻量级MicroPython开发IDE
  • 深入解析MySQL Join算法原理与性能优化实战指南