您好,登錄后才能下訂單哦!
這篇文章主要講解了“定時線程池是怎么實現延遲執行和周期執行的”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“定時線程池是怎么實現延遲執行和周期執行的”吧!
ScheduledThreadPoolExecutor即定時線程池,是用來執行延遲任務或周期性任務的。相比于Timer的單線程,定時線程池在遇到任務拋出異常的時候不會關閉整個線程池,更加健壯(需要提一下的是:ScheduledThreadPoolExecutor和ThreadPoolExecutor一樣,如果執行任務的過程中拋異常的話,這個任務是會被丟棄的。所以在任務的執行過程中需要對異常做捕獲處理,有必要的話需要做補償措施)。
傳進來的任務會被包裝為ScheduledFutureTask,其繼承于FutureTask,提供異步執行的能力,并且可以返回執行結果。同時實現了Delayed接口,可以通過getDelay方法來獲取延遲時間。
相比于ThreadPoolExecutor,ScheduledThreadPoolExecutor中使用的隊列是DelayedWorkQueue,是一個無界的隊列。所以在定時線程池中,最大線程數是沒有意義的(最大線程數會固定為int的最大值,且不會作為定時線程池的參數)。在ThreadPoolExecutor中,如果當前線程數小于核心線程數就直接創建核心線程來執行任務,大于等于核心線程數的話才往阻塞隊列中放入任務;而在ScheduledThreadPoolExecutor中卻不是這種邏輯。ScheduledThreadPoolExecutor中上來就會把任務放進延遲隊列中,然后再去等待執行。
DelayedWorkQueue的實現有些特殊,是基于小頂堆構建的(與DelayQueue和PriorityQueue類似)。因為要保證每次從延遲隊列中拿取到的任務是距現在最近的一個,所以使用小頂堆結構來構建是再適合不過了(堆結構也常常用來解決前N小和前N大的問題)。小頂堆保證每個節點的值不小于其父節點的值,而不大于其孩子節點的值,而對于同級節點來說則沒有什么限制。這樣在小頂堆中值最小的點永遠保證是在根節點處。如果用數組來構建小頂堆的話,值最小的點就在數組中的第一個位置處。
圖中紅色的數字代表節點在數組中的索引位置,由此可以看出堆的另一條性質是:假設當前節點的索引是k,那么其父節點的索引是:(k-1)/2;左孩子節點的索引是:k* 2+1;而右孩子節點的索引是k*2+2。
構建堆的兩個核心方法是 siftUp和 siftDown,siftUp方法用于添加節點時的上溯過程;而siftDown方法用于刪除節點時的下溯過程。具體的實現源碼會在下面進行分析,這里就畫圖來理解一下(下面只會分析經典的小頂堆添加和刪除節點的實現,而在源碼中的實現略有不同,但核心都是一樣的):
如果在上面的siftUp過程中,發現某一次當前節點的值就已經大于了父節點的值,siftUp過程也就會提前終止了。同時可以看出:在上面的siftUp以及下面將要講的siftDown操作過程中,每次都只會比較并交換當前節點和其父子節點的值,而不是整個堆都發生變動,降低了時間復雜度。
刪除節點分為三種情況,首先來看一下 刪除根節點的情況:
然后是 刪除最后一個節點的情況。刪除最后一個節點是最簡單的,只需要進行刪除就行了,因為這并不影響小頂堆的結構,不需要進行調整。這里就不再展示了(注意:刪除除了最后一個節點的其他葉子節點并不屬于當前這種情況,而是屬于下面第三種情況。也就是說刪除這些葉子節點并不能簡單地刪除它們就完了的,因為堆結構首先得保證是一顆完全二叉樹)。
最后是 刪除既不是根節點又不是最后一個節點的情況:
在刪除既不是根節點又不是最后一個節點的時候,可以看到執行了一次siftDown并伴隨了一次siftUp的過程。但是這個siftUp過程并不是會一定觸發的,只有滿足最后一個節點的值比要刪除節點的父節點的值還要小的時候才會觸發siftUp操作(這個很好推理:在小頂堆中如果最后一個節點值比要刪除節點的父節點值要小的話,那么要刪除節點的左右孩子節點值也必然是都大于最后一個節點值的(不考慮值相等的情況),那么此時就不會發生siftDown操作;而如果發生了siftDown操作,就說明最后一個節點值至少要比要刪除節點的左右孩子節點中的一個要大(如果有左右孩子節點的話)。而孫子節點值是肯定要大于爺爺節點值的(不考慮值相等的情況),所以也就是說發生了siftDown操作的時候,最后一個節點值是比要刪除節點的父節點值大的。這個時候孫子節點和最后一個節點siftDown交換后,依然是滿足小頂堆性質的,所以就不需要附加的siftUp操作;還有一種情況是最后一個節點值是介于要刪除節點的父節點值和要刪除節點的左右孩子節點值中的較小者,那么這個時候既不會發生siftDown,也不會發生siftUp)。
而源碼中的實現和上面的經典實現最大的不同就是不會有節點彼此交換的操作。在siftUp和siftDown的經典實現中,如果需要變動節點時,都會來一次父子節點的互相交換操作(包括刪除節點時首先做的要刪除節點和最后一個節點之間的交換操作也是如此)。如果仔細思考的話,就會發現這其實是多余的。在需要交換節點的時候,只需要siftUp操作時的父節點或siftDown時的孩子節點重新移到當前需要比較的節點位置上,而比較節點是不需要移動到它們的位置上的。此時直接進入到下一次的判斷中,重復siftUp或siftDown過程,直到最后找到了比較節點的插入位置后,才會將其插入進去。這樣做的好處是可以省去一半的節點賦值的操作,提高了執行的效率。同時這也就意味著,需要將要比較的節點作為參數保存起來,而源碼中也正是這么實現的。
ScheduledThreadPoolExecutor中使用了Leader-Follower模式。這是一種設計思想,假如說現在有一堆等待執行的任務(一般是存放在一個隊列中排好序),而所有的工作線程中只會有一個是leader線程,其他的線程都是follower線程。只有leader線程能執行任務,而剩下的follower線程則不會執行任務,它們會處在休眠中的狀態。當leader線程拿到任務后執行任務前,自己會變成follower線程,同時會選出一個新的leader線程,然后才去執行任務。如果此時有下一個任務,就是這個新的leader線程來執行了,并以此往復這個過程。當之前那個執行任務的線程執行完畢再回來時,會判斷如果此時已經沒任務了,又或者有任務但是有其他的線程作為leader線程,那么自己就休眠了;如果此時有任務但是沒有leader線程,那么自己就會重新成為leader線程來執行任務。
不像ThreadPoolExecutor是需要立即執行任務的,ScheduledThreadPoolExecutor中的任務是延遲執行的,而拿取任務也是延遲拿取的。所以并不需要所有的線程都處于運行狀態延時等待獲取任務。而如果這么做的話,最后也只會有一個線程能執行當前任務,其他的線程還是會被再次休眠的(這里只是在說單任務多線程的情況,但對于多任務來說也是一樣的,總結來說就是 Leader-Follower模式只會喚醒真正需要“干事”的線程)。這是很沒有必要的,而且浪費資源。所以使用Leader-Follower模式的好處是:避免沒必要的喚醒和阻塞的操作,這樣會更加有效,且節省資源。
1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 public ScheduledThreadPoolExecutor(int corePoolSize) { 5 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 6 new DelayedWorkQueue()); 7 } 8 9 /** 10 * ThreadPoolExecutor: 11 */12 public ThreadPoolExecutor(int corePoolSize,13 int maximumPoolSize,14 long keepAliveTime,15 TimeUnit unit,16 BlockingQueue<Runnable> workQueue) {17 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,18 Executors.defaultThreadFactory(), defaultHandler);19 }
可以看到:ScheduledThreadPoolExecutor的構造器是調用了父類ThreadPoolExecutor的構造器來實現的,而父類的構造器以及之中的所有參數我在之前分析ThreadPoolExecutor的源碼文章中講過,這里就不再贅述了。
execute方法和submit方法內部都是調用的schedule方法,所以來看一下其實現:
1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 public ScheduledFuture<?> schedule(Runnable command, 5 long delay, 6 TimeUnit unit) { 7 //非空校驗 8 if (command == null || unit == null) 9 throw new NullPointerException();10 //包裝任務11 RunnableScheduledFuture<?> t = decorateTask(command,12 new ScheduledFutureTask<Void>(command, null,13 triggerTime(delay, unit)));14 //延遲執行15 delayedExecute(t);16 return t;17 }1819 /** 20 * 第13行代碼處: 21 * 延遲操作的觸發時間 22 */23 private long triggerTime(long delay, TimeUnit unit) {24 //delay非負處理25 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));26 }2728 long triggerTime(long delay) {29 /* 30 now方法內部就一句話:“System.nanoTime();”,也就是獲取當前時間。這里也就是獲取 31 當前時間加上延遲時間后的結果。如果延遲時間超過了上限,會在overflowFree方法中處理 32 */33 return now() +34 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));35 }3637 private long overflowFree(long delay) {38 //獲取隊頭節點(不移除)39 Delayed head = (Delayed) super.getQueue().peek();40 if (head != null) {41 //獲取隊頭的剩余延遲時間42 long headDelay = head.getDelay(NANOSECONDS);43 /* 44 能走進本方法中,就說明delay是一個接近long最大值的數。此時判斷如果headDelay小于0 45 就說明延遲時間已經到了或過期了但是還沒有執行,并且delay和headDelay的差值小于0,說明headDelay 46 和delay的差值已經超過了long的范圍 47 */48 if (headDelay < 0 && (delay - headDelay < 0))49 //此時更新一下delay的值,確保其和headDelay的差值在long的范圍內,同時delay也會重新變成一個正數50 delay = Long.MAX_VALUE + headDelay;51 }52 return delay;53 }5455 /** 56 * 第39行代碼處: 57 * 調用DelayedWorkQueue中覆寫的peek方法來獲取隊頭節點 58 */59 public RunnableScheduledFuture<?> peek() {60 final ReentrantLock lock = this.lock;61 lock.lock();62 try {63 return queue[0];64 } finally {65 lock.unlock();66 }67 }6869 /** 70 * 第42行代碼處: 71 * 可以看到本方法就是獲取延遲時間和當前時間的差值 72 */73 public long getDelay(TimeUnit unit) {74 return unit.convert(time - now(), NANOSECONDS);75 }
上面第11行和第12行代碼處會進行任務的包裝:
1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 ScheduledFutureTask(Runnable r, V result, long ns) { 5 //調用父類FutureTask的構造器 6 super(r, result); 7 //這里會將延遲時間賦值給this.time 8 this.time = ns; 9 //period用來表示任務的類型,為0表示延遲任務,否則表示周期性任務10 this.period = 0;11 //這里會給每一個任務賦值一個唯一的序列號。當延遲時間相同時,會以該序列號來進行判斷。序列號小的會出隊12 this.sequenceNumber = sequencer.getAndIncrement();13 }1415 /** 16 * schedule方法第11行代碼處: 17 * 包裝任務,這里只是返回task而已,子類可以覆寫本方法中的邏輯 18 */19 protected <V> RunnableScheduledFuture<V> decorateTask(20 Runnable runnable, RunnableScheduledFuture<V> task) {21 return task;22 }
在schedule方法的第15行代碼處會執行延遲任務,添加任務和補充工作線程:
1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 private void delayedExecute(RunnableScheduledFuture<?> task) { 5 if (isShutdown()) 6 /* 7 這里會調用父類ThreadPoolExecutor的isShutdown方法來判斷當前線程池是否處于關閉或正在關閉的狀態, 8 如果是的話就執行具體的拒絕策略 9 */ 10 reject(task); 11 else { 12 //否則就往延遲隊列中添加當前任務 13 super.getQueue().add(task); 14 /* 15 添加后繼續判斷當前線程池是否處于關閉或正在關閉的狀態,如果是的話就判斷此時是否還能繼續執行任務, 16 如果不能的話就刪除上面添加的任務 17 */ 18 if (isShutdown() && 19 !canRunInCurrentRunState(task.isPeriodic()) && 20 remove(task)) 21 //同時會取消此任務的執行 22 task.cancel(false); 23 else 24 //否則,說明線程池是可以繼續執行任務的,就去判斷此時是否需要補充工作線程 25 ensurePrestart(); 26 } 27 } 28 29 /** 30 * 第19行代碼處: 31 * 傳進來的periodic表示任務是否是周期性任務,如果是的話就是true(通過“period != 0”進行判斷) 32 */ 33 boolean canRunInCurrentRunState(boolean periodic) { 34 return isRunningOrShutdown(periodic ? 35 //關閉線程池時判斷是否需要繼續執行周期性任務 36 continueExistingPeriodicTasksAfterShutdown : 37 //關閉線程池時判斷是否需要繼續執行延遲任務 38 executeExistingDelayedTasksAfterShutdown); 39 } 40 41 /** 42 * ThreadPoolExecutor: 43 */ 44 final boolean isRunningOrShutdown(boolean shutdownOK) { 45 //獲取當前線程池的運行狀態 46 int rs = runStateOf(ctl.get()); 47 //如果是RUNNING狀態的,或者是SHUTDOWN狀態并且是能繼續執行任務的,就返回true 48 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 49 } 50 51 /** 52 * ScheduledThreadPoolExecutor: 53 * 上面第20行代碼處的remove方法會調用ThreadPoolExecutor的remove方法,而該方法我在之前的 54 * ThreadPoolExecutor的源碼分析文章中已經分析過了。但是其中會調用延遲隊列覆寫的remove邏輯, 55 * 也就是本方法(同時第130行代碼處也會調用到這里) 56 */ 57 public boolean remove(Object x) { 58 final ReentrantLock lock = this.lock; 59 //加鎖 60 lock.lock(); 61 try { 62 //獲取當前節點的堆索引位 63 int i = indexOf(x); 64 if (i < 0) 65 //如果找不到的話,就直接返回false 66 return false; 67 68 //將當前節點的索引位設置為-1,因為下面要進行刪除了 69 setIndex(queue[i], -1); 70 //size-1 71 int s = --size; 72 //獲取小頂堆的最后一個節點,用于替換 73 RunnableScheduledFuture<?> replacement = queue[s]; 74 //將最后一個節點置為null 75 queue[s] = null; 76 //如果要刪除的節點本身就是最后一個節點的話,就可以直接返回true了,因為不影響小頂堆的結構 77 if (s != i) { 78 /* 79 否則執行一次siftDown下溯過程,將最后一個節點的值重新插入到小頂堆中 80 這其中會刪除i位置處的節點(siftDown方法后面會再次調用,到時候再來詳細分析該方法的實現) 81 */ 82 siftDown(i, replacement); 83 /* 84 經過上面的siftDown的操作后,如果最后一個節點的延遲時間本身就比要刪除的節點的小的話, 85 那么就會直接將最后一個節點放在要刪除節點的位置上。此時從刪除節點到其下面的節點都是滿足 86 小頂堆結構的,但是不能保證replacement也就是當前刪除后的替換節點和其父節點之間滿足小頂堆 87 結構,也就是說可能出現replacement節點的延遲時間比其父節點的還小的情況 88 */ 89 if (queue[i] == replacement) 90 //那么此時就調用一次siftUp上溯操作,再次調整replacement節點其上的小頂堆的結構即可 91 siftUp(i, replacement); 92 } 93 return true; 94 } finally { 95 //釋放鎖 96 lock.unlock(); 97 } 98 } 99100 /** 101 * 第63行代碼處: 102 */103 private int indexOf(Object x) {104 if (x != null) {105 if (x instanceof ScheduledFutureTask) {106 //如果當前節點是ScheduledFutureTask類型的,就獲取它的堆索引位107 int i = ((ScheduledFutureTask) x).heapIndex;108 //大于等于0和小于size說明當前節點還在小頂堆中,并且當前節點還在延遲隊列中的話,就直接返回該索引位109 if (i >= 0 && i < size && queue[i] == x)110 return i;111 } else {112 //否則就按照普通遍歷的方式查找是否有相等的節點,如果有的話就返回索引位113 for (int i = 0; i < size; i++)114 if (x.equals(queue[i]))115 return i;116 }117 }118 //找不到的話就返回-1119 return -1;120 }121122 /** 123 * 第22行代碼處: 124 */125 public boolean cancel(boolean mayInterruptIfRunning) {126 //調用FutureTask的cancel方法來嘗試取消此任務的執行127 boolean cancelled = super.cancel(mayInterruptIfRunning);128 //如果取消成功了,并且允許刪除節點,并且當前節點存在于小頂堆中的話,就刪除它129 if (cancelled && removeOnCancel && heapIndex >= 0)130 remove(this);131 return cancelled;132 }133134 /** 135 * ThreadPoolExecutor: 136 * 第25行代碼處: 137 */138 void ensurePrestart() {139 //獲取當前線程池的工作線程數140 int wc = workerCountOf(ctl.get());141 if (wc < corePoolSize)142 /* 143 如果小于核心線程數,就添加一個核心線程,之前我在分析ThreadPoolExecutor的源碼文章中講過, 144 addWorker方法的執行中會同時啟動運行線程。這里傳入的firstTask參數為null,因為不需要立即執行任務, 145 而是從延遲隊列中拿取任務 146 */147 addWorker(null, true);148 else if (wc == 0)149 //如果當前沒有工作線程,就去添加一個非核心線程,然后運行它。保證至少要有一個線程150 addWorker(null, false);151 /* 152 從這里可以看出,如果當前的工作線程數已經達到了核心線程數后,就不會再創建工作線程了 153 定時線程池最多只有“核心線程數”個線程,也就是通過構造器傳進來的參數大小 154 */155 }
因為延遲隊列是用小頂堆構建的,所以添加的時候會涉及到 小頂堆的調整:
1 /** 2 * ScheduledThreadPoolExecutor: 3 * 這里會調用DelayedWorkQueue的add方法 4 */ 5 public boolean add(Runnable e) { 6 return offer(e); 7 } 8 9 public boolean offer(Runnable x) { 10 //非空校驗 11 if (x == null) 12 throw new NullPointerException(); 13 //強轉類型 14 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x; 15 final ReentrantLock lock = this.lock; 16 //加鎖 17 lock.lock(); 18 try { 19 //獲取當前的任務數量 20 int i = size; 21 //判斷是否需要擴容(初始容量為16) 22 if (i >= queue.length) 23 grow(); 24 //size+1 25 size = i + 1; 26 if (i == 0) { 27 //如果當前是第一個任務的話,就直接放在小頂堆的根節點位置處就行了(隊列第一個位置) 28 queue[0] = e; 29 //同時設置一下當前節點的堆索引位為0 30 setIndex(e, 0); 31 } else { 32 //否則就用siftUp的方式來插入到應該插入的位置 33 siftUp(i, e); 34 } 35 //經過上面的插入過程之后,如果小頂堆的根節點還是當前新添加節點的話,說明新添加節點的延遲時間是最短的 36 if (queue[0] == e) { 37 //那么此時不管有沒有leader線程,都得將其置為null 38 leader = null; 39 /* 40 并且重新將條件隊列上的一個節點轉移到CLH隊列中(如果當前只有一個節點的時候也會進入到signal方法中 41 但無妨,因為此時條件隊列中還沒有節點,所以并不會做什么)需要提一點的是:如果真的看過signal方法內部實現 42 的話就會知道,signal方法在常規情況下并不是在做喚醒線程的工作,喚醒是在下面的unlock方法中實現的 43 */ 44 available.signal(); 45 } 46 } finally { 47 /* 48 釋放鎖(注意,這里只會喚醒CLH隊列中的head節點的下一個節點,可能是上面被鎖住的添加任務的其他線程、 49 也可能是上次執行完任務后準備再次拿取任務的線程,還有可能是等待被喚醒的follower線程,又或者有其他的 50 情況。但不管是哪個,只要能保證喚醒動作是一直能被傳播下去的就行。ReentrantLock和阻塞隊列的執行細節 51 詳見我之前對AQS源碼進行分析的文章) 52 */ 53 lock.unlock(); 54 } 55 return true; 56 } 57 58 /** 59 * 第23行代碼處: 60 */ 61 private void grow() { 62 int oldCapacity = queue.length; 63 //可以看到這里的擴容策略是*1.5的方式 64 int newCapacity = oldCapacity + (oldCapacity >> 1); 65 //如果擴容后的新容量溢出了,就將其恢復為int的最大值 66 if (newCapacity < 0) 67 newCapacity = Integer.MAX_VALUE; 68 //使用Arrays.copyOf(System.arraycopy)的方式來進行數組的拷貝 69 queue = Arrays.copyOf(queue, newCapacity); 70 } 71 72 /** 73 * 第30行、第99行和第109行代碼處: 74 * 設置f節點在小頂堆中的索引位為idx,這樣在最后的刪除節點時可以通過index是否大于0來判斷當前節點是否仍在小頂堆中 75 */ 76 private void setIndex(RunnableScheduledFuture<?> f, int idx) { 77 if (f instanceof ScheduledFutureTask) 78 ((ScheduledFutureTask) f).heapIndex = idx; 79 } 80 81 /** 82 * 第33行代碼處: 83 * 堆排序的精髓就在于siftUp和siftDown方法,但本實現與常規的實現略有不同,多了一個入參key 84 * key代表當前要插入節點中的任務 85 */ 86 private void siftUp(int k, RunnableScheduledFuture<?> key) { 87 //當k<=0的時候說明已經上溯到根節點了 88 while (k > 0) { 89 //獲取父節點的索引((當前節點索引位-1)/2的方式) 90 int parent = (k - 1) >>> 1; 91 //獲取父節點的任務 92 RunnableScheduledFuture<?> e = queue[parent]; 93 //如果當前要插入節點中的任務延遲時間大于父節點的延遲時間的話,就停止上溯過程,說明找到了插入的位置 94 if (key.compareTo(e) >= 0) 95 break; 96 //否則就需要將父節點的內容賦值給當前節點 97 queue[k] = e; 98 //同時設置一下父節點的堆索引位為當前節點處 99 setIndex(e, k);100 //然后將父節點賦值給當前節點,繼續下一次的上溯過程101 k = parent;102 }103 /* 104 走到這里說明有兩種情況:<1>已經結束了上溯的過程,但最后一次的父節點還沒有賦值,這里就是進行賦值的操作; 105 <2>如果本方法進來的時候要添加的最后一個節點本身就滿足小頂堆條件的話,那么該處就是在給最后一個節點進行賦值 106 */107 queue[k] = key;108 //同時設置一下要插入節點的堆索引位109 setIndex(key, k);110 }111112 /** 113 * 第94行代碼處: 114 */115 public int compareTo(Delayed other) {116 //如果比較的就是當前對象,就直接返回0相等117 if (other == this)118 return 0;119 if (other instanceof ScheduledFutureTask) {120 //如果需要比較的任務也是ScheduledFutureTask類型的話,就首先強轉一下類型121 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;122 //計算當前任務和需要比較的任務之間的延遲時間差123 long diff = time - x.time;124 if (diff < 0)125 //小于0說明當前任務的延遲時間更短,就返回-1126 return -1;127 else if (diff > 0)128 //大于0說明需要比較的任務的延遲時間更短,就返回1129 return 1;130 //如果兩者相等的話,就比較序列號,誰的序列號更小(序列號是唯一的),就應該先被執行131 else if (sequenceNumber < x.sequenceNumber)132 return -1;133 else134 return 1;135 }136 //如果需要比較的任務不是ScheduledFutureTask類型的話,就通過getDelay的方式來進行比較137 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);138 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;139 }
在上面的ensurePrestart方法中會調用到addWorker方法,以此來補充工作線程。之前我對ThreadPoolExecutor源碼進行分析的文章中說到過 ,addWorker方法會調用到getTask方法來從隊列中拿取任務:
1 /** 2 * ThreadPoolExecutor: 3 */ 4 private Runnable getTask() { 5 //... 6 /* 7 這里的allowCoreThreadTimeOut默認為false(為true表示空閑的核心線程也是要超時銷毀的), 8 而上面說過定時線程池最多只有“核心線程數”個線程,所以timed為false 9 */ 10 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 11 //... 12 //因為timed為false,所以這里會走take方法中的邏輯 13 Runnable r = timed ? 14 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 15 workQueue.take(); 16 //... 17 } 18 19 /** 20 * ScheduledThreadPoolExecutor: 21 * 第15行代碼處: 22 * 上面的take方法會調用到DelayedWorkQueue的take方法,而該方法也就是用來實現延遲拿取任務的 23 */ 24 public RunnableScheduledFuture<?> take() throws InterruptedException { 25 final ReentrantLock lock = this.lock; 26 //加鎖(響應中斷模式) 27 lock.lockInterruptibly(); 28 try { 29 for (; ; ) { 30 //獲取隊頭節點 31 RunnableScheduledFuture<?> first = queue[0]; 32 if (first == null) 33 /* 34 如果當前延遲隊列中沒有延遲任務,就在這里阻塞當前線程(通過AQS中條件隊列的方式),等待有任務時被喚醒 35 另外,當線程執行完任務后也會再次走到getTask方法中的本方法中。如果此時沒任務了,就會在此被阻塞休眠住 36 (我在之前AQS源碼分析的文章中說過:await方法中會釋放掉所有的ReentrantLock鎖資源,然后才會被阻塞住) 37 */ 38 available.await(); 39 else { 40 //否則就獲取隊頭的剩余延遲時間 41 long delay = first.getDelay(NANOSECONDS); 42 //如果延遲時間已經到了的話,就刪除并返回隊頭,表示拿取到了任務 43 if (delay <= 0) 44 return finishPoll(first); 45 /* 46 這里將隊頭節點的引用置為null,如果不置為null的話,可能有多個等待著的線程同時持有著隊頭節點的 47 first引用,這樣如果要刪除隊頭節點的話,因為其還有其他線程的引用,所以不能被及時回收,造成內存泄漏 48 */ 49 first = null; 50 /* 51 如果leader不為null,說明有其他的線程已經成為了leader線程,正在延遲等待著 52 同時此時沒有新的延遲時間最短的節點進入到延遲隊列中 53 */ 54 if (leader != null) 55 /* 56 那么當前線程就變成了follower線程,需要被阻塞住,等待被喚醒(同上,其中會釋放掉所有的鎖資源) 57 線程執行完任務后也會再次走到本方法中拿取任務,如果走到這里發現已經有別的leader線程了, 58 那么當前線程也會被阻塞休眠住;否則就會在下面的else分支中再次成為leader線程 59 */ 60 available.await(); 61 else { 62 /* 63 leader為null,可能是上一個leader線程拿取到任務后喚醒的下一個線程,也有可能 64 是一個新的延遲時間最短的節點進入到延遲隊列中,從而將leader置為null 65 66 此時獲取當前線程 67 */ 68 Thread thisThread = Thread.currentThread(); 69 //并將leader置為當前線程,也就是當前線程成為了leader線程 70 leader = thisThread; 71 try { 72 /* 73 這里也就是在做具體的延時等待delay納秒的操作了,具體涉及到AQS中條件隊列的相關操作 74 如果被喚醒的話可能是因為到達了延遲時間從而醒來;也有可能是被別的線程signal喚醒了; 75 還有可能是中斷被喚醒。正常情況下是等到達了延遲時間后,這里會醒來并進入到下一次循環中的 76 finishPoll方法中,剔除隊頭節點并最終返回(awaitNanos方法和await方法類似,其中會釋放掉 77 所有的鎖資源;不一樣的是在被喚醒時會把當前節點從條件隊列中“轉移”到CLH隊列中。這里可以認為 78 是轉移,因為在條件隊列中的該節點狀態已經改為了0,相當于是個垃圾節點,后續會進行刪除) 79 */ 80 available.awaitNanos(delay); 81 } finally { 82 /* 83 不管awaitNanos是如何被喚醒的,此時會判斷當前的leader線程是否還是當前線程 84 如果是的話就將leader置為null,也就是當前線程不再是leader線程了 85 */ 86 if (leader == thisThread) 87 leader = null; 88 } 89 } 90 } 91 } 92 } finally { 93 //在退出本方法之前,判斷如果leader線程為null并且刪除隊頭后的延遲隊列仍然不為空的話(說明此時有其他的延遲任務) 94 if (leader == null && queue[0] != null) 95 //就將條件隊列上的一個節點轉移到CLH隊列中(同時會剔除上面的垃圾條件節點) 96 available.signal(); 97 /* 98 釋放鎖(同offer方法中的邏輯,這里只會喚醒CLH隊列中的head節點的下一個節點。這里就體現了 99 Leader-Follower模式:當leader線程拿取到任務后準備要執行時,會首先喚醒剩下線程中的一個, 100 它將會成為新的leader線程,并以此往復。保證在任何時間都只有一個leader線程,避免不必要的喚醒與睡眠) 101 */102 lock.unlock();103 }104 }105106 /** 107 * 第44行代碼處: 108 */109 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {110 //size-1111 int s = --size;112 //獲取隊列中的最后一個節點113 RunnableScheduledFuture<?> x = queue[s];114 //并置空它,便于GC,這里也就是在刪除最后一個節點115 queue[s] = null;116 //如果刪除前延遲隊列中有不止一個節點的話,就進入到siftDown方法中,將小頂堆中的根節點刪除,并且重新維護小頂堆117 if (s != 0)118 siftDown(0, x);119 //同時設置一下刪除前的根節點的堆索引位為-1,表示其不存在于小頂堆中了120 setIndex(f, -1);121 //最后將其返回出去122 return f;123 }124125 /** 126 * 第118行代碼處: 127 * 方法參數中的key代表刪除的最后一個節點中的任務 128 */129 private void siftDown(int k, RunnableScheduledFuture<?> key) {130 /* 131 這里會取數組長度的一半half(注意這里的size是已經刪除最后一個節點后的size), 132 而half也就是在指向最后一個非葉子節點的下一個節點 133 */134 int half = size >>> 1;135 //從這里可以看出下溯的終止條件是k大于等于half,也就是此時遍歷到已經沒有了非葉子節點,自然不需要進行調整136 while (k < half) {137 //獲取左孩子節點的索引位138 int child = (k << 1) + 1;139 //獲取左孩子節點的任務140 RunnableScheduledFuture<?> c = queue[child];141 //獲取右孩子節點的索引位142 int right = child + 1;143 //如果右孩子節點的索引位小于size,也就是在說當前節點含有右子樹。并且左孩子節點的任務延遲時間大于右孩子節點的話144 if (right < size && c.compareTo(queue[right]) > 0)145 //就將c重新指向為右孩子節點146 c = queue[child = right];147 /* 148 走到這里說明c指向的是左右子節點中、任務延遲時間較小的那個節點。此時判斷如果最后一個節點的 149 任務延遲時間小于等于這個較小節點的話,就可以停止下溯了,說明找到了插入的位置 150 */151 if (key.compareTo(c) <= 0)152 break;153 //否則就把較小的那個節點賦值給當前節點處154 queue[k] = c;155 //同時設置一下延遲時間較小的那個節點的堆索引位為當前節點處156 setIndex(c, k);157 //然后將當前節點指向那個較小的節點,繼續下一次循環158 k = child;159 }160 /* 161 同siftUp方法一樣,走到這里說明有兩種情況:<1>已經結束了下溯的過程,但最后一次的子節點還沒有賦值, 162 這里會把其賦值為之前刪除的最后一個節點; 163 <2>如果根節點的左右子節點中、任務延遲時間較小的那個節點本身的延遲時間就比之前刪除節點大的話, 164 就會把根節點替換為之前刪除的最后一個節點 165 所以本方法加上finishPoll方法,實際上并沒有將最后一個節點刪除,最后一個節點中的任務一直都是保留著的 166 (也就是key),而是變相地將堆的根節點刪除了(在第一種情況中根節點在第一次賦值為左右子節點中、 167 任務延遲時間較小的那個節點時,就已經被覆蓋了) 168 */169 queue[k] = key;170 //同時設置一下最后一個節點現在新的堆索引位171 setIndex(key, k);172 }
拿取到任務之后,就是具體的執行任務了。addWorker方法具體的執行邏輯我在之前ThreadPoolExecutor的源碼分析文章中已經講過了,其中執行任務的時候會調用task的run方法,也就是這里包裝為ScheduledFutureTask的run方法:
1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 public void run() { 5 //判斷是否是周期性任務 6 boolean periodic = isPeriodic(); 7 if (!canRunInCurrentRunState(periodic)) { 8 //如果此時不能繼續執行任務的話,就嘗試取消此任務的執行 9 cancel(false);10 } else if (!periodic)11 /* 12 如果是延遲任務,就調用ScheduledFutureTask父類FutureTask的run方法, 13 其中會通過call方法來最終調用到使用者具體寫的任務 14 */15 ScheduledFutureTask.super.run();16 else if (ScheduledFutureTask.super.runAndReset()) {17 //周期性任務的執行放在下一節中進行分析18 setNextRunTime();19 reExecutePeriodic(outerTask);20 }21 }
scheduleAtFixedRate方法是以上次的延遲時間點開始,延遲指定時間后再次執行當前任務;而scheduleWithFixedDelay方法是以上個周期任務執行完畢后的時間點開始,延遲指定時間后再次執行當前任務。因為這兩個方法的實現絕大部分都是一樣的,所以合在一起來進行分析:
1 /** 2 * ScheduledThreadPoolExecutor: 3 * scheduleAtFixedRate方法 4 */ 5 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 6 long initialDelay, 7 long period, 8 TimeUnit unit) { 9 //非空校驗 10 if (command == null || unit == null) 11 throw new NullPointerException(); 12 //非負校驗 13 if (period <= 0) 14 throw new IllegalArgumentException(); 15 //包裝任務 16 ScheduledFutureTask<Void> sft = 17 new ScheduledFutureTask<Void>(command, 18 null, 19 triggerTime(initialDelay, unit), 20 unit.toNanos(period)); 21 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 22 //把任務賦值給ScheduledFutureTask的outerTask屬性 23 sft.outerTask = t; 24 //延遲執行 25 delayedExecute(t); 26 return t; 27 } 28 29 /** 30 * scheduleWithFixedDelay方法 31 */ 32 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 33 long initialDelay, 34 long delay, 35 TimeUnit unit) { 36 //非空校驗 37 if (command == null || unit == null) 38 throw new NullPointerException(); 39 //非負校驗 40 if (delay <= 0) 41 throw new IllegalArgumentException(); 42 //包裝任務 43 ScheduledFutureTask<Void> sft = 44 new ScheduledFutureTask<Void>(command, 45 null, 46 triggerTime(initialDelay, unit), 47 unit.toNanos(-delay)); 48 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 49 //把任務賦值給ScheduledFutureTask的outerTask屬性 50 sft.outerTask = t; 51 //延遲執行 52 delayedExecute(t); 53 return t; 54 } 55 56 /** 57 * 第17行和第44行代碼處: 58 */ 59 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 60 super(r, result); 61 this.time = ns; 62 /* 63 可以看到這里與schedule方法中調用ScheduledFutureTask構造器的區別是多了一個period入參 64 在schedule方法中this.period賦值為0,而這里會賦值為周期時間。其他的代碼都是一樣的 65 如果細心的話可以看出:在上面scheduleAtFixedRate方法傳入的period是一個大于0的數,而 66 scheduleWithFixedDelay方法傳入的period是一個小于0的數,以此來進行區分 67 */ 68 this.period = period; 69 this.sequenceNumber = sequencer.getAndIncrement(); 70 }
周期性任務和延遲任務的拿取任務邏輯都是一樣的,而在下面具體運行任務時有所不同,下面就來看一下其實現的差異:
1 /** 2 * ScheduledThreadPoolExecutor: 3 */ 4 public void run() { 5 boolean periodic = isPeriodic(); 6 if (!canRunInCurrentRunState(periodic)) 7 cancel(false); 8 else if (!periodic) 9 ScheduledFutureTask.super.run();10 /* 11 前面都是之前分析過的,而周期性任務會走下面的分支中 12 13 FutureTask的runAndReset方法相比于run方法來說,區別在于可以重復計算(run方法不能復用) 14 因為runAndReset方法在計算完成后不會修改狀態,狀態一直都是NEW 15 */16 else if (ScheduledFutureTask.super.runAndReset()) {17 //設置下次的運行時間點18 setNextRunTime();19 //重新添加任務20 reExecutePeriodic(outerTask);21 }22 }2324 /** 25 * 第18行代碼處: 26 */27 private void setNextRunTime() {28 /* 29 這里會獲取period,也就是之前設置的周期時間。上面說過,通過period的正負就可以區分出到底調用的是 30 scheduleAtFixedRate方法還是scheduleWithFixedDelay方法 31 */32 long p = period;33 if (p > 0)34 /* 35 如果調用的是scheduleAtFixedRate方法,下一次的周期任務時間點就是起始的延遲時間加上周期時間,需要注意的是: 36 如果任務執行的時間大于周期時間period的話,那么定時線程池就不會按照原先設計的延遲時間進行執行,而是會按照近似于 37 任務執行的時間來作為延遲的間隔(不管核心線程有多少個都是如此,因為任務是放在延遲隊列中的、是線性執行的) 38 */39 time += p;40 else41 /* 42 triggerTime方法之前分析過是獲取當前時間+延遲時間后的結果,而此時是在執行完任務后,也就是說: 43 如果調用的是scheduleWithFixedDelay方法,下一次的周期任務時間點就是執行完上次任務后的時間點加上周期時間 44 由此可以看出,scheduleAtFixedRate方法和scheduleWithFixedDelay方法的區別就在于下一次time設置的不同而已 45 */46 time = triggerTime(-p);47 //time屬性會記錄到節點中,在小頂堆中通過compareTo方法來進行排序48 }4950 /** 51 * 第20行代碼處: 52 */53 void reExecutePeriodic(RunnableScheduledFuture<?> task) {54 //判斷此時是否還能繼續執行任務55 if (canRunInCurrentRunState(true)) {56 /* 57 這里也就是重新往延遲隊列中添加任務,以此達到周期執行的效果。添加之后在getTask方法中的take方法中 58 就又可以拿到這個任務。設置下次的執行時間,然后再添加任務...周而復始 59 */60 super.getQueue().add(task);61 //添加后繼續判斷此時是否還能繼續執行任務,如果不能的話就刪除上面添加的任務62 if (!canRunInCurrentRunState(true) && remove(task))63 //同時會取消此任務的執行64 task.cancel(false);65 else66 //否則,說明線程池是可以繼續執行任務的,就去判斷此時是否需要補充工作線程67 ensurePrestart();68 }69 }
注意:網上的一種說法是: scheduleAtFixedRate方法是以上一個任務開始的時間計時,period時間過去后,檢測上一個任務是否執行完畢。如果上一個任務執行完畢,則當前任務立即執行;如果上一個任務沒有執行完畢,則需要等上一個任務執行完畢后立即執行。實際上這種說法是錯誤的,盡管它的表象是對的。正確的說法是: 如果任務的執行時間小于周期時間的話,則會以上次任務執行開始時間加上周期時間后,再去執行下一次任務;而如果任務的執行時間大于周期時間的話,則會等到上次任務執行完畢后立即(近似于)執行下次任務。這兩種說法的區別就在于任務的執行時間大于周期時間的時候,檢測上一個任務是否完畢的時機不同。實際上在period時間過去后,根本不會有任何的檢測機制。因為只有等上次任務執行完畢后才會往延遲隊列中添加下一次任務,從而觸發各種后續的動作。所以在period時間點時,當前線程還在執行任務中,而其他的線程因為延遲隊列中為空會處于休眠的狀態(假如就只有一個周期任務的話)。所以根本不會有所謂的“檢測”的說法,這種說法也只能說是想當然了。還是那句話:“Talk is cheap. Show me the code.”
既然都說到這里了,那么現在就想來嘗試分析一下如果任務的執行時間大于周期時間的話,具體是怎樣的一個執行流程?
為了便于分析,假設現在是只有一個周期任務的場景,那么延遲隊列中的任務數量最多就只會有1個:拿取到任務,延遲隊列中就變為空。執行完任務的時候,就又會往隊列中放一個任務。這樣其他搶不到任務的線程就會被休眠住。而添加任務的時候因為每次重新添加的任務都是小頂堆的根節點(從無到有),即添加的這個任務就是此時延遲時間最短的任務,所以同時會觸發嘗試喚醒線程的動作。
同時在添加下一個任務前會修改下一次的時間點。在setNextRunTime方法中,scheduleAtFixedRate方法是以上一次的延遲時間點加上周期時間來作為下一次的延遲時間點的,并不是scheduleWithFixedDelay方法獲取當前時間加上周期時間的方式。在當前這種情況下周期時間是要小于任務的執行時間的,也就是說會造成下一次的延遲時間點會賦值為一個已經過期的時間。且隨著周期的增加,下一次的延遲時間點會離當前時間點越來越遠。既然下一次的延遲時間點已經過期了,那么就會去立馬執行任務。
所以總結一下:需要被喚醒的線程和上次執行完任務的線程就會去爭搶鎖資源(喚醒線程會把當前節點放進CLH隊列中,上次執行完任務的線程也會再次走到lockInterruptibly方法中(在它重新放任務的時候也會經歷一次lock),同時因為是ReentrantLock非公平鎖,這樣在調用unlock解鎖時就會出現在CLH隊列上的搶資源現象了),搶到的就會立馬去執行下一次的周期任務,而不會有任何的延時,造成的表象就是會以一個近似于任務執行時間為間隔的周期來執行任務。
1 /** 2 * ScheduledThreadPoolExecutor: 3 * 可以看到,定時線程池的shutdown方法是使用的父類ThreadPoolExecutor的shutdown方法, 4 * 而該方法我在之前的ThreadPoolExecutor的源碼分析文章中已經分析過了。但是其中會調用 5 * onShutdown的鉤子方法,也就是在ScheduledThreadPoolExecutor中的實現 6 */ 7 public void shutdown() { 8 super.shutdown(); 9 }1011 @Override12 void onShutdown() {13 //獲取延遲隊列14 BlockingQueue<Runnable> q = super.getQueue();15 //關閉線程池時判斷是否需要繼續執行延遲任務16 boolean keepDelayed =17 getExecuteExistingDelayedTasksAfterShutdownPolicy();18 //關閉線程池時判斷是否需要繼續執行周期性任務19 boolean keepPeriodic =20 getContinueExistingPeriodicTasksAfterShutdownPolicy();21 if (!keepDelayed && !keepPeriodic) {22 //如果都不需要的話,就將延遲隊列中的任務逐個取消(并刪除)23 for (Object e : q.toArray())24 if (e instanceof RunnableScheduledFuture<?>)25 ((RunnableScheduledFuture<?>) e).cancel(false);26 //最后做清理工作27 q.clear();28 } else {29 for (Object e : q.toArray()) {30 if (e instanceof RunnableScheduledFuture) {31 //否則就判斷如果任務是RunnableScheduledFuture類型的,就強轉一下類型32 RunnableScheduledFuture<?> t =33 (RunnableScheduledFuture<?>) e;34 //如果關閉線程池時不需要繼續執行任務,又或者需要繼續執行但是任務已經取消了35 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||36 t.isCancelled()) {37 //就刪除當前節點38 if (q.remove(t))39 //同時取消任務40 t.cancel(false);41 }42 }43 }44 }45 //根據線程池狀態來判斷是否應該結束線程池46 tryTerminate();47 }4849 /** 50 * 第27行代碼處: 51 */52 public void clear() {53 final ReentrantLock lock = this.lock;54 //加鎖55 lock.lock();56 try {57 for (int i = 0; i < size; i++) {58 //遍歷獲得延遲隊列中的每一個節點59 RunnableScheduledFuture<?> t = queue[i];60 if (t != null) {61 //將節點置為null62 queue[i] = null;63 //同時將索引位置為-1(recheck)64 setIndex(t, -1);65 }66 }67 //size賦為初始值068 size = 0;69 } finally {70 //釋放鎖71 lock.unlock();72 }73 }
感謝各位的閱讀,以上就是“定時線程池是怎么實現延遲執行和周期執行的”的內容了,經過本文的學習后,相信大家對定時線程池是怎么實現延遲執行和周期執行的這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。