您好,登錄后才能下訂單哦!
本篇文章為大家展示了WebFlux定點推送以及全推送靈活websocket運用是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
WebFlux 本身提供了對 WebSocket 協議的支持,處理 WebSocket 請求需要對應的 handler 實現 WebSocketHandler 接口,每一個 WebSocket 都有一個關聯的 WebSocketSession,包含了建立請求時的握手信息 HandshakeInfo
,以及其它相關的信息。可以通過 session 的 receive()
方法來接收客戶端的數據,通過 session 的 send()
方法向客戶端發送數據。
下面是一個簡單的 WebSocketHandler 示例:
@Component public class EchoHandler implements WebSocketHandler { public Mono<Void> handle(WebSocketSession session) { return session.send( session.receive().map( msg -> session.textMessage("ECHO -> " + msg.getPayloadAsText()))); } }
有了 handler 之后,還需要讓 WebFlux 知道哪些請求需要交給這個 handler 進行處理,因此要創建相應的 HandlerMapping。
在處理 HTTP 請求時,我們經常使用 WebFlux 中最簡單的 handler 定義方式,即通過注解 @RequestMapping
將某個方法定義為處理特定路徑請求的 handler。 但是這個注解是用于處理 HTTP 請求的,對于 WebSocket 請求而言,收到請求后還需要協議升級的過程,之后才是 handler 的執行,所以我們不能直接通過該注解定義請求映射,不過可以使用 SimpleUrlHandlerMapping 來添加映射。
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping(EchoHandler echoHandler) { final Map<String, WebSocketHandler> map = new HashMap<>(1); map.put("/echo", echoHandler); final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setOrder(Ordered.HIGHEST_PRECEDENCE); mapping.setUrlMap(map); return mapping; } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
這樣就能夠將發往 /echo
的 WebSocket 請求交給 EchoHandler 處理。
我們還要為 WebSocket 類型的 handler 創建對應的 WebSocketHandlerAdapter,以便讓 DispatcherHandler 能夠調用我們的 WebSocketHandler。
完成這三個步驟后,當一個 WebSocket 請求到達 WebFlux 時,首先由 DispatcherHandler 進行處理,它會根據已有的 HandlerMapping 找到這個 WebSocket 請求對應的 handler,接著發現該 handler 實現了 WebSocketHandler 接口,于是會通過 WebSocketHandlerAdapter 來完成該 handler 的調用。
從上面的例子不難看出,沒接收一個請求后,就得在里面里面返回消息,后面就不能再給他發消息了。其次是我每次新添加或者刪除一個消息的處理類Handler,就得每次去修改配置文件中的SimpleUrlHandlerMapping的UrlMap的內容,感覺不是很友好。于是針對這2點進行修改和調整如下:
我們能否像注冊 HTTP 請求的 Handler 那樣,也通過類似 RequestMapping 的注解來注冊 Handler 呢?
雖然官方沒有相關實現,但我們可以自己實現一個類似的注解,不妨叫作 WebSocketMapping
:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface WebSocketMapping { String value() default ""; }
@Retention(RetentionPolicy.RUNTIME)
表明該注解工作在運行期間,@Target(ElementType.TYPE)
表明該注解作用在類上。
我們先看下該注解最終的使用方式。下面是一個 TimeHandler 的示例,它會每秒鐘會向客戶端發送一次時間。我們通過注解 @WebSocketMapping("/time")
完成了 TimeHandler 的注冊,告訴 WebFlux 當有 WebSocket 請求發往 /echo
路徑時,就交給 EchoHandler 處理:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Override public Mono<Void> handle(final WebSocketSession session) { return session.send( session.receive() .map(msg -> session.textMessage( "服務端返回:小明, -> " + msg.getPayloadAsText()))); } }
是不是和 RequestMapping 一樣方便?
到目前為止,這個注解還沒有實際的功能,還不能自動注冊 handler。回顧我們上面注冊路由的方式,我們創建了一個 SimpleUrlHandlerMapping,并手動添加了 EchoHandler 的映射規則,然后將其作為 HandlerMapping 的 Bean 返回。
現在我們要創建一個專門的 HandlerMapping 類來處理 WebSocketMapping 注解,自動完成 handler 的注冊:
public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping{ private Map<String, WebSocketHandler> handlerMap = new LinkedHashMap<>(); /** * Register WebSocket handlers annotated by @WebSocketMapping * @throws BeansException */ @Override public void initApplicationContext() throws BeansException { Map<String, Object> beanMap = obtainApplicationContext() .getBeansWithAnnotation(WebSocketMapping.class); beanMap.values().forEach(bean -> { if (!(bean instanceof WebSocketHandler)) { throw new RuntimeException( String.format("Controller [%s] doesn't implement WebSocketHandler interface.", bean.getClass().getName())); } WebSocketMapping annotation = AnnotationUtils.getAnnotation( bean.getClass(), WebSocketMapping.class); //webSocketMapping 映射到管理中 handlerMap.put(Objects.requireNonNull(annotation).value(),(WebSocketHandler) bean); }); super.setOrder(Ordered.HIGHEST_PRECEDENCE); super.setUrlMap(handlerMap); super.initApplicationContext(); } }
我們的 WebSocketMappingHandlerMapping 類,實際上就是 SimpleUrlHandlerMapping,只不過增加了一些初始化的操作。
initApplicationContext()
方法是 Spring 中 ApplicationObjectSupport 類的方法,用于自定義類的初始化行為,在我們的 WebSocketMappingHandlerMapping 中,初始化工作主要是收集使用了 @WebSocketMapping
注解并且實現來 WebSocketHandler
接口的 Component,然后將它們注冊到內部的 SimpleUrlHandlerMapping 中。之后的路由工作都是由父類 SimpleUrlHandlerMapping 已實現的功能來完成。
現在,我們只需要返回 WebSocketMappingHandlerMapping 的 Bean,就能自動處理 @WebSocketMapping
注解了:
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping() { return new WebSocketMappingHandlerMapping(); } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
我們來看下基于 Reactor Netty 的 WebFlux 具體是如何處理 WebSocket 請求的。
前面說過,WebSocket 請求進入 WebFlux 后,首先會從 HandlerMapping 中找到對應的 WebSocketHandler,再由 WebSocketHandlerAdapter 進行實際的調用。這就不再多做闡述,有興趣的朋友可以去看看WebSocketHandler,WebSocketHandlerAdapter。
我們知道 HTTP 協議是半雙工通信,雖然客戶端和服務器都能給對方發數據,但是同一時間內只會由一方向另一方發送數據,并且在順序上是客戶端先發送請求,然后才由服務器返回響應數據。所以服務器處理 HTTP 的邏輯很簡單,就是每接收到一個客戶端請求,就返回一個響應。
而 WebSocket 是全雙工通信,客戶端和服務器可以隨時向另一方發送數據,所以不再是"發送請求、返回響應"的通信方式了。我們上面的 EchoHandler 示例用的仍舊是這一方式,即收到數據后再針對性地返回一條數據,我們下面就來看看如何充分利用 WebSocket 的雙向通信。
WebSocket 的處理,主要是通過 session 完成對兩個數據流的操作,一個是客戶端發給服務器的數據流,一個是服務器發給客戶端的數據流:
WebSocketSession 方法 | 描述 |
---|---|
Flux<WebSocketMessage> receive() | 接收來自客戶端的數據流,當連接關閉時數據流結束。 |
Mono<Void> send(Publisher<WebSocketMessage>) | 向客戶端發送數據流,當數據流結束時,往客戶端的寫操作也會隨之結束,此時返回的 Mono<Void> 會發出一個完成信號。 |
在 WebSocketHandler 中,最后應該將兩個數據流的處理結果整合成一個信號流,并返回一個 Mono<Void>
用于表明處理是否結束。
我們分別為兩個流定義處理的邏輯:
對于輸出流:服務器每秒向客戶端發送一個數字;
對于輸入流:每當收到客戶端消息時,就打印到標準輸出
Mono<Void> input = session.receive() .map(WebSocketMessage::getPayloadAsText) .map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink))));
這兩個處理邏輯互相獨立,它們之間沒有先后關系,操作執行完之后都是返回一個 Mono<Void>
,但是如何將這兩個操作的結果整合成一個信號流返回給 WebFlux 呢?我們可以使用 WebFlux 中的 Mono.zip()
方法:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @Override public Mono<Void> handle(WebSocketSession session) { Mono<Void> input = session.receive() .map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink)))); /** * Mono.zip() 會將多個 Mono 合并為一個新的 Mono, * 任何一個 Mono 產生 error 或 complete 都會導致合并后的 Mono * 也隨之產生 error 或 complete,此時其它的 Mono 則會被執行取消操作。 */ return Mono.zip(input, output).then(); } }
這里所說的從外部發送數據,指的是需要在 WebSocketHandler 的代碼范圍之外,在其它地方通過代碼調用的方式向 WebSocket 連接發送數據。
思路:在定義 session 的 send()
操作時,通過編程的方式創建 Flux,即使用 Flux.create()
方法創建,將發布 Flux 數據的 FluxSink
暴露出來,并進行保存,然后在需要發送數據的地方,調用 FluxSink<T>
的 next(T data)
方法,向 Flux 的訂閱者發布數據。
create 方法是以編程方式創建 Flux 的高級形式,它允許每次產生多個數據,并且可以由多個線程產生。
create 方法將內部的 FluxSink 暴露出來,FluxSink 提供了 next、error、complete 方法。通過 create 方法,可以將響應式堆棧中的 API 與其它 API 進行連接。
考慮這么一個場景:服務器與客戶端 A 建立 WebSocket 連接后,允許客戶端 B 通過 HTTP 向客戶端 A 發送數據。
不考慮安全性、魯棒性等問題,我們給出一個簡單的示例。
首先是 WebSocketHandler 的實現,客戶端發送 WebSocket 建立請求時,需要在 query 參數中為當前連接指定一個 id,服務器會以該 id 為鍵,以對應的 WebSocketSender 為值存放到 senderMap 中:
@Component @WebSocketMapping("/echo") public class EchoHandler implements WebSocketHandler { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @Override public Mono<Void> handle(WebSocketSession session) { // TODO Auto-generated method stub HandshakeInfo handshakeInfo = session.getHandshakeInfo(); Map<String, String> queryMap = getQueryMap(handshakeInfo.getUri().getQuery()); String id = queryMap.getOrDefault("id", "defaultId"); Mono<Void> input = session.receive().map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg) .doOnNext(System.out::println).then(); Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink)))); /** * Mono.zip() 會將多個 Mono 合并為一個新的 Mono,任何一個 Mono 產生 error 或 complete 都會導致合并后的 Mono * 也隨之產生 error 或 complete,此時其它的 Mono 則會被執行取消操作。 */ return Mono.zip(input, output).then(); } //用于獲取url參數 private Map<String, String> getQueryMap(String queryStr) { Map<String, String> queryMap = new HashMap<>(); if (!StringUtils.isEmpty(queryStr)) { String[] queryParam = queryStr.split("&"); Arrays.stream(queryParam).forEach(s -> { String[] kv = s.split("=", 2); String value = kv.length == 2 ? kv[1] : ""; queryMap.put(kv[0], value); }); } return queryMap; } }
其中,senderMap
是我們自己定義的 Bean,在配置文件中定義:
@Configuration public class WebSocketConfiguration { @Bean public HandlerMapping webSocketMapping() { return new WebSocketMappingHandlerMapping(); } @Bean public ConcurrentHashMap<String, WebSocketSender> senderMap() { return new ConcurrentHashMap<String, WebSocketSender>(); } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
WebSocketSender
是我們自己創建的類,目的是保存 WebSocket 連接的 session 以及對應的 FluxSink,以便在 WebSocketHandler 代碼范圍外發送數據:
public class WebSocketSender { private WebSocketSession session; private FluxSink<WebSocketMessage> sink; public WebSocketSender(WebSocketSession session, FluxSink<WebSocketMessage> sink) { this.session = session; this.sink = sink; } public void sendData(String data) { sink.next(session.textMessage(data)); } }
接著我們來實現 HTTP Controller,用戶在發起 HTTP 請求時,通過 query 參數指定要通信的 WebSocket 連接 id,以及要發送的數據,然后從 senderMap 中取出對應的 WebSocketSender,調用其 send()
方法向客戶端發送數據:
@RestController @RequestMapping("/msg") public class MsgController { @Autowired private ConcurrentHashMap<String, WebSocketSender> senderMap; @RequestMapping("/send") public String sendMessage(@RequestParam String id, @RequestParam String data) { WebSocketSender sender = senderMap.get(id); if (sender != null) { sender.sendData(data); return String.format("Message '%s' sent to connection: %s.", data, id); } else { return String.format("Connection of id '%s' doesn't exist", id); } } }
我這就不再寫頁面了,直接就用https://www.websocket.org/echo.html進行測試了,結果如下:
這樣就算完成了定點推送了,全推送,和部分推送就不再寫了,只要從ConcurrentHashMap中取出來去發送就是了。
上述內容就是WebFlux定點推送以及全推送靈活websocket運用是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。