当前位置: 首页 > news >正文

Spring事务中异步操作导致数据查询失败问题分析与解决方案

在使用 Spring 的事务管理时,我们常常会遇到一个经典问题:在一个事务方法中插入数据后,立即调用另一个异步或新事务方法去查询该数据,却查不到刚刚插入的数据。
这个问题看似是 @Async 或异步执行导致的,但实际上它的根源在于 事务传播机制 + 数据库隔离级别 + 事务提交时机 的组合行为。

参考代码:GitHub - kerrsixy/transaction-demo

1. 问题描述

来看一段典型的代码:

@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));IUserService userService = (IUserService) AopContext.currentProxy();userService.sendMessage(id);return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}

这段代码运行后发现,在 sendMessage 方法中调用 getById(id) 时,查不到刚刚插入的用户信息。

2. 问题本质分析

  • 事务尚未真正提交到数据库
  • 异步线程已经发起查询
  • 数据库连接池或事务管理器还未将变更刷入数据库,导致查询不到。

这并不是 Spring 本身的 bug,而是事务提交与异步任务执行之间的并发竞争条件(race condition)

3. 解决方案

3.1 方案一:使用 @TransactionalEventListener

1. 定义事务事件类

package com.zjp.transactiondemo.event;import org.springframework.context.ApplicationEvent;public class UserRegisteredEvent extends ApplicationEvent {private final Long userId;public UserRegisteredEvent(Object source, Long userId) {super(source);this.userId = userId;}public Long getUserId() {return userId;}
}

2. 修改 registerUser 方法,移除 AOP 代理调用,发布事件

package com.zjp.transactiondemo.service.impl;import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.event.UserRegisteredEvent;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;private final ApplicationEventPublisher applicationEventPublisher;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));// 发布事件,注意事务尚未提交applicationEventPublisher.publishEvent(new UserRegisteredEvent(this, id));return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}

3. 创建监听器类

package com.zjp.transactiondemo.listener;import com.zjp.transactiondemo.event.UserRegisteredEvent;
import com.zjp.transactiondemo.service.IUserService;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.TransactionalEventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.TransactionPhase;@Component
@RequiredArgsConstructor
public class UserRegisteredListener {private final IUserService userService;/*** 只有在事务成功提交后才会执行*/@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void handleUserRegistered(UserRegisteredEvent event) {userService.sendMessage(event.getUserId());}
}

关于 TransactionPhase 源码及说明:

public enum TransactionPhase {// 指定目标方法在事务commit之前执行BEFORE_COMMIT,// 指定目标方法在事务commit之后执行AFTER_COMMIT,// 指定目标方法在事务rollback之后执行AFTER_ROLLBACK,// 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是事务回滚了AFTER_COMPLETION
}

优点:

  • 安全可靠,确保在事务提交后执行;
  • 可以跨模块复用;
  • 符合事件驱动设计思想。

 3.2 方案二:使用 TransactionSynchronizationManager

通过注册事务同步回调,在事务提交后执行异步操作。

package com.zjp.transactiondemo.service.impl;import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCommit() {IUserService proxy = (IUserService) AopContext.currentProxy();proxy.sendMessage(id);}});return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}

优点:

  • 无需引入事件机制;
  • 控制粒度更细。

缺点:

  • 侵入性强,不利于维护;
  • 不建议在复杂业务中频繁使用。

3.3 方案三:使用消息队列(MQ)解耦事务与异步逻辑

将异步任务发送到消息中间件(如 RabbitMQ、Kafka),由独立消费者处理。

示例流程:

  1. 事务方法中写入数据库;
  2. 向 MQ 发送消息;
  3. 消费者异步消费消息并处理业务。

优点:

  • 异步任务持久化,避免丢失;
  • 系统解耦,支持分布式架构;
  • 可靠性高,适用于关键业务场景。

3.4 方案四:通过手动提交事务处理

示例代码:

package com.zjp.transactiondemo.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;private final PlatformTransactionManager transactionManager;@Overridepublic Boolean registerUser(Long id) {TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());int insert = userMapper.insert(new User().setId(id).setName("zjp"));transactionManager.commit(status);IUserService userService = (IUserService) AopContext.currentProxy();userService.sendMessage(id);return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}

缺点:

  • 代码有入侵 

3.5 方案五:Thread.sleep()(不推荐)

    @Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}User user = getById(id);log.info("发送短信给用户:{}", user.getName());}

缺点:

依赖不确定时间延迟,无法适应负载变化,不可靠

4. 误区方案

4.1 通过修改 @Transactional 的传播机制

package com.zjp.transactiondemo.service.impl;import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));IUserService userService = (IUserService) AopContext.currentProxy();userService.sendMessage(id);return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}

执行结果:

http://www.xdnf.cn/news/415117.html

相关文章:

  • SHA-256 哈希算法详解
  • DNS工作原理与报文解析
  • Docker快速入门与应用
  • 基于Arduino的贪吃蛇游戏机
  • 位运算题目:黑板异或游戏
  • 火山云网站搭建
  • AES-128 加密与解密详解
  • 分享AI时代数据智能人才定向就业班(暑期班)
  • 【Linux 系统调试】syslog:Linux 系统日志工具详解
  • DAY22kaggle泰坦尼克号
  • 手写 vue 源码 === watch 实现
  • 学习黑客5分钟深入浅出理解系列之Windows compmgmt
  • 配置Hadoop集群-免密登录
  • dfs第二次加训 详细题解 下
  • STM32G474VET6-CAN FD使用经典模式+过滤报文ID
  • ESOP系统如何帮助玩具工厂实现生产数据实时展示
  • rufus+Ubuntu 18.04 镜像
  • Promise/A+ 规范中文解读
  • Matlab基于PSO-MVMD粒子群算法优化多元变分模态分解
  • 【C语言指针超详解(五)】--回调函数,qsort函数的理解和使用,qsort函数的模拟实现
  • 类神经网络训练失败怎么办?
  • 中央处理器(CPU)(概述、指令周期)
  • 阿里云服务器核心用途解析:从基础应用到行业创新​
  • c++刷题便捷函数(类似于stoi的小函数)
  • 超越合并速度(merge speed):AI如何重塑开发者协作
  • Hadoop集群的常用命令
  • axi uart 16550 ip core使用流程
  • 一、HAL库的设计理念详解:从架构到实践
  • 274、H指数
  • StringBuilder,StringJoiner,StringBuffer字符串处理类深度解析