91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Netty分布式pipeline管道傳播outBound事件的示例分析

發布時間:2022-03-28 13:56:23 來源:億速云 閱讀:166 作者:小新 欄目:開發技術

這篇文章將為大家詳細講解有關Netty分布式pipeline管道傳播outBound事件的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

outbound事件傳輸流程

在我們業務代碼中, 有可能使用wirte方法往寫數據:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().write("test data");
}

當然, 直接調用write方法是不能往對方channel中寫入數據的, 因為這種方式只能寫入到緩沖區, 還要調用flush方法才能將緩沖區數據刷到channel中, 或者直接調用writeAndFlush方法, 有關邏輯, 我們會在后面章節中詳細講解, 這里只是以wirte方法為例為了演示outbound事件的傳播的流程

這里我們同樣給出兩種寫法

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫法1
    ctx.channel().write("test data");
    //寫法2
    ctx.write("test data");
}

這兩種寫法有什么區別, 我們首先跟到第一種寫法中去:

ctx.channel().write("test data");

這里獲取ctx所綁定的channel

我們跟到AbstractChannel的write方法中:

public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

這里pipeline是DefaultChannelPipeline

跟到其write方法中:
public final ChannelFuture write(Object msg) {
    //從tail節點開始(從最后的節點往前寫)
    return tail.write(msg);
}

這里調用tail節點write方法, 這里我們應該能分析到, outbound事件, 是通過tail節點開始往上傳播的, 帶著這點猜想, 我們繼往下看

其實tail節點并沒有重寫write方法, 最終會調用其父類AbstractChannelHandlerContext的write方法

AbstractChannelHandlerContext的write方法:

public ChannelFuture write(Object msg) { 
    return write(msg, newPromise());
}

我們看到這里有個newPromise()這個方法, 這里是創建一個Promise對象, 有關Promise的相關知識我們會在以后的章節剖析

我們繼續跟write:

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    //代碼省略
    write(msg, false, promise);
    return promise;
}

繼續跟write:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒有調flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

這里跟我們上一小節剖析過channelRead方法有點類似, 但是事件傳輸的方向有所不同, 這里findContextOutbound()是獲取上一個標注outbound事件的HandlerContext

跟到findContextOutbound中
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

這里的邏輯我們似曾相識, 跟我們上一小節的findContextInbound()方法有點像, 只是過程是反過來的

在這里, 會找到當前context的上一個節點, 如果標注的事件不是outbound事件, 則繼續往上找, 意思就是找到上一個標注outbound事件的節點

回到write方法:
AbstractChannelHandlerContext next = findContextOutbound();

這里將找到節點賦值到next屬性中

因為我們之前分析的write事件是從tail節點傳播的, 所以上一個節點就有可能是用戶自定的handler所屬的context

然后判斷是否為當前eventLoop線程, 如果是不是, 則封裝成task異步執行, 如果不是, 則繼續判斷是否調用了flush方法, 因為我們這里沒有調用, 所以會執行到next.invokeWrite(m, promise),

我們繼續跟invokeWrite

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

這里會判斷當前handler的狀態是否是添加狀態, 這里返回的是true, 將會走到invokeWrite0(msg, promise)這一步

繼續跟invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        //調用當前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

這里的邏輯也似曾相識, 調用了當前節點包裝的handler的write方法, 如果用戶沒有重寫write方法, 則會交給其父類處理

我們跟到ChannelOutboundHandlerAdapter的write方法中看:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

這里調用了當前ctx的write方法, 這種寫法和我們小節開始的寫法是相同的, 我們回顧一下:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫法1
    ctx.channel().write("test data");
    //寫法2
    ctx.write("test data");
}

我們跟到其write方法中, 這里走到的是AbstractChannelHandlerContext類的write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒有調flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

又是我們所熟悉邏輯, 找到當前節點的上一個標注事件為outbound事件的節點, 繼續執行invokeWrite方法, 根據之前的剖析, 我們知道最終會執行到上一個handler的write方法中

走到這里已經不難理解, ctx.channel().write("test data")其實是從tail節點開始傳播寫事件, 而ctx.write("test data")是從自身開始傳播寫事件

所以, 在handler中如果重寫了write方法要傳遞write事件, 一定采用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")這種方式, 因為會造成每次事件傳輸到這里都會從tail節點重新傳輸, 導致不可預知的錯誤

如果用代碼中沒有重寫handler的write方法, 則事件會一直往上傳輸, 當傳輸完所有的outbound節點之后, 最后會走到head節點的wirte方法中

我們跟到HeadContext的write方法中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

我們看到write事件最終會流向這里, 通過unsafe對象進行最終的寫操作

有關inbound事件和outbound事件的傳輸, 可通過下圖進行說明:

Netty分布式pipeline管道傳播outBound事件的示例分析

關于“Netty分布式pipeline管道傳播outBound事件的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

神木县| 马关县| 清原| 赣榆县| 新兴县| 古蔺县| 衡南县| 霍邱县| 新乡县| 丰镇市| 水城县| 遂昌县| 松潘县| 台东市| 仪陇县| 闻喜县| 南召县| 遂川县| 师宗县| 曲麻莱县| 昆山市| 新泰市| 胶州市| 驻马店市| 台江县| 龙里县| 沙洋县| 崇义县| 鹤峰县| 七台河市| 尚志市| 和田市| 弥勒县| 浮山县| 景泰县| 泊头市| 新津县| 黎城县| 商城县| 金川县| 右玉县|