您好,登錄后才能下訂單哦!
Java中的Future和Future通常和線程池搭配使用,用來獲取線程池返回執行后的返回值。我們假設通過Executors工廠方法構建一個線程池es ,es要執行某個任務有兩種方式,一種是執行 es.execute(runnable) ,這種情況是沒有返回值的; 另外一種情況是執行 es.submit(runnale)或者 es.submit(callable) ,這種情況會返回一個Future的對象,然后調用Future的get()來獲取返回值。
Future
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future是一個接口,他提供給了我們方法來檢測當前的任務是否已經結束,還可以等待任務結束并且拿到一個結果,通過調用Future的get()方法可以當任務結束后返回一個結果值,如果工作沒有結束,則會阻塞當前線程,直到任務執行完畢,我們可以通過調用cancel()方法來停止一個任務,如果任務已經停止,則cancel()方法會返回true;如果任務已經完成或者已經停止了或者這個任務無法停止,則cancel()會返回一個false。當一個任務被成功停止后,他無法再次執行。isDone()和isCancel()方法可以判斷當前工作是否完成和是否取消。
線程池中有以下幾個方法:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
都返回一個Future對象,仔細查看發現,所有的方法最終都將runnable或者callable轉變成一個RunnableFuture的對象,這個RunnableFutre的對象是一個同時繼承了Runnable和Future的接口
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
然后調用executor(runnable)方法,關于executor(runnable)的具體實現我們后面再講。最后返回一個RunnableFuture對象。RunnableFuture這個接口直有一個具體的實現類,那就時我們接下來要講的FutureTask。
FutureTask
public class FutureTask<V> implements RunnableFuture<V>
FutureTask實現了RunnableFuture的接口,既然我們知道最終返回的是一個FutureTask對象ftask,而且我們可以通過ftask.get()可以的來得到execute(task)的返回值,這個過程具體事怎么實現的了??這個也是本篇文章的所要講的。
我們可以先來猜測一下它的實現過程,首先Runnable的run()是沒有返回值的,所以當es.submit()的參數只有一個Runnable對象的時候,通過ftask.get()得到的也是一個null值,當參數還有一個result的時候,就返回這個result;如果參數是一個Callable的對象的時候,Callable的call()是有返回值的,同時這個call()方法會在轉換的Runable對象ftask的run()方法中被調用,然后將它的返回值賦值給一個全局變量,最后在ftask的get()方法中得到這個值。猜想對不對了? 我們直接看源碼。
將Runnable對象轉為RunnableFuture的方法:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
Executors::callable()
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
Executors的內部類RunnableAdapter
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
將Callable對象轉為RunnableFuture的方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
接下來我們來看execute(runnbale)的執行過程:
execute(runnable)最終的實現是在ThreadPoolExecutor,基本上所有的線程池都是通過ThreadPoolExecutor的構造方法傳入不同的參數來構造的。
ThreadPoolExecutor::executor(runnable) :
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
如上所示,這個過程分為三步:
Step 1:
如果當前線程池的的線程小于核心線程的數量的時候,就會調用addWorker檢查運行狀態和正在運行的線程數量,通過返回false來防止錯誤地添加線程,然后執行當前任務。
Step 2:
否則當前線程池的的線程大于核心線程的數量的時候,我們仍然需要先判斷是否需要添加一個新的線程來執行這個任務,因為可能已經存在的線程此刻任務執行完畢處于空閑狀態,這個時候可以直接復用。否則創建一個新的線程來執行此任務。
Step 3:
如果不能再添加新的任務,就拒絕。
執行execute(runnable)最終會回調runnable的run()方法,也就是FutureTask的對象ftask的run()方法,源碼如下:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
通過執行result = c.call()拿到返回值,然后set(result) ,因此get()方法獲得的值正是這個result。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。