您好,登錄后才能下訂單哦!
本篇文章為大家展示了Java J.U.C中executors框架的設計理念是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
juc-executors
框架是整個J.U.C包中類/接口關系最復雜的框架,真正理解executors框架的前提是理清楚各個模塊之間的關系,高屋建瓴,從整體到局部才能透徹理解其中各個模塊的功能和背后的設計思路。
網上有太多文章講executors框架,要么泛泛而談,要么一葉障目不見泰山,缺乏整體視角,很多根本沒有理解整個框架的設計思想和模塊關系。本文將對整個executors框架做綜述,介紹各個模塊的功能和聯系,后續再深入探討每個模塊,包括模塊中的各個工具類。
Executor
是JDK1.5時,隨著J.U.C引入的一個接口,引入該接口的主要目的是解耦任務本身和任務的執行。我們之前通過線程執行一個任務時,往往需要先創建一個線程,然后調用線程的start
方法來執行任務:
new Thread(new(RunnableTask())).start();
上述RunnableTask是實現了Runnable接口的任務類。而Executor接口解耦了任務和任務的執行,該接口只有一個方法,入參為待執行的任務:
public interface Executor { /** * 執行給定的Runnable任務. * 根據Executor的實現不同, 具體執行方式也不相同. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
我們可以像下面這樣執行任務,而不必關心線程的創建:
Executor executor = someExecutor; // 創建具體的Executor對象 executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
由于Executor僅僅是一個接口,所以根據其實現的不同,執行任務的具體方式也不盡相同,比如:
①同步執行任務
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
DirectExecutor是一個同步任務執行器,對于傳入的任務,只有執行完成后execute才會返回。
②異步執行任務
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ static final class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
ThreadPerTaskExecutor是一個異步任務執行器,對于每個任務,執行器都會創建一個新的線程去執行任務。
注意:Java線程與本地操作系統的線程是一一映射的。Java線程啟動時會創建一個本地操作系統線程;當該Java線程終止時,對應操作系統線程會被回收。由于CPU資源是有限的,所以線程數量有上限,所以一般由線程池來管理線程的創建/回收,而上面這種方式其實是線程池的雛形。
③對任務進行排隊執行
class SerialExecutor implements Executor { final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
SerialExecutor 會對傳入的任務進行排隊(FIFO順序),然后從隊首取出一個任務執行。
以上這些示例僅僅是給出了一些可能的Executor實現,J.U.C包中提供了很多Executor的具體實現類,我們以后會具體講到,這里關鍵是理解Executor的設計思想——對任務和任務的執行解耦。
Executor接口提供的功能很簡單,為了對它進行增強,J.U.C又提供了一個名為ExecutorService
接口,ExecutorService也是在JDK1.5時,隨著J.U.C引入的:
public interface ExecutorService extends Executor
可以看到,ExecutorService繼承了Executor,它在Executor的基礎上增強了對任務的控制,同時包括對自身生命周期的管理,主要有四類:
關閉執行器,禁止任務的提交;
監視執行器的狀態;
提供對異步任務的支持;
提供對批處理任務的支持。
public interface ExecutorService extends Executor { /** * 關閉執行器, 主要有以下特點: * 1. 已經提交給該執行器的任務將會繼續執行, 但是不再接受新任務的提交; * 2. 如果執行器已經關閉了, 則再次調用沒有副作用. */ void shutdown(); /** * 立即關閉執行器, 主要有以下特點: * 1. 嘗試停止所有正在執行的任務, 無法保證能夠停止成功, 但會盡力嘗試(例如, 通過 Thread.interrupt中斷任務, 但是不響應中斷的任務可能無法終止); * 2. 暫停處理已經提交但未執行的任務; * * @return 返回已經提交但未執行的任務列表 */ List<Runnable> shutdownNow(); /** * 如果該執行器已經關閉, 則返回true. */ boolean isShutdown(); /** * 判斷執行器是否已經【終止】. * <p> * 僅當執行器已關閉且所有任務都已經執行完成, 才返回true. * 注意: 除非首先調用 shutdown 或 shutdownNow, 否則該方法永遠返回false. */ boolean isTerminated(); /** * 阻塞調用線程, 等待執行器到達【終止】狀態. * * @return {@code true} 如果執行器最終到達終止狀態, 則返回true; 否則返回false * @throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * 提交一個具有返回值的任務用于執行. * 注意: Future的get方法在成功完成時將會返回task的返回值. * * @param task 待提交的任務 * @param <T> 任務的返回值類型 * @return 返回該任務的Future對象 * @throws RejectedExecutionException 如果任務無法安排執行 * @throws NullPointerException if the task is null */ <T> Future<T> submit(Callable<T> task); /** * 提交一個 Runnable 任務用于執行. * 注意: Future的get方法在成功完成時將會返回給定的結果(入參時指定). * * @param task 待提交的任務 * @param result 返回的結果 * @param <T> 返回的結果類型 * @return 返回該任務的Future對象 * @throws RejectedExecutionException 如果任務無法安排執行 * @throws NullPointerException if the task is null */ <T> Future<T> submit(Runnable task, T result); /** * 提交一個 Runnable 任務用于執行. * 注意: Future的get方法在成功完成時將會返回null. * * @param task 待提交的任務 * @return 返回該任務的Future對象 * @throws RejectedExecutionException 如果任務無法安排執行 * @throws NullPointerException if the task is null */ Future<?> submit(Runnable task); /** * 執行給定集合中的所有任務, 當所有任務都執行完成后, 返回保持任務狀態和結果的 Future 列表. * <p> * 注意: 該方法為同步方法. 返回列表中的所有元素的Future.isDone() 為 true. * * @param tasks 任務集合 * @param <T> 任務的返回結果類型 * @return 任務的Future對象列表,列表順序與集合中的迭代器所生成的順序相同, * @throws InterruptedException 如果等待時發生中斷, 會將所有未完成的任務取消. * @throws NullPointerException 任一任務為 null * @throws RejectedExecutionException 如果任一任務無法安排執行 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * 執行給定集合中的所有任務, 當所有任務都執行完成后或超時期滿時(無論哪個首先發生), 返回保持任務狀態和結果的 Future 列表. */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * 執行給定集合中的任務, 只有其中某個任務率先成功完成(未拋出異常), 則返回其結果. * 一旦正常或異常返回后, 則取消尚未完成的任務. */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * 執行給定集合中的任務, 如果在給定的超時期滿前, 某個任務已成功完成(未拋出異常), 則返回其結果. * 一旦正常或異常返回后, 則取消尚未完成的任務. */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
關于Future,其實就是Java多線程設計模式中 Future模式,后面我們會專門講解J.U.C中的Future框架。
Future
對象提供了對任務異步執行的支持,也就是說調用線程無需等待任務執行完成,提交待執行的任務后,就會立即返回往下執行。然后,可以在需要時檢查Future是否有結果了,如果任務已執行完畢,通過Future.get()
方法可以獲取到執行結果——Future.get()是阻塞方法。
在工業環境中,我們可能希望提交給執行器的某些任務能夠定時執行或周期性地執行,這時我們可以自己實現Executor接口來創建符合我們需要的類,Doug Lea已經考慮到了這類需求,所以在ExecutorService的基礎上,又提供了一個接口——ScheduledExecutorService
,該接口也是在JDK1.5時,隨著J.U.C引入的:
public interface ScheduledExecutorService extends ExecutorService
ScheduledExecutorService提供了一系列schedule方法,可以在給定的延遲后執行提交的任務,或者每個指定的周期執行一次提交的任務,我們來看下面這個示例:
public class TestSche { public static void main(String[] args) { //創建了一個ScheduledExecutorService 實例 ScheduledExecutorService executorService=new ScheduledThreadPoolExecutor(1); final ScheduledFuture<?>scheduledFuture=executorService.scheduleAtFixedRate(new BeepTask(),10,10 ,TimeUnit.SECONDS); executorService.schedule(new Runnable() { @Override public void run() { scheduledFuture.cancel(true); } },1,TimeUnit.HOURS); //1小時取消任務 } private static class BeepTask implements Runnable{ @Override public void run() { System.out.println("beep!..."); } } }
上述示例先創建一個ScheduledExecutorService類型的執行器,然后利用scheduleAtFixedRate方法提交了一個“蜂鳴”任務,每隔10s該任務會執行一次。
注意:scheduleAtFixedRate
方法返回一個ScheduledFuture對象,ScheduledFuture其實就是在Future的基礎上增加了延遲的功能。通過ScheduledFuture,可以取消一個任務的執行,本例中我們利用schedule方法,設定在1小時后,執行任務的取消。
ScheduledExecutorService完整的接口聲明如下:
public interface ScheduledExecutorService extends ExecutorService { /** * 提交一個待執行的任務, 并在給定的延遲后執行該任務. * * @param command 待執行的任務 * @param delay 延遲時間 * @param unit 延遲時間的單位 */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * 提交一個待執行的任務(具有返回值), 并在給定的延遲后執行該任務. * * @param command 待執行的任務 * @param delay 延遲時間 * @param unit 延遲時間的單位 * @param <V> 返回值類型 */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 提交一個待執行的任務. * 該任務在 initialDelay 后開始執行, 然后在 initialDelay+period 后執行, 接著在 initialDelay + 2 * period 后執行, 依此類推. * * @param command 待執行的任務 * @param initialDelay 首次執行的延遲時間 * @param period 連續執行之間的周期 * @param unit 延遲時間的單位 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 提交一個待執行的任務. * 該任務在 initialDelay 后開始執行, 隨后在每一次執行終止和下一次執行開始之間都存在給定的延遲. * 如果任務的任一執行遇到異常, 就會取消后續執行. 否則, 只能通過執行程序的取消或終止方法來終止該任務. * * @param command 待執行的任務 * @param initialDelay 首次執行的延遲時間 * @param delay 一次執行終止和下一次執行開始之間的延遲 * @param unit 延遲時間的單位 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
至此,Executors框架中的三個最核心的接口介紹完畢,這三個接口的關系如下圖:
通過第一部分的學習,讀者應該對Executors框架有了一個初步的認識,Executors框架就是用來解耦任務本身與任務的執行,并提供了三個核心接口來滿足使用者的需求:
Executor:提交普通的可執行任務
ExecutorService:提供對線程池生命周期的管理、異步任務的支持
ScheduledExecutorService:提供對任務的周期性執行支持
既然上面三種執行器只是接口,那么就一定存在具體的實現類,J.U.C提供了許多默認的接口實現,如果要用戶自己去創建這些類的實例,就需要了解這些類的細節,有沒有一種直接的方式,僅僅根據一些需要的特性(參數)就創建這些實例呢?因為對于用戶來說,其實使用的只是這三個接口。
JDK1.5時,J.U.C中還提供了一個Executors
類,專門用于創建上述接口的實現類對象。Executors其實就是一個簡單工廠,它的所有方法都是static的,用戶可以根據需要,選擇需要創建的執行器實例,Executors一共提供了五類可供創建的Executor執行器實例。
Executors提供了兩種創建具有固定線程數的Executor的方法,固定線程池在初始化時確定其中的線程總數,運行過程中會始終維持線程數量不變。
可以看到下面的兩種創建方法其實都返回了一個ThreadPoolExecutor
實例。ThreadPoolExecutor是一個ExecutorService接口的實現類,我們會在后面用專門章節講解,現在只需要了解這是一種Executor,用來調度其中的線程的執行即可。
/** * 創建一個具有固定線程數的Executor. */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /** * 創建一個具有固定線程數的Executor. * 在需要時使用提供的 ThreadFactory 創建新線程. */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
上面需要注意的是ThreadFactory
這個接口:
public interface ThreadFactory { Thread newThread(Runnable r); }
既然返回的是一個線程池,那么就涉及線程的創建,一般我們需要通過 new Thread ()
這種方法創建一個新線程,但是我們可能希望設置一些線程屬性,比如
名稱、守護程序狀態、ThreadGroup 等等,線程池中的線程非常多,如果每個線程都這樣手動配置勢必非常繁瑣,而ThreadFactory 作為一個線程工廠可以讓我們從這些繁瑣的線程狀態設置的工作中解放出來,還可以由外部指定ThreadFactory實例,以決定線程的具體創建方式。
Executors提供了靜態內部類,實現了ThreadFactory接口,最簡單且常用的就是下面這個DefaultThreadFactory :
/** * 默認的線程工廠. */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
可以看到,DefaultThreadFactory 初始化的時候定義了線程組、線程名稱等信息,每創建一個線程,都給線程統一分配這些信息,避免了一個個手工通過new的方式創建線程,又可進行工廠的復用。
除了固定線程數的線程池,Executors還提供了兩種創建只有單個線程Executor的方法:
/** * 創建一個使用單個 worker 線程的 Executor. */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } /** * 創建一個使用單個 worker 線程的 Executor. * 在需要時使用提供的 ThreadFactory 創建新線程. */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
可以看到,只有單個線程的線程池其實就是指定線程數為1的固定線程池,主要區別就是,返回的Executor實例用了一個FinalizableDelegatedExecutorService
對象進行包裝。
我們來看下FinalizableDelegatedExecutorService,該類 只定義了一個finalize方法:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
核心是其繼承的DelegatedExecutorService ,這是一個包裝類,實現了ExecutorService的所有方法,但是內部實現其實都委托給了傳入的ExecutorService 實例:
/** * ExecutorService實現類的包裝類. */ /** * A wrapper class that exposes only the ExecutorService methods * of an ExecutorService implementation. */ static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
為什么要多此一舉,加上這樣一個委托層?因為返回的ThreadPoolExecutor包含一些設置線程池大小的方法——比如setCorePoolSize,對于只有單個線程的線程池來說,我們是不希望用戶通過強轉的方式使用這些方法的,所以需要一個包裝類,只暴露ExecutorService本身的方法。
有些情況下,我們雖然創建了具有一定線程數的線程池,但出于資源利用率的考慮,可能希望在特定的時候對線程進行回收(比如線程超過指定時間沒有被使用),Executors就提供了這種類型的線程池:
/** * 創建一個可緩存線程的Execotor. * 如果線程池中沒有線程可用, 則創建一個新線程并添加到池中; * 如果有線程長時間未被使用(默認60s, 可通過threadFactory配置), 則從緩存中移除. */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } /** * 創建一個可緩存線程的Execotor. * 如果線程池中沒有線程可用, 則創建一個新線程并添加到池中; * 如果有線程長時間未被使用(默認60s, 可通過threadFactory配置), 則從緩存中移除. * 在需要時使用提供的 ThreadFactory 創建新線程. */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
可以看到,返回的還是ThreadPoolExecutor對象,只是指定了超時時間,另外線程池中線程的數量在[0, Integer.MAX_VALUE]
之間。
如果有任務需要延遲/周期調用,就需要返回ScheduledExecutorService接口的實例,ScheduledThreadPoolExecutor
就是實現了ScheduledExecutorService接口的一種Executor,和ThreadPoolExecutor一樣,這個我們后面會專門講解。
/** * 創建一個具有固定線程數的 可調度Executor. * 它可安排任務在指定延遲后或周期性地執行. */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } /** * 創建一個具有固定線程數的 可調度Executor. * 它可安排任務在指定延遲后或周期性地執行. * 在需要時使用提供的 ThreadFactory 創建新線程. */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
Fork/Join線程池是比較特殊的一類線程池,在JDK1.7時才引入,其核心實現就是ForkJoinPool
類。關于Fork/Join框架,我們后面會專題講解,現在只需要知道,Executors框架提供了一種創建該類線程池的便捷方法。
/** * 創建具有指定并行級別的ForkJoin線程池. */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } /** * 創建并行級別等于CPU核心數的ForkJoin線程池. */ public static ExecutorService newWorkStealingPool() { return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
至此,Executors框架的整體結構基本就講解完了,此時我們的腦海中應有大致如下的一幅類繼承圖:
下面來回顧一下,上面的各個接口/類的關系和作用:
Executor
執行器接口,也是最頂層的抽象核心接口, 分離了任務和任務的執行。ExecutorService
在Executor的基礎上提供了執行器生命周期管理,任務異步執行等功能。ScheduledExecutorService
在ExecutorService基礎上提供了任務的延遲執行/周期執行的功能。Executors
生產具體的執行器的靜態工廠ThreadFactory
線程工廠,用于創建單個線程,減少手工創建線程的繁瑣工作,同時能夠復用工廠的特性。AbstractExecutorService
ExecutorService的抽象實現,為各類執行器類的實現提供基礎。ThreadPoolExecutor
線程池Executor,也是最常用的Executor,可以以線程池的方式管理線程。ScheduledThreadPoolExecutor
在ThreadPoolExecutor基礎上,增加了對周期任務調度的支持。ForkJoinPool
Fork/Join線程池,在JDK1.7時引入,時實現Fork/Join框架的核心類。
上述內容就是Java J.U.C中executors框架的設計理念是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。