当前位置: 首页 > news >正文

Spring事务管理机制深度解析:从JDBC基础到Spring高级实现

一、事务的本质?

什么是事务?

数据库事务(Database Transaction) ,是指作为单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全地不执行。

  事务处理可以确保除非事务性单元内的所有操作都成功完成,否则不会永久更新面向数据的资源。通过将一组相关操作组合为一个要么全部成功要么全部失败的单元,可以简化错误恢复并使应用程序更加可靠。

  一个逻辑工作单元要成为事务,必须满足所谓的 ACID(原子性、一致性、隔离性和持久性)属性。事务是数据库运行中的逻辑工作单位,由DBMS中的事务管理子系统负责事务的处理。

JDBC中是怎么处理事务的?

JDBC(Java Database Connectivity)提供了对数据库事务的支持,主要通过Connection对象来控制事务。

核心事务控制方法

JDBC通过Connection接口提供了以下关键事务控制方法:setAutoCommit(boolean autoCommit)- 设置自动提交模式

commit()- 提交事务rollback()- 回滚事务

        public static void main(String[] args) {Connection conn = null;Statement stmt = null;try {// 注册 JDBC 驱动// Class.forName("com.mysql.cj.jdbc.Driver");// 打开连接conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC", "root", "123456");// 执行查询stmt = conn.createStatement();conn.setAutoCommit(false); // 关闭自动提交// 添加用户信息String sql = "INSERT INTO T_USER(id,user_name)values(1,'管理员')";stmt.executeUpdate(sql);// 添加日志问题sql = "INSET INTO t_log(id,log)values(1,'添加了用户:管理员')";stmt.executeUpdate(sql);conn.commit(); // 上面两个操作都没有问题就提交} catch (Exception e) {e.printStackTrace();// 出现问题就回滚try {conn.rollback();} catch (SQLException throwables) {throwables.printStackTrace();}} finally {try {if (stmt != null) stmt.close();} catch (SQLException se2) {}try {if (conn != null) conn.close();} catch (SQLException se) {se.printStackTrace();}}}

关键操作:关闭自动提交,都成功就commit,有一个失败就rollback

在Spring中的事务管理?

我们在Service中是可能调用多个Dao的方法来操作数据库中的数据的,我们要做的就是要保证UserService中的 addUser()方法中的相关操作满足事务的要求。在Spring中支持两种事务的使用方式

一种是基于XML的文件配置方式,一种是基于注解的方式。

在Spring的底层:PlatformTransactionManager

TransactionManager顶级接口,但是内容为空的

public interface TransactionManager {}

PlatformTransactionManager:平台事务管理器

ReactiveTransactionManager:响应式编程的事务管理器关注的重点是PlatformTransactionManager

public interface PlatformTransactionManager extends TransactionManager {/**获取事务*/TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException;/**提交数据*/void commit(TransactionStatus status) throws TransactionException;/**回滚数据*/void rollback(TransactionStatus status) throws TransactionException;}

PlatformTransactionManager也是个接口,在他下面的实现有两个比较重要实现

JtaTransactionManager:支持分布式事务【本身服务中的多数据源】

DataSourceTransactionManager:数据源事务管理器。在但数据源中的事务管理,这个是我们分析的重点。

在DataSourceTransactionManager中提供了与事务相关的操作方法。

事务的定义

在上面的 PlatformTransactoinManager中看到了 TransactionDefinition 这个对象,通过字面含义是 事务定义

TransactionDefinition中定义了事务的 传播属性隔离级别

DefaultTransactionDefinition:是事务定义的默认实现

DefaultTransactionAttribute:扩展了TransactionAttribute中的属性的实现

@Transactional:该组件就会被解析加载为对应的 TransactionDefinition对象。

开启事务

 然后在 PlatformTransactionManager中获取事务的时候返回的是 TransactionStatus对象。

核心方法

getTransaction

/*** This implementation handles propagation behavior. Delegates to* {@code doGetTransaction}, {@code isExistingTransaction}* and {@code doBegin}.* @see #doGetTransaction* @see #isExistingTransaction* @see #doBegin*/
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException {// Use defaults if no transaction definition given.// 如果没有事务定义信息则使用默认的事务管理器定义信息TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());// 获取事务Object transaction = doGetTransaction();boolean debugEnabled = logger.isDebugEnabled();// 判断当前线程是否存在事务,判断依据为当前线程记录的连接不为空且连接中的transactionActive属性不为空if (isExistingTransaction(transaction)) {// Existing transaction found -> check propagation behavior to find out how to behave.// 当前线程已经存在事务return handleExistingTransaction(def, transaction, debugEnabled);}// Check definition settings for new transaction.// 事务超时设置验证if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());}// No existing transaction found -> check propagation behavior to find out how to proceed.// 如果当前线程不存在事务,但是PropagationBehavior却被声明为PROPAGATION_MANDATORY抛出异常if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");}// PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED都需要新建事务else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {//没有当前事务的话,REQUIRED,REQUIRES_NEW,NESTED挂起的是空事务,然后创建一个新事务SuspendedResourcesHolder suspendedResources = suspend(null);if (debugEnabled) {logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);}try {return startTransaction(def, transaction, debugEnabled, suspendedResources);}catch (RuntimeException | Error ex) {// 恢复挂起的事务resume(null, suspendedResources);throw ex;}}else {// Create "empty" transaction: no actual transaction, but potentially synchronization.// 创建一个空的事务if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + def);}boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);}
}

doGetTransaction()

/*** 创建一个DataSourceTransactionObject当作事务,设置是否允许保存点,然后获取连接持有器ConnectionHolder* 里面会存放JDBC的连接,设置给DataSourceTransactionObject,当然第一次是空的** @return*/
@Override
protected Object doGetTransaction() {// 创建一个数据源事务对象DataSourceTransactionObject txObject = new DataSourceTransactionObject();// 是否允许当前事务设置保持点txObject.setSavepointAllowed(isNestedTransactionAllowed());/*** TransactionSynchronizationManager 事务同步管理器对象(该类中都是局部线程变量)* 用来保存当前事务的信息,我们第一次从这里去线程变量中获取 事务连接持有器对象 通过数据源为key去获取* 由于第一次进来开始事务 我们的事务同步管理器中没有被存放.所以此时获取出来的conHolder为null*/ConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());// 非新创建连接则写falsetxObject.setConnectionHolder(conHolder, false);// 返回事务对象return txObject;
}
/*** Create a TransactionStatus for an existing transaction.*/
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {/*** 判断当前的事务行为是不是PROPAGATION_NEVER的* 表示为不支持事务,但是当前又存在一个事务,所以抛出异常*/if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}/*** 判断当前的事务属性不支持事务,PROPAGATION_NOT_SUPPORTED,所以需要先挂起已经存在的事务*/if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {if (debugEnabled) {logger.debug("Suspending current transaction");}// 挂起当前事务Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);// 创建一个新的非事务状态(保存了上一个存在事务状态的属性)return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);}/*** 当前的事务属性状态是PROPAGATION_REQUIRES_NEW表示需要新开启一个事务状态*/if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {if (debugEnabled) {logger.debug("Suspending current transaction, creating new transaction with name [" +definition.getName() + "]");}// 挂起当前事务并返回挂起的资源持有器SuspendedResourcesHolder suspendedResources = suspend(transaction);try {// 创建一个新的非事务状态(保存了上一个存在事务状态的属性)return startTransaction(definition, transaction, debugEnabled, suspendedResources);}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}}// 嵌套事务if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {// 不允许就报异常if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}// 嵌套事务的处理if (useSavepointForNestedTransaction()) {// Create savepoint within existing Spring-managed transaction,// through the SavepointManager API implemented by TransactionStatus.// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.// 如果没有可以使用保存点的方式控制事务回滚,那么在嵌入式事务的建立初始简历保存点DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);// 为事务设置一个回退点status.createAndHoldSavepoint();return status;}else {// Nested transaction through nested begin and commit/rollback calls.// Usually only for JTA: Spring synchronization might get activated here// in case of a pre-existing JTA transaction.// 有些情况是不能使用保存点操作return startTransaction(definition, transaction, debugEnabled, null);}}// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.if (debugEnabled) {logger.debug("Participating in existing transaction");}if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

startTransaction()

/*** Start a new transaction.*/
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {// 是否需要新同步boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);// 创建新的事务DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);// 开启事务和连接doBegin(transaction, definition);// 新同步事务的设置,针对于当前线程的设置prepareSynchronization(status, definition);return status;
}

doBegin方法开启和连接事务

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {// 强制转化事务对象DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;Connection con = null;try {// 判断事务对象没有数据库连接持有器if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {// 通过数据源获取一个数据库连接对象Connection newCon = obtainDataSource().getConnection();if (logger.isDebugEnabled()) {logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");}// 把我们的数据库连接包装成一个ConnectionHolder对象 然后设置到我们的txObject对象中去txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}// 标记当前的连接是一个同步事务txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con = txObject.getConnectionHolder().getConnection();// 为当前的事务设置隔离级别Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);// 设置先前隔离级别txObject.setPreviousIsolationLevel(previousIsolationLevel);// 设置是否只读txObject.setReadOnly(definition.isReadOnly());// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,// so we don't want to do it unnecessarily (for example if we've explicitly// configured the connection pool to set it already).// 关闭自动提交if (con.getAutoCommit()) {//设置需要恢复自动提交txObject.setMustRestoreAutoCommit(true);if (logger.isDebugEnabled()) {logger.debug("Switching JDBC Connection [" + con + "] to manual commit");}// 关闭自动提交con.setAutoCommit(false);}// 判断事务是否需要设置为只读事务prepareTransactionalConnection(con, definition);// 标记激活事务txObject.getConnectionHolder().setTransactionActive(true);// 设置事务超时时间int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}// Bind the connection holder to the thread.// 绑定我们的数据源和连接到我们的同步管理器上,把数据源作为key,数据库连接作为value 设置到线程变量中if (txObject.isNewConnectionHolder()) {// 将当前获取到的连接绑定到当前线程TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());}}catch (Throwable ex) {if (txObject.isNewConnectionHolder()) {// 释放数据库连接DataSourceUtils.releaseConnection(con, obtainDataSource());txObject.setConnectionHolder(null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);}
}

doBegin方法中核心的关闭了自动提交

同时把连接绑定到本地线程中bindResource方法

Spring事务源码串联

编程式事务、AOP事务

@Autowired
private UserDao userDao;@Autowired
private PlatformTransactionManager txManager;@Autowired
private LogService logService;@Transactional
public void insertUser(User u) {// 1、创建事务定义DefaultTransactionDefinition definition = new DefaultTransactionDefinition();// 2、根据定义开启事务TransactionStatus status = txManager.getTransaction(definition);try {this.userDao.insert(u);Log log = new Log(System.currentTimeMillis() + "", System.currentTimeMillis() + "-" + u.getUserName());// this.doAddUser(u);this.logService.insertLog(log);// 3、提交事务txManager.commit(status);} catch (Exception e) {// 4、异常了,回滚事务txManager.rollback(status);throw e;}
}

在Service中通过事务处理的代码实现了事务管理,同时结合AOP的内容,可以把事务的代码抽取出来,然后我们来看看Spring中这块是如何处理的。

通过Debug的方式看到处理的关键流程 TransactionInterceptor 就是事务处理的 advice


@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {// Work out the target class: may be {@code null}.// The TransactionAttributeSource should be passed the target class// as well as the method, which may be from an interface.Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);// Adapt to TransactionAspectSupport's invokeWithinTransaction...return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

进入到invokeWithinTransaction方法中

@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,final InvocationCallback invocation) throws Throwable {// If the transaction attribute is null, the method is non-transactional.// 获取我们的事务属性源对象TransactionAttributeSource tas = getTransactionAttributeSource();// 通过事务属性源对象获取到当前方法的事务属性信息final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);// 获取我们配置的事务管理器对象final TransactionManager tm = determineTransactionManager(txAttr);if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: " + method +". Use TransactionalOperator.transactional extensions instead.");}ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());if (adapter == null) {throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +method.getReturnType());}return new ReactiveTransactionSupport(adapter);});return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);}PlatformTransactionManager ptm = asPlatformTransactionManager(tm);// 获取连接点的唯一标识  类名+方法名final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);// 声明式事务处理if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {// Standard transaction demarcation with getTransaction and commit/rollback calls.// 创建TransactionInfoTransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);Object retVal;try {// This is an around advice: Invoke the next interceptor in the chain.// This will normally result in a target object being invoked.// 执行被增强方法,调用具体的处理逻辑retVal = invocation.proceedWithInvocation();}catch (Throwable ex) {// target invocation exception// 异常回滚completeTransactionAfterThrowing(txInfo, ex);throw ex;}finally {//清除事务信息,恢复线程私有的老的事务信息cleanupTransactionInfo(txInfo);}if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {// Set rollback-only in case of Vavr failure matching our rollback rules...TransactionStatus status = txInfo.getTransactionStatus();if (status != null && txAttr != null) {retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}}//成功后提交,会进行资源储量,连接释放,恢复挂起事务等操作commitTransactionAfterReturning(txInfo);return retVal;}else {// 编程式事务处理Object result;final ThrowableHolder throwableHolder = new ThrowableHolder();// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.try {result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);try {Object retVal = invocation.proceedWithInvocation();if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {// Set rollback-only in case of Vavr failure matching our rollback rules...retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}return retVal;}catch (Throwable ex) {if (txAttr.rollbackOn(ex)) {// A RuntimeException: will lead to a rollback.if (ex instanceof RuntimeException) {throw (RuntimeException) ex;}else {throw new ThrowableHolderException(ex);}}else {// A normal return value: will lead to a commit.throwableHolder.throwable = ex;return null;}}finally {cleanupTransactionInfo(txInfo);}});}catch (ThrowableHolderException ex) {throw ex.getCause();}catch (TransactionSystemException ex2) {if (throwableHolder.throwable != null) {logger.error("Application exception overridden by commit exception", throwableHolder.throwable);ex2.initApplicationException(throwableHolder.throwable);}throw ex2;}catch (Throwable ex2) {if (throwableHolder.throwable != null) {logger.error("Application exception overridden by commit exception", throwableHolder.throwable);}throw ex2;}// Check result state: It might indicate a Throwable to rethrow.if (throwableHolder.throwable != null) {throw throwableHolder.throwable;}return result;}
}

然后进入到createTransactionIfNecessary方法中

进入 getTransaction

核心的是doBegin方法。完成 自动提交的关闭和 本地线程 对象的存储

TransactionInterceptor

TransactionInterceptor是如何注入到容器中的?

首先来看看事务的开启@EnableTransactionManagement

可以看到拦截器关联到了Advisor中

http://www.xdnf.cn/news/1382491.html

相关文章:

  • 力扣(LeetCode) ——965. 单值二叉树(C语言)
  • C#写的一键自动测灯带的应用 AI帮写的。
  • [灵动微电子 MM32BIN560CN MM32SPIN0280]读懂电机MCU之串口DMA
  • list 手动实现 1
  • 学习日志40 python
  • 微服务即时通信系统(十三)--- 项目部署
  • 【后端】微服务后端鉴权方案
  • 虚函数指针和虚函数表的创建时机和存放位置
  • 【Linux知识】Linux 设置账号密码永不过期
  • 完整代码注释:实现 Qt 的 TCP 客户端,实现和服务器通信
  • 【LINUX网络】TCP原理
  • WEEX唯客上线C2C交易平台:打造安全便捷的用户交易体验
  • 现在购买PCIe 5.0 SSD是否是最好的时机?
  • 前端实现Linux查询平台:打造高效运维工作流
  • [光学原理与应用-320]:光学产品不同阶段使用的工具软件、对应的输出文件
  • 华为S5720S重置密码
  • c语言动态数组扩容
  • MCU平台化实践方案
  • STL库——list(类函数学习)
  • 财务数据报销画像技术实现:从数据采集到智能决策的全流程解析
  • 【AI自动化】VSCode+Playwright+codegen+nodejs自动化脚本生成
  • 当new一块内存时,操作系统做了哪些事情
  • 软考 系统架构设计师系列知识点之杂项集萃(134)
  • leetcode算法刷题的第二十天
  • 鸿蒙OS与Rust整合开发流程
  • 面试tips--JVM(3)--类加载过程
  • 动态加载和异步调用tasklet/workqueue day63 ay64
  • 中国剩余定理(以及扩展..)
  • .Net Core Web 架构(管道机制)的底层实现
  • [光学原理与应用-321]:皮秒深紫外激光器产品不同阶段使用的工具软件、对应的输出文件