您好,登錄后才能下訂單哦!
這篇文章主要介紹了Java8 CompletableFuture異步多線程怎么實現的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Java8 CompletableFuture異步多線程怎么實現文章都會有所收獲,下面我們一起來看看吧。
一些業務場景我們需要使用多線程異步執行任務,加快任務執行速度。
JDK5新增了Future接口,用于描述一個異步計算的結果。
雖然 Future 以及相關使用方法提供了異步執行任務的能力,但是對于結果的獲取卻是很不方便,我們必須使用Future.get()的方式阻塞調用線程,或者使用輪詢方式判斷 Future.isDone 任務是否結束,再獲取結果。
這兩種處理方式都不是很優雅,相關代碼如下:
@Test public void testFuture() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); Future<String> future = executorService.submit(() -> { Thread.sleep(2000); return "hello"; }); System.out.println(future.get()); System.out.println("end"); }
與此同時,Future無法解決多個異步任務需要相互依賴的場景,簡單點說就是,主線程需要等待子線程任務執行完畢之后在進行執行,這個時候你可能想到了「CountDownLatch」,沒錯確實可以解決,代碼如下。
這里定義兩個Future,第一個通過用戶id獲取用戶信息,第二個通過商品id獲取商品信息。
@Test public void testCountDownLatch() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(5); CountDownLatch downLatch = new CountDownLatch(2); long startTime = System.currentTimeMillis(); Future<String> userFuture = executorService.submit(() -> { //模擬查詢商品耗時500毫秒 Thread.sleep(500); downLatch.countDown(); return "用戶A"; }); Future<String> goodsFuture = executorService.submit(() -> { //模擬查詢商品耗時500毫秒 Thread.sleep(400); downLatch.countDown(); return "商品A"; }); downLatch.await(); //模擬主程序耗時時間 Thread.sleep(600); System.out.println("獲取用戶信息:" + userFuture.get()); System.out.println("獲取商品信息:" + goodsFuture.get()); System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms"); }
「運行結果」
獲取用戶信息:用戶A
獲取商品信息:商品A
總共用時1110ms
從運行結果可以看出結果都已經獲取,而且如果我們不用異步操作,執行時間應該是:500+400+600 = 1500,用異步操作后實際只用1110。
但是Java8以后我不在認為這是一種優雅的解決方式,接下來來了解下CompletableFuture的使用。
@Test public void testCompletableInfo() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); //調用用戶服務獲取用戶基本信息 CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> //模擬查詢商品耗時500毫秒 { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "用戶A"; }); //調用商品服務獲取商品基本信息 CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() -> //模擬查詢商品耗時500毫秒 { try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); System.out.println("獲取用戶信息:" + userFuture.get()); System.out.println("獲取商品信息:" + goodsFuture.get()); //模擬主程序耗時時間 Thread.sleep(600); System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms"); }
運行結果
獲取用戶信息:用戶A
獲取商品信息:商品A
總共用時1112ms
通過CompletableFuture可以很輕松的實現CountDownLatch的功能,你以為這就結束了,遠遠不止,CompletableFuture比這要強多了。
比如可以實現:任務1執行完了再執行任務2,甚至任務1執行的結果,作為任務2的入參數等等強大功能,下面就來學學CompletableFuture的API。
CompletableFuture源碼中有四個靜態方法用來執行異步任務
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..} public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..} public static CompletableFuture<Void> runAsync(Runnable runnable){..} public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
一般我們用上面的靜態方法來創建CompletableFuture,這里也解釋下他們的區別:
「supplyAsync」執行任務,支持返回值。
「runAsync」執行任務,沒有返回值。
3.1.1、「supplyAsync方法」
//使用默認內置線程池ForkJoinPool.commonPool(),根據supplier構建執行任務 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //自定義線程,根據supplier構建執行任務 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
3.1.2、「runAsync方法」
//使用默認內置線程池ForkJoinPool.commonPool(),根據runnable構建執行任務 public static CompletableFuture<Void> runAsync(Runnable runnable) //自定義線程,根據runnable構建執行任務 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
對于結果的獲取CompltableFuture類提供了四種方式
//方式一 public T get() //方式二 public T get(long timeout, TimeUnit unit) //方式三 public T getNow(T valueIfAbsent) //方式四 public T join()
說明:
「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已經提供了,后者提供超時處理,如果在指定時間內未獲取結果將拋出超時異常
「getNow」 => 立即獲取結果不阻塞,結果計算已完成將返回結果或計算過程中的異常,如果未計算完成將返回設定的valueIfAbsent值
「join」 => 方法里不會拋出異常
示例
:
@Test public void testCompletableGet() throws InterruptedException, ExecutionException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "商品A"; }); // getNow方法測試 System.out.println(cp1.getNow("商品B")); //join方法測試 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp2.join()); System.out.println("-----------------------------------------------------"); //get方法測試 CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0)); System.out.println(cp3.get()); }
「運行結果」:
第一個執行結果為 「商品B」,因為要先睡上1秒結果不能立即獲取
join方法獲取結果方法里不會拋異常,但是執行結果會拋異常,拋出的異常為CompletionException
get方法獲取結果方法里將拋出異常,執行結果拋出的異常為ExecutionException
通俗點講就是,「做完第一個任務后,再做第二個任務,第二個任務也沒有返回值」。
示例
@Test public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> { try { //執行任務A Thread.sleep(600); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture<Void> cp2 = cp1.thenRun(() -> { try { //執行任務B Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } }); // get方法測試 System.out.println(cp2.get()); //模擬主程序耗時時間 Thread.sleep(600); System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms"); } //運行結果 /** * null * 總共用時1610ms */
「thenRun 和thenRunAsync有什么區別呢?」
如果你執行第一個任務的時候,傳入了一個自定義線程池:
調用thenRun方法執行第二個任務時,則第二個任務和第一個任務是共用同一個線程池。
調用thenRunAsync執行第二個任務時,則第一個任務使用的是你自己傳入的線程池,第二個任務使用的是ForkJoin線程池。
說明
: 后面介紹的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它們之間的區別也是這個。
第一個任務執行完成后,執行第二個回調方法任務,會將該任務的執行結果,作為入參,傳遞到回調方法中,但是回調方法是沒有返回值的。
示例
@Test public void testCompletableThenAccept() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }); CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> { System.out.println("上一個任務的返回結果為: " + a); }); cp2.get(); }
表示第一個任務執行完成后,執行第二個回調方法任務,會將該任務的執行結果,作為入參,傳遞到回調方法中,并且回調方法是有返回值的。
示例
@Test public void testCompletableThenApply() throws ExecutionException, InterruptedException { CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { return "dev"; }).thenApply((a) -> { if (Objects.equals(a, "dev")) { return "dev"; } return "prod"; }); System.out.println("當前環境為:" + cp1.get()); //輸出: 當前環境為:dev }
當CompletableFuture的任務不論是正常完成還是出現異常它都會調用「whenComplete」這回調函數。
「正常完成」:whenComplete返回結果和上級任務一致,異常為null;
「出現異常」:whenComplete返回結果為null,異常為上級任務的異常;
即調用get()時,正常完成時就獲取到結果,出現異常時就會拋出異常,需要你處理該異常。
下面來看看示例
@Test public void testCompletableWhenComplete() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出錯了"); } System.out.println("正常結束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }); System.out.println("最終返回的結果 = " + future.get()); }
正常完成,沒有異常時:
正常結束
whenComplete aDouble is 0.11
whenComplete throwable is null
最終返回的結果 = 0.11
出現異常時:get()會拋出異常
whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出錯了
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出錯了
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
@Test public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("出錯了"); } System.out.println("正常結束"); return 0.11; }).whenComplete((aDouble, throwable) -> { if (aDouble == null) { System.out.println("whenComplete aDouble is null"); } else { System.out.println("whenComplete aDouble is " + aDouble); } if (throwable == null) { System.out.println("whenComplete throwable is null"); } else { System.out.println("whenComplete throwable is " + throwable.getMessage()); } }).exceptionally((throwable) -> { System.out.println("exceptionally中異常:" + throwable.getMessage()); return 0.0; }); System.out.println("最終返回的結果 = " + future.get()); }
當出現異常時,exceptionally中會捕獲該異常,給出默認返回值0.0。
whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出錯了
exceptionally中異常:java.lang.RuntimeException: 出錯了
最終返回的結果 = 0.0
thenCombine / thenAcceptBoth / runAfterBoth都表示:「當任務一和任務二都完成再執行任務三」。
區別在于:
「runAfterBoth」 不會把執行結果當做方法入參,且沒有返回值
「thenAcceptBoth」: 會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且無返回值
「thenCombine」:會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且有返回值
示例
@Test public void testCompletableThenCombine() throws ExecutionException, InterruptedException { //創建線程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //開啟異步任務1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務1,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務1結束"); return result; }, executorService); //開啟異步任務2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務2,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務2結束"); return result; }, executorService); //任務組合 CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> { System.out.println("執行任務3,當前線程是:" + Thread.currentThread().getId()); System.out.println("任務1返回值:" + f1); System.out.println("任務2返回值:" + f2); return f1 + f2; }, executorService); Integer res = task3.get(); System.out.println("最終結果:" + res); }
「運行結果」
異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:18
異步任務2結束
執行任務3,當前線程是:19
任務1返回值:2
任務2返回值:2
最終結果:4
applyToEither / acceptEither / runAfterEither 都表示:「兩個任務,只要有一個任務完成,就執行任務三」。
區別在于:
「runAfterEither」:不會把執行結果當做方法入參,且沒有返回值
「acceptEither」: 會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且無返回值
「applyToEither」:會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且有返回值
示例
@Test public void testCompletableEitherAsync() { //創建線程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //開啟異步任務1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務1,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務1結束"); return result; }, executorService); //開啟異步任務2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務2,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("異步任務2結束"); return result; }, executorService); //任務組合 task.acceptEitherAsync(task2, (res) -> { System.out.println("執行任務3,當前線程是:" + Thread.currentThread().getId()); System.out.println("上一個任務的結果為:" + res); }, executorService); }
運行結果
//通過結果可以看出,異步任務2都沒有執行結束,任務3獲取的也是1的執行結果
異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:18
執行任務3,當前線程是:19
上一個任務的結果為:2
注意
如果把上面的核心線程數改為1也就是
ExecutorService executorService = Executors.newFixedThreadPool(1);
運行結果就是下面的了,會發現根本沒有執行任務3,顯然是任務3直接被丟棄了。
異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:17
「allOf」:等待所有任務完成
「anyOf」:只要有一個任務完成
示例
allOf:等待所有任務完成
@Test public void testCompletableAallOf() throws ExecutionException, InterruptedException { //創建線程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //開啟異步任務1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務1,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 1; System.out.println("異步任務1結束"); return result; }, executorService); //開啟異步任務2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務2,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 2; try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("異步任務2結束"); return result; }, executorService); //開啟異步任務3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { System.out.println("異步任務3,當前線程是:" + Thread.currentThread().getId()); int result = 1 + 3; try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("異步任務3結束"); return result; }, executorService); //任務組合 CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3); //等待所有任務完成 allOf.get(); //獲取任務的返回結果 System.out.println("task結果為:" + task.get()); System.out.println("task2結果為:" + task2.get()); System.out.println("task3結果為:" + task3.get()); }
anyOf: 只要有一個任務完成
@Test public void testCompletableAnyOf() throws ExecutionException, InterruptedException { //創建線程池 ExecutorService executorService = Executors.newFixedThreadPool(10); //開啟異步任務1 CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> { int result = 1 + 1; return result; }, executorService); //開啟異步任務2 CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { int result = 1 + 2; return result; }, executorService); //開啟異步任務3 CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> { int result = 1 + 3; return result; }, executorService); //任務組合 CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3); //只要有一個有任務完成 Object o = anyOf.get(); System.out.println("完成的任務的結果:" + o); }
CompletableFuture 使我們的異步編程更加便利的、代碼更加優雅的同時,我們也要關注下它,使用的一些注意點。
@Test public void testWhenCompleteExceptionally() { CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (1 == 1) { throw new RuntimeException("出錯了"); } return 0.11; }); //如果不加 get()方法這一行,看不到異常信息 //future.get(); }
Future需要獲取返回值,才能獲取到異常信息。如果不加 get()/join()方法,看不到異常信息。
小伙伴們使用的時候,注意一下哈,考慮是否加try...catch...或者使用exceptionally方法。
CompletableFuture的get()方法是阻塞的,如果使用它來獲取異步調用的返回值,需要添加超時時間。
//反例 CompletableFuture.get(); //正例 CompletableFuture.get(5, TimeUnit.SECONDS);
CompletableFuture代碼中又使用了默認的「ForkJoin線程池」,處理的線程個數是電腦「CPU核數-1」。在大量請求過來的時候,處理邏輯復雜的話,響應會很慢。一般建議使用自定義線程池,優化線程池配置參數。
CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(5, TimeUnit.SECONDS)。并且一般建議使用自定義線程池。
但是如果線程池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當線程池飽和時,會直接丟棄任務,不會拋棄異常。因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時的異步線程,做好線程池隔離哈。
關于“Java8 CompletableFuture異步多線程怎么實現”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Java8 CompletableFuture異步多線程怎么實現”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。