@Override @Nullable public Object invoke(MethodInvocation invocation)throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. // 获取我们的代理对象的class属性 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 声明式事务处理 if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 创建TransactionInfo TransactionInfotxInfo= createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 执行被增强方法,调用具体的处理逻辑 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception // 异常回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除事务信息,恢复线程私有的老的事务信息 cleanupTransactionInfo(txInfo); }
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatusstatus= txInfo.getTransactionStatus();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> { TransactionInfotxInfo= prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status); try { ObjectretVal= invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { thrownewThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; returnnull; } } finally { cleanupTransactionInfo(txInfo); } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; }
// Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } }
// 创建事务信息 TransactionInfotxInfo=newTransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. // 设置新事务状态 txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } }
// We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. // 事务信息绑定到当前线程 txInfo.bindToThread(); return txInfo; }
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 关闭自动提交 if (con.getAutoCommit()) { //设置需要恢复自动提交 txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } // 关闭自动提交 con.setAutoCommit(false); }
// Bind the connection holder to the thread. // 绑定我们的数据源和连接到我们的同步管理器上,把数据源作为key,数据库连接作为value 设置到线程变量中 if (txObject.isNewConnectionHolder()) { // 将当前获取到的连接绑定到当前线程 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } }
catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { // 释放数据库连接 DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } thrownewCannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } } protectedvoidprepareTransactionalConnection(Connection con, TransactionDefinition definition) throws SQLException { if (isEnforceReadOnly() && definition.isReadOnly()) { try (Statementstmt= con.createStatement()) { // 设置当前会话事务只读 stmt.executeUpdate("SET TRANSACTION READ ONLY"); } } }
// Remove the connection holder from the thread, if exposed. if (txObject.isNewConnectionHolder()) { // 将数据库连接从当前线程中解除绑定 TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
// 判断当前线程是否存在事务,判断依据为当前线程记录的连接不为空且连接中的 transactionActive 属性不为空 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. // 当前线程已经存在事务 return handleExistingTransaction(def, transaction, debugEnabled); }
// Check definition settings for new transaction. // 事务超时设置验证 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { thrownewInvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); }
// No existing transaction found -> check propagation behavior to find out how to proceed. // 如果当前线程不存在事务,但是 PropagationBehavior 却被声明为 PROPAGATION_MANDATORY 抛出异常 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { thrownewIllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED 都需要新建事务 elseif (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 没有当前事务的话,REQUIRED,REQUIRES_NEW,NESTED 挂起的是空事务,然后创建一个新事务 SuspendedResourcesHoldersuspendedResources= suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { // 恢复挂起的事务 resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. // 创建一个空的事务 if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } booleannewSynchronization= (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } } // 开启新事务 private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
/** * 当前的事务属性状态是PROPAGATION_REQUIRES_NEW表示需要新开启一个事务状态 */ if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } // 挂起当前事务并返回挂起的资源持有器 SuspendedResourcesHoldersuspendedResources= suspend(transaction); try { // 创建一个新的非事务状态(保存了上一个存在事务状态的属性) return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } }
// 嵌套事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 不允许就报异常 if (!isNestedTransactionAllowed()) { thrownewNestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // 嵌套事务的处理 if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. // 如果没有可以使用保存点的方式控制事务回滚,那么在嵌入式事务的建立初始简历保存点 DefaultTransactionStatusstatus= prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 为事务设置一个回退点 status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. // 有些情况是不能使用保存点操作 return startTransaction(definition, transaction, debugEnabled, null); } }
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { IntegercurrentIsolationLevel= TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { ConstantsisoConstants= DefaultTransactionDefinition.constants; thrownewIllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { thrownewIllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } booleannewSynchronization= (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
publicfinalvoidrollback(TransactionStatus status)throws TransactionException { if (status.isCompleted()) { thrownewIllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); }
// Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { thrownewUnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { // 根据事务状态信息,完成后数据清除,和线程的私有资源解绑,重置连接自动提交,隔离级别,是否只读,释放连接,恢复挂起事务等 cleanupAfterCompletion(status); } }
publicfinalvoidcommit(TransactionStatus status)throws TransactionException { if (status.isCompleted()) { thrownewIllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatusdefStatus= (DefaultTransactionStatus) status; // 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚 if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } // 不可预期的回滚 processRollback(defStatus, false); return; }
// 设置了全局回滚 if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } // 可预期的回滚,可能会报异常 processRollback(defStatus, true); return; }
// Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. // 有全局回滚标记就报异常 if (unexpectedRollback) { thrownewUnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } // 提交过程中出现异常则回滚 doRollbackOnCommitException(status, ex); throw ex; }
// Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { // 提交后回调 triggerAfterCommit(status); } finally { // 提交后清除线程私有同步状态 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); }