自带的定时任务实现功能
1.开启对定时任务的支持
在启动类上添加 @EnableScheduling
注解。
注意:Spring
对 cron
表达式的支持,是由 CronSequenceGenerator
来实现的。源码中文文档注释如下:
也就是说, Spirng
自带的定时任务只支持6个参数的 cron
表达式,即不支持年。所以使用 Spring
自带定时任务的需要注意。判断 cron
表达式是否正确,可以通过 CronSequenceGenerator.isValidExpression()
方法。
public static boolean isValidExpression(@Nullable String expression) {
if (expression == null) {
return false;
}
String[] fields = StringUtils.tokenizeToStringArray(expression, " ");
// 判断参数
if (!areValidCronFields(fields)) {
return false;
}
try {
new CronSequenceGenerator(expression, fields);
return true;
} catch (IllegalArgumentException ex) {
return false;
}
}
private static boolean areValidCronFields(@Nullable String[] fields) {
return (fields != null && fields.length == 6);
}
2.线程问题
2.1 为什么默认是单线程
@Scheduled
执行任务的时候是在一个单线程中,如果有多个任务,其中一个任务执行时间过长,则有可能会导致其他后续任务被阻塞直到该任务执行完成,这是非常不友好的。为什么是单线程呢?还是从源码入手:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
@EnableScheduling
导入了一个类 SchedulingConfiguration
,
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
再进入 ScheduledAnnotationBeanPostProcessor
// 简化,只展示部分代码
public class ScheduledAnnotationBeanPostProcessor {
private final ScheduledTaskRegistrar registrar;
@Nullable
private Object scheduler;
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
...
this.registrar.afterPropertiesSet();
}
}
public class ScheduledTaskRegistrar {
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
...
}
}
看到这一行你应该就明白了 this.localExecutor = Executors.newSingleThreadScheduledExecutor()
,如果 taskScheduler
未被初始化,那么就给定时任务做了一个单线程的线程池。
2.2 解决办法
2.2.1 扩大核心线程数
@Configuration
public class SchedulingConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
// 设置核心线程数为50
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(50));
}
}
配置后,程序会启动50个线程放在线程池中,每个定时任务占用1个线程。但是相同的定时任务,执行的时候,还是在同一个线程。
比如:任务A开始执行,任务B也开始执行,但是任务A因为某些原因卡死,任务B正常执行完成,那么下个周期,任务A还是卡死状态,任务B可以继续执行。即某个任务自身卡死,不会影响其他任务,但是等到下一次执行时间,任务本身会一直等待上一次任务执行完成。
2.2.2 自定义 TaskScheduler
@Configuration
public class SchedulerConfig {
@Bean()
public TaskScheduler modifiedScheduler(){
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(50);
taskScheduler.setThreadNamePrefix("ScheduleTask");
//taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
//taskScheduler.setAwaitTerminationSeconds(60);
return taskScheduler;
}
}
里面有两行注释代码,第一行的注释代码是 如果停止项目时线程池中还是任务没有执行完成,那就等待任务执行完成后停止,但是也不能无限等待下去,所以第二行注释代码设置了线程池中任务的等待时间,如果超过这个时候还没有完成就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
这两行代码实现的效果就是:点击重启按钮,项目不会立即停止,而是等待60s后才会重启;如果点击重启按钮 想要达到立即重启的效果,需要点击 stop 按钮,项目会立即重启。
这种方法,每次定时任务启动的时候,都会创建一个单独的线程来处理。也就是说同一个定时任务也会启动多个线程处理。
比如:任务A开始执行,任务B也开始执行,但是任务A因为某些原因卡死,任务B正常执行完成,那么下个周期,任务A还是可以正常执行,不会因为上一次卡死而影响。但是任务A中的卡死线程越来越多,导致线程池占满,还是会影响到定时任务,这时重启项目就行。
3. 实现方式
3.1 使用 @Scheduled 注解
@Configuration
public class SchedulingTask {
@Scheduled(cron = "0/5 * * * * ?")
//或直接指定时间间隔,例如:5秒
//@Scheduled(fixedRate=5000)
private void configureTasks() {
// 定时任务所执行的具体代码
}
}
使用 @Scheduled
注解很方便,但如果 cron
表达式是需要从数据库获取或可以动态改变的,注解就达不到要求了,这时可以使用第二种实现方式。
3.2 基于 SchedulingConfigurer 接口
@Configuration
public class SchedulerTask implements SchedulingConfigurer {
@Autowired
private XXXMapper mapper;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addTriggerTask(
//1.添加任务内容(Runnable)
() -> System.out.println("执行动态定时任务: " + new Date(System.currentTimeMillis())),
//2.设置执行周期(Trigger)
triggerContext -> {
//2.1 从数据库获取执行周期
String cron = mapper.getCron();
//2.2 合法性校验.
if (CronSequenceGenerator.isValidExpression(cron)) {
// Omitted Code ..
}
//2.3 返回执行周期(Date)
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
}
}
3.3 动态添加、更新、停止、删除、固定时间间隔(多线程)
如果在项目启动后需要添加定时任务,且在不重启的情况的下,第二种方式达不到要求了。
3.3.1 首先配置 TaskScheduler
@Configuration
public class SchedulerConfig {
@Bean()
public TaskScheduler modifiedScheduler(){
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(50);
taskScheduler.setThreadNamePrefix("ScheduleTask");
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
taskScheduler.setAwaitTerminationSeconds(60);
return taskScheduler;
}
}
3.3.2 统一接口
public interface ScheduleProcessor {
/**
* 初始化任务
*/
void initTask();
/**
* 添加cron任务
* @param taskId 任务id
* @param cron cron表达式
*/
default void addCronTask(String taskId, String cron){}
/**
* 添加固定时间间隔任务
* @param taskId 任务id
* @param interval 间隔 单位ms
*/
default void addFixedRateTask(String taskId, Long interval){}
/**
* 取消任务
* @param taskIds 任务id列表
*/
void cancelTask(Object... taskIds);
/**
* 修改cron任务
* @param taskId 任务id
* @param cron cron表达式
*/
default void updateCronTask(String taskId, String cron){}
/**
* 添加固定时间间隔任务
* @param taskId 任务id
* @param interval 间隔 单位ms
*/
default void updateFixedTateTask(String taskId, Long interval){}
}
其中 taskId
可以认为是数据库中 cron
表达式所在行的主键。接口中的方法声明为 default
可以避免该接口的所有实现类都必须实现此方法。
3.3.3 具体实现
@Configuration
public class SchedulingTask implements ScheduleProcessor {
@Autowired
private ScheduleTaskMapper scheduleTaskMapper;
@Autowired
private TaskScheduler taskScheduler;
private static final String PREFIX = "TIMER_";
private static final
Map<String, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
@PostConstruct
public void init(){
initTask();
}
@Override
public void initTask() {
// 初始化前取消项目中的定时任务
cancelTask();
// 从数据库读取需要执行的的定时任务(ScheduleTask 为数据库表对应的实体类)
List<ScheduleTask> scheduleTasks = scheduleTaskMapper.selectAll();
// 添加定时任务
for (ScheduleTask scheduleTask : scheduleTasks){
singleScheduling(scheduleTask);
}
}
}
使用线程安全的 ConcurrentHashMap
存储定时任务,将任务的初始化逻辑放在 @PostConstruct
注解修饰的方法中,达到项目启动时完成对定时任务的读取。
@Override
public void cancelTask(Object... taskIds) {
if (taskIds.length == 0){
// 全部取消
for (String key:scheduledFutureMap.keySet()){
if (StringUtils.isNotEmpty(scheduledFutureMap.get(key))){
scheduledFutureMap.get(key).cancel(false);
}
}
scheduledFutureMap.clear();
}else {
// 选定取消
for (Object o:taskIds){
String key = PREFIX + o;
if (StringUtils.isNotEmpty(scheduledFutureMap.get(key))){
scheduledFutureMap.get(key).cancel(false);
scheduledFutureMap.remove(key);
}
}
}
}
项目重启时需要取消之前所存在的定时任务。
评论区