分布式事务的两种解决方案
说明:在微服务架构中,一个方法内存在多个数据库操作,且这些操作有跨服务调用,在这种情况下的事务叫做分布式事务。阿里巴巴开源的分布式组件 seata
,通过引入独立于微服务之外的事务协调者,协调事务,统一执行或者回滚事务,是目前很完善的解决方案。
本文介绍在不引入分布式事务组件的情况下,两种解决分布式事务的方案。Seata 的使用参考下面这两篇文章。
-
Seata安装和使用
-
使用Seata解决分布式事务问题
场景
假设有一个根据用户 ID 删除用户的接口,删除用户记录的同时,也删除用户所拥有的角色记录,如下:
@Overridepublic void delete(Long id) {// 1.删除用户userMapper.deleteById(id);// 2.删除用户对应的角色roleApi.deleteRoleByUserId(id);}
为了演示,我将删除用户对应的角色放到其他服务中,使用 rpc-api 调用其他服务的方法实现
(InfraRoleApi 接口)
import cn.iocoder.yudao.module.infra.enums.ApiConstants;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.RequestParam;@FeignClient(name = ApiConstants.NAME)
@Tag(name = "RPC 服务 - 角色")
public interface InfraRoleApi {String PREFIX = ApiConstants.PREFIX + "/role";@DeleteMapping(PREFIX)@Operation(summary = "根据参数键查询参数值")void deleteRoleByUserId(@RequestParam Long id);
}
(InfraRoleApi 接口实现类)
import cn.iocoder.yudao.module.infra.dal.dataobject.role.RoleMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;@RestController
public class InfraRoleApiImpl implements InfraRoleApi {@Autowiredprivate RoleMapper roleMapper;@Overridepublic void deleteRoleByUserId(Long id) {roleMapper.deleteRoleByUserId(id);}
}
显然,这个接口需要保证事务,就是说,删除用户和删除用户对应的角色要么都成功,要么都失败。加个事务注解。
@Transactional(rollbackFor = Exception.class)@Overridepublic void delete(Long id) {// 1.删除用户userMapper.deleteById(id);// 2.删除用户对应的角色roleApi.deleteRoleByUserId(id);}
手动在 InfraRoleApi 这边写一个异常
@Overridepublic void deleteRoleByUserId(Long id) {// 模拟异常int i = 1 / 0;roleMapper.deleteRoleByUserId(id);}
启动项目,调用接口,删除用户 ID=142 的记录
用户表逻辑删除成功
用户角色表,对应用户 ID 的记录没有删除成功
控制台有报错,显然,前面的声明式注解没有生效。这没办法生效,不同服务运行在不同的 JVM 上,数据库连的都可能不是同一个。
针对这个场景,我想到下面两种解决方案。
方案一:手写TCC
TCC,是 Try(尝试)-Confirm(确认)-Cancel(取消)
三个单词的首字母,具体实现是,针对业务中的数据库操作,对应写一套回滚操作,把正常的业务操作分为两阶段执行——尝试提交和确认提交,当两阶段发生异常时,执行对应的回滚操作,来保证整体事务的一致性。
针对上面场景的代码,开始改造。
写一个 TCC 类,将两个删除数据库操作封装进来,根据操作的返回结果和 try-catch
判断操作是否成功,看是否需要回滚
import cn.iocoder.yudao.module.infra.api.role.InfraRoleApi;
import cn.iocoder.yudao.module.system.controller.admin.demo.tcc.UserTccService;
import cn.iocoder.yudao.module.system.dal.mysql.user.AdminUserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class UserTccServiceImpl implements UserTccService {@Autowiredprivate InfraRoleApi roleApi;@Autowiredprivate AdminUserMapper userMapper;@Overridepublic void deleteUserByUserId(Long id) {boolean tryResult = tryCommit(id);if (tryResult) {if (!commit(id)) {rollback(id);}} else {boolean rollback = rollback(id);// 如果回滚失败,重试三次if (!rollback) {for (int i = 0; i < 3; i++) {rollback(id);}}}}/*** 尝试提交*/private boolean tryCommit(Long id) {try {// 1.删除用户int i = userMapper.deleteById(id);if (i == 0) {return false;}// 2.删除用户对应的角色boolean result = roleApi.deleteRoleByUserId(id);if (!result) {return false;}} catch (Exception e) {return false;}return true;}/*** 确认提交*/private boolean commit(Long id) {return true;}/*** 回滚*/private boolean rollback(Long id) {try {// 1.删除用户(回滚)userMapper.undeleteById(id);// 2.删除用户对应的角色(回滚)boolean result2 = roleApi.undeleteRoleByUserId(id);if (!result2) {return false;}} catch (Exception e) {return false;}return true;}
}
写两个回滚操作,根据用户 ID 回滚删除用户、回滚删除用户角色
@Update("update system_users set deleted = 0 where id = #{userId}")void undeleteById(Long userId);
@Update("update system_user_role set deleted = 0 where user_id = #{id}")void undeleteRoleByUserId(Long id);
好的,重启项目,调用删除接口,报错依旧
哎,这回用户就没有被删除,事务控制住了。
这是一种参考了 TCC 设计思想的实现,并非是完整意义上的 TCC,这种方式需要注意以下两点:
-
(1)回滚方法需要幂等,即回滚操作执行一次和多次效果相同;
-
(2)尝试提交和回滚操作的方法顺序要一致,即尝试提交是先删除用户,再删除用户角色,回滚也要按这个顺序回滚;
方案二:消息队列
第二种方案是使用消息队列,当本地服务需要调用其他服务的更新操作时,发布一个消息,让这个消息去完成其他服务的更新操作。
首先,创建一张消息表,里面包括消息对象的全限定名和消息的内容(即消息对象实例化后的字符串)
记录如下
改造代码,更新其他服务的操作,换成生成一个消息。这里并不直接发消息,而是把消息先存数据库里。这里可以加声明式注解。
@Transactional(rollbackFor = Exception.class)@Overridepublic void delete2(Long id) {// 1.删除用户int i = userMapper.deleteById(id);// 2.发一个MQ消息DeleteUserRoleMessage deleteUserRoleMessage = new DeleteUserRoleMessage();deleteUserRoleMessage.setUserId(id);MessageDO messageDO = new MessageDO();messageDO.setClassName("cn.iocoder.yudao.module.system.dal.dataobject.mq.DeleteUserRoleMessage");messageDO.setPayload(JSON.toJSONString(deleteUserRoleMessage));messageMapper.insert(messageDO);}
写个定时器,每隔 10 秒把消息取出来实例化,发送到消息队列里
/*** 消息定时器*/
@Component
public class MessageScheduled {@Autowiredprivate MessageMapper messageMapper;@Autowiredprivate RedisMQTemplate redisMQTemplate;@Scheduled(fixedRate = 10000)public void send() throws ClassNotFoundException {List<MessageDO> messageDOS = messageMapper.selectList();for (MessageDO messageDO : messageDOS) {// 1. 加载类Class<DeleteUserRoleMessage> clazz = (Class<DeleteUserRoleMessage>) Class.forName(messageDO.getClassName());// 2. 使用 Fastjson 反序列化DeleteUserRoleMessage deleteUserRoleMessage = JSON.parseObject(messageDO.getPayload(), clazz);// 3.发送消息redisMQTemplate.send(deleteUserRoleMessage);// 4.删除消息messageMapper.deleteById(messageDO.getId());}}
}
消息对象类,因为只需用到用户 ID,所以只定义一个用户 ID属性
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 删除用户角色ID*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DeleteUserRoleMessage extends AbstractRedisChannelMessage {private Long userId;
}
这里是用 Redis 作为消息队列,关于 Redis 如何实现消息队列,可参考下面这篇文章:
- 如何用Redis作为消息队列
回到 infra 服务里,写一个消息消费者,这里接收来自消息队列的消息,获取消息后调用本服务方法,执行删除用户角色操作
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import cn.iocoder.yudao.module.infra.api.role.InfraRoleApiImpl;
import cn.iocoder.yudao.module.infra.mq.message.DeleteUserRoleMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** 删除用户角色消息消费者*/
@Service
public class DeleteUserRoleMessageConsumer extends AbstractRedisChannelMessageListener<DeleteUserRoleMessage> {@Autowiredprivate InfraRoleApiImpl infraRoleApi;@Overridepublic void onMessage(DeleteUserRoleMessage message) {infraRoleApi.deleteRoleByUserId(message.getUserId());}
}
重启项目,试一下,调用接口
消息表写入一条记录
消息定时器,读取消息,并发送到消息队列
另一个服务的消息消费者监听到消息,执行本服务方法
这套流程解决分布式事务的思路是:不回滚本服务方法,而是向前推进事务,通过将其他服务的操作用 MQ 消息封装,让其他服务监听,执行本服务的数据库操作,完成全部事务。
而其他服务消费消息失败、事务的问题则依靠消息组件保证消息不丢失,和本服务的本地事务实现。
也就是说,如果消费消息失败,则不让消息组件移除消息,而是进行重试,或者在消费者这边判断重试次数,超出重试次数存入到数据库中或者什么地方,等程序员排查消费失败的原因后,再写定时任务来消费消息,保证最终一致性。
当然,这种方法需要考虑重复消费消息的问题,所以也需考虑消费者操作的幂等性。
总结
本文介绍了分布式事务的两种解决方案,方案一实现逻辑简单,但代码侵入高,每个事务方法都要写一套,方案二借助了消息组件,技术门槛较高。