您好,登錄后才能下訂單哦!
這篇文章主要介紹“java9新特性Reactive Stream響應式編程API怎么用”,在日常操作中,相信很多人在java9新特性Reactive Stream響應式編程API怎么用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”java9新特性Reactive Stream響應式編程API怎么用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Java 9提供了一組定義響應式流編程的接口。所有這些接口都作為靜態內部接口定義在java.util.concurrent.Flow
類里面。
下面是Java 響應式編程中的一些重要角色和概念,先簡單理解一下
發布者(Publisher)是潛在的無限數量的有序數據元素的生產者。 它根據收到的需求(subscription)向當前訂閱者發布一定數量的數據元素。
訂閱者(Subscriber)從發布者那里訂閱并接收數據元素。與發布者建立訂閱關系后,發布者向訂閱者發送訂閱令牌(subscription),訂閱者可以根據自己的處理能力請求發布者發布數據元素的數量。
訂閱令牌(subscription)表示訂閱者與發布者之間建立的訂閱關系。 當建立訂閱關系后,發布者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與發布者進行交互,例如請求數據元素的數量或取消訂閱。
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
onSubscribe
:在發布者接受訂閱者的訂閱動作之后,發布任何的訂閱消息之前被調用。新創建的Subscription
訂閱令牌對象通過此方法傳遞給訂閱者。
onNext
:下一個待處理的數據項的處理函數
onError
:在發布者或訂閱遇到不可恢復的錯誤時調用
onComplete
:當沒有訂閱者調用(包括onNext()方法)發生時調用。
訂閱令牌對象通過Subscriber.onSubscribe()
方法傳遞
public static interface Subscription { public void request(long n); public void cancel();}
request(long n)
是無阻塞背壓概念背后的關鍵方法。訂閱者使用它來請求n個以上的消費項目。這樣,訂閱者控制了它當前能夠接收多少個數據。cancel()
由訂閱者主動來取消其訂閱,取消后將不會在接收到任何數據消息。
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
調用該方法,建立訂閱者Subscriber與發布者Publisher之間的消息訂閱關系。
處理者Processor 可以同時充當訂閱者和發布者,起到轉換發布者——訂閱者管道中的元素的作用。用于將發布者T類型的數據元素,接收并轉換為類型R的數據并發布。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
現在我們要去實現上面的四個接口來完成響應式編程
Subscription Interface
訂閱令牌接口通常不需要我們自己編程去實現,我們只需要在知道request()方法和cancle()方法含義即可。
Publisher Interface
發布者接口,Java 9 已經默認為我們提供了實現SubmissionPublisher,該實現類除了實現Publisher接口的方法外,提供了一個方法叫做submit()來完成消息數據的發送。
Subscriber Interface
訂閱者接口,通常需要我們自己去實現。因為在數據訂閱接收之后,不同的業務有不同的處理邏輯。
Processor
實際上是 Publisher Interface和Subscriber Interface的集合體,有需要數據類型轉換及數據處理的需求才去實現這個接口
下面的例子實現的式字符串的數據消息訂閱處理
import java.util.concurrent.Flow; public class MySubscriber implements Flow.Subscriber<String> { private Flow.Subscription subscription; //訂閱令牌 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("訂閱關系建立onSubscribe: " + subscription); this.subscription = subscription; subscription.request(2); } @Override public void onNext(String item) { System.out.println("item: " + item); // 一個消息處理完成之后,可以繼續調用subscription.request(n);向發布者要求數據發送 //subscription.request(n); } @Override public void onError(Throwable throwable) { System.out.println("onError: " + throwable); } @Override public void onComplete() { System.out.println("onComplete"); } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; public class SubmissionPublisherExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); sb.subscribe(new MySubscriber()); //建立訂閱關系,可以有多個訂閱者 sb.submit("數據 1"); //發送消息1 sb.submit("數據 2"); //發送消息2 sb.submit("數據 3"); //發送消息3 executor.shutdown(); } }
控制臺打印輸出結果
訂閱關系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 數據 1
item: 數據 2
請注意:即使發布者submit了3條數據,MySubscriber也僅收到了2條數據進行了處理。是因為我們在MySubscriber#onSubscribe()
方法中使用了subscription.request(2);
。這就是“背壓”的響應式編程效果,我有能力處理多少數據,就會通知消息發布者給多少數據。
到此,關于“java9新特性Reactive Stream響應式編程API怎么用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。