您好,登錄后才能下訂單哦!
這篇文章主要介紹了netty pipeline中的inbound和outbound事件怎么傳播的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇netty pipeline中的inbound和outbound事件怎么傳播文章都會有所收獲,下面我們一起來看看吧。
有關于inbound
事件, 在概述中做過簡單的介紹, 就是以自己為基準, 流向自己的事件, 比如最常見的channelRead
事件, 就是對方發來數據流的所觸發的事件, 己方要對這些數據進行處理, 這一小節, 以激活channelRead
為例講解有關inbound
事件的處理流程。
在業務代碼中, 我們自己的handler
往往會通過重寫channelRead
方法來處理對方發來的數據, 那么對方發來的數據是如何走到channelRead
方法中了呢, 也是我們這一小節要剖析的內容。
在業務代碼中, 傳遞channelRead
事件方式是通過fireChannelRead
方法進行傳播的。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這里重寫了channelRead
方法, 并且方法體內繼續通過fireChannelRead
方法進行傳播channelRead
事件, 那么這兩種寫法有什么異同?
我們先以寫法2為例, 將這種寫法進行剖析。
這里首先獲取當前context
的pipeline
對象, 然后通過pipeline
對象調用自身的fireChannelRead
方法進行傳播, 因為默認創建的DefaultChannelpipeline
。
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
這里首先調用的是AbstractChannelHandlerContext
類的靜態方法invokeChannelRead
, 參數傳入head
節點和事件的消息
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這里的m
通常就是我們傳入的msg
, 而next
, 目前是head
節點, 然后再判斷是否為當前eventLoop
線程, 如果不是則將方法包裝成task
交給eventLoop
線程處理
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
首先通過invokeHandler()
判斷當前handler
是否已添加, 如果添加, 則執行當前handler
的chanelRead
方法, 其實這里就明白了, 通過fireChannelRead
方法傳遞事件的過程中, 其實就是找到相關handler
執行其channelRead
方法, 由于我們在這里的handler
就是head
節點, 所以我們跟到HeadContext
的channelRead
方法中
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //向下傳遞channelRead事件 ctx.fireChannelRead(msg); }
在這里我們看到, 這里通過fireChannelRead
方法繼續往下傳遞channelRead
事件, 而這種調用方式, 就是我們剛才分析用戶代碼的第一種調用方式
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這里直接通過context
對象調用fireChannelRead
方法, 那么和使用pipeline
調用有什么區別的, 我會回到HeadConetx
的channelRead
方法, 我們來剖析ctx.fireChannelRead(msg)
這句, 大家就會對這個問題有答案了, 跟到ctx
的fireChannelRead
方法中, 這里會走到AbstractChannelHandlerContext
類中的fireChannelRead
方法中
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
這里我們看到, invokeChannelRead
方法中傳入了一個findContextInbound()
參數, 而這findContextInbound
方法其實就是找到當前Context
的下一個節點
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
這里的邏輯也比較簡單, 是通過一個doWhile
循環, 找到當前handlerContext
的下一個節點, 這里要注意循環的終止條件, while (!ctx.inbound)
表示下一個context
標志的事件不是inbound
的事件, 則循環繼續往下找, 言外之意就是要找到下一個標注inbound
事件的節點
有關事件的標注, 之前已經進行了分析, 如果是用戶定義的handler
, 是通過handler
繼承的接口而定的, 如果tail
或者head
, 那么是在初始化的時候就已經定義好, 這里不再贅述
回到AbstractChannelHandlerContext.fireChannelRead(msg)
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
找到下一個節點后, 繼續調用invokeChannelRead
方法, 傳入下一個和消息對象
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這里的邏輯我們又不陌生了, 因為我們傳入的是當前context
的下一個節點, 所以這里會調用下一個節點invokeChannelRead
方法, 因我們剛才剖析的是head
節點, 所以下一個節點有可能是用戶添加的handler
的包裝類HandlerConext
的對象
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { //發生異常的時候在這里捕獲異常 notifyHandlerException(t); } } else { fireChannelRead(msg); } }
又是我們熟悉的邏輯, 調用了自身handler
的channelRead
方法, 如果是用戶自定義的handler
, 則會走到用戶定義的channelRead()
方法中去, 所以這里就解釋了為什么通過傳遞channelRead
事件, 最終會走到用戶重寫的channelRead
方法中去
同樣, 也解釋了該小節最初提到過的兩種寫法的區別
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
寫法1是通過當前節點往下傳播事件
寫法2是通過頭節點往下傳遞事件
所以, 在handler
中如果要在channelRead
方法中傳遞channelRead
事件, 一定要采用寫法1的方式向下傳遞, 或者交給其父類處理, 如果采用2的寫法則每次事件傳輸到這里都會繼續從head
節點傳輸, 從而陷入死循環或者發生異常
還有一點需要注意, 如果用戶代碼中channelRead
方法, 如果沒有顯示的調用ctx.fireChannelRead(msg)
那么事件則不會再往下傳播, 則事件會在這里終止, 所以如果我們寫業務代碼的時候要考慮有關資源釋放的相關操作
如果ctx.fireChannelRead(msg)
則事件會繼續往下傳播, 如果每一個handler
都向下傳播事件, 當然, 根據我們之前的分析channelRead
事件只會在標識為inbound
事件的HandlerConetext
中傳播, 傳播到最后, 則最終會調用到tail
節點的channelRead
方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { //釋放資源 ReferenceCountUtil.release(msg); } }
這里做了釋放資源的相關的操作
到這里,對于inbound
事件的傳輸流程以及channelRead
方法的執行流程已經分析完畢。
有關于outBound
事件, 和inbound
正好相反,以自己為基準, 流向對方的事件, 比如最常見的wirte
事件
在業務代碼中, , 有可能使用wirte方法往寫數據
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().write("test data"); }
當然, 直接調用write
方法是不能往對方channel
中寫入數據的, 因為這種方式只能寫入到緩沖區, 還要調用flush
方法才能將緩沖區數據刷到channel
中, 或者直接調用writeAndFlush
方法, 有關邏輯, 我們會在后面章節中詳細講解, 這里只是以wirte
方法為例為了演示outbound
事件的傳播的流程
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
這兩種寫法有什么區別, 首先分析第一種寫法
//這里獲取ctx所綁定的channel ctx.channel().write("test data");
public ChannelFuture write(Object msg) { //這里pipeline是DefaultChannelPipeline return pipeline.write(msg); }
繼續跟蹤DefaultChannelPipeline.write(msg)
public final ChannelFuture write(Object msg) { //從tail節點開始(從最后的節點往前寫) return tail.write(msg); }
這里調用tail
節點write
方法, 這里我們應該能分析到, outbound
事件, 是通過tail
節點開始往上傳播的。
其實tail
節點并沒有重寫write
方法, 最終會調用其父類AbstractChannelHandlerContext.write方法
public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
這里有個newPromise()
這個方法, 這里是創建一個Promise
對象, 有關Promise
的相關知識會在以后章節進行分析,繼續分析write
public ChannelFuture write(final Object msg, final ChannelPromise promise) { /** * 省略 * */ write(msg, false, promise); return promise; }
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這里跟我們之前分析過channelRead
方法有點類似, 但是事件傳輸的方向有所不同, 這里findContextOutbound()
是獲取上一個標注outbound
事件的HandlerContext
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
這里的邏輯跟之前的findContextInbound()
方法有點像, 只是過程是反過來的
在這里, 會找到當前context
的上一個節點, 如果標注的事件不是outbound
事件, 則繼續往上找, 意思就是找到上一個標注outbound
事件的節點
回到AbstractChannelHandlerContext.write方法
AbstractChannelHandlerContext next = findContextOutbound();
這里將找到節點賦值到next
屬性中,因為我們之前分析的write
事件是從tail
節點傳播的, 所以上一個節點就有可能是用戶自定的handler
所屬的context
然后判斷是否為當前eventLoop
線程, 如果是不是, 則封裝成task
異步執行, 如果不是, 則繼續判斷是否調用了flush
方法, 因為我們這里沒有調用, 所以會執行到next.invokeWrite(m, promise)
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
這里會判斷當前handler
的狀態是否是添加狀態, 這里返回的是true
, 將會走到invokeWrite0(msg, promise)
這一步
private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調用當前handler的wirte()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
這里的邏輯也似曾相識, 調用了當前節點包裝的handler
的write
方法, 如果用戶沒有重寫write
方法, 則會交給其父類處理
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
這里調用了當前ctx
的write
方法, 這種寫法和我們小節開始的寫法是相同的, 我們回顧一下
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
我們跟到其write
方法中, 這里走到的是AbstractChannelHandlerContext
類的write
方法
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
又是我們所熟悉邏輯, 找到當前節點的上一個標注事件為outbound
事件的節點, 繼續執行invokeWrite
方法, 根據之前的剖析, 我們知道最終會執行到上一個handler
的write
方法中。
走到這里已經不難理解, ctx.channel().write("test data")
其實是從tail
節點開始傳播寫事件, 而ctx.write("test data")
是從自身開始傳播寫事件。
所以, 在handler
中如果重寫了write
方法要傳遞write
事件, 一定采用ctx.write("test data")
這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")
這種方式, 因為會造成每次事件傳輸到這里都會從tail
節點重新傳輸, 導致不可預知的錯誤。
如果用代碼中沒有重寫handler
的write
方法, 則事件會一直往上傳輸, 當傳輸完所有的outbound
節點之后, 最后會走到head
節點的wirte
方法中。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
我們看到write
事件最終會流向這里, 通過unsafe
對象進行最終的寫操作
關于“netty pipeline中的inbound和outbound事件怎么傳播”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“netty pipeline中的inbound和outbound事件怎么傳播”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。