您好,登錄后才能下訂單哦!
這篇文章主要講解了“CompletableFuture怎么使用”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“CompletableFuture怎么使用”吧!
通常情況下,我們希望代碼的執行順序和代碼的組織順序一致,即代碼表述了同步執行的程序,這樣可以減少很多思考。
而 閱讀異步的程序代碼,需要在腦海中建立事件流,當程序業務復雜時,將挑戰人的記憶力和空間想象力,并非所有人都擅長在腦海中構建并分析異步事件流模型。
所以,我們期望擁有一個非常友好的框架,能夠讓我們方便地進行異步編程,并且在框架內部設計有線程同步、異常處理機制。
并且,基于該框架編寫的代碼具有很高的可讀、可理解性。
而Future基本無法滿足這一期望。
在先前的系列文章中,我們已經回顧了Future類的設計,在絕大多數場景下,我們選擇使用多線程,是為了 充分利用機器性能 以及 避免用戶交互線程出現長時間阻塞 以致影響體驗。
所以我們將耗時的、會引起長時間阻塞的任務分離到其他線程執行,并在 合適時機 進行線程同步,于主線程(一般負責用戶交互處理、界面渲染)中處理結果。
詳見拙作 掌握Future,輕松獲取異步任務結果
Future
于 Java 1.5版本引入,它類似于 異步處理的結果占位符 , 提供了兩個方法獲取結果:
get()
, 調用線程進入阻塞直至得到結果或者異常。
get(long timeout, TimeUnit unit)
, 調用線程將僅在指定時間 timeout 內等待結果或者異常,如果超時未獲得結果就會拋出 TimeoutException 異常。
Future
可以實現 Runnable
或 Callable
接口來定義任務,一定程度上滿足 使用框架進行異步編程 的期望,但通過整體源碼可知它存在如下 3個問題 :
調用 get()
方法會一直阻塞直到獲取結果、異常,無法在任務完成時獲得 "通知" ,無法附加回調函數
不具備鏈式調用和結果聚合處理能力,當我們想鏈接多個 Future
共同完成一件任務時,沒有框架級的處理,只能編寫業務級邏輯,合并結果,并小心的處理同步
需要單獨編寫異常處理代碼
使用 get(long timeout, TimeUnit unit)
和 isDone()
判斷,確實可以緩解問題1,但這需要結合業務單獨設計(調優),存在大量的不確定性。不再展開
Java 8中引入 CompletableFuture
來解決 Future
的不足。
CompletableFuture
的設計靈感來自于 Google Guava
庫的 ListenableFuture
類,它實現了 Future接口
和 CompletionStage接口
, 并且新增一系列API,支持Java 8的 lambda特性
,通過回調利用非阻塞方法,提升了異步編程模型。
它解決了Future的不足,允許我們在非主線程中運行任務,并向啟動線程 (一般是主線程) 通知 任務完成
或 任務失敗
,編寫異步的、非阻塞的程序。
使用 CompletableFuture.completedFuture(U value)
可以獲取一個 執行狀態已經完成
的 CompletableFuture
對象。
這可以用于快速改造舊程序,并進行逐步過渡
class Demo { @Test public void testSimpleCompletableFuture() { CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("testSimpleCompletableFuture"); assertTrue(completableFuture.isDone()); try { assertEquals("testSimpleCompletableFuture", completableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
部分老舊程序已經建立了多線程業務模型,我們可以使用 CompletableFuture
改造其中的線程同步部分,但暫不改造數據傳遞。
使用 runAsync()
方法,該方法接收一個 Runnable
類型的參數返回 CompletableFuture<Void>
:
//并不改變原項目中數據傳遞的部分、或者不關心結果數據,僅進行同步 class Demo { @Test public void testCompletableFutureRunAsync() { AtomicInteger variable = new AtomicInteger(0); CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable)); runAsync.join(); assertEquals(1, variable.get()); } public void process(AtomicInteger variable) { System.out.println(Thread.currentThread() + " Process..."); variable.set(1); } }
當我們關心異步任務的結果數據、或者改造原 多線程業務模型 的 數據傳遞方式 時,可以使用 supplyAsync()
方法,該方法接收一個 Supplier<T>
接口類型的參數,它實現了任務的邏輯,方法返回 CompletableFuture<T>
實例。
class Demo { @Test public void testCompletableFutureSupplyAsync() { CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process); try { // Blocking assertEquals("testCompletableFutureSupplyAsync", supplyAsync.get()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public String process() { return "testCompletableFutureSupplyAsync"; } }
"獲取用于執行任務的線程" 類似 Java 8 中的 parallelStream
, CompletableFuture
默認從全局 ForkJoinPool.commonPool()
獲取線程,用于執行任務。同時也提供了指定線程池的方式用于獲取線程執行任務,您可以使用API中具有 Executor
參數的重載方法。
class Demo { @Test public void testCompletableFutureSupplyAsyncWithExecutor() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool); try { // Blocking assertEquals("testCompletableFutureSupplyAsyncWithExecutor", supplyAsync.get()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } public String process() { return "testCompletableFutureSupplyAsyncWithExecutor"; } }
CompletableFuture
中有眾多API,方法命名中含有 Async
的API可使用線程池。
截至此處,以上使用方式均與 Future
類似,接下來演示 CompletableFuture
的不同
CompletableFuture
的 get()
API是阻塞式獲取結果,CompletableFuture
提供了
thenApply
thenAccept
thenRun
等API來避免阻塞式獲取,并且可添加 任務完成
后的回調。這幾個方法的使用場景如下:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
收到結果后,可以進行轉化
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
收到結果后,對其進行消費
CompletableFuture<Void> thenRun(Runnable action)
收到結果后,執行回調,無法消費結果只能消費 這一事件
API較為簡單,不再代碼演示
顯然,通過鏈式調用可以組裝多個執行過程。
有讀者可能會疑惑:Function
和 Consumer
也可以進行鏈式組裝,是否存在冗余呢?
兩種的鏈式調用特性確實存在重疊,您可以自行選擇用法,但 thenRun
只能采用 CompletableFuture
的鏈式調用。
另外,前面提到,我們可以指定線程池執行任務,對于這三組API,同樣有相同的特性,通過 thenXXXXAsync
指定線程池,這是 Function
和 Consumer
的鏈式組裝所無法完成的。
class Demo { @Test public void testCompletableFutureApplyAsync() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); // 從線程池 newFixedThreadPool 獲取線程執行任務 CompletableFuture<Double> completableFuture = CompletableFuture.supplyAsync(() -> 1D, newFixedThreadPool) .thenApplyAsync(d -> d + 1D, newSingleThreadScheduledExecutor) .thenApplyAsync(d -> d + 2D); Double result = completableFuture.join(); assertEquals(4D, result); } }
通過 聚合
多個 CompletableFuture
,可以組成更 復雜
的業務流,可以達到精細地控制粒度、聚焦單個節點的業務。
注意:操作符并不能完全的控制 CompletableFuture
任務執行的時機,您需要謹慎的選擇 CompletableFuture
的創建時機
compose
原意為 組成
, 通過多個 CompletableFuture
構建異步流。
在操作的 CompletableFuture
獲得結果時,將另一個 CompletableFuture
compose
到異步流中,compose的過程中,可以根據操作的 CompletableFuture
的結果編寫邏輯。
與 thenApply
相比,thenCompose
返回邏輯中提供的 CompletableFuture
而 thenApply
返回框架內處理的新實例。
注意,這一特性在使用 FP編程范式
進行編碼時,會顯得非常靈活,一定程度上提升了函數的復用性
API含義直觀,不再進行代碼演示
thenCombine
可以用于合并多個 獨立任務 的處理結果。
注意: thenCompose
進行聚合時,下游可以使用上游的結果,在業務需求上一般表現為依賴上一步結果,而非兩者相互獨立。
例如,產品希望在博客詳情頁同時展示 "博客的詳情" 和 "作者主要信息" ,以避免內容區抖動或割裂的骨架占位。這兩者 可以獨立獲取時 ,則可以使用 thenCombine
系列API,分別獲取,并合并結果。
combine
的特點是 被合并的兩個 CompletableFuture
可以并發,等兩者都獲得結果后進行合并。
但它依舊存在使用上的不便捷,合并超過2個 CompletableFuture
時,顯得不夠靈活。可以使用 static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
API。
allOf
創建了 CompletableFuture<Void>
,并不會幫助我們合并結果,所以需要自行編寫業務代碼合并,故存在 Side Effects
。
runAfterBoth
系列API在兩個 CompletableFuture
都獲得結果后執行回調
runAfterEither
系列API在兩個 CompletableFuture
任意一個獲得結果后執行回調
通過API,不難理解它們需要使用者自行處理結果
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
;
CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
同樣可以增加編碼靈活性,不再贅述。
acceptEither、acceptEitherAsync;thenAcceptBoth、thenAcceptBothAsync
applyToEither
系列API表現如 thenApply
和 Either
的組合,兩個同類型的 CompletableFuture
任意一個獲得結果后,可消費該結果并進行改變,類似 thenApply
acceptEither
系列API表現如 thenAccept
和 Either
的組合,兩個同類型的 CompletableFuture
任意一個獲得結果后,可消費該結果,類似 thenAccept
thenAcceptBoth
系列API表現如 thenCombine
,但返回 CompletableFuture<Void>
同樣可以增加編碼靈活性,不再贅述
使用回調處理結果有兩種API,注意,除了正常獲得結果外還可能獲得異常,而這兩組API簇差異體現在對 異常
的處理中。
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
handle
使用 BiFunction
,無論是正常結果還是異常情況,均視作可被邏輯接受,消費后轉化
而 whenComplete
使用 BiConsumer
,僅可消費但不能轉化,異常情況被視作不可被邏輯接受,仍會拋出。
舉個例子,進行網絡編程時會遇到 Exception
, 如果業務設計中使用的模型實體包含了 正常結果
、異常
兩種情況:
open class Result<T>(val t: T?) { open val isThr: Boolean = false } class FailResult<T>(val tr: Throwable) : Result<T>(null) { override val isThr: Boolean = true }
則適合使用 handle
API在底層處理。否則需要額外的異常處理,可依據項目的設計選擇處理方式,一般在依據FP范式設計的程序中,傾向于使用handle,避免增加side effect。
在多線程背景下,異常處理并不容易。它不僅僅是使用 try-catch
捕獲異常,還包含程序異步流中,節點出現異常時流的業務走向。
在 CompletableFuture
中,節點出現異常將跳過后續節點,進入異常處理。
_如果您不希望某個節點拋出異常導致后續流程中斷,則可在節點的處理中捕獲并包裝為結果、或者對子 CompletableFuture 節點采用 handle
、exceptionally
API轉換異常 _
除前文提到的 handle
whenComplete
,CompletableFuture
中還提供了 exceptionally
API用于處理異常
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
從表現結果看,它類似于 handle
API中對異常的處理,將異常轉換為目標結果的一種特定情形。
感謝各位的閱讀,以上就是“CompletableFuture怎么使用”的內容了,經過本文的學習后,相信大家對CompletableFuture怎么使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。