驽马十驾 驽马十驾

驽马十驾,功在不舍

目录
关于SpringBoot 定时任务背后的秘密以及集群环境独占式定时任务的思路和实现
/    

关于SpringBoot 定时任务背后的秘密以及集群环境独占式定时任务的思路和实现

开篇

通常项目中,都会有一些在某时某刻做某些事情的操作,比如

  • 凌晨1点开始进行昨日数据报表生成
  • 每隔15分钟扫描一次某个资源池中资源占用情况
  • 凌晨2点清理本地的某些本地缓存文件
  • ....

在集群环境下,上述的部分任务应该是互斥的,比如每天凌晨生成报表,只需要1个应用来做就可以了,其他的应用不需要同时做。针对这个需求,SpringBoot内置的定时调度是无法完成的,需要自己实现或者引入第三方来完成。

注解背后的原理

传统的定时任务可以分为2类:

  • 某个时间点进行
  • 间隔多少时间点进行

不管那一类,我们都可以通过CRON表达式生成器来生成相关的规则,但是在具体使用之前,我们需要告诉SpringBoot开启定时调度:在某个配置类@Configuration修饰的配置类上加上注解@EnableSchedule

你是否思考过,为什么通过一个简单的注解就能过开启定时调度了?接下来我们就来分析下。

@EnableScheduling的代码非常简单,核心就是@Import,引入了该类,下面是代码

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {

}

那么在类SchedulingConfiguration中,又有什么玄机,我们来看看源码

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
  
	//特定的名称:SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME 目的是搭配后面引入类
	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}
}

通过@Bean注解为Spring 容器注入了一个对象,相关信息如下:

  • NameTaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME的对象
  • ClassScheduledAnnotationBeanPostProcessor

我们先看看这个 Class 中最关键的代码是这一句:

public class ScheduledAnnotationBeanPostProcessor{
  private void finishRegistration() {
      //... 
     this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
      //.....
    }
}

其目的是从Spring容器中获取到类为TaskScheduler的对象,这个类从名字上来看,应该是定时调度的核心实现了。

此时是否好奇过:这么类是什么时候注入Spring容器的了?

其中Class我们了解了,但是源码中为这个ScheduledAnnotationBeanPostProcessor 赋予了一个很特殊的Bean名称了?你应该要猜到:如果没有必要,写代码的人不会这么干,所以我们由此入手,来看看哪里有对这个名称的引用!

经过排查,我们发现了下述代码中有对该名称的引用!

//省略...
@Configuration(proxyBeanMethods = false)
public class TaskSchedulingAutoConfiguration {

	@Bean
  	//条件1:当 Spring 容器中存在名为 Xxx 的 Bean的时候,创建这个 Bean
	@ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  	//条件2:当不存在下述 3 个 Bean 的时候
	@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
	public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
		return builder.build();
	}

	@Bean
	@ConditionalOnMissingBean
	public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,
			ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {
		TaskSchedulerBuilder builder = new TaskSchedulerBuilder();
    builder = builder.poolSize(properties.getPool().getSize());
		//builder 一些参数
		return builder;
	}
}

我们简答的来梳理下上部分代码的核心内容:

  • 要自动构建这个类需要满足 2 个条件
    • Spring 上下文中有名为TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME的对象
    • Spring上下文中不可以有SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class 这 3 各类的对象。
    • 其实这个就是 SpringBoot自动构建的奥妙了:如果你不自定义,我就给你默认构建一个配置相关的类。
  • 接下来我们看看这 2 个Bean
    • 第一个 Bean,类型是ThreadPoolTaskScheduler,它是接口TaskScheduler的子类(同前面对象上了)
    • 第二个 Bean,是一个Builder,就是一个构建器,其中有一个可以调整的参数,目的就是为了构建第一个 Bean

整个自动注入的流程,大体上就是这样的,我们总结下:

  1. 通过 @EnableScheduling 引入类 SchedulingConfiguration
  2. 在配置类中创建一个带有特殊名称的Bean
  3. 根据@Condition条件,又会根据这个特殊的名称,构建核心的ThreadPoolTaskScheduler,这个才是任务调度的核心。

自定义核心调度器

默认的任务调度器只有一个核心,可以从上述代码taskSchedulerBuilder的构建代码中发现!

一个核心就意味着只有一个线程在进行任务调度,当某 2 次任务调度时间挨得近,而先开始的任务非常耗时的时候,后开始的任务就有可能被阻塞直到上一个任务完成。

我们重写TaskSchedulerBuilder来构造自己想要的定时任务调度核心

@Configuration
class TaskScheduleConfig {

    @Bean
    fun taskSchedulerBuilder(): TaskSchedulerBuilder {
        var builder = TaskSchedulerBuilder()
        builder = builder.poolSize(4)
        builder = builder.awaitTermination(true)
        builder = builder.awaitTerminationPeriod(Duration.ofSeconds(3))
        builder = builder.threadNamePrefix("hicode-")
        //其他的自定义的配置
        //builder = builder.customizers(TaskSchedulerCustomizer {
        //
        //})
        return builder
    }
}

通过该方法可以很好的自定义定时调度的核心

需要注意的是,不建议将定时调度任务的核心线程设置的非常大,因为其核心线程会一直存活,毕竟默认情况下每一个线程就是1M的资源!

分布式定时调度

默认的定时调度在本机上执行的是没有问题的,但是假如某个任务希望在服务集群中同一段时间只被调用一次。那么改如何实现了?

其实处理这个问题不难,按照当前需求,集群下各个服务是抢占式的执行任务,这个同分布式锁很像,但是有细微的不同之处在于分布式锁的场景下,没有抢到锁的线程会等待当前占有锁的线程释放锁后,继续抢占后执行。但是定时调度任务却是只需要执行一次。

为了解决这个问题,我们需要给分布式任务设定一个独占的执行时间:当任务开始后,在这段时间以为,只允许该线程执行,其他线程没有获取到锁就直接跳过本次任务。有了这个思路,解决问题就很简单了!

Github上一个开源的框架ShedLock,刚好帮大家解决此类问题,不愿意动手的小伙伴,可以看看这个开源框架!

总结

本文主要讲解了如下内容:

  1. 如何引入任务调度
  2. 任务调度的注解背后的密码
  3. 如何自定义核心调度器
  4. 分布式集群环境下如何进行独占式的任务执行
不积跬步,无以至千里。不积小流,无以成江海。