侧边栏壁纸
博主头像
Leokoの小破站博主等级

行动起来,活在当下

  • 累计撰写 18 篇文章
  • 累计创建 10 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Spring 定时任务

Leoko
2024-07-06 / 0 评论 / 0 点赞 / 89 阅读 / 12230 字

自带的定时任务实现功能

1.开启对定时任务的支持

在启动类上添加 @EnableScheduling 注解。

注意:Springcron表达式的支持,是由 CronSequenceGenerator 来实现的。源码中文文档注释如下:

image-20210910145008224

也就是说, Spirng 自带的定时任务只支持6个参数的 cron 表达式,即不支持年。所以使用 Spring 自带定时任务的需要注意。判断 cron 表达式是否正确,可以通过 CronSequenceGenerator.isValidExpression()方法。

public static boolean isValidExpression(@Nullable String expression) {
	  if (expression == null) {
		  return false;
    }	
    String[] fields = StringUtils.tokenizeToStringArray(expression, " ");
    // 判断参数
    if (!areValidCronFields(fields)) {
        return false;
    }
    try {
        new CronSequenceGenerator(expression, fields);
        return true;
    } catch (IllegalArgumentException ex) {
        return false;
    }
}
​
private static boolean areValidCronFields(@Nullable String[] fields) {
    return (fields != null && fields.length == 6);
}

2.线程问题

2.1 为什么默认是单线程

@Scheduled 执行任务的时候是在一个单线程中,如果有多个任务,其中一个任务执行时间过长,则有可能会导致其他后续任务被阻塞直到该任务执行完成,这是非常不友好的。为什么是单线程呢?还是从源码入手:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
​
}

@EnableScheduling 导入了一个类 SchedulingConfiguration

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
​
    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }
​
}

再进入 ScheduledAnnotationBeanPostProcessor

// 简化,只展示部分代码
public class ScheduledAnnotationBeanPostProcessor {
    private final ScheduledTaskRegistrar registrar;
​
    @Nullable
    private Object scheduler;
    
    private void finishRegistration() {
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }
        ...
        this.registrar.afterPropertiesSet();    
    }    
}
public class ScheduledTaskRegistrar {
    
    @Override
    public void afterPropertiesSet() {
        scheduleTasks();
    }
    
    protected void scheduleTasks() {
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        ...
    }        
}    

看到这一行你应该就明白了 this.localExecutor = Executors.newSingleThreadScheduledExecutor(),如果 taskScheduler 未被初始化,那么就给定时任务做了一个单线程的线程池。

2.2 解决办法

2.2.1 扩大核心线程数
@Configuration
public class SchedulingConfig implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        // 设置核心线程数为50
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(50));
    }
}

配置后,程序会启动50个线程放在线程池中,每个定时任务占用1个线程。但是相同的定时任务,执行的时候,还是在同一个线程。

比如:任务A开始执行,任务B也开始执行,但是任务A因为某些原因卡死,任务B正常执行完成,那么下个周期,任务A还是卡死状态,任务B可以继续执行。即某个任务自身卡死,不会影响其他任务,但是等到下一次执行时间,任务本身会一直等待上一次任务执行完成

2.2.2 自定义 TaskScheduler
@Configuration
public class SchedulerConfig {
​
    @Bean()
    public TaskScheduler modifiedScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(50);
        taskScheduler.setThreadNamePrefix("ScheduleTask");
        //taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        //taskScheduler.setAwaitTerminationSeconds(60);
        return taskScheduler;
    }
}

里面有两行注释代码,第一行的注释代码是 如果停止项目时线程池中还是任务没有执行完成,那就等待任务执行完成后停止,但是也不能无限等待下去,所以第二行注释代码设置了线程池中任务的等待时间,如果超过这个时候还没有完成就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

这两行代码实现的效果就是:点击重启按钮,项目不会立即停止,而是等待60s后才会重启;如果点击重启按钮 想要达到立即重启的效果,需要点击 stop 按钮,项目会立即重启。

这种方法,每次定时任务启动的时候,都会创建一个单独的线程来处理。也就是说同一个定时任务也会启动多个线程处理。

比如:任务A开始执行,任务B也开始执行,但是任务A因为某些原因卡死,任务B正常执行完成,那么下个周期,任务A还是可以正常执行,不会因为上一次卡死而影响。但是任务A中的卡死线程越来越多,导致线程池占满,还是会影响到定时任务,这时重启项目就行。

3. 实现方式

3.1 使用 @Scheduled 注解

@Configuration      
public class SchedulingTask {
    
    @Scheduled(cron = "0/5 * * * * ?")
    //或直接指定时间间隔,例如:5秒
    //@Scheduled(fixedRate=5000)
    private void configureTasks() {
        // 定时任务所执行的具体代码
    }
}

使用 @Scheduled 注解很方便,但如果 cron 表达式是需要从数据库获取或可以动态改变的,注解就达不到要求了,这时可以使用第二种实现方式。

3.2 基于 SchedulingConfigurer 接口

@Configuration
public class SchedulerTask implements SchedulingConfigurer {

    @Autowired    
    private  XXXMapper mapper;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(
                //1.添加任务内容(Runnable)
                () -> System.out.println("执行动态定时任务: " + new Date(System.currentTimeMillis())),
                //2.设置执行周期(Trigger)
                triggerContext -> {
                    //2.1 从数据库获取执行周期
                    String cron = mapper.getCron();
                    //2.2 合法性校验.
                    if (CronSequenceGenerator.isValidExpression(cron)) {
                        // Omitted Code ..
                    }
                    //2.3 返回执行周期(Date)
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                }
        );
    }

}

3.3 动态添加、更新、停止、删除、固定时间间隔(多线程)

如果在项目启动后需要添加定时任务,且在不重启的情况的下,第二种方式达不到要求了。

3.3.1 首先配置 TaskScheduler
@Configuration
public class SchedulerConfig {

    @Bean()
    public TaskScheduler modifiedScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(50);
        taskScheduler.setThreadNamePrefix("ScheduleTask");
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        taskScheduler.setAwaitTerminationSeconds(60);
        return taskScheduler;
    }
}
3.3.2 统一接口
public interface ScheduleProcessor {
    /**
     * 初始化任务
     */
    void initTask();

    /**
     * 添加cron任务
     * @param taskId 任务id
     * @param cron cron表达式
     */
    default void addCronTask(String taskId, String cron){}

    /**
     * 添加固定时间间隔任务
     * @param taskId 任务id
     * @param interval 间隔 单位ms
     */
    default void addFixedRateTask(String taskId, Long interval){}

    /**
     * 取消任务
     * @param taskIds 任务id列表
     */
    void cancelTask(Object... taskIds);

    /**
     * 修改cron任务
     * @param taskId 任务id
     * @param cron cron表达式
     */
    default void updateCronTask(String taskId, String cron){}

    /**
     * 添加固定时间间隔任务
     * @param taskId 任务id
     * @param interval 间隔 单位ms
     */
    default void updateFixedTateTask(String taskId, Long interval){}
}

其中 taskId 可以认为是数据库中 cron 表达式所在行的主键。接口中的方法声明为 default 可以避免该接口的所有实现类都必须实现此方法。

3.3.3 具体实现
@Configuration
public class SchedulingTask implements ScheduleProcessor {

    @Autowired
    private ScheduleTaskMapper scheduleTaskMapper;

    @Autowired
    private TaskScheduler taskScheduler;

    private static final String PREFIX = "TIMER_";

    private static final 
        Map<String, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();

    @PostConstruct
    public void init(){
        initTask();
    }

    @Override
    public void initTask() {
        // 初始化前取消项目中的定时任务
        cancelTask();
        // 从数据库读取需要执行的的定时任务(ScheduleTask 为数据库表对应的实体类)
        List<ScheduleTask> scheduleTasks = scheduleTaskMapper.selectAll();
        // 添加定时任务
        for (ScheduleTask scheduleTask : scheduleTasks){
            singleScheduling(scheduleTask);
        }
    }
}

使用线程安全的 ConcurrentHashMap 存储定时任务,将任务的初始化逻辑放在 @PostConstruct 注解修饰的方法中,达到项目启动时完成对定时任务的读取。

    @Override
    public void cancelTask(Object... taskIds) {
        if (taskIds.length == 0){
            // 全部取消
            for (String key:scheduledFutureMap.keySet()){
                if (StringUtils.isNotEmpty(scheduledFutureMap.get(key))){
                    scheduledFutureMap.get(key).cancel(false);
                }
            }
            scheduledFutureMap.clear();
        }else {
            // 选定取消
            for (Object o:taskIds){
                String key = PREFIX + o;
                if (StringUtils.isNotEmpty(scheduledFutureMap.get(key))){
                    scheduledFutureMap.get(key).cancel(false);
                    scheduledFutureMap.remove(key);
                }
            }
        }
    }

项目重启时需要取消之前所存在的定时任务。

0

评论区