在Spring中我们经常会用到异步操作,注解中使用 @EnableAsync
和 @Async
就可以使用它了。但是最近发现在异步中线程号使用的是我们项目中自定义的线程池 ThreadPoolTaskExecutor
而不是之前熟悉的 SimpleAsyncTaskExecutor
那么来看一下他的执行过程吧。
@EnableAsync
那么就点开它看看。它会使用 @Import
注入一个 AsyncConfigurationSelector
类,启动是通过父类可以决定它使用的是配置类 ProxyAsyncConfiguration
。public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public AsyncConfigurationSelector() {
}
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
default:
return null;
}
}
}
AsyncAnnotationBeanPostProcessor
。它实现了 BeanPostProcessor
接口,因此它是一个后处理器,用于将 Spring AOP
的 Advisor
应用于给定的 bean
。从而该bean
上通过异步注解所定义的方法在调用时会被真正地异步调用起来。public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
public ProxyAsyncConfiguration() {
}
@Bean(
name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
)
@Role(2)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
的父类实现了 BeanFactoryAware
,那么会在 AsyncAnnotationBeanPostProcessor
实例化之后回调 setBeanFactory()
来实例化切面 AsyncAnnotationAdvisor
。public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//定义一个切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor
构造和声明切入的目标(切点)和代码增强(通知)。 public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知
this.advice = buildAdvice(executor, exceptionHandler);
//切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
buildAdvice
用于构建通知,主要是创建一个 AnnotationAsyncExecutionInterceptor
类型的拦截器,并且配置好使用的执行器和异常处理器。真正的异步执行的代码在 AsyncExecutionAspectSupport
中!protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认执行器
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
getDefaultExecutor()
方法,用来查找默认的执行器,父类 AsyncExecutionAspectSupport
首先寻找唯一一个类型为 TaskExecutor
的执行器并返回,若存在多个则寻找默认的执行器 taskExecutor
,若无法找到则直接返回null。子类 AsyncExecutionInterceptor
重写 getDefaultExecutor
方法,首先调用父类逻辑,返回null则配置一个名为 SimpleAsyncTaskExecutor
的执行器/**
* 父类
* 获取或构建此通知实例的默认执行器
* 这里返回的执行器将被缓存以供后续使用
* 默认实现搜索唯一的TaskExecutor的bean
* 在上下文中,用于名为“taskExecutor”的Executor bean。
* 如果两者都不是可解析的,这个实现将返回 null
*/
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 搜索唯一的一个TaskExecutor类型的bean并返回
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
//找不到唯一一个bean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//未找到异常时搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
/**
* 子类
* 如父类为null则重新实例化一个名为SimpleAsyncTaskExecutor的执行器
*/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
所以,到了这一步就可以理解为什么异步线程名默认叫 SimpleAsyncTaskExecutor-xx
,为什么有了自己的线程池有可能异步用到了自己的线程池配置。
我们有这个切入点之后,每次请求接口执行异步方法前都会执行 AsyncExecutionInterceptor#invoke()
, determineAsyncExecutor
用来决策使用哪个执行器
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//在缓存的执行器中选择一个对应方法的执行器
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
//获取@Async注解中的value(指定的执行器)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
//获取指定执行器的bean
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
//选择默认的执行器
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
//将执行器进行缓存
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
当有了执行器调用 doSubmit
方法将任务加入到执行器中。
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,也就是说为每个任务新起一个线程。但是可以通过
concurrencyLimit
属性来控制并发线程数量,但是默认情况下不做限制(concurrencyLimit
取值为-1)。因此,如果我们使用异步任务,一定不能采用默认执行器的配置,以防OOM异常!最好的方式是指定执行器!
本文主要以看源码的方式来了解异步注解 @Async
是如何在项目中选择线程以及使用线程的,尽量给异步任务指定一个独有线程池,这样会的避免不与其他业务共用线程池而造成影响。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。