您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關基于netty的websocket在channelActive觸發時發送數據異常問題分析是怎樣的,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
####事情起因,用netty實現了websocket,在鏈接創建成功后發送一個消息給客戶端,我們選擇在channelActive中發送消息。 可想而知肯定是不行的了 代碼如下
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); // 綁定線程池 sb.group(group, bossGroup) // 指定使用的channel .channel(NioServerSocketChannel.class) // 綁定監聽端口 .localAddress(this.port) .option(ChannelOption.SO_KEEPALIVE, true) // 綁定客戶端連接時候觸發操作 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { log.info("收到新連接"); ch.pipeline().addLast(new LoggingHandler("DEBUG")); ch.pipeline().addLast(new IdleStateHandler(60, 0, 0)); //websocket協議本身是基于http協議的,所以這邊也要使用http解編碼器 ch.pipeline().addLast(new HttpServerCodec()); //以塊的方式來寫的處理器 ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebSocketServerProtocolHandler("/socket", null, true, 65536 * 10)); ch.pipeline().addLast(new MyWebSocketHandler()); } }); // 服務器異步創建綁定 ChannelFuture cf = sb.bind().sync(); log.info(NettyServer.class + " 啟動正在監聽: " + cf.channel().localAddress()); // 關閉服務器通道 cf.channel().closeFuture().sync();
#####排查1:因為沒有任何異常,客戶端沒有收到消息,故先采用wireshark抓包,發現網卡上沒有對應的想發送的消息。 為什么網卡沒有對應的包呢,經過debug發現如果發送的數據類型是WebSocketFrame在最終發送時候異常了具體代碼在HeadContext的write方法中,headcontext是netty的channelpipeline的頭部,最終寫出時都會從pipeline的尾部鏈接到頭部來執行(pipeline為雙向鏈表) 為什么在channelread中能寫信息而在channelActive無法寫信息呢,經過分析發現,channelActive的觸發是在socketchannel第一次注冊的時候發生的具體代碼如下:abstrcatchannel中
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
而抓包時發現觸發此channelactive時,服務端還未返回websocket的協議握手包(websocket協議是在在http協議上衍生的,會先發一個http get請求然后服務端返回一個為websocket協議的包給客戶端)至此問題就真相大白了,在我們添加的WebSocketServerProtocolHandler這個handller中有如下代碼
public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) { // Add the WebSocketHandshakeHandler before this one. ctx.pipeline().addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(), new WebSocketServerProtocolHandshakeHandler(websocketPath, subprotocols, allowExtensions, maxFramePayloadLength, allowMaskMismatch, checkStartsWith)); } if (cp.get(Utf8FrameValidator.class) == null) { // Add the UFT8 checking before this one. ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(), new Utf8FrameValidator()); } }
添加的WebSocketServerProtocolHandshakeHandler中有如下代碼
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final FullHttpRequest req = (FullHttpRequest) msg; if (isNotWebSocketPath(req)) { ctx.fireChannelRead(msg); return; } try { if (!GET.equals(req.method())) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); return; } final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, websocketPath), subprotocols, allowExtensions, maxFramePayloadSize, allowMaskMismatch); final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); handshakeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { ctx.fireExceptionCaught(future.cause()); } else { // Kept for compatibility ctx.fireUserEventTriggered( WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered( new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } } }); WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker); ctx.pipeline().replace(this, "WS403Responder", WebSocketServerProtocolHandler.forbiddenHttpRequestResponder()); } } finally { req.release(); } }
接受握手信息時候會添加兩個handler 為websocket協議信息的編碼和解碼handler
public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) { if (logger.isDebugEnabled()) { logger.debug("{} WebSocket version {} server handshake", channel, version()); } FullHttpResponse response = newHandshakeResponse(req, responseHeaders); ChannelPipeline p = channel.pipeline(); if (p.get(HttpObjectAggregator.class) != null) { p.remove(HttpObjectAggregator.class); } if (p.get(HttpContentCompressor.class) != null) { p.remove(HttpContentCompressor.class); } ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class); final String encoderName; if (ctx == null) { // this means the user use a HttpServerCodec ctx = p.context(HttpServerCodec.class); if (ctx == null) { promise.setFailure( new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline")); return promise; } p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder()); p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder()); encoderName = ctx.name(); } else { p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder()); encoderName = p.context(HttpResponseEncoder.class).name(); p.addBefore(encoderName, "wsencoder", newWebSocketEncoder()); } channel.writeAndFlush(response).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { ChannelPipeline p = future.channel().pipeline(); p.remove(encoderName); promise.setSuccess(); } else { promise.setFailure(future.cause()); } } }); return promise; }
如果想要實現在websocket協議連接成功后發送一個消息給客戶端,我們發現在發送握手成功后觸發了fireUserEventTriggered,去實現userEventTriggered然后判斷evt類型做處理吧
以上就是基于netty的websocket在channelActive觸發時發送數據異常問題分析是怎樣的,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。