我们看看 Spring 中的事务处理的代码,使用 Spring 管理事务有声明式和编程式两种方式,声明式事务处理通过 AOP 的实现把事物管理代码作为方面封装来横向插入到业务代码中,使得事务管理代码和业务代码解藕。在这种方式我们结合 IoC 容器和 Spirng 已有的FactoryBean 来对事务管理进行属性配置,比如传播行为,隔离级别等。其中最简单的方式就是通过配置 TransactionProxyFactoryBean来实现声明式事物;在整个源代码分析中,我们可以大致可以看到 Spring 实现声明式事物管理有这么几个部分:
我们下面看看具体的实现,在 TransactionFactoryBean 中:
1public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements FactoryBean, BeanFactoryAware {
2 //这里是 Spring 事务处理而使用的 AOP 拦截器,中间封装了 Spring 对事务处理的代码来支持声明式事务处理的实现
3 private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
4
5 private Pointcut pointcut;
6
7 //这里 Spring 把 TransactionManager 注入到 TransactionInterceptor 中去
8 public void setTransactionManager(PlatformTransactionManager transactionManager) {
9 this.transactionInterceptor.setTransactionManager(transactionManager);
10 }
11
12 //这里把在 bean 配置文件中读到的事务管理的属性信息注入到 TransactionInterceptor 中去
13 public void setTransactionAttributes(Properties transactionAttributes) {
14 this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
15 }
16
17 ...中间省略了其他一些方法...
18
19 //这里创建 Spring AOP 对事务处理的 Advisor
20 protected Object createMainInterceptor() {
21 this.transactionInterceptor.afterPropertiesSet();
22 if (this.pointcut != null) {
23 //这里使用默认的通知器
24 return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
25 }else {
26 // 使用上面定义好的 TransactionInterceptor 作为拦截器,同时使用 TransactionAttributeSourceAdvisor
27 return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
28 }
29 }
30}
那什么时候 Spring 的 TransactionInterceptor 被注入到 Spring AOP 中成为 Advisor 中的一部分呢?我们看到在TransactionProxyFactoryBean 中,这个方法在 IOC 初始化 bean 的时候被执行:
1public void afterPropertiesSet() {
2 ...
3 //TransactionProxyFactoryBean 实际上使用 ProxyFactory 完成 AOP 的基本功能。
4 ProxyFactory proxyFactory = new ProxyFactory();
5
6 if (this.preInterceptors != null) {
7 for (int i = 0; i < this.preInterceptors.length; i++) {
8 proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(this.preInterceptors[i]));
9 }
10 }
11
12 //这里是 Spring 加入通知器的地方
13 //有两种通知器可以被加入 DefaultPointcutAdvisor 或者 TransactionAttributeSourceAdvisor
14 //这里把 Spring 处理声明式事务处理的 AOP 代码都放到 ProxyFactory 中去,怎样加入 advisor 我们可以参考 ProxyFactory 的父类 AdvisedSupport()
15 //由它来维护一个 advice 的链表,通过这个链表的增删改来抽象我们对整个通知器配置的增删改操作。
16 proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(createMainInterceptor()));
17
18 if (this.postInterceptors != null) {
19 for (int i = 0; i < this.postInterceptors.length; i++) {
20 proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(this.postInterceptors[i]));
21 }
22 }
23
24 proxyFactory.copyFrom(this);
25
26 //这里创建 AOP 的目标源
27 TargetSource targetSource = createTargetSource(this.target);
28 proxyFactory.setTargetSource(targetSource);
29
30 if (this.proxyInterfaces != null) {
31 proxyFactory.setInterfaces(this.proxyInterfaces);
32 }else if (!isProxyTargetClass()) {
33 proxyFactory.setInterfaces(ClassUtils.getAllInterfacesForClass(targetSource.getTargetClass()));
34 }
35
36 this.proxy = getProxy(proxyFactory);
37}
Spring 已经定义了一个 transctionInterceptor 作为拦截器或者 AOP advice 的实现,在 IOC 容器中定义的其他属性比如transactionManager 和事务管理的属性都会传到已经定义好的 TransactionInterceptor 那里去进行处理。以上反映了基本的 Spring AOP的定义过程,其中 pointcut 和 advice 都已经定义好,同时也通过通知器配置到 ProxyFactory 中去了。
下面让我们回到 TransactionProxyFactoryBean 中看看 TransactionAttributeSourceAdvisor 是怎样定义的,这样我们可以理解具体的属性是怎样起作用,这里我们分析一下类 TransactionAttributeSourceAdvisor:
1public class TransactionAttributeSourceAdvisor extends AbstractPointcutAdvisor {
2 //和其他 Advisor 一样,同样需要定义 AOP 中的用到的 Interceptor 和 Pointcut
3 //Interceptor 使用传进来的 TransactionInterceptor
4 //而对于 pointcut,这里定义了一个内部类,参见下面的代码
5 private TransactionInterceptor transactionInterceptor;
6
7 private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut();
8
9 ...
10
11//定义的 PointCut 内部类
12private class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {
13 ...
14 //方法匹配的实现,使用了 TransactionAttributeSource 类
15 public boolean matches(Method method, Class targetClass) {
16 TransactionAttributeSource tas = getTransactionAttributeSource();
17 //这里使用 TransactionAttributeSource 来对配置属性进行处理
18 return (tas != null && tas.getTransactionAttribute(method, targetClass) != null);
19 }
20 ...省略了 equal,hashcode,tostring 的代码
21}
这里我们看看属性值是怎样被读入的:AbstractFallbackTransactionAttributeSource 负责具体的属性读入任务,我们可以有两种读入方式,比如 annotation 和直接配置.我们下面看看直接配置的读入方式,在 Spring 中同时对读入的属性值进行了缓存处理,这是一个decorator 模式:
1public final TransactionAttribute getTransactionAttribute(Method method, Class targetClass) {
2 //这里先查一下缓存里有没有事务管理的属性配置,如果有从缓存中取得 TransactionAttribute
3 Object cacheKey = getCacheKey(method, targetClass);
4 Object cached = this.cache.get(cacheKey);
5 if (cached != null) {
6 if (cached == NULL_TRANSACTION_ATTRIBUTE) {
7 return null;
8 }else {
9 return (TransactionAttribute) cached;
10 }
11 }else {
12 // 这里通过对方法和目标对象的信息来计算事务缓存属性
13 TransactionAttribute txAtt = computeTransactionAttribute(method, targetClass);
14 //把得到的事务缓存属性存到缓存中,下次可以直接从缓存中取得。
15 if (txAtt == null) {
16 this.cache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
17 }else {
18 ...
19 this.cache.put(cacheKey, txAtt);
20 }
21 return txAtt;
22 }
23}
别急,基本的处理在 computeTransactionAttribute()中:
1private TransactionAttribute computeTransactionAttribute(Method method, Class targetClass) {
2 //这里检测是不是 public 方法
3 if(allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
4 return null;
5 }
6
7 Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
8
9 // First try is the method in the target class.
10 TransactionAttribute txAtt = findTransactionAttribute(findAllAttributes(specificMethod));
11 if (txAtt != null) {
12 return txAtt;
13 }
14
15 // Second try is the transaction attribute on the target class.
16 txAtt = findTransactionAttribute(findAllAttributes(specificMethod.getDeclaringClass()));
17 if (txAtt != null) {
18 return txAtt;
19 }
20
21 if (specificMethod != method) {
22 // Fallback is to look at the original method.
23 txAtt = findTransactionAttribute(findAllAttributes(method));
24 if (txAtt != null) {
25 return txAtt;
26 }
27 // Last fallback is the class of the original method.
28 return findTransactionAttribute(findAllAttributes(method.getDeclaringClass()));
29 }
30 return null;
31}
经过一系列的尝试我们可以通过 findTransactionAttribute()通过调用 findAllAttribute()得到 TransactionAttribute 的对象,如果返回的是 null,这说明该方法不是我们需要事务处理的方法。
在完成把需要的通知器加到 ProxyFactory 中去的基础上,我们看看具体的看事务处理代码怎样起作用,在TransactionInterceptor 中:
1public Object invoke(final MethodInvocation invocation) throws Throwable {
2 //这里得到目标对象
3 Class targetClass = (invocation.getThis() != null ? invocation.getThis().getClass() : null);
4
5 //这里同样的通过判断是否能够得到 TransactionAttribute 来决定是否对当前方法进行事务处理,有可能该属性已经被缓存,
6 //具体可以参考上面对 getTransactionAttribute 的分析,同样是通过 TransactionAttributeSource
7 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);
8 final String joinpointIdentification = methodIdentification(invocation.getMethod());
9
10 //这里判断我们使用了什么 TransactionManager
11 if (txAttr == null || !(getTransactionManager() instanceof CallbackPreferringPlatformTransactionManager)) {
12 // 这里创建事务,同时把创建事务过程中得到的信息放到 TransactionInfo 中去
13 TransactionInfo txInfo = createTransactionIfNecessary(txAttr, joinpointIdentification);
14 Object retVal = null;
15 try {
16 retVal = invocation.proceed();
17 }catch (Throwable ex) {
18 // target invocation exception
19 completeTransactionAfterThrowing(txInfo, ex);
20 throw ex;
21 }finally {
22 cleanupTransactionInfo(txInfo);
23 }
24 commitTransactionAfterReturning(txInfo);
25 return retVal;
26 }else {
27 // 使用的是 Spring 定义的 PlatformTransactionManager 同时实现了回调接口,我们通过其回调函数完成事务处理,就像我们使用编程式事务处理一样。
28 try {
29 Object result = ((CallbackPreferringPlatformTransactionManager) getTransactionManager()).execute(txAttr, new TransactionCallback() {
30 public Object doInTransaction(TransactionStatus status) {
31 //同样的需要一个 TransactonInfo
32 TransactionInfo txInfo = prepareTransactionInfo(txAttr, joinpointIdentification, status);
33 try {
34 return invocation.proceed();
35 }
36 ...这里省去了异常处理和事务信息的清理代码
37 });
38 ...
39 }
40 }
这里面涉及到事务的创建,我们可以在 TransactionAspectSupport 实现的事务管理代码:
1protected TransactionInfo createTransactionIfNecessary(TransactionAttribute txAttr, final String joinpointIdentification) {
2 // If no name specified, apply method identification as transaction name.
3 if (txAttr != null && txAttr.getName() == null) {
4 txAttr = new DelegatingTransactionAttribute(txAttr) {
5 public String getName() {
6 return joinpointIdentification;
7 }
8 };
9 }
10
11 TransactionStatus status = null;
12 if (txAttr != null) {
13 //这里使用了我们定义好的事务配置信息,有事务管理器来创建事务,同时返回 TransactionInfo
14 status = getTransactionManager().getTransaction(txAttr);
15 }
16 return prepareTransactionInfo(txAttr, joinpointIdentification, status);
17}
首先通过 TransactionManager 得到需要的事务,事务的创建根据我们定义的事务配置决定,在AbstractTransactionManager 中给出一个标准的创建过程,当然创建什么样的事务还是需要具体的PlatformTransactionManager 来决定,但这里给出了创建事务的模板:
1public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
2 Object transaction = doGetTransaction();
3 ...
4
5 if (definition == null) {
6 //如果事务信息没有被配置,我们使用 Spring 默认的配置方式
7 definition = new DefaultTransactionDefinition();
8 }
9
10 if (isExistingTransaction(transaction)) {
11 // Existing transaction found -> check propagation behavior to find out how to behave.
12 return handleExistingTransaction(definition, transaction, debugEnabled);
13 }
14
15 // Check definition settings for new transaction.
16 //下面就是使用配置信息来创建我们需要的事务;比如传播属性和同步属性等
17 //最后把创建过程中的信息收集起来放到 TransactionStatus 中返回;
18 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
19 throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
20 }
21
22 // No existing transaction found -> check propagation behavior to find out how to behave.
23 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
24 throw new IllegalTransactionStateException(
25 "Transaction propagation 'mandatory' but no existing transaction found");
26 }else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
27 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
28 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
29 //这里是事务管理器创建事务的地方,并将创建过程中得到的信息放到 TransactionStatus 中去,包括创建出来的事务
30 doBegin(transaction, definition);
31 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
32 return newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);
33 }else {
34 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
35 return newTransactionStatus(definition, null, false, newSynchronization, debugEnabled, null);
36 }
37}
接着通过调用 prepareTransactionInfo 完成事务创建的准备,创建过程中得到的信息存储在 TransactionInfo 对象中进行传递同时把信息和当前线程绑定;
1protected TransactionInfo prepareTransactionInfo(TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
2 TransactionInfo txInfo = new TransactionInfo(txAttr, joinpointIdentification);
3 if (txAttr != null) {
4 ...
5 // 同样的需要把在 getTransaction 中得到的 TransactionStatus 放到 TransactionInfo 中来。
6 txInfo.newTransactionStatus(status);
7 }else {
8 ...
9 }
10
11 // 绑定事务创建信息到当前线程
12 txInfo.bindToThread();
13 return txInfo;
14}
将创建事务的信息返回,然后看到其他的事务管理代码:
1protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
2 if (txInfo != null && txInfo.hasTransaction()) {
3 if (logger.isDebugEnabled()) {
4 logger.debug("Invoking commit for transaction on " + txInfo.getJoinpointIdentification());
5 }
6 this.transactionManager.commit(txInfo.getTransactionStatus());
7 }
8}
通过 transactionManager 对事务进行处理,包括异常抛出和正常的提交事务,具体的事务管理器由用户程序设定。
1protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
2 if (txInfo != null && txInfo.hasTransaction()) {
3 if (txInfo.transactionAttribute.rollbackOn(ex)) {
4 ...
5 try {
6 this.transactionManager.rollback(txInfo.getTransactionStatus());
7 }
8 ...
9 } else {
10 ...
11 try {
12 this.transactionManager.commit(txInfo.getTransactionStatus());
13 }
14 ...
15 }
16
17 protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
18 if (txInfo != null && txInfo.hasTransaction()) {
19 ...
20 this.transactionManager.commit(txInfo.getTransactionStatus());
21 }
22 }
Spring 通过以上代码对 transactionManager 进行事务处理的过程进行了 AOP 包装,到这里我们看到为了方便客户实现声明式的事务处理,Spring 还是做了许多工作的。如果说使用编程式事务处理,过程其实比较清楚,我们可以参考书中的例子:
1TransactionDefinition td = new DefaultTransactionDefinition();
2TransactionStatus status = transactionManager.getTransaction(td);
3try{
4 ...//这里是我们的业务方法
5}catch (ApplicationException e) {
6 transactionManager.rollback(status);
7 throw e
8}
9transactionManager.commit(status);
10...
我们看到这里选取了默认的事务配置 DefaultTransactionDefinition,同时在创建事物的过程中得到 TransactionStatus,然后通过直接调用事务管理器的相关方法就能完成事务处理。
声明式事务处理也同样实现了类似的过程,只是因为采用了声明的方法,需要增加对属性的读取处理,并且需要把整个过程整合到 Spring AOP 框架中和 IoC 容器中去的过程。
下面我们选取一个具体的 transactionManager - DataSourceTransactionManager 来看看其中事务处理的实现: 同样的通过使用 AbstractPlatformTransactionManager 使用模板方法,这些都体现了对具体平台相关的事务管理器操作的封装,比如commit:
1public final void commit(TransactionStatus status) throws TransactionException {
2 ...
3 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
4 if (defStatus.isLocalRollbackOnly()) {
5 ...
6 processRollback(defStatus);
7 return;
8 }
9 ...
10 processRollback(defStatus);
11 ...
12}
13
14processCommit(defStatus);
15}
通过对 TransactionStatus 的具体状态的判断,来决定具体的事务处理:
1private void processCommit(DefaultTransactionStatus status) throws TransactionException {
2 try {
3 boolean beforeCompletionInvoked = false;
4 try {
5 triggerBeforeCommit(status);
6 triggerBeforeCompletion(status);
7 beforeCompletionInvoked = true;
8 boolean globalRollbackOnly = false;
9 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
10 globalRollbackOnly = status.isGlobalRollbackOnly();
11 }
12 if (status.hasSavepoint()) {
13 ...
14 status.releaseHeldSavepoint();
15 } else if (status.isNewTransaction()) {
16 ...
17 doCommit(status);
18 }
19 ...
20 }
这些模板方法的实现由具体的 transactionManager 来实现,比如在 DataSourceTransactionManager:
1protected void doCommit(DefaultTransactionStatus status) {
2 //这里得到存在 TransactionInfo 中已经创建好的事务
3 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
4
5 //这里得到和事务绑定的数据库连接
6 Connection con = txObject.getConnectionHolder().getConnection();
7 ...
8 try {
9 //这里通过数据库连接来提交事务
10 con.commit();
11 }
12 ...
13}
14
15protected void doRollback(DefaultTransactionStatus status) {
16 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
17 Connection con = txObject.getConnectionHolder().getConnection();
18 if (status.isDebug()) {
19 logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
20 }
21 try {
22 //这里通过数据库连接来回滚事务
23 con.rollback();
24 } catch (SQLException ex) {
25 throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
26 }
27}
我们看到在 DataSourceTransactionManager 中最后还是交给 connection 来实现事务的提交和 rollback。整个声明式事务处理是事务处理在 Spring AOP 中的应用,我们看到了一个很好的使用 Spring AOP 的例子,在 Spring 声明式事务处理的源代码中我们可以看到:
如果能够结合前面的 Spring AOP 的源代码来学习,理解可能会更深刻些。