您好,登錄后才能下訂單哦!
本篇內容介紹了“Java線程池是怎么工作的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
首先我們看下當一個新的任務提交到線程池之后,線程池是如何處理的
1、線程池判斷核心線程池里的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池里的線程都在執行任務,則執行第二步。
2、線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里進行等待。如果工作隊列滿了,則執行第三步
3、線程池判斷線程池的線程是否都處于工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務
這里提到了線程池的飽和策略,那我們就簡單介紹下有哪些飽和策略:
AbortPolicy
為Java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常,切記ThreadPoolExecutor.execute需要try catch,否則程序會直接退出。
DiscardPolicy
直接拋棄,任務不執行,空方法
DiscardOldestPolicy
從隊列里面拋棄head的一個任務,并再次execute 此task。
CallerRunsPolicy
在調用execute的線程里面執行此command,會阻塞入口
實現RejectedExecutionHandler,并自己定義策略模式
下我們以ThreadPoolExecutor為例展示下線程池的工作流程圖
1、如果當前運行的線程少于corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。
2、如果運行的線程等于或多于corePoolSize,則將任務加入BlockingQueue。
3、如果無法將任務加入BlockingQueue(隊列已滿),則在非corePool中創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。
4、如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,并調用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之后(當前運行的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要獲取全局鎖。
我們看看核心方法添加到線程池方法execute的源碼如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current {@code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // {@code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if {@code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻譯如下: // 判斷當前的線程數是否小于corePoolSize如果是,使用入參任務通過addWord方法創建一個新的線程, // 如果能完成新線程創建exexute方法結束,成功提交任務 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻譯如下: // 在第一步沒有完成任務提交;狀態為運行并且能否成功加入任務到工作隊列后,再進行一次check,如果狀態 // 在任務加入隊列后變為了非運行(有可能是在執行到這里線程池shutdown了),非運行狀態下當然是需要 // reject;然后再判斷當前線程數是否為0(有可能這個時候線程數變為了0),如是,新增一個線程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻譯如下: // 如果不能加入任務到工作隊列,將嘗試使用任務新增一個線程,如果失敗,則是線程池已經shutdown或者線程池 // 已經達到飽和狀態,所以reject這個他任務 // int c = ctl.get(); // 工作線程數小于核心線程數 if (workerCountOf(c)
下面我們繼續看看addWorker是如何實現的:
private boolean addWorker(Runnable firstTask, boolean core) { // java標簽 retry: // 死循環 for (;;) { int c = ctl.get(); // 獲取當前線程狀態 int rs = runStateOf(c); // Check if queue empty only if necessary. // 這個邏輯判斷有點繞可以改成 // rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty()) // 邏輯判斷成立可以分為以下幾種情況均不接受新任務 // 1、rs > shutdown:--不接受新任務 // 2、rs >= shutdown && firstTask != null:--不接受新任務 // 3、rs >= shutdown && workQueue.isEmppty:--不接受新任務 // 邏輯判斷不成立 // 1、rs==shutdown&&firstTask != null:此時不接受新任務,但是仍會執行隊列中的任務 // 2、rs==shotdown&&firstTask == null:會執行addWork(null,false) // 防止了SHUTDOWN狀態下沒有活動線程了,但是隊列里還有任務沒執行這種特殊情況。 // 添加一個null任務是因為SHUTDOWN狀態下,線程池不再接受新任務 if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())) return false; // 死循環 // 如果線程池狀態為RUNNING并且隊列中還有需要執行的任務 for (;;) { // 獲取線程池中線程數量 int wc = workerCountOf(c); // 如果超出容量或者最大線程池容量不在接受新任務 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 線程安全增加工作線程數 if (compareAndIncrementWorkerCount(c)) // 跳出retry break retry; c = ctl.get(); // Re-read ctl // 如果線程池狀態發生變化,重新循環 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 走到這里說明工作線程數增加成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // RUNNING狀態 || SHUTDONW狀態下清理隊列中剩余的任務 if (rs if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將新啟動的線程添加到線程池中 workers.add(w); // 更新線程池線程數且不超過最大值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 啟動新添加的線程,這個線程首先執行firstTask,然后不停的從隊列中取任務執行 if (workerAdded) { //執行ThreadPoolExecutor的runWoker方法 t.start(); workerStarted = true; } } } finally { // 線程啟動失敗,則從wokers中移除w并遞減wokerCount if (! workerStarted) // 遞減wokerCount會觸發tryTerminate方法 addWorkerFailed(w); } return workerStarted; }
addWorker之后是runWorker,第一次啟動會執行初始化傳進來的任務firstTask;然后會從workQueue中取任務執行,如果隊列為空則等待keepAliveTime這么長時間
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 允許中斷 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果getTask返回null那么getTask中會將workerCount遞減,如果異常了這個遞減操作會在processWorkerExit中處理 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
我們看下getTask是如何執行的
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // 死循環 retry: for (;;) { // 獲取線程池狀態 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 1.rs > SHUTDOWN 所以rs至少等于STOP,這時不再處理隊列中的任務 // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,這時還需要處理隊列中的任務除非隊列為空 // 這兩種情況都會返回null讓runWoker退出while循環也就是當前線程結束了,所以必須要decrement if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 遞減workerCount值 decrementWorkerCount(); return null; } // 標記從隊列中取任務時是否設置超時時間 boolean timed; // Are workers subject to culling? // 1.RUNING狀態 // 2.SHUTDOWN狀態,但隊列中還有任務需要執行 for (;;) { int wc = workerCountOf(c); // 1.core thread允許被超時,那么超過corePoolSize的的線程必定有超時 // 2.allowCoreThreadTimeOut == false && wc > // corePoolSize時,一般都是這種情況,core thread即使空閑也不會被回收,只要超過的線程才會 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 從addWorker可以看到一般wc不會大于maximumPoolSize,所以更關心后面半句的情形: // 1. timedOut == false 第一次執行循環, 從隊列中取出任務不為null方法返回 或者 // poll出異常了重試 // 2.timeOut == true && timed == // false:看后面的代碼workerQueue.poll超時時timeOut才為true, // 并且timed要為false,這兩個條件相悖不可能同時成立(既然有超時那么timed肯定為true) // 所以超時不會繼續執行而是return null結束線程。 if (wc break; // workerCount遞減,結束當前thread if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // 需要重新檢查線程池狀態,因為上述操作過程中線程池可能被SHUTDOWN if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { // 1.以指定的超時時間從隊列中取任務 // 2.core thread沒有超時 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;// 超時 } catch (InterruptedException retry) { timedOut = false;// 線程被中斷重試 } } }
下面我們看下processWorkerExit是如何工作的
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 正常的話再runWorker的getTask方法workerCount已經被減一了 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 累加線程的completedTasks completedTaskCount += w.completedTasks; // 從線程池中移除超時或者出現異常的線程 workers.remove(w); } finally { mainLock.unlock(); } // 嘗試停止線程池 tryTerminate(); int c = ctl.get(); // runState為RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { // 線程不是異常結束 if (!completedAbruptly) { // 線程池最小空閑數,允許core thread超時就是0,否則就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0但是隊列不為空要保證有1個線程來執行隊列中的任務 if (min == 0 && !workQueue.isEmpty()) min = 1; // 線程池還不為空那就不用擔心了 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.線程異常退出 // 2.線程池為空,但是隊列中還有任務沒執行,看addWoker方法對這種情況的處理 addWorker(null, false); } }
tryTerminate
processWorkerExit方法中會嘗試調用tryTerminate來終止線程池。這個方法在任何可能導致線程池終止的動作后執行:比如減少wokerCount或SHUTDOWN狀態下從隊列中移除任務。
final void tryTerminate() { for (;;) { int c = ctl.get(); // 以下狀態直接返回: // 1.線程池還處于RUNNING狀態 // 2.SHUTDOWN狀態但是任務隊列非空 // 3.runState >= TIDYING 線程池已經停止了或在停止了 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; // 只能是以下情形會繼續下面的邏輯:結束線程池。 // 1.SHUTDOWN狀態,這時不再接受新任務而且任務隊列也空了 // 2.STOP狀態,當調用了shutdownNow方法 // workerCount不為0則還不能停止線程池,而且這時線程都處于空閑等待的狀態 // 需要中斷讓線程“醒”過來,醒過來的線程才能繼續處理shutdown的信號。 if (workerCountOf(c) != 0) { // Eligible to terminate // runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷。 // ONLY_ONE:這里只需要中斷1個線程去處理shutdown信號就可以了。 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 進入TIDYING狀態 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 子類重載:一些資源清理工作 terminated(); } finally { // TERMINATED狀態 ctl.set(ctlOf(TERMINATED, 0)); // 繼續awaitTermination termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
shutdown這個方法會將runState置為SHUTDOWN,會終止所有空閑的線程。shutdownNow方法將runState置為STOP。和shutdown方法的區別,這個方法會終止所有的線程。主要區別在于shutdown調用的是interruptIdleWorkers這個方法,而shutdownNow實際調用的是Worker類的interruptIfStarted方法:
他們的實現如下:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 線程池狀態設為SHUTDOWN,如果已經至少是這個狀態那么則直接返回 advanceRunState(SHUTDOWN); // 注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進入processWorkerExit → // tryTerminate方法中會保證隊列中剩余的任務得到執行。 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // STOP狀態:不再接受新任務且不再執行隊列中的任務。 advanceRunState(STOP); // 中斷所有線程 interruptWorkers(); // 返回隊列中還沒有被執行的任務。 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.tryLock能獲取到鎖,說明該線程沒有在運行,因為runWorker中執行任務會先lock, // 因此保證了中斷的肯定是空閑的線程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; // 初始化時state == -1 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
我們可以通過ThreadPoolExecutor來創建一個線程池
/** * @param corePoolSize 線程池基本大小,核心線程池大小,活動線程小于corePoolSize則直接創建,大于等于則先加到workQueue中, * 隊列滿了才創建新的線程。當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程, * 等到需要執行的任務數大于線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads()方法, * 線程池會提前創建并啟動所有基本線程。 * @param maximumPoolSize 最大線程數,超過就reject;線程池允許創建的最大線程數。如果隊列滿了, * 并且已創建的線程數小于最大線程數,則線程池會再創建新的線程執行任務 * @param keepAliveTime * 線程池的工作線程空閑后,保持存活的時間。所以,如果任務很多,并且每個任務執行的時間比較短,可以調大時間,提高線程的利用率 * @param unit 線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、 * 毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒) * @param workQueue 工作隊列,線程池中的工作線程都是從這個工作隊列源源不斷的獲取任務進行執行 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { // threadFactory用于設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
可以使用兩個方法向線程池提交任務,分別為execute()和submit()方法。execute()方法用于提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute()方法輸入的任務是一個Runnable類的實例。
threadsPool.execute(new Runnable() { @Override public void run() { } });
submit()方法用于提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完。
Future future = executor.submit(harReturnValuetask); try { Object s = future.get(); }catch( InterruptedException e) { // 處理中斷異常 }catch( ExecutionException e) { // 處理無法執行任務異常 }finally { // 關閉線程池 executor.shutdown(); }
可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,并返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。
只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至于應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。
要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析。
1、任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
2、任務的優先級:高、中和低。
3、任務的執行時間:長、中和短。
4、任務的依賴性:是否依賴其他系統資源,如數據庫連接。
性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務線程并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐量將高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先執行
如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。執行時間不同的任務可以交給不同規模的線程池來處理,或者可以使用優先級隊列,讓執行時間短的任務先執行。依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那么線程數應該設置得越大,這樣才能更好地利用CPU。
建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。有時候我們系統里后臺任務線程池的隊列和線程池全滿了,不斷拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后臺任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞,任務積壓在線程池里。如果當時我們設置成無界隊列,那么線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后臺任務出現問題。當然,我們的系統所有的任務是用單獨的服務器部署的,我們使用不同規模的線程池完成不同類型的任務,但是出現這樣問題時也會影響到其他任務。
如果在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,可以根據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的時候可以使用以下屬性
通過擴展線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行后和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。
“Java線程池是怎么工作的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。