前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >TransactionalEventListener使用场景与原理分析

TransactionalEventListener使用场景与原理分析

作者头像
叔牙
修改2021-12-07 23:14:16
5.4K0
修改2021-12-07 23:14:16
举报
文章被收录于专栏:一个执拗的后端搬砖工

一、背景

开发中有这样一个场景,客服业务需要接入在线能力,基于其他团队的IM底层能力构建业务层能力,也就是需要先调二方restful服务创建群聊,然后调用本地服务创建会话,并且创建会话依赖于二方服务返回的群聊信息,那么就会出现本地服务异常回滚,但是二方服务已经调用成功的情况,如果不做处理那么下次再尝试创建群聊,用户id已经存在,创建不成功,考虑到异构服务(二方服务可能是java、C++或者其他)或者异构数据(mysql、TiDB等), 分布式事务并不是一个很好的选择,这个时候我们就可以考虑在产生异常时候手动回滚二方服务的方式。

二、案例分析

今天我们要描述的是使用TransactionalEventListener来做业务补偿,TransactionalEventListener本质上是一个EventListener,依赖于Spring事件体系的支撑,我们要做的就是优先调用二方服务并返回结果,如果二方服务异常,流程终止不执行本地业务,如果二方服务正常,执行本地业务,如果本地执行成功,整个流程执行成功,如果本地执行异常,本地数据回滚,然后发出异常事件,由TransactionalEventListener执行二方数据的手动回滚或者订正。大致流程如下:

业务代码:

代码语言:javascript
复制
@Transactional
public void createSession() {
    //1.创建群
    String groupId = restful api.createGroup
    //2.发布补偿事件
    this.applicationEventPublisher.publishEvent();
    //3.创建会话
    this.createSession(groupId);   
}

首先调用二方服务,然后发送补偿事件,最后调用本地服务,2和3的顺序不能颠倒,否则会导致异常终止事件发送不出去。

TransactionalEventListener事件补偿:

代码语言:javascript
复制
@Slf4j
@Component
public class CreateSessionFailedListener {

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void onRollbackEvent(CreateSessionFailedEvent event) {
        try {
            restful api补偿
        } catch (Exception e) {
            log.error("onRollbackEvent occur error;event={}",event,e);
        }
    }
}

使用TransactionalEventListener注解写事务监听器,并且监听的时机是异常回滚,也就是本地事务出现异常回滚后触发该事件监听。

这样就能实现二方服务执行成功后,本地事务回滚,然后补偿订正二方服务数据了。当然也可以在事务的上层调用方捕获并识别异常,然后根据需要决定是否需要补偿。

三、源码&原理解析

1.监听器初始化

@TransactionalEventListener注解肯定和事务相关,那么我们就从springboot开启事务注解的地方开始分析,先看EnableTransactionManagement:

代码语言:javascript
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
    ...
}

TransactionalEventListener导入了选择器TransactionManagementConfigurationSelector:

代码语言:javascript
复制
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
  @Override
  protected String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
      case PROXY:
        return new String[] {AutoProxyRegistrar.class.getName(),
            ProxyTransactionManagementConfiguration.class.getName()};
      case ASPECTJ:
        return new String[] {determineTransactionAspectClass()};
      default:
        return null;
    }
  }
}

默认是Proxy代理,会引入AutoProxyRegistrar和ProxyTransactionManagementConfiguration,前者我们在spring cache原理解析中已经分析过,直接看ProxyTransactionManagementConfiguration:

代码语言:javascript
复制
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

  @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
    BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
    advisor.setTransactionAttributeSource(transactionAttributeSource());
    advisor.setAdvice(transactionInterceptor());
    if (this.enableTx != null) {
      advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
    }
    return advisor;
  }

  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionAttributeSource transactionAttributeSource() {
    return new AnnotationTransactionAttributeSource();
  }

  @Bean
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public TransactionInterceptor transactionInterceptor() {
    TransactionInterceptor interceptor = new TransactionInterceptor();
    interceptor.setTransactionAttributeSource(transactionAttributeSource());
    if (this.txManager != null) {
      interceptor.setTransactionManager(this.txManager);
    }
    return interceptor;
  }

}

ProxyTransactionManagementConfiguration定义了几个基础设施类,来实现事务逻辑织入,在之前的篇幅中已经不止一次分析过,此处不在赘述,我们看一下期继承关系:

ProxyTransactionManagementConfiguration继承了AbstractTransactionManagementConfiguration

类,该类定义了一个bean:

代码语言:javascript
复制
@Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
  return new TransactionalEventListenerFactory();
}

该bean所在类定义了创建事件监听器的方法:

代码语言:javascript
复制
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {

  @Override
  public boolean supportsMethod(Method method) {
    return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
  }
  @Override
  public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
    return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
  }
}

该方法根据用户使用@TransactionalEventListener注解的方法创建时间监听器代理,在应用启动的时候EventListenerMethodProcessor中调用,其原理在另外一片文章《事件驱动编程》中也分析过,然后我们看一下创建的监听器代理的实现ApplicationListenerMethodTransactionalAdapter:

代码语言:javascript
复制
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {

  private final TransactionalEventListener annotation;

  public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
    super(beanName, targetClass, method);
    TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
    if (ann == null) {
      throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);
    }
    this.annotation = ann;
  }

  @Override
  public void onApplicationEvent(ApplicationEvent event) {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
      TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
      TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
    }
    else if (this.annotation.fallbackExecution()) {
      if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
        logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
      }
      processEvent(event);
    }
    else {
      // No transactional event execution at all
      if (logger.isDebugEnabled()) {
        logger.debug("No transaction is active - skipping " + event);
      }
    }
  }
}

监听器逻辑调用会调用onApplicationEvent方法,这一段逻辑比较巧妙,首先检查当前上下文是否在事务中,如果是则把监听器逻辑注册到事务同步器中,等待后续事务执行过程指定节点触发,如果没有在事务中则立即触发事件监听逻辑。

事件工厂初始化:

事务事件监听器初始化:

2.事务事件调用与监听器触发

在spring体系中我们可以直接注入事件发布器来发布事件:

代码语言:javascript
复制
@Autowired
protected ApplicationEventPublisher applicationEventPublisher;

看一下ApplicationEventPublisher定义:

代码语言:javascript
复制
@FunctionalInterface
public interface ApplicationEventPublisher {
  /**
   * Notify all <strong>matching</strong> listeners registered with this
   * application of an application event. Events may be framework events
   * (such as RequestHandledEvent) or application-specific events.
   * @param event the event to publish
   * @see org.springframework.web.context.support.RequestHandledEvent
   */
  default void publishEvent(ApplicationEvent event) {
    publishEvent((Object) event);
  }
  /**
   * Notify all <strong>matching</strong> listeners registered with this
   * application of an event.
   * <p>If the specified {@code event} is not an {@link ApplicationEvent},
   * it is wrapped in a {@link PayloadApplicationEvent}.
   * @param event the event to publish
   * @since 4.2
   * @see PayloadApplicationEvent
   */
  void publishEvent(Object event);
}

调用ApplicationEventPublisher#publishEvent会调用AbstractApplicationContext#publishEvent:

代码语言:javascript
复制
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
  Assert.notNull(event, "Event must not be null");
  // Decorate event as an ApplicationEvent if necessary
  ApplicationEvent applicationEvent;
  if (event instanceof ApplicationEvent) {
    applicationEvent = (ApplicationEvent) event;
  }
  else {
    applicationEvent = new PayloadApplicationEvent<>(this, event);
    if (eventType == null) {
      eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
    }
  }
  // Multicast right now if possible - or lazily once the multicaster is initialized
  if (this.earlyApplicationEvents != null) {
    this.earlyApplicationEvents.add(applicationEvent);
  }
  else {
    getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
  }

  // Publish event via parent context as well...
  if (this.parent != null) {
    if (this.parent instanceof AbstractApplicationContext) {
      ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
    }
    else {
      this.parent.publishEvent(event);
    }
  }
}

这里是一个递归调用,当前上下文先发布事件,然后递归找父上下文发布事件,最终会调用SimpleApplicationEventMulticaster#multicastEvent来发布事件:

代码语言:javascript
复制
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
  ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
    Executor executor = getTaskExecutor();
    if (executor != null) {
      executor.execute(() -> invokeListener(listener, event));
    }
    else {
      invokeListener(listener, event);
    }
  }
}

这里是从上下文中先获取监听器集合,然后如果有任务执行器就调用任务执行器执行监听器逻辑(多线程),否则当前线程调用监听器逻辑,然后看invokeListener实现:

代码语言:javascript
复制
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
  ErrorHandler errorHandler = getErrorHandler();
  if (errorHandler != null) {
    try {
      doInvokeListener(listener, event);
    }
    catch (Throwable err) {
      errorHandler.handleError(err);
    }
  }
  else {
    doInvokeListener(listener, event);
  }
}

继续看doInvokeListener:

代码语言:javascript
复制
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
  try {
    listener.onApplicationEvent(event);
  }
  catch (ClassCastException ex) {
    String msg = ex.getMessage();
    if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
      // Possibly a lambda-defined listener which we could not resolve the generic event type for
      // -> let's suppress the exception and just log a debug message.
      Log logger = LogFactory.getLog(getClass());
      if (logger.isDebugEnabled()) {
        logger.debug("Non-matching event type for listener: " + listener, ex);
      }
    }
    else {
      throw ex;
    }
  }
}

到这里就比较清晰了,我们自定义事件监听器都实现了ApplicationListener接口,此处会调用监听器的onApplicationEvent方法执行自定义逻辑。

然后我们回顾一下事务事件监听器适配器实现:

代码语言:javascript
复制
@Override
public void onApplicationEvent(ApplicationEvent event) {
  if (TransactionSynchronizationManager.isSynchronizationActive()) {
    TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
    TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
  }
  else if (this.annotation.fallbackExecution()) {
    if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
      logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
    }
    processEvent(event);
  }
  else {
    // No transactional event execution at all
    if (logger.isDebugEnabled()) {
      logger.debug("No transaction is active - skipping " + event);
    }
  }
}

当上一步doInvokeListener调用到ApplicationListenerMethodTransactionalAdapter#onApplicationEvent的时候,如果检测到当前上下文有活跃的事务,那么就把监听器逻辑注册到事务中,等到事务执行到指定的节点触发监听器逻辑,否则如果检测到TransactionalEventListener.fallbackExecution属性为true(如果没有事务,是否处理事件),则直接调用处理事件逻辑,否则返回调用。

我们暂且理解为当前逻辑在事务中,先创建事务同步逻辑:

代码语言:javascript
复制
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
    ApplicationEvent event, TransactionPhase phase) {

  this.listener = listener;
  this.event = event;
  this.phase = phase;
}

包含了事件监听器,事件类型和事务事件触发阶段。然后调用事件同步管理器把事件同步逻辑注册到事务中:

代码语言:javascript
复制
public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException {

  Assert.notNull(synchronization, "TransactionSynchronization must not be null");
  if (!isSynchronizationActive()) {
    throw new IllegalStateException("Transaction synchronization is not active");
  }
  synchronizations.get().add(synchronization);
}

我们把事务事件监听器执行的此阶段叫做注册阶段,用时序图更清晰的分析一下其逻辑:

那事件事务监听器逻辑注册到事务生命周期成功了,什么时候触发呢?那就要回到ProxyTransactionManagementConfiguration的TransactionInterceptor了,加事务注解的逻辑执行的时候会被TransactionInterceptor拦截到,然后执行invoke逻辑:

代码语言:javascript
复制
public Object invoke(MethodInvocation invocation) throws Throwable {
  // Work out the target class: may be {@code null}.
  // The TransactionAttributeSource should be passed the target class
  // as well as the method, which may be from an interface.
  Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

  // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

在invokeWithinTransaction逻辑中会调用commitTransactionAfterReturning方法:

代码语言:javascript
复制
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
  if (txInfo != null && txInfo.getTransactionStatus() != null) {
    if (logger.isTraceEnabled()) {
      logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
    }
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

然后会调用事务管理器执行事务状态的提交逻辑:

代码语言:javascript
复制
@Override
public final void commit(TransactionStatus status) throws TransactionException {
  if (status.isCompleted()) {
    throw new IllegalTransactionStateException(
        "Transaction is already completed - do not call commit or rollback more than once per transaction");
  }
  DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  if (defStatus.isLocalRollbackOnly()) {
    if (defStatus.isDebug()) {
      logger.debug("Transactional code has requested rollback");
    }
    processRollback(defStatus, false);
    return;
  }
  if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    if (defStatus.isDebug()) {
      logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    }
    processRollback(defStatus, true);
    return;
  }
  processCommit(defStatus);
}

如果事务已结束,异常终止,如果事务需要回滚则执行processRollback,否则执行processCommit提交,我们继续看processRollback回滚逻辑:

代码语言:javascript
复制
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
  try {
    boolean unexpectedRollback = unexpected;
    try {
            //1.处理事务提交前事件监听逻辑
      triggerBeforeCompletion(status);
            //回滚逻辑
            ...      
    }
    catch (RuntimeException | Error ex) {
      triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
      throw ex;
    }
    //2触发事务提交后监听逻辑
    triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    // Raise UnexpectedRollbackException if we had a global rollback-only marker
    if (unexpectedRollback) {
      throw new UnexpectedRollbackException(
          "Transaction rolled back because it has been marked as rollback-only");
    }
  }
  finally {
    cleanupAfterCompletion(status);
  }
}

看一下triggerAfterCompletion实现:

代码语言:javascript
复制
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
  if (status.isNewSynchronization()) {
    List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
    TransactionSynchronizationManager.clearSynchronization();
    if (!status.hasTransaction() || status.isNewTransaction()) {
      if (status.isDebug()) {
        logger.trace("Triggering afterCompletion synchronization");
      }
      // No transaction or new transaction for the current scope ->
      // invoke the afterCompletion callbacks immediately
      invokeAfterCompletion(synchronizations, completionStatus);
    }
    else if (!synchronizations.isEmpty()) {
      // Existing transaction that we participate in, controlled outside
      // of the scope of this Spring transaction manager -> try to register
      // an afterCompletion callback with the existing (JTA) transaction.
      registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
    }
  }
}

如果当前阶段没有事务或者新事务则执行后置回调逻辑invokeAfterCompletion:

代码语言:javascript
复制
protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
  TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
}

然后调用TransactionSynchronizationUtils#invokeAfterCompletion方法:

代码语言:javascript
复制
public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
    int completionStatus) {

  if (synchronizations != null) {
    for (TransactionSynchronization synchronization : synchronizations) {
      try {
        synchronization.afterCompletion(completionStatus);
      }
      catch (Throwable tsex) {
        logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
      }
    }
  }
}

获取到注册到当前事务的事件列表并执行,前边我们注册的是TransactionSynchronizationEventAdapter,直接看其afterCompletion实现:

代码语言:javascript
复制
@Override
public void afterCompletion(int status) {
  if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
    processEvent();
  }
  else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
    processEvent();
  }
  else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
    processEvent();
  }
}

从@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)可以看出我们注册事件监听的截断是TransactionPhase.AFTER_ROLLBACK,逻辑会进入第二个分支调用processEvent方法:

代码语言:javascript
复制
protected void processEvent() {
  this.listener.processEvent(this.event);
}

到这里就执行到我们自定义监听器的逻辑了,也用时序图来清晰的描述事务事件的触发时机和逻辑:

总结

我们本篇从使用和源码角度分别分析了TransactionalEventListener使用方式和实现原理,可以得出以下几个结论:

  • TransactionalEventListener本质上是EventListener,依托于spring事件体系支持
  • TransactionalEventListener从注册到触发依赖于事务管理器和事务的生命周期
  • TransactionalEventListener适用于在事务的生命周期中特定节点做一些前置逻辑和后置补偿

对于一个事务涉及到本地和二方服务调用的场景,并且本地业务的执行依赖二方服务的结果,在本地服务出现异常发生回滚的时候,可以使用事务事件监听来做逻辑解耦和数据补偿,并且这种方式更优雅和简单。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-08-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 PersistentCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、背景
  • 二、案例分析
  • 三、源码&原理解析
  • 总结
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档