Spring事务中异步操作导致数据查询失败问题分析与解决方案
在使用 Spring 的事务管理时,我们常常会遇到一个经典问题:在一个事务方法中插入数据后,立即调用另一个异步或新事务方法去查询该数据,却查不到刚刚插入的数据。
这个问题看似是 @Async 或异步执行导致的,但实际上它的根源在于 事务传播机制 + 数据库隔离级别 + 事务提交时机 的组合行为。
参考代码:GitHub - kerrsixy/transaction-demo
1. 问题描述
来看一段典型的代码:
@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));IUserService userService = (IUserService) AopContext.currentProxy();userService.sendMessage(id);return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}
这段代码运行后发现,在 sendMessage 方法中调用 getById(id) 时,查不到刚刚插入的用户信息。
2. 问题本质分析
- 事务尚未真正提交到数据库
- 异步线程已经发起查询
- 数据库连接池或事务管理器还未将变更刷入数据库,导致查询不到。
这并不是 Spring 本身的 bug,而是事务提交与异步任务执行之间的并发竞争条件(race condition)。
3. 解决方案
3.1 方案一:使用 @TransactionalEventListener
1. 定义事务事件类
package com.zjp.transactiondemo.event;import org.springframework.context.ApplicationEvent;public class UserRegisteredEvent extends ApplicationEvent {private final Long userId;public UserRegisteredEvent(Object source, Long userId) {super(source);this.userId = userId;}public Long getUserId() {return userId;}
}
2. 修改 registerUser 方法,移除 AOP 代理调用,发布事件
package com.zjp.transactiondemo.service.impl;import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.event.UserRegisteredEvent;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;private final ApplicationEventPublisher applicationEventPublisher;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));// 发布事件,注意事务尚未提交applicationEventPublisher.publishEvent(new UserRegisteredEvent(this, id));return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}
3. 创建监听器类
package com.zjp.transactiondemo.listener;import com.zjp.transactiondemo.event.UserRegisteredEvent;
import com.zjp.transactiondemo.service.IUserService;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.TransactionalEventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.TransactionPhase;@Component
@RequiredArgsConstructor
public class UserRegisteredListener {private final IUserService userService;/*** 只有在事务成功提交后才会执行*/@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void handleUserRegistered(UserRegisteredEvent event) {userService.sendMessage(event.getUserId());}
}
关于 TransactionPhase 源码及说明:
public enum TransactionPhase {// 指定目标方法在事务commit之前执行BEFORE_COMMIT,// 指定目标方法在事务commit之后执行AFTER_COMMIT,// 指定目标方法在事务rollback之后执行AFTER_ROLLBACK,// 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是事务回滚了AFTER_COMPLETION }
优点:
- 安全可靠,确保在事务提交后执行;
- 可以跨模块复用;
- 符合事件驱动设计思想。
3.2 方案二:使用 TransactionSynchronizationManager
通过注册事务同步回调,在事务提交后执行异步操作。
package com.zjp.transactiondemo.service.impl;import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCommit() {IUserService proxy = (IUserService) AopContext.currentProxy();proxy.sendMessage(id);}});return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}
优点:
- 无需引入事件机制;
- 控制粒度更细。
缺点:
- 侵入性强,不利于维护;
- 不建议在复杂业务中频繁使用。
3.3 方案三:使用消息队列(MQ)解耦事务与异步逻辑
将异步任务发送到消息中间件(如 RabbitMQ、Kafka),由独立消费者处理。
示例流程:
- 事务方法中写入数据库;
- 向 MQ 发送消息;
- 消费者异步消费消息并处理业务。
优点:
- 异步任务持久化,避免丢失;
- 系统解耦,支持分布式架构;
- 可靠性高,适用于关键业务场景。
3.4 方案四:通过手动提交事务处理
示例代码:
package com.zjp.transactiondemo.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;private final PlatformTransactionManager transactionManager;@Overridepublic Boolean registerUser(Long id) {TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());int insert = userMapper.insert(new User().setId(id).setName("zjp"));transactionManager.commit(status);IUserService userService = (IUserService) AopContext.currentProxy();userService.sendMessage(id);return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}
缺点:
- 代码有入侵
3.5 方案五:Thread.sleep()(不推荐)
@Async@Override@Transactional(rollbackFor = Exception.class)public void sendMessage(Long id) {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
缺点:
依赖不确定时间延迟,无法适应负载变化,不可靠
4. 误区方案
4.1 通过修改 @Transactional 的传播机制
package com.zjp.transactiondemo.service.impl;import com.zjp.transactiondemo.entity.User;
import com.zjp.transactiondemo.mapper.UserMapper;
import com.zjp.transactiondemo.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;@Slf4j
@Service
@RequiredArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {private final UserMapper userMapper;@Override@Transactional(rollbackFor = Exception.class)public Boolean registerUser(Long id) {int insert = userMapper.insert(new User().setId(id).setName("zjp"));IUserService userService = (IUserService) AopContext.currentProxy();userService.sendMessage(id);return insert == 1;}@Async@Override@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)public void sendMessage(Long id) {User user = getById(id);log.info("发送短信给用户:{}", user.getName());}
}
执行结果: