您好,登錄后才能下訂單哦!
本篇內容主要講解“Java中的Future接口怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java中的Future接口怎么使用”吧!
在系統中,異步執行任務,是很常見的功能邏輯,但是在不同的場景中,又存在很多細節差異;
有的任務只強調「執行過程」,并不需要追溯任務自身的「執行結果」,這里并不是指對系統和業務產生的效果,比如定時任務、消息隊列等場景;
但是有些任務即強調「執行過程」,又需要追溯任務自身的「執行結果」,在流程中依賴某個異步結果,判斷流程是否中斷,比如「并行」處理;
【串行處理】整個流程按照邏輯逐步推進,如果出現異常會導致流程中斷;
【并行處理】主流程按照邏輯逐步推進,其他「異步」交互的流程執行完畢后,將結果返回到主流程,如果「異步」流程異常,會影響部分結果;
此前在《「訂單」業務》的內容中,聊過關于「串行」和「并行」的應用對比,即在訂單詳情的加載過程中,通過「并行」的方式讀取:商品、商戶、訂單、用戶等信息,提升接口的響應時間;
異步是對流程的解耦,但是有的流程中又依賴異步執行的最終結果,此時就可以使用「Future」接口來達到該目的,先來看一個簡單的入門案例;
public class ServerTask implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(2000); return 3; } } public class FutureBase01 { public static void main(String[] args) throws Exception { TimeInterval timer = DateUtil.timer(); // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 批量任務 List<ServerTask> serverTasks = new ArrayList<>() ; for (int i=0;i<3;i++){ serverTasks.add(new ServerTask()); } List<Future<Integer>> taskResList = executor.invokeAll(serverTasks) ; // 結果輸出 for (Future<Integer> intFuture:taskResList){ System.out.println(intFuture.get()); } // 耗時統計 System.out.println("timer...interval = "+timer.interval()); } }
這里模擬一個場景,以線程池批量執行異步任務,在任務內線程休眠2秒,以并行的方式最終獲取全部結果,只耗時2秒多一點,如果串行的話耗時肯定超過6秒;
Future表示異步計算的結果,提供了用于檢查計算是否完成、等待計算完成、以及檢索計算結果的方法。
【核心方法】
get()
:等待任務完成,獲取執行結果,如果任務取消會拋出異常;
get(long timeout, TimeUnit unit)
:指定等待任務完成的時間,等待超時會拋出異常;
isDone()
:判斷任務是否完成;
isCancelled()
:判斷任務是否被取消;
cancel(boolean mayInterruptIfRunning)
:嘗試取消此任務的執行,如果任務已經完成、已經取消或由于其他原因無法取消,則此嘗試將失敗;
【基礎用法】
public class FutureBase02 { public static void main(String[] args) throws Exception { // 線程池執行任務 ExecutorService executor = Executors.newFixedThreadPool(3); FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return "task...OK"; } }) ; executor.execute(futureTask); // 任務信息獲取 System.out.println("是否完成:"+futureTask.isDone()); System.out.println("是否取消:"+futureTask.isCancelled()); System.out.println("獲取結果:"+futureTask.get()); System.out.println("嘗試取消:"+futureTask.cancel(Boolean.TRUE)); } }
【FutureTask】
Future接口的基本實現類,提供了計算的啟動和取消、查詢計算是否完成以及檢索計算結果的方法;
在「FutureTask」類中,可以看到線程異步執行任務時,其中的核心狀態轉換,以及最終結果寫出的方式;
雖然「Future」從設計上,實現了異步計算的結果獲取,但是通過上面的案例也可以發現,流程的主線程在執行get()
方法時會阻塞,直到最終獲取結果,顯然對于程序來說并不友好;
在JDK1.8
提供「CompletableFuture」類,對「Future」進行優化和擴展;
「CompletableFuture」類提供函數編程的能力,可以通過回調的方式處理計算結果,并且支持組合操作,提供很多方法來實現異步編排,降低異步編程的復雜度;
「CompletableFuture」實現「Future」和「CompletionStage」兩個接口;
Future:表示異步計算的結果;
CompletionStage:表示異步計算的一個步驟,當一個階段計算完成時,可能會觸發其他階段,即步驟可能由其他CompletionStage觸發;
【入門案例】
public class CompletableBase01 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 任務執行 CompletableFuture<String> cft = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "Res...OK"; }, executor); // 結果輸出 System.out.println(cft.get()); } }
public class Completable01 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 1、創建未完成的CompletableFuture,通過complete()方法完成 CompletableFuture<Integer> cft01 = new CompletableFuture<>() ; cft01.complete(99) ; // 2、創建已經完成CompletableFuture,并且給定結果 CompletableFuture<String> cft02 = CompletableFuture.completedFuture("given...value"); // 3、有返回值,默認ForkJoinPool線程池 CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {return "OK-3";}); // 4、有返回值,采用Executor自定義線程池 CompletableFuture<String> cft04 = CompletableFuture.supplyAsync(() -> {return "OK-4";},executor); // 5、無返回值,默認ForkJoinPool線程池 CompletableFuture<Void> cft05 = CompletableFuture.runAsync(() -> {}); // 6、無返回值,采用Executor自定義線程池 CompletableFuture<Void> cft06 = CompletableFuture.runAsync(()-> {}, executor); } }
public class Completable02 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK"; },executor); // 1、計算完成后,執行后續處理 // cft01.whenComplete((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex)); // 2、觸發計算,如果沒有完成,則get設定的值,如果已完成,則get任務返回值 // boolean completeFlag = cft01.complete("given...value"); // if (completeFlag){ // System.out.println(cft01.get()); // } else { // System.out.println(cft01.get()); // } // 3、開啟新CompletionStage,重新獲取線程執行任務 cft01.whenCompleteAsync((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex),executor); } }
public class Completable03 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "Res...OK"; },executor); // 1、阻塞直到獲取結果 // System.out.println(cft01.get()); // 2、設定超時的阻塞獲取結果 // System.out.println(cft01.get(4, TimeUnit.SECONDS)); // 3、非阻塞獲取結果,如果任務已經完成,則返回結果,如果任務未完成,返回給定的值 // System.out.println(cft01.getNow("given...value")); // 4、get獲取拋檢查異常,join獲取非檢查異常 System.out.println(cft01.join()); } }
public class Completable04 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("OK-1"); return "OK"; },executor); // 1、cft01任務執行完成后,執行之后的任務,此處不關注cft01的結果 // cft01.thenRun(() -> System.out.println("task...run")) ; // 2、cft01任務執行完成后,執行之后的任務,可以獲取cft01的結果 // cft01.thenAccept((res) -> { // System.out.println("cft01:"+res); // System.out.println("task...run"); // }); // 3、cft01任務執行完成后,執行之后的任務,獲取cft01的結果,并且具有返回值 // CompletableFuture<Integer> cft02 = cft01.thenApply((res) -> { // System.out.println("cft01:"+res); // return 99 ; // }); // System.out.println(cft02.get()); // 4、順序執行cft01、cft02 // CompletableFuture<String> cft02 = cft01.thenCompose((res) -> CompletableFuture.supplyAsync(() -> { // System.out.println("cft01:"+res); // return "OK-2"; // })); // cft02.whenComplete((res,ex) -> System.out.println("Result:"+res+";Exe:"+ex)); // 5、對比任務的執行效率,由于cft02先完成,所以取cft02的結果 // CompletableFuture<String> cft02 = cft01.applyToEither(CompletableFuture.supplyAsync(() -> { // System.out.println("run...cft02"); // try { // Thread.sleep(3000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // return "OK-2"; // }),(res) -> { // System.out.println("either...result:" + res); // return res; // }); // System.out.println("finally...result:" + cft02.get()); // 6、兩組任務執行完成后,對結果進行合并 // CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> "OK-2") ; // String finallyRes = cft01.thenCombine(cft02,(res1,res2) -> { // System.out.println("res1:"+res1+";res2:"+res2); // return res1+";"+res2 ; // }).get(); // System.out.println(finallyRes); CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> { System.out.println("OK-2"); return "OK-2"; }) ; CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> { System.out.println("OK-3"); return "OK-3"; }) ; // 7、等待批量任務執行完返回 // CompletableFuture.allOf(cft01,cft02,cft03).get(); // 8、任意一個任務執行完即返回 System.out.println("Sign:"+CompletableFuture.anyOf(cft01,cft02,cft03).get()); } }
public class Completable05 { public static void main(String[] args) throws Exception { // 線程池 ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> { if (1 > 0){ throw new RuntimeException("task...exception"); } return "OK"; },executor); // 1、捕獲cft01的異常信息,并提供返回值 String finallyRes = cft01.thenApply((res) -> { System.out.println("cft01-res:" + res); return res; }).exceptionally((ex) -> { System.out.println("cft01-exe:" + ex.getMessage()); return "error" ; }).get(); System.out.println("finallyRes="+finallyRes); CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-2"; },executor); // 2、如果cft02未完成,則get時拋出指定異常信息 boolean exeFlag = cft02.completeExceptionally(new RuntimeException("given...exception")); if (exeFlag){ System.out.println(cft02.get()); } else { System.out.println(cft02.get()); } } }
在實踐中,通常不使用ForkJoinPool#commonPool()
公共線程池,會出現線程競爭問題,從而形成系統瓶頸;
在任務編排中,如果出現依賴情況或者父子任務,盡量使用多個線程池,從而避免任務請求同一個線程池,規避死鎖情況發生;
在分析「CompletableFuture」其原理之前,首先看一下涉及的核心結構;
【CompletableFuture】
在該類中有兩個關鍵的字段:「result」存儲當前CF的結果,「stack」代表棧頂元素,即當前CF計算完成后會觸發的依賴動作;從上面案例中可知,依賴動作可以沒有或者有多個;
【Completion】
依賴動作的封裝類;
【UniCompletion】
繼承Completion類,一元依賴的基礎類,「executor」指線程池,「dep」指依賴的計算,「src」指源動作;
【BiCompletion】
繼承UniCompletion類,二元或者多元依賴的基礎類,「snd」指第二個源動作;
顧名思義,即各個CF之間不產生依賴關系;
public class DepZero { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(()-> "OK-1",executor); CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(()-> "OK-2",executor); System.out.println(cft1.get()+";"+cft2.get()); } }
即CF之間的單個依賴關系;這里使用「thenApply」方法演示,為了看到效果,使「cft1」長時間休眠,斷點查看「stack」結構;
public class DepOne { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-1"; },executor); CompletableFuture<String> cft2 = cft1.thenApply(res -> { System.out.println("cft01-res"+res); return "OK-2" ; }); System.out.println("cft02-res"+cft2.get()); } }
斷點截圖:
原理分析:
觀察者Completion注冊到「cft1」,注冊時會檢查計算是否完成,未完成則觀察者入棧,當「cft1」計算完成會彈棧;已完成則直接觸發觀察者;
可以調整斷點代碼,讓「cft1」先處于完成狀態,再查看其運行時結構,從而分析完整的邏輯;
即一個CF同時依賴兩個CF;這里使用「thenCombine」方法演示;為了看到效果,使「cft1、cft2」長時間休眠,斷點查看「stack」結構;
public class DepTwo { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-1"; },executor); CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-2"; },executor); // cft3 依賴 cft1和cft2 的計算結果 CompletableFuture<String> cft3 = cft1.thenCombine(cft2,(res1,res2) -> { System.out.println("cft01-res:"+res1); System.out.println("cft02-res:"+res2); return "OK-3" ; }); System.out.println("cft03-res:"+cft3.get()); } }
斷點截圖:
原理分析:
在「cft1」和「cft2」未完成的狀態下,嘗試將BiApply壓入「cft1」和「cft2」兩個棧中,任意CF完成時,會嘗試觸發觀察者,觀察者檢查「cft1」和「cft2」是否都完成,如果完成則執行;
即一個CF同時依賴多個CF;這里使用「allOf」方法演示;為了看到效果,使「cft1、cft2、cft3」長時間休眠,斷點查看「stack」結構;
public class DepMore { public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(3); CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-1"; },executor); CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-2"; },executor); CompletableFuture<String> cft3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } return "OK-3"; },executor); // cft4 依賴 cft1和cft2和cft3 的計算結果 CompletableFuture<Void> cft4 = CompletableFuture.allOf(cft1,cft2,cft3); CompletableFuture<String> finallyRes = cft4.thenApply(tm -> { System.out.println("cft01-res:"+cft1.join()); System.out.println("cft02-res:"+cft2.join()); System.out.println("cft03-res:"+cft3.join()); return "OK-4"; }); System.out.println("finally-res:"+finallyRes.get()); } }
斷點截圖:
原理分析:
多元依賴的回調方法除了「allOf」還有「anyOf」,其實現原理都是將依賴的多個CF補全為平衡二叉樹,從斷點圖可知會按照樹的層級處理,核心結構參考二元依賴即可。
到此,相信大家對“Java中的Future接口怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。