您好,登錄后才能下訂單哦!
Reactive-MongoDB如何異步Java Driver,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
一、關于 異步驅動
從3.0 版本開始,MongoDB 開始提供異步方式的驅動(Java Async Driver),這為應用提供了一種更高性能的選擇。
但實質上,使用同步驅動(Java Sync Driver)的項目也不在少數,或許是因為先入為主的原因(同步Driver的文檔說明更加的完善),又或者是為了兼容舊的 MongoDB 版本。
無論如何,由于 Reactive 的發展,未來使用異步驅動應該是一個趨勢。
在使用 Async Driver 之前,需要對 Reactive 的概念有一些熟悉。
二、理解 Reactive (響應式)
響應式(Reactive)是一種異步的、面向數據流的開發方式,最早是來自于.NET 平臺上的 Reactive Extensions 庫,隨后被擴展為各種編程語言的實現。
在著名的 Reactive Manifesto(響應式宣言) 中,對 Reactive 定義了四個特征:
及時響應(Responsive):系統能及時的響應請求。
有韌性(Resilient):系統在出現異常時仍然可以響應,即支持容錯。
有彈性(Elastic):在不同的負載下,系統可彈性伸縮來保證運行。
消息驅動(Message Driven):不同組件之間使用異步消息傳遞來進行交互,并確保松耦合及相互隔離。
在響應式宣言的所定義的這些系統特征中,無一不與響應式的流有若干的關系,于是乎就有了 2013年發起的 響應式流規范(Reactive Stream Specification)。
https://www.reactive-streams.org/
其中,對于響應式流的處理環節又做了如下定義:
具有處理無限數量的元素的能力,即允許流永不結束
按序處理
異步地傳遞元素
實現非阻塞的負壓(back-pressure)
Java 平臺則是在 JDK 9 版本上發布了對 Reactive Streams 的支持。
下面介紹響應式流的幾個關鍵接口:
Publisher
Publisher 是數據的發布者。Publisher 接口只有一個方法 subscribe,用于添加數據的訂閱者,也就是 Subscriber。
Subscriber
Subscriber 是數據的訂閱者。Subscriber 接口有4個方法,都是作為不同事件的處理器。在訂閱者成功訂閱到發布者之后,其 onSubscribe(Subscription s) 方法會被調用。
Subscription 表示的是當前的訂閱關系。
當訂閱成功后,可以使用 Subscription 的 request(long n) 方法來請求發布者發布 n 條數據。發布者可能產生3種不同的消息通知,分別對應 Subscriber 的另外3個回調方法。
數據通知:對應 onNext 方法,表示發布者產生的數據。
錯誤通知:對應 onError 方法,表示發布者產生了錯誤。
結束通知:對應 onComplete 方法,表示發布者已經完成了所有數據的發布。
在上述3種通知中,錯誤通知和結束通知都是終結通知,也就是在終結通知之后,不會再有其他通知產生。
Subscription
Subscription 表示的是一個訂閱關系。除了之前提到的 request 方法之外,還有 cancel 方法用來取消訂閱。需要注意的是,在 cancel 方法調用之后,發布者仍然有可能繼續發布通知。但訂閱最終會被取消。
這幾個接口的關系如下圖所示:
圖片出處:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html
MongoDB 的異步驅動為 mongo-java-driver-reactivestreams 組件,其實現了 Reactive Stream 的上述接口。
> 除了 reactivestream 之外,MongoDB 的異步驅動還包含 RxJava 等風格的版本,有興趣的讀者可以進一步了解
http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/
三、使用示例
接下來,通過一個簡單的例子來演示一下 Reactive 方式的代碼風格:
A. 引入依賴
org.mongodb mongodb-driver-reactivestreams 1.11.0
> 引入mongodb-driver-reactivestreams 將會自動添加 reactive-streams, bson, mongodb-driver-async組件
B. 連接數據庫
//服務器實例表List servers =newArrayList(); servers.add(newServerAddress("localhost",27018));//配置構建器MongoClientSettings.Builder settingsBuilder =MongoClientSettings.builder();//傳入服務器實例 settingsBuilder.applyToClusterSettings( builder -> builder.hosts(servers));//構建 Client 實例MongoClient mongoClient =MongoClients.create(settingsBuilder.build());
C. 實現文檔查詢
//獲得數據庫對象MongoDatabase database = client.getDatabase(databaseName);//獲得集合MongoCollection collection = database.getCollection(collectionName);//異步返回PublisherFindPublisher publisher = collection.find();//訂閱實現 publisher.subscribe(newSubscriber(){ @Override publicvoid onSubscribe(Subscription s){ System.out.println("start..."); //執行請求 s.request(Integer.MAX_VALUE); } @Override publicvoid onNext(Document document){ //獲得文檔 System.out.println("Document:"+ document.toJson()); } @Override publicvoid onError(Throwable t){ System.out.println("error occurs."); } @Override publicvoid onComplete(){ System.out.println("finished."); }});
注意到,與使用同步驅動不同的是,collection.find()方法返回的不是 Cursor,而是一個 FindPublisher對象,這是Publisher接口的一層擴展。
而且,在返回 Publisher 對象時,此時并沒有產生真正的數據庫IO請求。真正發起請求需要通過調用 Subscription.request()方法。
在上面的代碼中,為了讀取由 Publisher 產生的結果,通過自定義一個Subscriber,在onSubscribe 事件觸發時就執行 數據庫的請求,之后分別對 onNext、onError、onComplete進行處理。
盡管這種實現方式是純異步的,但在使用上比較繁瑣。試想如果對于每個數據庫操作都要完成一個Subscriber 邏輯,那么開發的工作量是巨大的。
為了盡可能復用重復的邏輯,可以對Subscriber的邏輯做一層封裝,包含如下功能:
使用 List 容器對請求結果進行緩存
實現阻塞等待結果的方法,可指定超時時間
捕獲異常,在等待結果時拋出
代碼如下:
publicclassObservableSubscriberimplementsSubscriber{ //響應數據 privatefinalList received; //錯誤信息 privatefinalList errors; //等待對象 privatefinalCountDownLatch latch; //訂閱器 privatevolatileSubscription subscription; //是否完成 privatevolatileboolean completed; publicObservableSubscriber(){ this.received =newArrayList(); this.errors =newArrayList(); this.latch =newCountDownLatch(1); } @Override publicvoid onSubscribe(finalSubscription s){ subscription = s; } @Override publicvoid onNext(final T t){ received.add(t); } @Override publicvoid onError(finalThrowable t){ errors.add(t); onComplete(); } @Override publicvoid onComplete(){ completed =true; latch.countDown(); } publicSubscription getSubscription(){ return subscription; } publicList getReceived(){ return received; } publicThrowable getError(){ if(errors.size()>0){ return errors.get(0); } returnnull; } publicboolean isCompleted(){ return completed; } /** * 阻塞一定時間等待結果 * * @param timeout * @param unit * @return * @throws Throwable */ publicListget(finallong timeout,finalTimeUnit unit)throwsThrowable{ return await(timeout, unit).getReceived(); } /** * 一直阻塞等待請求完成 * * @return * @throws Throwable */ publicObservableSubscriber await()throwsThrowable{ return await(Long.MAX_VALUE,TimeUnit.MILLISECONDS); } /** * 阻塞一定時間等待完成 * * @param timeout * @param unit * @return * @throws Throwable */ publicObservableSubscriber await(finallong timeout,finalTimeUnit unit)throwsThrowable{ subscription.request(Integer.MAX_VALUE); if(!latch.await(timeout, unit)){ thrownewMongoTimeoutException("Publisher onComplete timed out"); } if(!errors.isEmpty()){ throw errors.get(0); } returnthis; }}
借助這個基礎的工具類,我們對于文檔的異步操作就變得簡單多了。
比如對于文檔查詢的操作可以改造如下:
ObservableSubscriber subscriber =newObservableSubscriber(); collection.find().subscribe(subscriber);//結果處理 subscriber.get(15,TimeUnit.SECONDS).forEach( d ->{ System.out.println("Document:"+ d.toJson());});
當然,這個例子還有可以繼續完善,比如使用 List 作為緩存,則要考慮數據量的問題,避免將全部(或超量) 的文檔一次性轉入內存。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。