您好,登錄后才能下訂單哦!
詳解Java線程池和Executor原理的分析
線程池作用與基本知識
在開始之前,我們先來討論下“線程池”這個概念。“線程池”,顧名思義就是一個線程緩存。它是一個或者多個線程的集合,用戶可以把需要執行的任務簡單地扔給線程池,而不用過多的糾結與執行的細節。那么線程池有哪些作用?或者說與直接用Thread相比,有什么優勢?我簡單總結了以下幾點:
減小線程創建和銷毀帶來的消耗
對于Java Thread的實現,我在前面的一篇blog中進行了分析。Java Thread與內核線程是1:1(Linux)的,再加上Thread在Java層與C++層都有不少成員數據,所以Java Thread其實是比較重的。創建和銷毀一個Java Thread需要OS和JVM都做不少工作,因此如果將Java Thread緩存起來,可以實現一定的效率提升。
更加方便和透明的實現計算資源控制
討論這一條,可能需要舉一些例子。以非常聞名的web服務器Nginx為例,Nginx以強大的并發能力和低資源消耗而著稱。Nginx為了實現這些嚴格的要求,它嚴格地限定了工作線程的數目(worker線程一般等于CPU數目)。這種設計的著眼點就是降低線程切換帶來的性能損失,這條優化方式對Java同樣適用。倘若,每來一個任務就新建一個Thread來運算,那最終的結果就是程序資源難以控制(某個功能把CPU跑滿了),而且整體的執行速度也比較慢。 而Java線程池提供了FixedThreadPool,你可以使用它實現線程最大數目的控制。
上面說了這么多的“廢話”,還是來結合Java線程池的實現來分析一下吧!Java的線程池有一下幾種實現:
cached ThreadPool
緩存線程池的特點是它會緩存之前的線程,新提交的任務可以運行在緩存的線程中,即實現了前文所述的第一個優勢。
fixed ThreadPool
cachedThreadPool的一個特點是——新提交的任務沒有空閑線程可以執行了,就會創建一個新的線程。而fixedThreadPool不會這樣,它會將任務保存起來,等到有空閑線程再執行。即實現了前文所述的第二個優勢。
scheduled ThreadPool
scheduled ThreadPool的特點是可以實現任務的調度,比如任務的延遲執行和周期執行。
出了上面三種,Java還實現了newWorkStealingPool,這個是基于Fork/Join框架的。目前我還沒研究這個,所以就先不管它了。Java的并發支持中,使用了Executor來包裝各種線程池,“執行器”這個名稱其實挺貼切的,線程池可不就是個執行器嘛!
1.cached ThreadPool、fixed ThreadPool的實現
從前文的描述就可以看出,這兩種線程池非常類似。的確是這樣,事實上它們是同時實現的,不行我們來看實際例子:
ThreadPoolExecutor executor1 = (ThreadPoolExecutor)Executors.newCachedThreadPool();
ThreadPoolExecutor executor2 = (ThreadPoolExecutor)Executors.newFixedThreadPool(4);
這是兩種線程池的新建方法,看起來很像吧!如果你不這么認為,我只能讓你看看真相了。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
是的,它們調用了同一個構造函數,只是參數略有不同。那么我們來看看這些參數的含義,以及兩組參數的區別。首先還是需要貼一下ThreadPoolExecutor的構造函數了。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
為了看起來清爽,再一層的構造函數我就不貼了,而且那個構造函數也只是簡單的賦值而已。這里的函數原型已經能給我們很多很多信息了,不得不說JDK的代碼命名確實好,簡直就像注釋一樣。
maximumPoolSize就是線程池的最大線程數;對于cached ThreadPool來說,這個值是Integer.MAX_VALUE,基本相當于無窮大了,什么樣的機器能跑幾十億線程!!對于fixed ThreadPool來講,這個值就是用戶設定的線程池的數目。
keepAliveTime和unit決定了線程的緩存過期時間;對于cached ThreadPool來講,線程的緩存過期時間是一分鐘,換言之,一個工作線程如果一分鐘都無事可干,就把它撤銷掉以節省資源。fixed ThreadPool傳入的時間是0,這里的含義是fixed ThreadPool中的工作線程是永遠不過期的。
corePoolSize是線程池的最小線程數;對于cached ThreadPool,這個值為0,因為在完全沒有任務的情況下,cached ThreadPool的確會成為“光桿司令”。至于fixed ThreadPool,這個fixed已經表明corePoolSize是等于線程總數的。
接下來,我們根據一個簡單的使用例子,來看看一下cached ThreadPool的流程。
public class Task implements Callable<String> { private String name; public Task(String name) { this.name = name; } @Override public String call() throws Exception { System.out.printf("%s: Starting at : %s\n", this.name, new Date()); return "hello, world"; } public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool(); Task task = new Task("test"); Future<String> result = executor.submit(task); try { System.out.printf("%s\n", result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } executor.shutdown(); System.out.printf("Main ends at : %s\n", new Date()); } }
首先,來看看executor.submit(task),這其實調用了ThreadPoolExecutor.execute(Runnable command)方法,這個方法的代碼如下,整段代碼的邏輯是這樣的。首先檢查線程池的線程數是否不夠corePoolSize,如果不夠就直接新建線程并把command添加進去;如果線程數已經夠了或者添加失敗(多個線程增加添加的情況),就嘗試把command添加到隊列中(workQueue.offer(command)),如果添加失敗了,就reject掉cmd。大體的邏輯是這樣的,這段代碼有很多基于線程安全的設計,這里為了不跑題,就先忽略細節了。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
到這里,看起來線程池實現的整體思路其實也沒多么復雜。但是還有一個問題——一個普通的Thread在執行完自己的run方法后會自動退出。那么線程池是如何實現Worker線程不斷的干活,甚至在沒有任務的時候。其實答案很簡單,就是Worker其實在跑大循環,Worker實際運行方法如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); /***/ try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /***/ } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
關鍵就在這個while的判斷條件,對于需要cached線程的情況下,getTask()會阻塞起來,如果緩存的時間過期,就會返回一個null,然后Worker就退出了,也就結束了它的服役周期。而在有任務的情況下,Woker會把task拿出來,然后調用task.run()執行任務,并通過Future通知客戶線程(即future.get()返回)。這樣一個簡單的線程池使用過程就完了。。。
當然,線程池的很多精髓知識——基于線程安全的設計,我都沒有分析。有興趣可以自己分析一下,也可以和我討論。此外Scheduled ThreadPool這里也沒有分析,它的要點其實是調度,主要是根據時間最小堆來驅動的。
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持,如有疑問請留言,或者到本站社區交流,大家共同進步!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。