当前位置: 首页 > java >正文

SpringBoot与Eventuate Tram整合 - 实现转账最终一致性系统

Eventuate Tram 是一个用于构建微服务架构的开源框架,提供事件驱动的消息传递和最终一致性保证,帮助企业高效地管理和协调分布式系统中的复杂业务逻辑。

为什么选择Eventuate Tram?

解耦和服务独立性:转账系统通常涉及多个服务(如账户服务、转账服务等)。Eventuate Tram 提供了一种事件驱动的方式来解耦这些服务,使得每个服务可以独立开发、部署和扩展。

灵活性:随着业务的发展,新的服务可能会被引入或现有服务需要重构。Eventuate Tram 的事件驱动模型允许这种灵活的变化而不需要大规模的重构。

分布式事务管理:传统的两阶段提交(2PC)在高并发环境下性能较差且复杂度高。Eventuate Tram 通过事件溯源和补偿机制实现了最终一致性,确保即使在分布式环境中也能保持数据的一致性。

幂等性和重试机制:Eventuate Tram 支持幂等处理和自动重试,确保消息传递的可靠性,防止重复处理导致的数据不一致问题。

异步通信:Eventuate Tram 使用事件总线进行异步通信,提高了系统的吞吐量和响应速度。这对于实时性强的应用场景尤为重要。

事件存储:Eventuate Tram 提供了内置的事件存储机制,记录所有发生的业务事件。这不仅有助于审计和调试,还能在系统故障后快速恢复状态。

多种消息代理支持:Eventuate Tram 支持多种消息代理(如 RabbitMQ、Kafka 等),可以根据现有的基础设施进行选择和集成。

代码生成工具:Eventuate 提供了一些代码生成工具和模板,帮助开发者快速搭建项目结构,减少了样板代码的数量。

代码实操

创建account-service

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>account-service</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.eventuate.tram</groupId><artifactId>eventuate-tram-spring-jdbc</artifactId><version>0.26.0.RELEASE</version><!-- Eventuate Tram JDBC支持 --></dependency><dependency><groupId>io.eventuate.tram</groupId><artifactId>eventuate-tram-messaging-rabbitmq</artifactId><version>0.26.0.RELEASE</version><!-- Eventuate Tram RabbitMQ消息代理支持 --></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId><!-- JPA数据访问支持 --></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

application.properties

server.port=8081 # 应用监听端口# 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/accounts?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# Hibernate自动建表策略
spring.jpa.hibernate.ddl-auto=update# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Account.java

package com.example.accountservice;import io.eventuate.tram.events.publisher.DomainEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.Collections;@Service
public class AccountService {@Autowiredprivate AccountRepository accountRepository; // 账户仓库,用于数据库操作@Autowiredprivate DomainEventPublisher domainEventPublisher; // 域事件发布器,用于发布事件/*** 减少账户余额* @param accountId 账户ID* @param amount 需要减少的金额*/@Transactionalpublic void debit(String accountId, double amount) {Account account = accountRepository.findById(accountId).orElseThrow(() -> new RuntimeException("Account not found")); // 根据账户ID查找账户,如果找不到则抛出异常if (account.getBalance() < amount) { // 检查账户余额是否足够thrownew RuntimeException("Insufficient balance"); // 如果余额不足,则抛出异常}account.setBalance(account.getBalance() - amount); // 减少账户余额accountRepository.save(account); // 保存账户信息到数据库domainEventPublisher.publish(Account.class, account.getId(), Collections.singletonList(new AccountDebitedEvent(amount))); // 发布账户被借记的事件}/*** 增加账户余额* @param accountId 账户ID* @param amount 需要增加的金额*/@Transactionalpublic void credit(String accountId, double amount) {Account account = accountRepository.findById(accountId).orElseThrow(() -> new RuntimeException("Account not found")); // 根据账户ID查找账户,如果找不到则抛出异常account.setBalance(account.getBalance() + amount); // 增加账户余额accountRepository.save(account); // 保存账户信息到数据库domainEventPublisher.publish(Account.class, account.getId(), Collections.singletonList(new AccountCreditedEvent(amount))); // 发布账户被贷记的事件}
}

AccountRepository.java

package com.example.accountservice;import org.springframework.data.jpa.repository.JpaRepository;/*** 账户仓库接口,继承自JpaRepository,用于对Account实体进行CRUD操作*/
public interface AccountRepository extends JpaRepository<Account, String> {}

AccountServiceApplication.java

package com.example.accountservice;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Spring Boot应用启动类*/
@SpringBootApplication
public class AccountServiceApplication {public static void main(String[] args) {SpringApplication.run(AccountServiceApplication.class, args); // 启动Spring Boot应用}
}

AccountController.java

package com.example.accountservice.controller;import com.example.accountservice.AccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;/*** 控制器类,处理HTTP请求*/
@RestController
@RequestMapping("/accounts")
public class AccountController {@Autowiredprivate AccountService accountService; // 注入AccountService/*** 处理借记账户的HTTP POST请求* @param accountId 账户ID* @param amount 借记金额*/@PostMapping("/{accountId}/debit/{amount}")public void debit(@PathVariable String accountId, @PathVariable double amount) {accountService.debit(accountId, amount); // 调用AccountService的debit方法}/*** 处理贷记账户的HTTP POST请求* @param accountId 账户ID* @param amount 贷记金额*/@PostMapping("/{accountId}/credit/{amount}")public void credit(@PathVariable String accountId, @PathVariable double amount) {accountService.credit(accountId, amount); // 调用AccountService的credit方法}
}

AccountDebitedEvent.java

package com.example.accountservice.event;/*** 账户借记事件类*/
public class AccountDebitedEvent {private double amount; // 借记金额public AccountDebitedEvent(double amount) {this.amount = amount; // 构造函数初始化借记金额}// 获取借记金额的方法public double getAmount() {return amount;}// 设置借记金额的方法public void setAmount(double amount) {this.amount = amount;}
}

AccountCreditedEvent.java

/*** 账户贷记事件类*/
public class AccountCreditedEvent {private double amount; // 贷记金额public AccountCreditedEvent(double amount) {this.amount = amount; // 构造函数初始化贷记金额}// 获取贷记金额的方法public double getAmount() {return amount;}// 设置贷记金额的方法public void setAmount(double amount) {this.amount = amount;}
}

TransferEventHandler.java

package com.example.accountservice.handler;import com.example.accountservice.event.TransferMadeEvent;
import com.example.accountservice.AccountService;
import io.eventuate.tram.events.subscriber.DomainEventEnvelope;
import io.eventuate.tram.events.subscriber.EventHandlerMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 事件处理器类,处理转账完成事件*/
@Component
public class TransferEventHandler {@Autowiredprivate AccountService accountService; // 注入AccountService/*** 处理转账完成事件的方法* @param event 包含转账完成事件的对象*/@EventHandlerMethodpublic void handle(DomainEventEnvelope<TransferMadeEvent> event) {TransferMadeEvent transferMadeEvent = event.getEvent(); // 获取转账完成事件对象accountService.credit(transferMadeEvent.getCreditAccountId(), transferMadeEvent.getAmount()); // 调用AccountService的credit方法,增加目标账户的余额}
}

创建transfer-service

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>transfer-service</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.eventuate.tram</groupId><artifactId>eventuate-tram-spring-jdbc</artifactId><version>0.26.0.RELEASE</version><!-- Eventuate Tram JDBC支持 --></dependency><dependency><groupId>io.eventuate.tram</groupId><artifactId>eventuate-tram-messaging-rabbitmq</artifactId><version>0.26.0.RELEASE</version><!-- Eventuate Tram RabbitMQ消息代理支持 --></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

application.properties

server.port=8082# 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/transfers?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# Hibernate自动建表策略
spring.jpa.hibernate.ddl-auto=update# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Transfer.java

package com.example.transferservice;import io.eventuate.tram.events.publisher.DomainEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.transaction.Transactional;
import java.util.Collections;/*** 转账服务类*/
@Service
public class TransferService {@Autowiredprivate TransferRepository transferRepository; // 转账仓库,用于数据库操作@Autowiredprivate DomainEventPublisher domainEventPublisher; // 域事件发布器,用于发布事件/*** 执行转账操作* @param cmd 包含转账命令的对象*/@Transactionalpublic void makeTransfer(MakeTransferCommand cmd) {Transfer transfer = new Transfer(cmd.getSourceAccountId(), cmd.getTargetAccountId(), cmd.getAmount()); // 创建转账记录transferRepository.save(transfer); // 保存转账记录到数据库domainEventPublisher.publish(Transfer.class, transfer.getId(),Collections.singletonList(new TransferMadeEvent(cmd.getSourceAccountId(), cmd.getTargetAccountId(), cmd.getAmount()))); // 发布转账完成事件}
}

TransferRepository.java

package com.example.transferservice;import org.springframework.data.jpa.repository.JpaRepository;/*** 转账仓库接口,继承自JpaRepository,用于对Transfer实体进行CRUD操作*/
public interface TransferRepository extends JpaRepository<Transfer, String> {}

TransferServiceApplication.java

package com.example.transferservice;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Spring Boot应用启动类*/
@SpringBootApplication
public class TransferServiceApplication {public static void main(String[] args) {SpringApplication.run(TransferServiceApplication.class, args); // 启动Spring Boot应用}
}

TransferController.java

package com.example.transferservice.controller;import com.example.transferservice.MakeTransferCommand;
import com.example.transferservice.TransferService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** 控制器类,处理HTTP请求*/
@RestController
@RequestMapping("/transfers")
public class TransferController {@Autowiredprivate TransferService transferService; // 注入TransferService/*** 处理转账的HTTP POST请求* @param cmd 包含转账命令的对象*/@PostMappingpublic void makeTransfer(@RequestBody MakeTransferCommand cmd) {transferService.makeTransfer(cmd); // 调用TransferService的makeTransfer方法执行转账操作}
}

MakeTransferCommand.java

package com.example.transferservice.command;/*** 转账命令类,包含转账所需的信息*/
public class MakeTransferCommand {private String sourceAccountId; // 源账户IDprivate String targetAccountId; // 目标账户IDprivate double amount; // 转账金额public MakeTransferCommand(String sourceAccountId, String targetAccountId, double amount) {this.sourceAccountId = sourceAccountId; // 初始化源账户IDthis.targetAccountId = targetAccountId; // 初始化目标账户IDthis.amount = amount; // 初始化转账金额}// 获取源账户ID的方法public String getSourceAccountId() {return sourceAccountId;}// 设置源账户ID的方法public void setSourceAccountId(String sourceAccountId) {this.sourceAccountId = sourceAccountId;}// 获取目标账户ID的方法public String getTargetAccountId() {return targetAccountId;}// 设置目标账户ID的方法public void setTargetAccountId(String targetAccountId) {this.targetAccountId = targetAccountId;}// 获取转账金额的方法public double getAmount() {return amount;}// 设置转账金额的方法public void setAmount(double amount) {this.amount = amount;}
}

TransferMadeEvent.java

package com.example.transferservice.event;/*** 转账完成事件类*/
public class TransferMadeEvent {private String debitAccountId; // 借记账户IDprivate String creditAccountId; // 贷记账户IDprivate double amount; // 转账金额public TransferMadeEvent(String debitAccountId, String creditAccountId, double amount) {this.debitAccountId = debitAccountId; // 初始化借记账户IDthis.creditAccountId = creditAccountId; // 初始化贷记账户IDthis.amount = amount; // 初始化转账金额}// 获取借记账户ID的方法public String getDebitAccountId() {return debitAccountId;}// 设置借记账户ID的方法public void setDebitAccountId(String debitAccountId) {this.debitAccountId = debitAccountId;}// 获取贷记账户ID的方法public String getCreditAccountId() {return creditAccountId;}// 设置贷记账户ID的方法public void setCreditAccountId(String creditAccountId) {this.creditAccountId = creditAccountId;}// 获取转账金额的方法public double getAmount() {return amount;}// 设置转账金额的方法public void setAmount(double amount) {this.amount = amount;}
}

AccountEventHandler.java

package com.example.transferservice.handler;import com.example.transferservice.event.AccountDebitedEvent;
import com.example.transferservice.TransferService;
import io.eventuate.tram.events.subscriber.DomainEventEnvelope;
import io.eventuate.tram.events.subscriber.EventHandlerMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 事件处理器类,处理账户借记事件*/
@Component
public class AccountEventHandler {@Autowiredprivate TransferService transferService; // 注入TransferService/*** 处理账户借记事件的方法* @param event 包含账户借记事件的对象*/@EventHandlerMethodpublic void handle(DomainEventEnvelope<AccountDebitedEvent> event) {AccountDebitedEvent accountDebitedEvent = event.getEvent(); // 获取账户借记事件对象// 这里可以添加额外的逻辑,当账户被借记时执行的操作}
}

测试

curl -X POST http://localhost:8082/transfers -H "Content-Type: application/json" -d '{"sourceAccountId": "account1", "targetAccountId": "account2", "amount": 150}'

测试结果
无返回内容,说明操作成功。

http://www.xdnf.cn/news/5802.html

相关文章:

  • 解锁生命周期评价密码:OpenLCA、GREET 与 R 语言的融合应用
  • 基于 Amazon Bedrock 和 Amazon Connect 打造智能客服自助服务 – 设计篇
  • 【阿里云】阿里云 Ubuntu 服务器无法更新 systemd(Operation not permitted)的解决方法
  • Java Solon v3.3.0 发布(国产优秀应用开发基座)
  • Spring Boot Swagger 安全防护全解析:从旧版实践到官方规范
  • Spring Boot 跨域问题全解:原理、解决方案与最佳实践
  • Tomcat和Nginx的主要区别
  • 【MySQL】第三弹——表的CRUD进阶(一)数据库约束
  • 地址簿模块-01.需求分析
  • D-Pointer(Pimpl)设计模式(指向实现的指针)
  • 在VSCode中接入DeepSeek的指南
  • 【时时三省】(C语言基础)使用字符串处理函数
  • 基于Spring Boot+Layui构建企业级电子招投标系统实战指南
  • 人脸识别系统中的隐私与数据权利保障
  • ‌OPE.AI开放平台:一站式企业AI应用引擎
  • 前端学习(2)—— CSS详解与使用
  • centos7.x下,使用宝塔进行主从复制的原理和实践
  • 博客系统技术需求文档(基于 Flask)
  • R语言绘图 | 渐变火山图
  • Leetcode 3548. Equal Sum Grid Partition II
  • Andorid之TabLayout+ViewPager
  • 通过POI实现对word基于书签的内容替换、删除、插入
  • 网络协议与系统架构分析实战:工具与方法全解
  • 【应用密码学】实验五 公钥密码2——ECC
  • 深入 MySQL 查询优化器:Optimizer Trace 分析
  • 初入OpenCV
  • 【Qt】qss语法详解
  • [250512] Node.js 24 发布:ClangCL 构建,升级 V8 引擎、集成 npm 11
  • MapReduce 模型
  • AI 模型训练轻量化技术在军事领域的实战应用与技术解析