您好,登錄后才能下訂單哦!
本篇內容介紹了“Scheduled時間調度是什么意思”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Schedule,可以將它看成一個輕量級的Quartz,而且使用起來比Quartz簡單許多,它是spring團隊開發的任務調度插件,它可以按照我們設計的時間周期執行既定的任務,首先來一個串行的 Schedule設計:
第一步: 在自定義類中添加注解@EnableScheduling,啟動scheduling,具體代碼如下
@SpringBootApplication @MapperScan("com.xash.quartzDemo.mapper") @EnableSwagger2 @EnableScheduling public class SpringbootStartApplication { public static void main(String[] args) { SpringApplication.run(SpringbootStartApplication.class, args); } }
第二步:創建一個類,并注入到spring中,讓該類實現SchedulingConfigurer,并重寫configureTasks方法,這樣可以實現基于多任務下的,多線程任務定都執行方案:
具體代碼如下
package com.xash.quartzDemo.config; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Executors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.stereotype.Component; import com.serotonin.modbus4j.exception.ErrorResponseException; import com.serotonin.modbus4j.exception.ModbusInitException; import com.serotonin.modbus4j.exception.ModbusTransportException; import com.xash.quartzDemo.collection.utils.Modbus4jUtil; @Component public class TaskConfiguration implements SchedulingConfigurer { @Autowired private Modbus4jUtil Modbus4jUtil; private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Scheduled(fixedRate=100,initialDelay=10000) public void myTask() throws InterruptedException, ModbusTransportException, ErrorResponseException, ModbusInitException { System.out.println("當前系統時間0:"+sdf.format(new Date())); String name=Thread.currentThread().getName(); System.out.println("當前執行線程"+name); } @Scheduled(fixedDelayString="${com.test.scheduled}") public void myTask1() throws InterruptedException { System.out.println("當前系統時間1:"+sdf.format(new Date())); String name=Thread.currentThread().getName(); System.out.println("當前執行線程"+name); } @Scheduled(cron = "0/10 * * * * ?") // 每2秒鐘執行一次 public void myTask2() throws InterruptedException { System.out.println("當前系統時間2:"+sdf.format(new Date())); String name=Thread.currentThread().getName(); System.out.println("當前執行線程"+name); /* throw new RuntimeException("運行時異常");*/ } @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(Executors.newScheduledThreadPool(5)); } /* @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskExecutor()); } @Bean(destroyMethod="shutdown") public Executor taskExecutor() { return Executors.newScheduledThreadPool(15); //指定線程池大小 }*/ }
Executors該接口時Java線程池的頂級接口,具體Java線程池的原理分析如下:
JDK1.8中的ThreadPoolExecutor分析線程池對象的依賴關系:
Executor:執行提交的線程任務的對象。這個接口提供了一種將任務提交與每個任務將如何運行實現了分離,包括線程使用、調度等細節。該接口只定義了一個execute()方法。
提供用于管理終止的方法如 shutDown()和shutDownNow()用于關閉線程池的方法以及判斷線程池是否關閉的方法如,isShutdown(),isTerminated()的方法
提供了可以生成用于跟蹤一個或多個異步任務進度的方法如,invokeAll(),submit()。這些方法的返回值都是Future類型,可以獲取線程的執行結果。
ctl是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段,ctl是一個Integer, 它包含兩部分的信息: 高三位表示線程池的運行狀態 (runState) 和低29位表示線程池內有效線程的數量 (workerCount),
線程池的生命周期,總共有五種狀態
RUNNING :能接受新提交的任務,并且也能處理阻塞隊列中的任務;
SHUTDOWN:關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務。在線程池處于 RUNNING 狀態時,調用 shutdown()方法會使線程池進入到該狀態。(finalize() 方法在執行過程中也會調用shutdown()方法進入該狀態);
STOP:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態時,調用 shutdownNow() 方法會使線程池進入到該狀態;
TIDYING:如果所有的任務都已終止了,workerCount (有效線程數) 為0,線程池進入該狀態后會調用 terminated() 方法進入TERMINATED 狀態。
TERMINATED:在terminated() 方法執行完后進入該狀態,默認terminated()方法中什么也沒有做。
進入TERMINATED的條件如下:
線程池不是RUNNING狀態;
線程池狀態不是TIDYING狀態或TERMINATED狀態;
如果線程池狀態是SHUTDOWN并且workerQueue為空;
workerCount為0;
設置TIDYING狀態成功。
還有三個關于ctl的方法
runStateOf:獲取運行狀態;
workerCountOf:獲取活動線程數;
ctlOf:獲取運行狀態和活動線程數的值
下面解釋構造函數的參數含義
corePoolSize:核心線程數量,當有新任務在execute()方法提交時,會執行以下判斷:
a):如果運行的線程少于 corePoolSize,則創建新線程來處理任務,即使線程池中的其他線程是空閑的;
b):如果線程池中的線程數量大于等于 corePoolSize 且小于 maximumPoolSize,當workQueue未滿的時候任務添加到workQueue中,當workQueue滿時才創建新的線程去處理任務;
c):如果設置的corePoolSize 和 maximumPoolSize相同,則創建的線程池的大小是固定的,這時如果有新任務提交,若workQueue未滿,則將請求放入workQueue中,等待有空閑的線程去從workQueue中取任務并處理;
d):如果運行的線程數量大于等于maximumPoolSize,這時如果workQueue已經滿了,則通過handler所指定的策略來處理任務;
所以,任務提交時,判斷的順序為 corePoolSize –> workQueue –> maximumPoolSize。
maximumPoolSize:最大線程數量;
workQueue:等待隊列,當任務提交時,如果線程池中的線程數量大于等于corePoolSize的時候,把該任務封裝成一個Worker對象放入等待隊列;
workQueue:保存等待執行的任務的阻塞隊列,當提交一個新的任務到線程池以后, 線程池會根據當前線程池中正在運行著的線程的數量來決定對該任務的處理方式,主要有以下幾種處理方式:
直接切換:這種方式常用的隊列是SynchronousQueue。
使用無界隊列:一般使用基于鏈表的阻塞隊列LinkedBlockingQueue。如果使用這種方式,那么線程池中能夠創建的最大線程數就是corePoolSize,而maximumPoolSize就不會起作用了。當線程池中所有的核心線程都是RUNNING狀態時,這時一個新的任務提交就會放入等待隊列中。
使用有界隊列:一般使用ArrayBlockingQueue。使用該方式可以將線程池的最大線程數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得線程池對線程的調度變得更困難,因為線程池和隊列的容量都是有限的值,所以要想使線程池處理任務的吞吐率達到一個相對合理的范圍,又想使線程調度相對簡單,并且還要盡可能的降低線程池對資源的消耗,就需要合理的設置這兩個數量。
如果要想降低系統資源的消耗(包括CPU的使用率,操作系統資源的消耗,上下文環境切換的開銷等), 可以設置較大的隊列容量和較小的線程池容量, 但這樣也會降低線程處理任務的吞吐量。
如果提交的任務經常發生阻塞,那么可以考慮通過調用 setMaximumPoolSize() 方法來重新設定線程池的容量。
如果隊列的容量設置的較小,通常需要將線程池的容量設置大一點,這樣CPU的使用率會相對的高一些。但如果線程池的容量設置的過大,則在提交的任務數量太多的情況下,并發量會增加,那么線程之間的調度就是一個要考慮的問題,因為這樣反而有可能降低處理任務的吞吐量。
keepAliveTime:線程池維護線程所允許的空閑時間。當線程池中的線程數量大于corePoolSize的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;
threadFactory:它是ThreadFactory類型的變量,用來創建新線程。默認使用Executors.defaultThreadFactory() 來創建線程。使用默認的ThreadFactory來創建線程時,會使新創建的線程具有相同的NORM_PRIORITY優先級并且是非守護線程,同時也設置了線程的名稱。
handler:它是RejectedExecutionHandler類型的變量,表示線程池的飽和策略。如果阻塞隊列滿了并且沒有空閑的線程,這時如果繼續提交任務,就需要采取一種策略處理該任務。線程池提供了4種策略:
AbortPolicy:直接拋出異常,這是默認策略;
CallerRunsPolicy:用調用者所在的線程來執行任務;
DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務;
DiscardPolicy:直接丟棄任務;
線程池中的核心線程和非核心線程,沒有什么區別,都是線程,只不過人為的規則線程池中的一部分線程叫核心線程
ThreadPoolExecutor執行execute方法分下面4種情況。
1)如果當前運行的線程少于corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。
2)如果運行的線程等于或多于corePoolSize,則將任務加入BlockingQueue。
3)如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。
4)如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,并調用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后當前運行的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要獲取全局鎖。
我們知道線程在執行完run()方法里面的邏輯后就會被GC回收,那么線程池是怎樣保持線程的存活,并且重復利用線程。
在線程池中使用Worker類來包裝向線程池中添加的Runnable線程任務。首先來分析一下addWorker()方法
在execute()方法中使用addWorker()方法的地方只有在添加核心線程和非核心線程的時候調用。
從上面可以看出這是一個添加線程的過程,并沒有看到,線程池是如何維護線程不被銷毀,從而達到重復利用的
從addWorker()中我們可以看到,向線程池中添加的Runnable被包裝成Worker對象,下面就來查看Worker對象,從中尋找為什么線程池中的線程可以重復利用的答案。
Worker類繼承Runnable,和AbstractQueuedSynchronizer(這個類奠定了Java并發包的基礎,很重要,可是我還沒有深入研究)
查看run()方法,調用runWorker,并將自身作為參數
查看runWorker(),在前面的addWorker()方法在最后是執行了start()方法,也就是Worker的run()方法,進而執行了runWorker()方法。
這個while就是線程池中線程不被銷毀的原因所在,在Worker的run方法中,如果while一直執行下去,那么Worker這個繼承了Runnable接口的線程就會一直執行下去,而我們知道線程池中任務的載體是Worker,如果Worker一直執行下去,就表示該載體可以一直存在,換的只是載體上我們通過execute()方法添加的Runnable任務而已。
那么如何保證while方法一直執行下去
在第一次執行完while后task設置為null,那么就要保證task=getTask()!=null
查看getTask(),從名字可以看出這是獲取一個任務。
通過代碼中的注釋,我們這就弄明白線程池的工作原理 以及線程池中如何保證線程(Worker)重復利用,不被銷毀。
1.固定數量線程池(newFixedThreadPool)
創建使用固定線程數的FixedThreadPool,適用于為了滿足資源管理的需求,而需要限制當前線程數量的應用場景,它適用于負載比較重的服務器。
Executors構造newFixedThreadPool方式
查看源碼
corePoolSize = maximumPoolSize =初始化的參數
workQueue:使用無界隊列LinkedBlockingQueue鏈表阻塞隊列
keepAliveTime = 0 由于使用無界隊列LinkedBlockingQueue作為緩存隊列,所以當corePoolSize滿后,后面添加的線程任務都會添加到LinkedBlockingQueue中去,所以maximumPoolSize 就失去了意義,這樣也就沒有必要設置空閑時間
使用無界隊列的影響,這也是為什么使用Eexcutors來創建線程池存在一定風險的原因
1)當線程池中的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池中的線程數不會超過corePoolSize。
2)使用無界隊列時maximumPoolSize將是一個無效參數。
3)使用無界隊列時keepAliveTime將是一個無效參數。
4)由于使用無界隊列,運行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。
代碼實例
結果:
為什么線程名稱會重復:這正是線程池的原理,因為線程池會重復利用已創建的線程,當一個任務Runnable被掛載到線程池中的一個線程,這個任務執行完畢后,會有另一個任務繼續掛載到這個線程上面,所以會出現線程名稱重復。
適用于需要保證順序地執行各個任務;并且在任意時間點,不會有多個線程是活動的應用場景。
Executors構造newSingleThreadExecutor方式
源代碼
corePoolSize = maximumPoolSize =1 由于是單例線程池,所以線程池中是有一個重用的線程
workQueue:使用無界隊列LinkedBlockingQueue鏈表阻塞隊列
keepAliveTime:0 原因上面已經闡述
代碼實例
結果
只有一個可重用的線程,任務的執行順序和添加順序一致
創建一個會根據需要創建新線程的,適用于執行很多的短期異步任務的小程序,或者是負載較輕的服務器。
Executors構造newCachedThreadPool方式
源代碼
corePoolSize:0 表示線程池中沒有核心線程,都是非核心線程
maximumPoolSize :線程池容量Integer最大值
keepAliveTime:60秒 由于沒有核心線程的存在,線程池中創建的線程都是非核心線程,所以設置空閑時間60秒,當非核心線程60秒后沒有被重用,將會被銷毀,如果沒有線程提交給該線程池,超過空閑時間,該線程池就沒有非空閑線程,那么該線程池也就不會消耗過多的資源,
workQueue:SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。
代碼實例
結果
它主要用來在給定的延遲之后運行任務,或者定期執行任務,例如定時輪詢數據庫中的表的數據
Executors構造newScheduledThreadPool方式
workQeueu:delayWorkQueue,使用延遲隊列作為緩存隊列
任務提交方式
schedule(Callable<E> callable, long delay, TimeUnit unit);
callable:提交Callable或者Runnable任務
delay:延遲時間
unit:時間級別
該方法表示在給定的delay延遲時間后執行一次,有返回結果
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
command:提交Runnable任務
initialDelay:初始延遲時間
period:表示連個任務連續執行的時間周期,第一個任務開始到第二個任務的開始,包含了任務的執行時間
unit:時間級別
該方法在initialDelay時間后開始周期性的按period時間間隔執行任務
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
command:提交Runnable任務
initialDelay:初始延遲時間
delay:表示延遲時間 第一個任務結束到第二個任務開始的時間間隔
unit:時間級別
單例延遲線程池(newSingleThreadScheduledExecutor)
Executors構造newSingleThreadScheduledExecutor方式
corePoolSize :1由于是單例線程池,所以核心線程為1,線程池中只有一個重用線程
總結:利用sping的線程調度結合java的線程池可以實現多線程的任務調度
“Scheduled時間調度是什么意思”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。