您好,登錄后才能下訂單哦!
本篇內容主要講解“怎么使用Java多線程Future獲取異步任務”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么使用Java多線程Future獲取異步任務”吧!
在前文中我們談到,通過編碼實現Runnable接口,將獲得具有邊界性的 "任務",在指定的線程(或者線程池)中運行。
重新觀察該接口,不難發現它并沒有方法返回值:
public interface Runnable { void run(); }
在JDK1.5之前,想利用任務的執行結果,需要小心的操作線程訪問臨界區資源。使用 回調
進行解耦是非常不錯的選擇。
注意,為了減少篇幅使用了lambda,但jdk1.5之前并不支持lambda
將計算任務分離到其他線程執行,再回到主線程消費結果
我們將計算、IO等耗時任務丟到其他線程,讓主線程專注于自身業務,假想它在接受用戶輸入以及處理反饋,但我們略去這一部分
我們可以設計出類似下面的代碼:
雖然它還有很多不合理之處值得優化,但也足以用于演示
class Demo { static final Object queueLock = new Object(); static List<Runnable> mainQueue = new ArrayList<>(); static boolean running = true; static final Runnable FINISH = () -> running = false; public static void main(String[] args) { synchronized (queueLock) { mainQueue.add(Demo::onStart); } while (running) { Runnable runnable = null; synchronized (queueLock) { if (!mainQueue.isEmpty()) runnable = mainQueue.remove(0); } if (runnable != null) { runnable.run(); } Thread.yield(); } } public static void onStart() { //... } public static void finish() { synchronized (queueLock) { mainQueue.clear(); mainQueue.add(FINISH); } } }
再模擬一個計算的線程和任務回調:
interface Callback { void onResultCalculated(int result); } class CalcThread extends Thread { private final Callback callback; private final int a; private final int b; public CalcThread(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { super.run(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } }
填充一下onStart業務:
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new CalcThread(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300).start(); } }
在前文我們提到,如果業務僅關注任務的執行,并不過于關心線程本身,則可以利用Runnable:
class Demo { static class CalcRunnable implements Runnable { private final Callback callback; private final int a; private final int b; public CalcRunnable(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new Thread(new CalcRunnable(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300)).start(); } }
不難想象出:我們非常需要
讓特定線程、特定類型的線程方便地接收任務,回顧本系列文章中的 線程池篇 ,線程池是應運而生
擁有比Synchronize更輕量的機制
擁有更方便的數據結構
至此,我們可以體會到:JDK1.5之前,因為JDK的功能不足,Java程序對于線程的使用 較為粗糙。
終于在JDK1.5中,迎來了新特性: Future
以及先前文章中提到的線程池, 時光荏苒,一晃將近20年了。
/** * 略 * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */ public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
盡管已經移除了API注釋,但仍然能夠理解每個API的含義,不多做贅述。
顯而易見,為了增加返回值,沒有必要用如此復雜的 接口來替代 Runnable
。簡單思考后可以對返回值的情況進行歸納:
返回Runnable中業務的結果,例如計算、讀取資源等
單純的在Runnable執行完畢后返回一個結果
從業務層上看,僅需要如下接口即可,它增加了返回值、并可以更友好地讓使用者處理異常:
作者按:拋開底層實現,僅看業務方編碼需要
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
顯然,JDK需要提供后向兼容能力:
Runnable 不能夠丟棄,也不應當丟棄
不能要求使用者完全的重構代碼
所以一并提供了適配器,讓使用者進行簡單的局部重構即可用上新特性
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
而Future恰如其名,它代表了在 "未來" 的一個結果和狀態,為了更方便地處理異步而生。
并且內置了 FutureTask
,在 FutureTask詳解 章節中再行展開。
在JDK1.8的基礎上,看一下精簡的類圖結構:
public class FutureTask { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
public class FutureTask { //新建 private static final int NEW = 0; //處理中 private static final int COMPLETING = 1; //正常 private static final int NORMAL = 2; //異常 private static final int EXCEPTIONAL = 3; //已取消 private static final int CANCELLED = 4; //中斷中 private static final int INTERRUPTING = 5; //已中斷 private static final int INTERRUPTED = 6; }
可能的生命周期轉換如下:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
JDK中原汁原味的解釋如下:
The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified.
本節從以下三塊入手閱讀源碼
狀態判斷
取消
獲取結果
狀態判斷API的實現非常簡單
public class FutureTask { public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } }
取消:
當前狀態為 NEW
且 CAS修改 state 成功,否則返回取消失敗
如果 mayInterruptIfRunning
則中斷在執行的線程并CAS修改state為INTERRUPTED
調用 finishCompletion
刪除并通知所有等待的線程
調用done()
設置callable為null
public class FutureTask { public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) { return false; } try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } }
獲取結果: 先判斷狀態,如果未進入到 COMPLETING
(即為NEW狀態),則阻塞等待狀態改變,返回結果或拋出異常
public class FutureTask { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); } }
而使用則非常簡單,也非常的樸素。
我們以文中的的例子進行改造:
沿用原Runnable邏輯
移除回調,增加 CalcResult
將 CalcResult
對象作為既定返回結果,Runnable中設置其屬性
class Demo { static class CalcResult { public int result; } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); calcResult.result = result; }, calcResult); System.out.println("threadId" + Thread.currentThread().getId() + "反正干點什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } finish(); } }
如果直接使用新特性Callback,則如下:
直接返回結果,當然也可以直接返回Integer,不再包裹一層
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<CalcResult> resultFuture = executor.submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); calcResult.result = result; return calcResult; }); System.out.println("threadId" + Thread.currentThread().getId() + "反正干點什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); finish(); } }
相信讀者諸君會有這樣的疑惑:
為何使用Future比原先的回調看起來粗糙?
首先要明確一點:文中前段的回調Demo,雖然達成了既定目標,但效率并不高!!在當時計算很昂貴的背景下,并不會如此莽撞地使用!
而在JDK1.5開始,提供了大量內容支持多線程開發。考慮到篇幅,會在系列文章中逐步展開。
另外,FutureTask中的CAS與Happens-Before本篇中亦不做展開。
接下來,再做一些引申,簡單看一看多線程業務模式。
常用的多線程設計模式包括:
Future模式
Master-Worker模式
Guarded Suspension模式
不變模式
生產者-消費
文中對于Future的使用方式遵循了Future模式。
業務方在使用時,已經明確了任務被分離到其他線程執行時有等待期,在此期間,可以干點別的事情,不必浪費系統資源。
在程序系統中設計兩類線程,并相互協作:
Master線程(單個)
Worker線程
Master線程負責接受任務、分配任務、接收(必要時進一步組合)結果并返回;
Worker線程負責處理子任務,當子任務處理完成后,向Master線程返回結果;
作者按:此時可再次回想一下文章開頭的Demo
使用緩存隊列,使得 服務線程/服務進程 在未就緒、忙碌時能夠延遲處理請求。
使用等待-通知機制,將消費 服務的返回結果
的方式規范化
在并行開發過程中,為確保數據的一致性和正確性,有必要對對象進行同步,而同步操作會對程序系統的性能產生相當的損耗。
因此,使用狀態不可改變的對象,依靠其不變性來確保 并行操作 在 沒有同步機制 的情況下,保持一致性和正確性。
對象創建后,其內部狀態和數據不再發生改變
對象被共享、被多個線程訪問
設計兩類線程:若干個生產者線程和若干個消費者線程。
生產者線程負責提交用戶請求,消費者線程負責處理用戶請求。生產者和消費者之間通過共享內存緩沖區進行通信。
內存緩沖區的意義:
解決是數據在多線程間的共享問題
緩解生產者和消費者之間的性能差
這幾種模式從不同角度出發解決特定問題,但亦有一定的相似之處,不再展開。
到此,相信大家對“怎么使用Java多線程Future獲取異步任務”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。