您好,登錄后才能下訂單哦!
這篇文章主要講解了“Java多線程編程基石ThreadPoolExecutor怎么使用”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Java多線程編程基石ThreadPoolExecutor怎么使用”吧!
線程創建和銷毀的開銷較大,每個線程都需要占用一定的內存和系統資源。如果頻繁地創建和銷毀線程,會導致系統的性能下降。
手動管理線程容易出現線程安全和資源競爭的問題,例如,多個線程同時訪問共享變量可能導致數據不一致或者死鎖等問題。
如果并發訪問的線程數量很大,可能會導致系統資源不足,例如,內存不足或者CPU過度使用等問題。
corePoolSize:核心線程池大小,即線程池中始終存在的線程數量,除非設置了allowCoreThreadTimeOut參數,默認情況下,即使空閑,核心線程也不會被回收。
maximumPoolSize:線程池的最大線程數,即可以同時執行的最大線程數量。
keepAliveTime:非核心線程的空閑存活時間,當非核心線程空閑時間超過這個時間,就會被回收。
unit:keepAliveTime的時間單位。
workQueue:任務隊列,用于存儲等待執行的任務,有多種實現方式,例如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。
threadFactory:用于創建新線程的工廠類,可以自定義線程名稱、線程優先級等屬性。
handler:線程池的拒絕策略,當線程池已經達到最大線程數,并且任務隊列已經滿了,新的任務將被拒絕執行,可以設置拒絕策略來處理這種情況。
CPU密集型任務:CPU密集型任務的特點是線程在執行任務時會一直利用CPU,對于這種情況要盡可能的避免發生線程上下文的切換。一般來說對于CPU密集型任務設置線程數為CPU核心數+1。
IO密集型任務:線程在執行IO密集型任務時,可能大部分時間都浪費在阻塞IO上了,所以對于IO密集型任務來說我們通常會設置線程數為CPU核心數*2。不過這樣子也不一定是最佳的,我們可以通過公式來進行計算:線程數 = CPU 核心數 *(1+平均等待時間/平均工作時間),盡可能的還要根據壓縮來進行調整。
public class CustomThreadPoolDemo { public static void main(String[] args) { // 創建線程池,大小為3,最大線程數為6,空閑線程存活時間為5秒,使用自定義線程工廠和拒絕策略 ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler()); // 提交10個任務 for (int i = 0; i < 10; i++) { executor.submit(new Task(i)); } // 關閉線程池 executor.shutdown(); } static class Task implements Runnable { private int taskId; public Task(int taskId) { this.taskId = taskId; } @Override public void run() { System.out.println("Task " + taskId + " is running in thread " + Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + taskId + " is done."); } } static class CustomThreadFactory implements java.util.concurrent.ThreadFactory { private int count = 1; @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("CustomThreadPool-" + count++); return t; } } static class CustomRejectedExecutionHandler implements java.util.concurrent.RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("Task " + ((Task) r).taskId + " is rejected."); } } }
該示例代碼使用ThreadPoolExecutor
類創建了一個大小為3,最大線程數為6,空閑線程存活時間為5秒的線程池,任務隊列的大小為10,使用了自定義的線程工廠和拒絕策略。然后提交了10個任務,每個任務輸出了當前線程的名稱,并休眠了3秒鐘。當程序執行時,可能會出現任務被拒絕執行的情況,拒絕策略會輸出任務被拒絕的信息。
ThreadPoolExecutor提供了兩種執行任務的方法:
Future<?> submit(Runnable task) void execute(Runnable command)
實際上submit中也是調用了execute方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
private final AtomicInteger ctl
線程池源碼中使用ctl通過高低位的方式來記錄線程池的狀態和當前線程池中的工作線程數量。
Integer占用4個字節也就是32位,線程池有5種狀態,要標識5種狀態需要3位
前三位 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
Integer.SIZE為32,所以COUNT_BITS為29,最終各個狀態對應的二級制為:
RUNNING:11100000 00000000 00000000 00000000
SHUTDOWN:00000000 00000000 00000000 00000000
STOP:00100000 00000000 00000000 00000000
TIDYING:01000000 00000000 00000000 00000000
TERMINATED:01100000 00000000 00000000 00000000
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //ctl初始值是ctlOf(RUNNING, 0),表示線程池處于運行中,工作線程數為0 int c = ctl.get(); //判斷工作線程是否小于核心線程數 if (workerCountOf(c) < corePoolSize) { //小于核心線程要新增工作線程 if (addWorker(command, true)) return; //新增失敗重新獲取一次ctl c = ctl.get(); } //線程池是否處于Running狀態 && 入隊是否成功 if (isRunning(c) && workQueue.offer(command)) {//入隊成功 //重新獲取ctl int recheck = ctl.get(); //如果線程池不是Running狀態就需要移除掉這個任務 if (! isRunning(recheck) && remove(command)) //觸發拒絕策略 reject(command); //工作線程為0時要去創建新的工作線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果線程池狀態不是RUNNING,或者線程池狀態是RUNNING但是隊列滿了,則去添加一個非核心工作線程。false表示非核心線程 else if (!addWorker(command, false)) reject(command); }
//core:true核心線程 false非核心線程 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取ctl值 int c = ctl.get(); //獲取高3位 int rs = runStateOf(c); // 線程池如果是SHUTDOWN狀態并且隊列非空則創建線程,如果隊列為空則不創建線程 // 線程池如果是STOP狀態則直接不創建線程 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取工作線程數 int wc = workerCountOf(c); //工作線程數超過規定數量則不創建線程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //修改工作線程 if (compareAndIncrementWorkerCount(c)) //成功則退出 retry這個循環 break retry; //CAS失敗說明有其他線程也在增加工作線程數量,此時重新獲取ctl值 c = ctl.get(); // Re-read ctl //如果發現線程池的狀態發生了變化,則繼續回到retry,重新判斷線程池的狀態是不是SHUTDOWN或STOP // 如果狀態沒有變化,則繼續利用cas來增加工作線程數,直到cas成功 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //到了這里說明ctl新增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //Worker實現了Runnable接口 在構造一個Worker對象時,就會利用ThreadFactory新建一個線程 w = new Worker(firstTask); //拿出線程對象此時線程還沒有start啟動 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 獲取高三位 int rs = runStateOf(ctl.get()); // 如果線程池的狀態是RUNNING // 或者線程池的狀態變成了SHUTDOWN,但是當前線程沒有自己的第一個任務,那就表示當前調用addWorker方法是為了從隊列中獲取任務來執行 // 正常情況下線程池的狀態如果是SHUTDOWN,是不能創建新的工作線程的,但是隊列中如果有任務,那就是上面說的特例情況 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果Worker對象對應的線程已經在運行了,那就有問題,直接拋異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers用來記錄當前線程池中工作線程,調用線程池的shutdown方法時會遍歷worker對象中斷對應線程 workers.add(w); int s = workers.size(); // largestPoolSize用來跟蹤線程池在運行過程中工作線程數的峰值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //啟動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 在上述過程中如果拋了異常,需要從works中移除所添加的work,并且還要修改ctl,工作線程數-1,表示新建工作線程失敗 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorker核心邏輯:
先判斷工作線程數是否超過了限制
修改ctl,使得工作線程數+1
構造Work對象,并把它添加到workers集合中
啟動Work對象對應的工作線程
剛剛有說到Worker實現了Runnable接口,看看他重寫的Run方法中執行過什么
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
final void runWorker(Worker w) { //獲取當前工作線程 Thread wt = Thread.currentThread(); //獲取第一個任務 Runnable task = w.firstTask; //置空 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //判斷當前第一個任務是否為空,為空的話從阻塞隊列獲取一個任務,阻塞隊列也為空就會阻塞在getTask()方法中 //也不會一直阻塞下去,keepAliveTime超時后還沒有獲取到任務就會返回null,退出循環,這個線程也就是中止了 while (task != null || (task = getTask()) != null) { w.lock(); //線程池狀態為STOP,則要中斷自己,但是如果發現中斷標記為true,那是不對的,因為線程池狀態不是STOP,工作線程仍然是要正常工作的,不能中斷掉,算是SHUTDOWN,也要等任務都執行完之后,線程才結束,而目前線程還在執行任務的過程中,不能中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //空方法給自定義線程池實現 beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //空方法給自定義線程池實現 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } //正常退出了while循環 // completedAbruptly=false,表示線程正常退出 completedAbruptly = false; } finally { //如果線程正常退出這個線程會自然死亡 //但是如果是由于執行任務的時候拋了異常,那么這個線程不應該直接結束,而應該繼續從隊列中獲取下一個任務 processWorkerExit(w, completedAbruptly); } }
private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果completedAbruptly為true,表示是執行任務的時候拋了異常,那就修改ctl,工作線程數-1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 將當前Work對象從workers中移除 workers.remove(w); } finally { mainLock.unlock(); } // 因為當前是處理線程退出流程中,所以要嘗試去修改線程池的狀態為TINDYING tryTerminate(); //獲取當前ctl值 int c = ctl.get(); // 如果線程池的狀態為RUNNING或者SHUTDOWN,則可能要替補一個線程 if (runStateLessThan(c, STOP)) { // completedAbruptly為false,表示線程是正常要退出了,則看是否需要保留線程 if (!completedAbruptly) { // 如果allowCoreThreadTimeOut為true,但是阻塞隊列中還有任務,那就至少得保留一個工作線程來處理阻塞隊列中的任務 // 如果allowCoreThreadTimeOut為false,那min就是corePoolSize,表示至少得保留corePoolSize個工作線程活著 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果當前工作線程數大于等于min,則表示符合所需要保留的最小線程數,那就直接return,不會調用下面的addWorker方法新開一個工作線程了 if (workerCountOf(c) >= min) return; // replacement not needed } //新開工作線程 addWorker(null, false); } }
某個工作線程正常情況下會不停的循環從阻塞隊列中獲取任務來執行,正常情況下就是通過阻塞來保證線程永遠活著,但是會有一些特殊情況:
如果線程被中斷了,那就會退出循環,然后做一些善后處理,比如ctl中的工作線程數-1,然后自己運行結束
如果線程阻塞超時了,那也會退出循環,此時就需要判斷線程池中的當前工作線程夠不夠,比如是否有corePoolSize個工作線程,如果不夠就需要新開一個線程,然后當前線程自己運行結束,這種看上去效率比較低,但是也沒辦法,當然如果當前工作線程數足夠,那就正常,自己正常的運行結束即可
如果線程是在執行任務的時候拋了移除,從而退出循環,那就直接新開一個線程作為替補,當然前提是線程池的狀態是RUNNING
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果線程池狀態是STOP,表示當前線程不需要處理任務了,那就修改ctl工作線程數-1 // 如果線程池狀態是SHUTDOWN,但是阻塞隊列中為空,表示當前任務沒有任務要處理了,那就修改ctl工作線程數-1 // return null表示當前線程無需處理任務,線程退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //當前工作線程數 int wc = workerCountOf(c); // 用來判斷當前線程是無限阻塞還是超時阻塞,如果一個線程超時阻塞,那么一旦超時了,那么這個線程最終就會退出 // 如果是無限阻塞,那除非被中斷了,不然這個線程就一直等著獲取隊列中的任務 // allowCoreThreadTimeOut為true,表示線程池中的所有線程都可以被回收掉,則當前線程應該直接使用超時阻塞,一旦超時就回收 // allowCoreThreadTimeOut為false,則要看當前工作線程數是否超過了corePoolSize,如果超過了,則表示超過部分的線程要用超時阻塞,一旦超時就回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果工作線程數超過了工作線程的最大限制或者線程超時了,則要修改ctl,工作線程數減1,并且return null // return null就會導致外層的while循環退出,從而導致線程直接運行結束 // 直播課程里會細講timed && timedOut if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 要么超時阻塞,要么無限阻塞 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 表示沒有超時,在阻塞期間獲取到了任務 if (r != null) return r; // 超時了,重新進入循環,上面的代碼會判斷出來當前線程阻塞超時了,最后return null,線程會運行結束 timedOut = true; } catch (InterruptedException retry) { // 如果線程池的狀態變成了STOP或者SHUTDOWN,最終也會return null,線程會運行結束 // 但是如果線程池的狀態仍然是RUNNING,那當前線程會繼續從隊列中去獲取任務,表示忽略了本次中斷 // 只有通過調用線程池的shutdown方法或shutdownNow方法才能真正中斷線程池中的線程 timedOut = false; } } }
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改ctl,將線程池狀態改為SHUTDOWN advanceRunState(SHUTDOWN); // 中斷工作線程 interruptIdleWorkers(); // 空方法,給子類擴展使用 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍歷所有正在工作的線程,要么在執行任務,要么在阻塞等待任務 for (Worker w : workers) { Thread t = w.thread; // 如果線程沒有被中斷,并且能夠拿到鎖,就中斷線程 // Worker在執行任務時會先加鎖,執行完任務之后會釋放鎖 // 所以只要這里拿到了鎖,就表示線程空出來了,可以中斷了 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
感謝各位的閱讀,以上就是“Java多線程編程基石ThreadPoolExecutor怎么使用”的內容了,經過本文的學習后,相信大家對Java多線程編程基石ThreadPoolExecutor怎么使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。