您好,登錄后才能下訂單哦!
1、線程池簡介:
多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。
假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。
如果:T1 + T3 遠大于 T2,則可以采用線程池,以提高服務器性能。
一個線程池包括以下四個基本組成部分:
1、線程池管理器(ThreadPool):用于創建并管理線程池,包括 創建線程池,銷毀線程池,添加新任務;
2、工作線程(PoolWorker):線程池中線程,在沒有任務時處于等待狀態,可以循環的執行任務;
3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等;
4、任務隊列(taskQueue):用于存放沒有處理的任務。提供一種緩沖機制。
線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。
線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:
假設一個服務器一天要處理50000個請求,并且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目,而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小于50000。所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。
代碼實現中并沒有實現任務接口,而是把Runnable對象加入到線程池管理器(ThreadPool),然后剩下的事情就由線程池管理器(ThreadPool)來完成了
復制代碼
復制代碼
package mine.util.thread;
import java.util.LinkedList;
import java.util.List;
/**
* 線程池類,線程管理器:創建線程,執行任務,銷毀線程,獲取線程基本信息
*/
public final class ThreadPool {
// 線程池中默認線程的個數為5
private static int worker_num = 5;
// 工作線程
private WorkThread[] workThrads;
// 未處理的任務
private static volatile int finished_task = 0;
// 任務隊列,作為一個緩沖,List線程不安全
private List<Runnable> taskQueue = new LinkedList<Runnable>();
private static ThreadPool threadPool;
// 創建具有默認線程個數的線程池
private ThreadPool() {
this(5);
}
// 創建線程池,worker_num為線程池中工作線程的個數
private ThreadPool(int worker_num) {
ThreadPool.worker_num = worker_num;
workThrads = new WorkThread[worker_num];
for (int i = 0; i < worker_num; i++) {
workThrads[i] = new WorkThread();
workThrads[i].start();// 開啟線程池中的線程
}
}
// 單態模式,獲得一個默認線程個數的線程池
public static ThreadPool getThreadPool() {
return getThreadPool(ThreadPool.worker_num);
}
// 單態模式,獲得一個指定線程個數的線程池,worker_num(>0)為線程池中工作線程的個數
// worker_num<=0創建默認的工作線程個數
public static ThreadPool getThreadPool(int worker_num1) {
if (worker_num1 <= 0)
worker_num1 = ThreadPool.worker_num;
if (threadPool == null)
threadPool = new ThreadPool(worker_num1);
return threadPool;
}
// 執行任務,其實只是把任務加入任務隊列,什么時候執行有線程池管理器覺定
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task);
taskQueue.notify();
}
}
// 批量執行任務,其實只是把任務加入任務隊列,什么時候執行有線程池管理器覺定
public void execute(Runnable[] task) {
synchronized (taskQueue) {
for (Runnable t : task)
taskQueue.add(t);
taskQueue.notify();
}
}
// 批量執行任務,其實只是把任務加入任務隊列,什么時候執行有線程池管理器覺定
public void execute(List<Runnable> task) {
synchronized (taskQueue) {
for (Runnable t : task)
taskQueue.add(t);
taskQueue.notify();
}
}
// 銷毀線程池,該方法保證在所有任務都完成的情況下才銷毀所有線程,否則等待任務完成才銷毀
public void destroy() {
while (!taskQueue.isEmpty()) {// 如果還有任務沒執行完成,就先睡會吧
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 工作線程停止工作,且置為null
for (int i = 0; i < worker_num; i++) {
workThrads[i].stopWorker();
workThrads[i] = null;
}
threadPool=null;
taskQueue.clear();// 清空任務隊列
}
// 返回工作線程的個數
public int getWorkThreadNumber() {
return worker_num;
}
// 返回已完成任務的個數,這里的已完成是只出了任務隊列的任務個數,可能該任務并沒有實際執行完成
public int getFinishedTasknumber() {
return finished_task;
}
// 返回任務隊列的長度,即還沒處理的任務個數
public int getWaitTasknumber() {
return taskQueue.size();
}
// 覆蓋toString方法,返回線程池信息:工作線程個數和已完成任務個數
@Override
public String toString() {
return "WorkThread number:" + worker_num + " finished task number:"
+ finished_task + " wait task number:" + getWaitTasknumber();
}
/**
* 內部類,工作線程
*/
private class WorkThread extends Thread {
// 該工作線程是否有效,用于結束該工作線程
private boolean isRunning = true;
/*
* 關鍵所在啊,如果任務隊列不空,則取出任務執行,若任務隊列空,則等待
*/
@Override
public void run() {
Runnable r = null;
while (isRunning) {// 注意,若線程無效則自然結束run方法,該線程就沒用了
synchronized (taskQueue) {
while (isRunning && taskQueue.isEmpty()) {// 隊列為空
try {
taskQueue.wait(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!taskQueue.isEmpty())
r = taskQueue.remove(0);// 取出任務
}
if (r != null) {
r.run();// 執行任務
}
finished_task++;
r = null;
}
}
// 停止工作,讓該線程自然執行完run方法,自然結束
public void stopWorker() {
isRunning = false;
}
}
}
復制代碼
復制代碼
復制代碼
復制代碼
package mine.util.thread;
//測試線程池
public class TestThreadPool {
public static void main(String[] args) {
// 創建3個線程的線程池
ThreadPool t = ThreadPool.getThreadPool(3);
t.execute(new Runnable[] { new Task(), new Task(), new Task() });
t.execute(new Runnable[] { new Task(), new Task(), new Task() });
System.out.println(t);
t.destroy();// 所有線程都執行完成才destory
System.out.println(t);
}
// 任務類
static class Task implements Runnable {
private static volatile int i = 1;
@Override
public void run() {// 執行任務
System.out.println("任務 " + (i++) + " 完成");
}
}
}
復制代碼
復制代碼
運行結果:
WorkThread number:3 finished task number:0 wait task number:6
任務 1 完成
任務 2 完成
任務 3 完成
任務 4 完成
任務 5 完成
任務 6 完成
WorkThread number:3 finished task number:6 wait task number:0
分析:由于并沒有任務接口,傳入的可以是自定義的任何任務,所以線程池并不能準確的判斷該任務是否真正的已經完成(真正完成該任務是這個任務的run方法執行完畢),只能知道該任務已經出了任務隊列,正在執行或者已經完成。
2、Java類庫中提供的線程池簡介:
java提供的線程池更加強大,相信理解線程池的工作原理,看類庫中的線程池就不會感到陌生了。
文章2:
Java線程池使用說明
一簡介
線程的使用在java中占有極其重要的地位,在jdk1.4極其之前的jdk版本中,關于線程池的使用是極其簡陋的。在jdk1.5之后這一情況有了很大的改觀。Jdk1.5之后加入了java.util.concurrent包,這個包中主要介紹java中線程以及線程池的使用。為我們在開發中處理線程的問題提供了非常大的幫助。
二:線程池
線程池的作用:
線程池作用就是限制系統中執行線程的數量。
根據系統的環境情況,可以自動或手動設置線程數量,達到運行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用線程池控制線程數量,其他線程排隊等候。一個任務執行完畢,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待進程,線程池的這一資源處于等待。當一個新任務需要運行時,如果線程池中有等待的工作線程,就可以開始運行了;否則進入等待隊列。
為什么要用線程池:
1.減少了創建和銷毀線程的次數,每個工作線程都可以被重復利用,可執行多個任務。
2.可以根據系統的承受能力,調整線程池中工作線線程的數目,防止因為消耗過多的內存,而把服務器累趴下(每個線程需要大約1MB內存,線程開的越多,消耗的內存也就越大,最后死機)。
Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor并不是一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。
比較重要的幾個類:
ExecutorService
真正的線程池接口。
ScheduledExecutorService
能和Timer/TimerTask類似,解決那些需要任務重復執行的問題。
ThreadPoolExecutor
ExecutorService的默認實現。
ScheduledThreadPoolExecutor
繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,周期性任務調度的類實現。
要配置一個線程池是比較復雜的,尤其是對于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池。
創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
2.newFixedThreadPool
創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,
那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說JVM)能夠創建的最大線程大小。
4.newScheduledThreadPool
創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。
實例
1:newSingleThreadExecutor
MyThread.java
publicclassMyThread extends Thread {
@Override
publicvoid run() {
System.out.println(Thread.currentThread().getName() + "正在執行。。。");
}
}
TestSingleThreadExecutor.java
publicclassTestSingleThreadExecutor {
publicstaticvoid main(String[] args) {
//創建一個可重用固定線程數的線程池
ExecutorService pool = Executors. newSingleThreadExecutor();
//創建實現了Runnable接口對象,Thread對象當然也實現了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將線程放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉線程池
pool.shutdown();
}
}
輸出結果
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
2newFixedThreadPool
TestFixedThreadPool.Java
publicclass TestFixedThreadPool {
publicstaticvoid main(String[] args) {
//創建一個可重用固定線程數的線程池
ExecutorService pool = Executors.newFixedThreadPool(2);
//創建實現了Runnable接口對象,Thread對象當然也實現了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將線程放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉線程池
pool.shutdown();
}
}
輸出結果
pool-1-thread-1正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-1正在執行。。。
3 newCachedThreadPool
TestCachedThreadPool.java
publicclass TestCachedThreadPool {
publicstaticvoid main(String[] args) {
//創建一個可重用固定線程數的線程池
ExecutorService pool = Executors.newCachedThreadPool();
//創建實現了Runnable接口對象,Thread對象當然也實現了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//將線程放入池中進行執行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//關閉線程池
pool.shutdown();
}
}
輸出結果:
pool-1-thread-2正在執行。。。
pool-1-thread-4正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-5正在執行。。。
4newScheduledThreadPool
TestScheduledThreadPoolExecutor.java
publicclass TestScheduledThreadPoolExecutor {
publicstaticvoid main(String[] args) {
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段時間就觸發異常
@Override
publicvoid run() {
//throw new RuntimeException();
System.out.println("================");
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段時間打印系統時間,證明兩者是互不影響的
@Override
publicvoid run() {
System.out.println(System.nanoTime());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}
}
輸出結果
================
8384644549516
8386643829034
8388643830710
================
8390643851383
8392643879319
8400643939383
三:ThreadPoolExecutor詳解
ThreadPoolExecutor的完整構造方法的簽名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .
corePoolSize - 池中所保存的線程數,包括空閑線程。
maximumPoolSize-池中允許的最大線程數。
keepAliveTime - 當線程數大于核心時,此為終止前多余的空閑線程等待新任務的最長時間。
unit - keepAliveTime 參數的時間單位。
workQueue - 執行前用于保持任務的隊列。此隊列僅保持由 execute方法提交的 Runnable任務。
threadFactory - 執行程序創建新線程時使用的工廠。
handler - 由于超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序。
ThreadPoolExecutor是Executors類的底層實現。
在JDK幫助文檔中,有如此一段話:
“強烈建議程序員使用較為方便的Executors工廠方法Executors.newCachedThreadPool()(***線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個后臺線程)
它們均為大多數使用場景預定義了設置。”
下面介紹一下幾個類的源碼:
ExecutorService newFixedThreadPool (int nThreads):固定大小線程池。
可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實際上,后面會介紹,如果使用***queue的話maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表名什么?-就是該實現不想keep alive!最后的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是***的。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ExecutorService newSingleThreadExecutor():單線程
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
ExecutorService newCachedThreadPool():***線程池,可以進行自動線程回收
這個實現就有意思了。首先是***的線程池,所以我們可以發現maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對于該BlockingQueue有些陌生,簡單說:該QUEUE中,每個插入操作必須等待另一個線程的對應移除操作。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
先從BlockingQueue<Runnable> workQueue這個入參開始說起。在JDK中,其實已經說得很清楚了,一共有三種類型的queue。
所有BlockingQueue 都可用于傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:
如果運行的線程少于 corePoolSize,則 Executor始終首選添加新的線程,而不進行排隊。(如果當前運行的線程小于corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄家伙(thread)開始運行)
如果運行的線程等于或多于 corePoolSize,則 Executor始終首選將請求加入隊列,而不添加新的線程。
如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
queue上的三種類型。
排隊有三種通用策略:
直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求*** maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許***線程具有增長的可能性。
***隊列。使用***隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立于其他任務,即任務執行互不影響時,適合于使用***隊列;例如,在 Web頁服務器中。這種排隊可用于處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許***線程具有增長的可能性。
有界隊列。當使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
BlockingQueue的選擇。
例子一:使用直接提交策略,也即SynchronousQueue。
首先SynchronousQueue是***的,也就是說他存數任務的能力是沒有限制的,但是由于該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續添加。在這里不是核心線程便是新創建的線程,但是我們試想一樣下,下面的場景。
我們使用一下參數構造ThreadPoolExecutor:
new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
當核心線程已經有2個正在運行.
此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等于或多于 corePoolSize,則Executor始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。
又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由于使用的SynchronousQueue,所以一定無法加入進去。
此時便滿足了上面提到的“如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個線程來運行這個任務。
暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,后一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以只好執行異常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是***的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界隊列)。對于使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。
什么意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那么先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1么有被執行前,A2不可能添加入queue中。
例子二:使用***隊列策略,即LinkedBlockingQueue
這個就拿newFixedThreadPool來說,根據前文提到的規則:
如果運行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。那么當任務繼續增加,會發生什么呢?
如果運行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。OK,此時任務變加入隊列之中了,那什么時候才會添加新線程呢?
如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。這里就很有意思了,可能會出現無法加入隊列嗎?不像SynchronousQueue那樣有其自身的特點,對于***隊列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。所以要防止任務瘋長,比如任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,而且還不斷增加,不一會兒就爆了。
例子三:有界隊列,使用ArrayBlockingQueue。
這個是最為復雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。
舉例來說,請看如下構造方法:
new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
假設,所有的任務都永遠無法執行完。
對于首先來的A,B來說直接運行,接下來,如果來了C,D,他們會被放到queue中,如果接下來再來E,F,則增加線程運行E,F。但是如果再來任務,隊列無法再接受了,線程數也到達最大的限制了,所以就會使用拒絕策略來處理。
keepAliveTime
jdk中的解釋是:當線程數大于核心時,此為終止前多余的空閑線程等待新任務的最長時間。
有點拗口,其實這個不難理解,在使用了“池”的應用中,大多都有類似的參數需要配置。比如數據庫連接池,DBCP中的maxIdle,minIdle參數。
什么意思?接著上面的解釋,后來向老板派來的工人始終是“借來的”,俗話說“有借就有還”,但這里的問題就是什么時候還了,如果借來的工人剛完成一個任務就還回去,后來發現任務還有,那豈不是又要去借?這一來一往,老板肯定頭也大死了。
合理的策略:既然借了,那就多借一會兒。直到“某一段”時間后,發現再也用不到這些工人時,便可以還回去了。這里的某一段時間便是keepAliveTime的含義,TimeUnit為keepAliveTime值的度量。
RejectedExecutionHandler
另一種情況便是,即使向老板借了工人,但是任務還是繼續過來,還是忙不過來,這時整個隊伍只好拒絕接受了。
RejectedExecutionHandler接口提供了對于拒絕任務的處理的自定方法的機會。在ThreadPoolExecutor中已經默認包含了4中策略,因為源碼非常簡單,這里直接貼出來。
CallerRunsPolicy:線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
這個策略顯然不想放棄執行任務。但是由于池中已經沒有任何資源了,那么就直接使用調用該execute的線程本身來執行。
AbortPolicy:處理程序遭到拒絕將拋出運行時RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
這種策略直接拋出異常,丟棄任務。
DiscardPolicy:不能執行的任務將被刪除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不拋出異常。
DiscardOldestPolicy:如果執行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
該策略就稍微復雜一些,在pool沒有關閉的前提下首先丟掉緩存在隊列中的最早的任務,然后重新嘗試運行該任務。這個策略需要適當小心。
設想:如果其他線程都還在運行,那么新來任務踢掉舊任務,緩存在queue中,再來一個任務又會踢掉queue中最老任務。
總結:
keepAliveTime和maximumPoolSize及BlockingQueue的類型均有關系。如果BlockingQueue是***的,那么永遠不會觸發maximumPoolSize,自然keepAliveTime也就沒有了意義。
反之,如果核心數較小,有界BlockingQueue數值又較小,同時keepAliveTime又設的很小,如果任務頻繁,那么系統就會頻繁的申請回收線程。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。