您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何徹底搞懂jdk8線程池,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
頂層設計,定義執行接口
Interface Executor(){ void execute(Runnable command); }
ExecutorService,定義控制接口
interface ExecutorService extends Executor{ }
抽象實現ExecutorService中的大部分方法
abstract class AbstractExecutorService implements ExecutorService{ //此處把ExecutorService中的提交方法都實現了 }
我們看下提交中的核心
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // ① //核心線程數沒有滿就繼續添加核心線程 if (addWorker(command, true)) // ② return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // ③ int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))// ④ reject(command); //⑦ else if (workerCountOf(recheck) == 0) // ⑤ //如果worker為0,則添加一個非核心worker,所以線程池里至少有一個線程 addWorker(null, false);// ⑥ } //隊列滿了以后,添加非核心線程 else if (!addWorker(command, false))// ⑧ reject(command);//⑦ }
1,什么時候用核心線程,什么時候啟用非核心線程?
添加任務時優先使用核心線程,核心線程滿了以后,任務放入隊列中。只要隊列不填滿,就一直使用核心線程執行任務(代碼①②)。
當隊列滿了以后開始使用增加非核心線程來執行隊列中的任務(代碼⑧)。
2,0個核心線程,2個非核心線程,隊列100,添加99個任務是否會執行?
會執行,添加隊列成功后,如果worker的數量為0,會添加非核心線程執行任務(見代碼⑤⑥)
3,隊列滿了會怎么樣?
隊列滿了,會優先啟用非核心線程執行任務,如果非核心線程也滿了,那就執行拒絕策略。
4,submit 和execute的區別是?
submit將執行任務包裝成了RunnableFuture,最終返回了Future,executor 方法執行無返回值。
addworker實現
ThreadPoolExecutor extends AbstractExecutorService{ //保存所有的執行線程(worker) HashSet<Worker> workers = new HashSet<Worker>(); //存放待執行的任務,這塊具體由指定的隊列實現 BlockingQueue<Runnable> workQueue; public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ } //添加執行worker private boolean addWorker(Runnable firstTask, boolean core) { //這里每次都會基礎校驗和cas校驗,防止并發無法創建線程, retry: for(;;){ for(;;){ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } try{ //創建一個worker w = new Worker(firstTask); final Thread t = w.thread; try{ //加鎖校驗,添加到workers集合中 workers.add(w); } //添加成功,將對應的線程啟動,執行任務 t.start(); }finally{ //失敗執行進行釋放資源 addWorkerFailed(Worker w) } } //Worker 是對任務和線程的封裝 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ //線程啟動后會循環執行任務 public void run() { runWorker(this); } } //循環執行 final void runWorker(Worker w) { try{ while (task != null || (task = getTask()) != null) { //執行前的可擴展點 beforeExecute(wt, task); try{ //執行任務 task.run(); }finally{ //執行后的可擴展點,這塊也把異常給吃了 afterExecute(task, thrown); } } //這里會對執行的任務進行統計 }finally{ //異常或者是循環退出都會走這里 processWorkerExit(w, completedAbruptly); } } //獲取執行任務,此處決定runWorker的狀態 private Runnable getTask() { //worker的淘汰策略:允許超時或者工作線程>核心線程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //滿足淘汰策略且...,就返回null,交由processWorkerExit去處理線程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } // 滿足淘汰策略,就等一定的時間poll(),不滿足,就一直等待take() Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); } //處理任務退出(循環獲取不到任務的時候) private void processWorkerExit(Worker w, boolean completedAbruptly) { //異常退出的,不能調整線程數的 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); //不管成功或失敗,都執行以下邏輯 //1,計數,2,減去一個線程 completedTaskCount += w.completedTasks; //將線程移除,并不關心是否非核心 workers.remove(w); //如果是還是運行狀態 if (!completedAbruptly) { //正常終止的,處理邏輯 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //核心線程為0 ,最小值也是1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //總線程數大于min就不再添加 if (workerCountOf(c) >= min) return; // replacement not needed } //異常退出一定還會添加worker,正常退出一般不會再添加線程,除非核心線程數為0 addWorker(null, false); } }
這里涉及到幾個點:
1,任務異常以后雖然有throw異常,但是外面有好幾個finally代碼;
2,在finally中,進行了任務的統計以及worker移除;
3,如果還有等待處理的任務,最少添加一個worker(不管核心線程數是否為0)
1, 線程池中核心線程數如何設置?
cpu密集型:一般為核心線程數+1,盡可能減少cpu的并行;
IO密集型:可以設置核心線程數稍微多些,將IO等待期間的空閑cpu充分利用起來。
2,線程池使用隊列的意義?
a)線程的資源是有限的,且線程的創建成本比較高;
b) 要保證cpu資源的合理利用(不能直接給cpu提一堆任務,cpu處理不過來,大家都慢了)
c) 利用了削峰填谷的思想(保證任務執行的可用性);
d) 隊列過大也會把內存撐爆。
3,為什么要用阻塞隊列?而不是非阻塞隊列?
a) 利用阻塞的特性,在沒有任務時阻塞一定的時間,防止資源被釋放(getTask和processWorkExit);
b) 阻塞隊列在阻塞時,CPU狀態是wait,等有任務時,會被喚醒,不會占用太多的資源;
線程池有兩個地方:
1,在execute方法中(提交任務時),只要工作線程為0,就至少添加一個Worker;
2,在processWorkerExit中(正常或異常結束時),只要有待處理的任務,就會增加Worker
所以正常情況下線程池一定會保證所有任務的執行。
我們在看下ThreadPoolExecutor中以下幾個方法
public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
確保了核心線程數必須是滿的,這些方法特別是在批處理的時候,或者動態調整核心線程數的大小時很有用。
一、newFixedThreadPool 與newSingleThreadExecutor
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
特點:
1,核心線程數和最大線程數大小一樣(唯一不同的是,一個是1,一個是自定義);
2,隊列用的是LinkedBlockingQueue(長度是Integer.Max_VALUE)
當任務的生產速度大于消費速度后,很容易將系統內存撐爆。
二、 newCachedThreadPool 和
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
特點:最大線程數為Integer.MAX_VALUE
當任務提交過多時,線程創建過多容易導致無法創建
三、 newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
這個主要是并行度,默認為cpu的核數。
四、newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
封裝起來的要么最大線程數不可控,要么隊列長度不可控,所以阿里規約里也不建議使用Executors方法創建線程池。
ps:
生產上使用線程池,最好是將關鍵任務和非關鍵任務分開設立線程池,非關鍵業務影響關鍵業務的執行。
關于如何徹底搞懂jdk8線程池就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。