您好,登錄后才能下訂單哦!
v一、前言
定時任務一般是項目中都需要用到的,可以用于定時處理一些特殊的任務。這篇文章主要給大家介紹了關于Spring boot定時任務的原理及動態創建的相關內容,下面來一起看看詳細的介紹吧
上周工作遇到了一個需求,同步多個省份銷號數據,解綁微信粉絲。分省定時將銷號數據放到SFTP服務器上,我需要開發定時任務去解析文件。因為是多省份,服務器、文件名規則、數據規則都不一定,所以要做成可配置是有一定難度的。數據規則這塊必須強烈要求統一,服務器、文件名規則都可以從配置中心去讀。每新增一個省份的配置,后臺感知到后,動態生成定時任務。
v二、Springboot引入定時任務核心配置
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class) @Documented public @interface EnableScheduling { } @Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
接下來主要看一下這個核心后置處理器:ScheduledAnnotationBeanPostProcessor 。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
1、處理Scheduled注解,通過ScheduledTaskRegistrar注冊定時任務。
private void finishRegistration() { if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { // Search for TaskScheduler bean... this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) { logger.trace("Could not find unique TaskScheduler bean", ex); try { this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.trace("Could not find default TaskScheduler bean", ex); // Search for ScheduledExecutorService bean next... try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException ex2) { logger.trace("Could not find unique ScheduledExecutorService bean", ex2); try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException ex3) { if (logger.isInfoEnabled()) { logger.info("More than one ScheduledExecutorService bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex2.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex2) { logger.trace("Could not find default ScheduledExecutorService bean", ex2); // Giving up -> falling back to default scheduler within the registrar... logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } this.registrar.afterPropertiesSet(); }
1、通過一系列的SchedulingConfigurer動態配置ScheduledTaskRegistrar。
2、向ScheduledTaskRegistrar注冊一個TaskScheduler(用于對Runnable的任務進行調度,它包含有多種觸發規則)。
3、registrar.afterPropertiesSet(),在這開始安排所有的定時任務開始執行了。
protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
1、TriggerTask:動態定時任務。通過Trigger#nextExecutionTime 給定的觸發上下文確定下一個執行時間。
2、CronTask:動態定時任務,TriggerTask子類。通過cron表達式確定的時間觸發下一個任務執行。
3、IntervalTask:一定時間延遲之后,周期性執行的任務。
4、taskScheduler 如果為空,默認是ConcurrentTaskScheduler,并使用默認單線程的ScheduledExecutor。
v三、主要看一下CronTask工作原理
ScheduledTaskRegistrar.java @Nullable public ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); } ConcurrentTaskScheduler.java @Override @Nullable public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { try { if (this.enterpriseConcurrentScheduler) { return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger); } else { ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } } ReschedulingRunnable.java @Nullable public ScheduledFuture<?> schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } } private ScheduledFuture<?> obtainCurrentFuture() { Assert.state(this.currentFuture != null, "No scheduled future"); return this.currentFuture; } @Override public void run() { Date actualExecutionTime = new Date(); super.run(); Date completionTime = new Date(); synchronized (this.triggerContextMonitor) { Assert.state(this.scheduledExecutionTime != null, "No scheduled execution"); this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); if (!obtainCurrentFuture().isCancelled()) { schedule(); } } }
1、最終將task和trigger都封裝到了ReschedulingRunnable中。
2、ReschedulingRunnable實現了任務重復調度(schedule方法中調用調度器executor并傳入自身對象,executor會調用run方法,run方法又調用了schedule方法)。
3、ReschedulingRunnable schedule方法加了同步鎖,只能有一個線程拿到下次執行時間并加入執行器的調度。
4、不同的ReschedulingRunnable對象之間在線程池夠用的情況下是不會相互影響的,也就是說滿足線程池的條件下,TaskScheduler的schedule方法的多次調用是可以交叉執行的。
ScheduledThreadPoolExecutor.java public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
ScheduledFutureTask 工作原理如下圖所示【太懶了,不想畫圖了,盜圖一張】。
1、ScheduledFutureTask會放入優先阻塞隊列:ScheduledThreadPoolExecutor.DelayedWorkQueue(二叉最小堆實現)
2、上圖中的Thread對象即ThreadPoolExecutor.Worker,實現了Runnable接口
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
1、Worker中維護了Thread對象,Thread對象的Runnable實例即Worker自身
2、ThreadPoolExecutor#addWorker方法中會創建Worker對象,然后拿到Worker中的thread實例并start,這樣就創建了線程池中的一個線程實例
3、Worker的run方法會調用ThreadPoolExecutor#runWorker方法,這才是任務最終被執行的地方,該方法示意如下
(1)首先取傳入的task執行,如果task是null,只要該線程池處于運行狀態,就會通過getTask方法從workQueue中取任務。ThreadPoolExecutor的execute方法會在無法產生core線程的時候向 workQueue隊列中offer任務。
getTask方法從隊列中取task的時候會根據相關配置決定是否阻塞和阻塞多久。如果getTask方法結束,返回的是null,runWorker循環結束,執行processWorkerExit方法。
至此,該線程結束自己的使命,從線程池中“消失”。
(2)在開始執行任務之前,會調用Worker的lock方法,目的是阻止task正在被執行的時候被interrupt,通過調用clearInterruptsForTaskRun方法來保證的(后面可以看一下這個方法),該線程沒有自己的interrupt set了。
(3)beforeExecute和afterExecute方法用于在執行任務前后執行一些自定義的操作,這兩個方法是空的,留給繼承類去填充功能。
我們可以在beforeExecute方法中拋出異常,這樣task不會被執行,而且在跳出該循環的時候completedAbruptly的值是true,表示the worker died due to user exception,會用decrementWorkerCount調整wc。
(4)因為Runnable的run方法不能拋出Throwables異常,所以這里重新包裝異常然后拋出,拋出的異常會使當當前線程死掉,可以在afterExecute中對異常做一些處理。
(5)afterExecute方法也可能拋出異常,也可能使當前線程死掉。
v四、動態創建定時任務
v TaskConfiguration 配置類
@Configuration @EnableScheduling @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class TaskConfiguration { @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledExecutorService scheduledAnnotationProcessor() { return Executors.newScheduledThreadPool(5, new DefaultThreadFactory()); } private static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-schedule-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } }
1、保證ConcurrentTaskScheduler不使用默認單線程的ScheduledExecutor,而是corePoolSize=5的線程池
2、自定義線程池工廠類
v DynamicTask 動態定時任務
@Configuration public class DynamicTask implements SchedulingConfigurer { private static Logger LOGGER = LoggerFactory.getLogger(DynamicTask.class); private static final ExecutorService es = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), new DynamicTaskConsumeThreadFactory()); private volatile ScheduledTaskRegistrar registrar; private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>(); private volatile List<TaskConstant> taskConstants = Lists.newArrayList(); @Override public void configureTasks(ScheduledTaskRegistrar registrar) { this.registrar = registrar; this.registrar.addTriggerTask(() -> { if (!CollectionUtils.isEmpty(taskConstants)) { LOGGER.info("檢測動態定時任務列表..."); List<TimingTask> tts = new ArrayList<>(); taskConstants .forEach(taskConstant -> { TimingTask tt = new TimingTask(); tt.setExpression(taskConstant.getCron()); tt.setTaskId("dynamic-task-" + taskConstant.getTaskId()); tts.add(tt); }); this.refreshTasks(tts); } } , triggerContext -> new PeriodicTrigger(5L, TimeUnit.SECONDS).nextExecutionTime(triggerContext)); } public List<TaskConstant> getTaskConstants() { return taskConstants; } private void refreshTasks(List<TimingTask> tasks) { //取消已經刪除的策略任務 Set<String> taskIds = scheduledFutures.keySet(); for (String taskId : taskIds) { if (!exists(tasks, taskId)) { scheduledFutures.get(taskId).cancel(false); } } for (TimingTask tt : tasks) { String expression = tt.getExpression(); if (StringUtils.isBlank(expression) || !CronSequenceGenerator.isValidExpression(expression)) { LOGGER.error("定時任務DynamicTask cron表達式不合法: " + expression); continue; } //如果配置一致,則不需要重新創建定時任務 if (scheduledFutures.containsKey(tt.getTaskId()) && cronTasks.get(tt.getTaskId()).getExpression().equals(expression)) { continue; } //如果策略執行時間發生了變化,則取消當前策略的任務 if (scheduledFutures.containsKey(tt.getTaskId())) { scheduledFutures.remove(tt.getTaskId()).cancel(false); cronTasks.remove(tt.getTaskId()); } CronTask task = new CronTask(tt, expression); ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger()); cronTasks.put(tt.getTaskId(), task); scheduledFutures.put(tt.getTaskId(), future); } } private boolean exists(List<TimingTask> tasks, String taskId) { for (TimingTask task : tasks) { if (task.getTaskId().equals(taskId)) { return true; } } return false; } @PreDestroy public void destroy() { this.registrar.destroy(); } public static class TaskConstant { private String cron; private String taskId; public String getCron() { return cron; } public void setCron(String cron) { this.cron = cron; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } } private class TimingTask implements Runnable { private String expression; private String taskId; public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public void run() { //設置隊列大小10 LOGGER.error("當前CronTask: " + this); DynamicBlockingQueue queue = new DynamicBlockingQueue(3); es.submit(() -> { while (!queue.isDone() || !queue.isEmpty()) { try { String content = queue.poll(500, TimeUnit.MILLISECONDS); if (StringUtils.isBlank(content)) { return; } LOGGER.info("DynamicBlockingQueue 消費:" + content); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); //隊列放入數據 for (int i = 0; i < 5; ++i) { try { queue.put(String.valueOf(i)); LOGGER.info("DynamicBlockingQueue 生產:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } queue.setDone(true); } public String getExpression() { return expression; } public void setExpression(String expression) { this.expression = expression; } @Override public String toString() { return ReflectionToStringBuilder.toString(this , ToStringStyle.JSON_STYLE , false , false , TimingTask.class); } } /** * 隊列消費線程工廠類 */ private static class DynamicTaskConsumeThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DynamicTaskConsumeThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-dynamic-task-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } private static class DynamicBlockingQueue extends LinkedBlockingQueue<String> { DynamicBlockingQueue(int capacity) { super(capacity); } private volatile boolean done = false; public boolean isDone() { return done; } public void setDone(boolean done) { this.done = done; } } }
1、taskConstants 動態任務列表
2、ScheduledTaskRegistrar#addTriggerTask 添加動態周期定時任務,檢測動態任務列表的變化
CronTask task = new CronTask(tt, expression); ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger()); cronTasks.put(tt.getTaskId(), task); scheduledFutures.put(tt.getTaskId(), future);
3、動態創建cron定時任務,拿到ScheduledFuture實例并緩存起來
4、在刷新任務列表時,通過緩存的ScheduledFuture實例和CronTask實例,來決定是否取消、移除失效的動態定時任務。
v DynamicTaskTest 動態定時任務測試類
@RunWith(SpringRunner.class) @SpringBootTest public class DynamicTaskTest { @Autowired private DynamicTask dynamicTask; @Test public void test() throws InterruptedException { List<DynamicTask.TaskConstant> taskConstans = dynamicTask.getTaskConstants(); DynamicTask.TaskConstant taskConstant = new DynamicTask.TaskConstant(); taskConstant.setCron("0/5 * * * * ?"); taskConstant.setTaskId("test1"); taskConstans.add(taskConstant); DynamicTask.TaskConstant taskConstant1 = new DynamicTask.TaskConstant(); taskConstant1.setCron("0/5 * * * * ?"); taskConstant1.setTaskId("test2"); taskConstans.add(taskConstant1); DynamicTask.TaskConstant taskConstant2 = new DynamicTask.TaskConstant(); taskConstant2.setCron("0/5 * * * * ?"); taskConstant2.setTaskId("test3"); taskConstans.add(taskConstant2); TimeUnit.SECONDS.sleep(40); //移除并添加新的配置 taskConstans.remove(taskConstans.size() - 1); DynamicTask.TaskConstant taskConstant3 = new DynamicTask.TaskConstant(); taskConstant3.setCron("0/5 * * * * ?"); taskConstant3.setTaskId("test4"); taskConstans.add(taskConstant3); // TimeUnit.MINUTES.sleep(50); } }
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。