Spring 异步执行(@Async)、任务调度(@Schedule)

摘要

在实际项目开发中,常常遇到异步执行、任务调度,这两个概念有点相似,异步执行是指在业务主逻辑里面调用某方法,不需要等待执行结果,让该方法异步执行并处理。任务调度则跟业务逻辑关联不多,只是定时处理。

本文介绍注解:

  • @Async 和 @EnableAsync
  • @Schedule 和 @EnableScheduling

本文介绍接口:

  • AsyncConfigurer
  • SchedulingConfigurer

Spring线程池

Spring异步线程池,本质都是java.util.concurrent.Executor的实现

Spring 已经实现的线程池:

  • SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
  • SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
  • ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
  • ThreadPoolTaskScheduler:可以使用cron表达式
  • ThreadPoolTaskExecutor最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装

ThreadPoolTaskExecutor 和ThreadPoolTaskScheduler都是ThreadPoolExecutor的包装

区别是ThreadPoolTaskScheduler实现了TaskScheduler接口,仅仅多实现这个接口

ThreadPoolTaskExecutor:

ThreadPoolTaskExecutor

ThreadPoolTaskScheduler:

spring通过接口TaskExecutorTaskScheduler这两个接口的方式为异步定时任务提供了一种抽象。

TaskExecutor 接口扩展自java.util.concurrent.Executor 接口。TaskExecutor 被创建来为其他组件提供线程池调用的抽象。

TaskExecutor是spring task的第一个抽象,实际上TaskExecutor就是为区别于Executor才引入的,而引入TaskExecutor的目的就是为定时任务的执行提供线程池的支持

TaskScheduler是spring task的第二个抽象,那么从字面的意义看,TaskScheduler就是为了提供定时任务的支持,好处是让需要执行定时任务的代码不需要指定特定的定时框架(比如Timer和Quartz)

异步执行@Async

源码分析
1
2
3
4
5
6
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
String value() default "";
}
  • @Async可以标注一个方法为异步执行,也可以放到类上面,也就是当前类下所有的方法都是异步方法,但是方法上面Async#value会覆盖类上面的Async#value(value是定义线程池Bean的名字)
  • 方法放回值可以是void,也可以是Future
  • 也可以用更具体ListenableFuture、CompletableFuture的返回值
  • 用Future#get可以用来获取异步执行的结果,但是,怎么让方法把结果返回呢?可以返回AsyncResult或者CompletableFuture#completedFuture(Object),(不直接使用ListenableFuture,AsyncResult实现了ListenableFuture接口)上面两个类都可以设置返回值
  • 异步方法可以传入参数
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author Liu Hailin
* @create 2017-12-21 下午12:12
**/
@Configuration
public class ThreadPoolConfiguration {


@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize( 1 );
executor.setMaxPoolSize( 2 );
executor.setQueueCapacity( 10 );
executor.setThreadNamePrefix( "executor-" );
executor.setWaitForTasksToCompleteOnShutdown( true );
executor.initialize();
return executor;

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author Liu Hailin
* @create 2017-12-21 下午2:04
**/
@Component
@Slf4j
@Async
public class AsyncDemo {

public ListenableFuture<String> sayhello() {

return new AsyncResult<String>( "hello" );
}

public CompletableFuture<String> sayhello2() {
return CompletableFuture.completedFuture( "hello" );
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
**
* @author Liu Hailin
* @create 2017-12-21 上午11:21
**/
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class ApplicationMain {

public static void main(String[] args) {
SpringApplication.run( ApplicationMain.class,args );
}

@Bean
public CommandLineRunner runner(AsyncDemo demo){
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
ListenableFuture<String> result = demo.sayhello();
result.addCallback( new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {

}

@Override
public void onSuccess(String result) {
System.out.println("demo return");
}
} );
}
};
}
}

@EnableAsync 注解启用异步执行。

@EnableScheduling 注解启用定时任务。

定时任务@Scheduled

使用方式
  • 如果需要以固定速率执行,只要将注解中指定的属性名称改成fixedRate即可,以下方法将以一个固定速率5s来调用一次执行,这个周期是以上一个任务开始时间为基准,从上一任务开始执行后5s再次调用:
  • 使用cron表达式,可以实现定时调用如:每天凌晨调用,详细的cron相关参数见后面介绍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author Liu Hailin
* @create 2017-12-22 下午12:04
**/
@Component
public class JobDemo {

@Scheduled(fixedDelay = 1000)
public void runDemo1(){
System.out.println("1");
}

@Scheduled(cron = "0/1 * * * * ?")
public void runDemo2(){
System.out.println("2");
}
}
  • cron相关参数意义

一个cron表达式有至少6个(也可能7个)有空格分隔的时间元素。

按顺序依次为

  • 秒(0~59)
  • 分钟(0~59)
  • 小时(0~23)
  • 天(月)(0~31,但是你需要考虑你月的天数)
  • 月(0~11)
  • 天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,FRI,SAT)
  • 年份(1970-2099)——@Scheduled是不支持的,spring quartz支持

基于AsyncConfigurer接口

可以自定义Executor 的类型,注册异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @author Liu Hailin
* @create 2017-12-22 下午12:16
**/
@Configuration
public class DemoAsyncConfigurer implements AsyncConfigurer{

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize( 1 );
executor.setMaxPoolSize( 2 );
executor.setQueueCapacity( 10 );
executor.setThreadNamePrefix( "executor-async-B-" );
executor.setWaitForTasksToCompleteOnShutdown( true );
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex,
Method method, Object... params) {
System.out.println(ex.getMessage());
}
};
}

}

基于SchedulingConfigurer接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author Liu Hailin
* @create 2017-12-22 下午4:08
**/
@Configuration
public class DemoSchedulerConfigurer implements SchedulingConfigurer{
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {

taskRegistrar.setScheduler( executor() );
}

private ThreadPoolTaskScheduler executor(){
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
executor.setPoolSize( 1 );
executor.setAwaitTerminationSeconds( 600 );
executor.setThreadNamePrefix( "executor-scheduler-B-" );
executor.setWaitForTasksToCompleteOnShutdown( true );
executor.setErrorHandler( new ErrorHandler() {
@Override
public void handleError(Throwable t) {
System.out.println(t.getMessage());
}
} );
executor.initialize();
return executor;

}
}

异常处理

  • 异步任务,通过实现AsyncConfigurer接口设置AsyncUncaughtExceptionHandler
  • 定时任务,通过在ThreadPoolTaskScheduler中设置ErrorHandler。

总结

  • 如果没有自定义任何TaskSchedulerTaskScheduler,Spring会创建一个默认线程池,是单线程(single-threaded),ScheduledAnnotationBeanPostProcessor 类和ScheduledTaskRegistrar#scheduleTasks(),所以不管你是执行异步任务还是定时任务,尽量自定义线程池,否则都是单线程执行。

  • 如果项目中异步执行和定时任务同时存在,要注意一下几点

    • 如果只定义ThreadPoolTaskExecutor类型线程池,异步任务可以使用ThreadPoolTaskExecutor线程池执行,但是定时任务采用了默认的单线程池。

      ScheduledAnnotationBeanPostProcessor#finishRegistration先判断是否存在TaskScheduler接口的Bean,不存在的话,判断是否有ScheduledExecutorService,最后没有,调用this.registrar.afterPropertiesSet();在ScheduledTaskRegistrar中初始化一个单线程线程池。

    • 如果只定义ThreadPoolTaskScheduler类型线程池,当然全部采用ThreadPoolTaskScheduler类型线程池

    • 如果两者都配置,虽然两种类型线程池都初始化了,定时任务采用ThreadPoolTaskScheduler类型线程池,但是异步任务并没有,由于两种类型都是TaskExecutor接口实现类

      2017-12-22 15:14:28.362 INFO 7171 — [main] .s.a.AnnotationAsyncExecutionInterceptor : More than one TaskExecutor bean found within the context, and none is named ‘taskExecutor’. Mark one of them as primary or name it ‘taskExecutor’ (possibly as an alias) in order to use it for async processing: [executor_2, executor],建议我们用BeanName=”taskExecutor”或者@Primary,来确定哪个是执行异步任务的线程池,当然如果BeanName不是『taskExecutor』可以定义,但是@Async的value需要指定设置的名字。

      但是,上面没办法决定采用哪个后,异步任务默认采用的是SimpleAsyncTaskExecutor这个线程池,

      AsyncExecutionInterceptor#getDefaultExecutor中调用AsyncExecutionAspectSuppor#getDefaultExecutor中的方法,发现有两个TaskExecutor,返回Null后,会自己new SimpleAsyncTaskExecutor(),自行翻阅源码。

  • 基于AsyncConfigurer和基于SchedulingConfigurer实现的线程池优先级要高,会覆盖上面@Bean声明的的。

    对于异步任务来说,首先EnableAsync中import了AsyncConfigurationSelector.class,在selectImports中返回ProxyAsyncConfiguration,在生成AsyncAnnotationBeanPostProcessor的时候设置父类中this.executor执行器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Autowired(required = false)
    void setConfigurers(Collection<AsyncConfigurer> configurers) {
    if (CollectionUtils.isEmpty(configurers)) {
    return;
    }
    //配置多个就挂了
    if (configurers.size() > 1) {
    throw new IllegalStateException("Only one AsyncConfigurer may exist");
    }
    AsyncConfigurer configurer = configurers.iterator().next();
    this.executor = configurer.getAsyncExecutor();
    this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
    }

    这里面通过注入了Collection configurers 来获取执行器和异常处理器。(加入配置多个,就挂了),这里获取了执行器。并设置到了AsyncAnnotationBeanPostProcessor,AsyncAnnotationBeanPostProcessor初始化,调用setBeanFactory方法,把执行器传入AsyncAnnotationAdvisor类,一路跟踪执行器的传递,回到AsyncExecutionAspectSupport类中设置了defaultExecutor,在调用getDefaultExecutor方法是,发现有默认值,就不会再从容器中获取Bean了

    对于定时任务,通过ScheduledAnnotationBeanPostProcessor#finishRegistration

    1
    2
    3
    4
    5
    6
    7
    if (this.beanFactory instanceof ListableBeanFactory) {
    Map<String, SchedulingConfigurer> configurers =
    ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
    for (SchedulingConfigurer configurer : configurers.values()) {
    configurer.configureTasks(this.registrar);
    }
    }

    调用SchedulingConfigurer实现类中的方法注册了执行器,后面不会从容器中拿。

坚持技术分享,您的支持将鼓励我继续创作!