您好,登錄后才能下訂單哦!
本篇內容介紹了“什么是ThreadPoolExecutor”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
ThreadPoolExecutor是一個通過使用可能幾個池線程之一來執行每個提交任務的ExecutorService,這些線程池通常通過Executors工廠方法進行配置。
ThreadPoolExecutor中的線程池處理了兩個不同的問題:
1、由于減少了每個任務調用的開銷,在執行大量的異步任務時它們通常提供改進的性能;
2、它們提供了邊界和管理資源的一種手段,包括多線程,在執行任務集合時的消耗。
每個ThreadPoolExecutor還維護一些基本的統計數據,例如完成任務的數量。
AtomicInteger類型的ctl代表了ThreadPoolExecutor中的控制狀態,它是一個復核類型的成員變量,是一個原子整數,借助高低位包裝了兩個概念:
(1)workerCount:線程池中當前活動的線程數量,占據ctl的低29位;
(2)runState:線程池運行狀態,占據ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀態。
//COUNT_BITS分割32位二進制偏移量,Integer.SIZE即Integer類型長度(32),COUNT_BITS=29,高3位保存線程池的狀態,低29位用來計量對象池中工作線程數 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
AtomicInteger ctl的定義如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //異或運算符 100100|111=100111 private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池理想的最大工作線程數(上限):
//(1<<COUNT_BITS)-1=0x20000000-1=0x1fffffff private static final int CAPACITY = (1 << COUNT_BITS) - 1;
獲取線程池當前的工作線程數:
//通過(與)運算符,示例 11001&1111=1001,CAPACITY=0x1fffffff,所以就是ctl的值(&)CAPACITY就是只獲取ctl低29位的值就是當前線程池的工作線程數 private static int workerCountOf(int c) { return c & CAPACITY; }
//用以下文中workers集合操作的鎖 private final ReentrantLock mainLock = new ReentrantLock(); //用于保存任務并傳遞給工作線程的隊列 private final BlockingQueue<Runnable> workQueue; /** * Set containing all worker threads in pool. Accessed only when * holding mainLock. * 保存線程池中所有工作線程的集合,僅在獲取mainLock鎖權限時可操作 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination 創建線程池的線程通過調用線程池引用.awaitTermination方法中通過termination實現持有鎖后釋放鎖掛起等待工作線程tryTerminate操作成功喚醒,或者超時自動喚醒中斷失敗。 */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. 獲取線程池工作集合歷史最大容量,需獲得鎖 */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. 池完成任務數,在processWorkerExit函數持鎖增量更新 */ private long completedTaskCount; //用以持鎖任務創建worker時創建線程的工廠類 private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. * 在線程非RUNNING狀態或者池容量和隊列容器容量滿載時拒絕處理對象 */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. * 空閑線程等待工作的超時時間(以納秒為單位)。 * 當超過corePoolSize工作線程書或allowCoreThreadTimeOut為true時,線程將使用此超時。 * 否則,他們將永遠等待新的工作 */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. * 如果為false(默認值為false),則即使處于空閑狀態,核心線程也會保持活動狀態。 * 如果為true,則活躍線程使用keepAliveTime來超時等待工作,達到閾值就會釋放線程 */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. * 除非設置allowCoreThreadTimeOut,否則核心池大小是保持活動狀態(不允許超時等)的最低數量, * 在這種情況下,最小值為零 */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. * 線程池最大工作線程書數,受CAPACITY約束,最大不會超過CAPACITY */ private volatile int maximumPoolSize; /** * The default rejected execution handler. * 默認拒絕策略處理器 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
RUNNING: Accept new tasks and process queued tasks(運行狀態,接受新任務,并處理隊列任務)
// 二進制位 -1=0x8000001(取反碼后+1得補碼)=0xfffffffe+1=0xffffffff // 右移29位后=0xe0000000即 1110 0000 0000 0000 0000 0000 0000 0000 // -536870912 private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN: Don't accept new tasks, but process queued tasks(停止運行狀態,不接受新任務,但處理隊列中任務)
//SHUTDOWN=0 private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks(中斷線程池工作狀態,不接受新任務,不處理隊列中準備彈出的任務,但是會執行完現有的工作任務(前提是在修改為STOP前,彈出隊列的任務已經走過線程狀態判斷,執行業務方法,若正好彈出準備判斷線程狀態,STOP扭轉成功,當前任務也會被攔截))
// STOP=0x20000000=0010 0000 0000 0000 0000 0000 0000 0000 // 536870912 private static final int STOP = 1 << COUNT_BITS;
TIDYING:All tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING.will run the terminated() hook method(任務處理結束狀態,workcount=0,線程池運行狀態修改為TIDYING,并且會執行==terminated()==鉤子函數)
// TIDYING=0x40000000=0100 0000 0000 0000 0000 0000 0000 0000 // 1073741824 private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED: terminated() has completed() (terminated()執行完成后,會修改成TERMINATED狀態)
// TERMINATED=0x60000000=0110 0000 0000 0000 0000 0000 0000 0000 // 1610612736 private static final int TERMINATED = 3 << COUNT_BITS;
/** The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() 顯式調用線程池showdown()方法,或者線程池對象不被引用,被GC回收時調用finalize()函數,finalize()函數中調用shutdown() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() 顯式調用shutdownNow()扭轉狀態,修改線程池中workers所有工作線程為中斷狀態,讓接下來隊列彈出的任務都跳過執行任務 * SHUTDOWN -> TIDYING * When both queue and pool are empty 工作線程全部執行完成且隊列也是空,則扭轉狀態 * STOP -> TIDYING * When pool is empty 當沒有任務時,狀態扭轉 * TIDYING -> TERMINATED * When the terminated() hook method has completed 執行terminated(),try{terminated()}finally{扭轉狀態} */
// Packing and unpacking ctl //~CAPACITY 連同符號位反轉(即相反數-1,若不理解百度反碼和補碼) 得 0xe0000000 ,就是取高三位做位與計算 private static int runStateOf(int c) { return c & ~CAPACITY; }
public void execute(Runnable command) { //任務非空校驗 if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ //獲取當前線程池計數器值 int c = ctl.get(); //判斷當前工作線程池的活動線程數是否<核心線程數 if (workerCountOf(c) < corePoolSize) { //進入addWorker函數,參數true(標識創建核心線程數工作線程,該函數中會對該標識識別是當前工作數量數比較核心線程數還是最大線程數),檢查是否可創建worker任務線程 if (addWorker(command, true)) return; //執行此步,意味著進入addWorker函數,資源被其他線程爭奪,導致該任務沒有搶到創建核心工作線程的資源,二次獲取最新活動線程數 c = ctl.get(); } //檢查線程池狀態是否為RUNNING狀態,并且任務隊列是否可追加該任務 if (isRunning(c) && workQueue.offer(command)) { //重新獲取線程池ctl值 int recheck = ctl.get(); //檢查當前線程池狀態為非RUNNING狀態,且從隊列容器中回滾該任務 if (! isRunning(recheck) && remove(command)) //拒絕加入任務,實則調用上文中handlder的rejectedExecution()拋出異常(默認AbortPolicy中止策略) reject(command); //獲取最后一次獲取的計量值,判斷是否工作線程均已完成任務,因為很有可能之前在36行操作之前工作線程數已達最大線程數閾值,但是正好剛加入到隊列中后,線程已全部執行完成,且釋放了,所以需要創建一個空任務的worker線程用以調用runWorker中從隊列中彈出任務去執行(具體查看getTask()) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //第三步則意味著第二步可能池狀態非RUNNING,當然如果是非RUNNING狀態,在addWorker判斷池狀態是否可接受新非核心任務。 //也有可能是隊列滿載,該任務會插隊嘗試創建非核心工作線程,如果創建失敗,會觸發拒絕策略異常 else if (!addWorker(command, false)) reject(command); } //拒絕任務 final void reject(Runnable command) { //默認拒絕策略Handler-AbortPolicy handler.rejectedExecution(command, this); }
流程圖:
2.創建任務線程(addWorker)(流程圖)
/** * firstTask:創建任務工作線程RunWorker-執行的第一個任務,也有可能是空任務(喚醒任務) * core:true(核心工作線程),false(非核心工作線程) 對應的是比較corePoolSize和maxPoolSize條件。 **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //STOP、TIDYING、TERMINATED狀態不接受新任務,所以直接拒絕,創建任務失敗 //SHUTDOWN狀態下,僅允許創建task為null的喚醒任務(前提隊列中存在任務),因為隊列中有任務,否則喚醒任務創建線程無意義不允許創建工作線程 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //根據core標識,對應比較閾值,首先保證不能>=(1>>29)-1,否則不允許創建工作線程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS當前值+1替換當前值,根絕替換返回值判斷是否替換成功,成功,直接跳出循環 if (compareAndIncrementWorkerCount(c)) break retry; //意味著CAS替換失敗,重新取值,判斷最新池狀態是否還是RUNNING,RUNNNING狀態則繼續執行該循環體,嘗試ctl+1操作 //否則直接跳入外循環,進行狀態判斷是否允許創建任務線程 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //此次任務工作的創建標記以及對應的線程啟動標記 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //創建任務工作線程,查看下文代碼的Worker源碼 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //嘗試持有worker集合的權限獨占鎖 mainLock.lock(); try { //如果獲得鎖時,線程池狀態非RUNNING或SHUTDOWN狀態TASK不為空,則不允許該任務工作對象加入集合,也不允許線程啟動 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //檢查線程狀態是否可啟動 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); //記錄worker集合在某一刻的長度最大數,按照配置來說,也就是同時存活存貨線程數最大也頂多就是MaxPoolSize if (s > largestPoolSize) largestPoolSize = s; //開啟任務工作對象加入集合成功標記 workerAdded = true; } } finally { //釋放worker集合的權限獨占鎖,因為可能同一時刻有N個任務需要創建對象加入workers集合 mainLock.unlock(); } if (workerAdded) { //加入工作集合成功,則需要啟動本次工作對象的內置線程 t.start(); //工作對象線程啟動成功標記 workerStarted = true; } } } finally { if (! workerStarted) //添加任務工作對象失敗,看下文源碼 addWorkerFailed(w); } return workerStarted; } //持MainLock鎖,讓workers集合移除添加失敗的任務,以及上文中ctl的cas自增操作回滾,嘗試中止線程,最終釋放鎖 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); //遞歸循環,直至降一操作成功 decrementWorkerCount(); //嘗試停止線程池,該處不詳做介紹,在runWorker中會有介紹 tryTerminate(); } finally { mainLock.unlock(); } } //ThreadPoolExectutor內部類 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //該任務工作對象內置線程(用來處理該工作對象中的任務,以及隊列中的任務),如果是ThreadFactory是異常的,則thread一定是null final Thread thread; //該任務工作對象的初始化任務,有可能是NULL(喚醒任務) Runnable firstTask; //該工作任務執行完成的任務次數 volatile long completedTasks; /** * 構造New實例的內部變量 * state默認為-1,在runWorker中執行到持鎖修改state=1,才可以觸發線程中斷信號,查看下文interruptIfStarted */ Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 將處理邏輯交給ThrearunWorker */ public void run() { runWorker(this); } //判斷該任務工作對象是否有線程持有鎖 protected boolean isHeldExclusively() { return getState() != 0; } //線程池修改為STOP狀態時,會對worker集合中的工作對象內置線程發送中斷信號 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } //上面的任務工作創建失敗后的回滾工作線程數自增操作 private void decrementWorkerCount() { //可以看到是遞歸降一操作,循環降一操作,直至成功才退出循環 do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
流程圖:
3.任務工作線程執行解析(runWorker)
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //獲取工作初始化對象時的任務 Runnable task = w.firstTask; //然后置空,防止重復執行 w.firstTask = null; //本處并不是釋放鎖,只是把默認state(-1)修改為0,允許中斷。 w.unlock(); boolean completedAbruptly = true; try { //如果內置任務是NULL,就會去從隊列中彈出任務處理,空隊列就會阻塞或者超時阻塞。 while (task != null || (task = getTask()) != null) { w.lock(); /** * 兩次檢查 * 第一次檢查 如果是>=STOP狀態 * 第二次檢查 獲取當前線程中斷信號(該靜態方法會清除中斷信號)且判斷是否>=STOP狀態 * 前兩次檢查任一滿足,則繼續檢查該線程是否中斷,未中斷將中斷該線程 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //子實現類執行任務前的鉤子函數 beforeExecute(wt, task); Throwable thrown = null; try { //執行execute傳入的任務,或者execute加入到隊列中的任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //子實現類執行任務后的鉤子函數 afterExecute(task, thrown); } } finally { task = null; //每個工作對象內部會記錄worker對應的線程處理了多少個任務(無論任務內部工作是否有異常),但是如果遇到某個任務拋出異常后,該線程就會釋放 //比如隊列容量80個,池工作最大工作線程數是20個,然后隊列滿載的情況下,極有可能每個線程在執行初始化的內置任務zhi w.completedTasks++; w.unlock(); } } //如果任務執行過程中出現異常,不會執行此步 completedAbruptly = false; } finally { //線程釋放后的退出處理工作,會把此次執行任務的結果和工作對象傳遞給該函數。 processWorkerExit(w, completedAbruptly); } } //用于子類實現類的runWorker的任務執行前置鉤子函數 protected void beforeExecute(Thread t, Runnable r) { } //用于子類實現類的runWorker的任務執行后置鉤子函數 protected void afterExecute(Thread t, Runnable r) { } private void processWorkerExit(Worker w, boolean completedAbruptly) { //未完成標記,就先回滾數量,比如工作任務執行異常,或者開啟核心線程超時配置,指定時間未收到隊列喚醒 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //持鎖,統計工作對象中完成的次數,累加到線程池的累計變量 completedTaskCount += w.completedTasks; //集合移除該工作對象 workers.remove(w); } finally { mainLock.unlock(); } //嘗試中斷,RUNNING狀態或者SHUTDOWN狀態下隊列中有任務,該操作無需理會,下文有該方法詳細介紹 tryTerminate(); int c = ctl.get(); /** * 1.如果是STOP狀態即值以上的狀態,該操作跳過 * 2.該步操作主要是該線程處理任務結果來判斷,如果是異常退出,直接創建一個空任務的處理處理線程 * 3.如果正常線程處理完成釋放的線程,判斷是allowCoreThreadTimeOut是否是true,如果是且隊列是空,則有可能是線程超時未取到任務而釋放線程的,則所有線程return返回直接釋放,無需創建線程,否則則查看隊列中是否有任務未處理完(如果有任務則需要最少一個線程,如果是最后一個線程,需要再創建一個空任務處理線程,由隊列彈出任務來自旋處理),如果是allowCoreThreadTimeOut為默認值false,判斷是否超過核心線程數如果超過就直接釋放線程,否則需要再創建一個空任務的處理線程 **/ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } //創建一個非核心的空任務線程用來處理隊列中的任務 addWorker(null, false); } } final void tryTerminate() { //自旋嘗試停止線程池,前提是非RUNNING狀態或非(SHUTDOWN狀態下隊列不為空)的情況之一,否則直接跳出循環 for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果是活躍線程數>0,就會從工作者列表中從第一個開始取,直到沒有中斷的工作線程,然后對該線程發送中斷信號 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //在持有鎖之后,則把shutdown或者stopz狀態嘗試扭轉為tidying狀態 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); //喚醒嘗試tryTerminate過程中阻塞在condition隊列中的線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // 繼續自旋嘗試該次操作 } }
“什么是ThreadPoolExecutor”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。