您好,登錄后才能下訂單哦!
本篇內容主要講解“Netty Client啟動流程是怎樣的”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Netty Client啟動流程是怎樣的”吧!
這里用netty-exmaple
中的EchoClient
來作為例子:
public final class EchoClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect(HOST, PORT).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
代碼沒有什么獨特的地方,我們上一篇文章時也梳理過Netty
網絡編程的一些套路,這里就不再贅述了。 (忘記的小朋友可以查看Netty
系列文章中查找~)
上面的客戶端代碼雖然簡單, 但是卻展示了Netty
客戶端初始化時所需的所有內容:
EventLoopGroup
:Netty
服務端或者客戶端,都必須指定EventLoopGroup
,客戶端指定的是NioEventLoopGroup
Bootstrap
: Netty
客戶端啟動類,負責客戶端的啟動和初始化過程
channel()
類型:指定Channel
的類型,因為這里是客戶端,所以使用的是NioSocketChannel
,服務端會使用NioServerSocketChannel
Handler
:設置數據的處理器
bootstrap.connect()
: 客戶端連接netty
服務的方法
我們先從NioEventLoopGroup
開始,一行行代碼解析,先看看其類結構:
上面是大致的類結構,而 EventLoop
又繼承自EventLoopGroup
,所以類的大致結構我們可想而知。這里一些核心邏輯會在MultithreadEventExecutorGroup
中,包含EventLoopGroup
的創建和初始化操作等。
接著從NioEventLoopGroup
構造方法開始看起,一步步往下跟(代碼都只展示重點的部分,省去很多暫時不需要關心的代碼,以下代碼都遵循這個原則):
EventLoopGroup group = new NioEventLoopGroup(); public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
這里通過調用this()
和super()
方法一路往下傳遞,期間會構造一些默認屬性,一直傳遞到MultithreadEventExecutorGroup
類中,接著往西看。
上面構造函數有一個重要的參數傳遞:DEFAULT_EVENT_LOOP_THREADS
,這個值默認是CPU核數 * 2
。
為什么要傳遞這個參數呢?我們之前說過EventLoopGroup
可以理解成一個線程池,MultithreadEventExecutorGroup
有一個線程數組EventExecutor[] children
屬性,而傳遞過來的DEFAULT_EVENT_LOOP_THREADS
就是數組的長度。
先看下MultithreadEventExecutorGroup
中的構造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } // ... 省略 }
這段代碼執行邏輯可以理解為:
通過ThreadPerTaskExecutor
構造一個Executor
執行器,后面會細說,里面包含了線程執行的execute()
方法
接著創建一個EventExecutor
數組對象,大小為傳遞進來的threads
數量,這個所謂的EventExecutor
可以理解為我們的EventLoop
,在這個demo中就是NioEventLoop
對象
最后調用 newChild
方法逐個初始化EventLoopGroup
中的EventLoop
對象
上面只是大概說了下MultithreadEventExecutorGroup
中的構造方法做的事情,后面還會一個個詳細展開,先不用著急,我們先有個整體的認知就好。
再回到MultithreadEventExecutorGroup
中的構造方法入參中,有個EventExecutorChooserFactory
對象,這里面是有個很亮眼的細節設計,通過它我們來洞悉Netty
的良苦用心。
EventExecutorChooserFactory
這個類的作用是用來選擇EventLoop
執行器的,我們知道EventLoopGroup
是一個包含了CPU * 2
個數量的EventLoop
數組對象,那每次選擇EventLoop
來執行任務是選擇數組中的哪一個呢?
我們看一下這個類的具體實現,紅框中
都是需要重點查看的地方:
DefaultEventExecutorChooserFactory
是一個選擇器工廠類,調用里面的next()
方法達到一個輪詢選擇的目的。
數組的長度是length,執行第n次,取數組中的哪個元素就是對length取余
繼續回到代碼的實現,這里的優化就是在于先通過isPowerOfTwo()
方法判斷數組的長度是否為2的n次冪,判斷的方式很巧妙,使用val & -val == val
,這里我不做過多的解釋,網上還有很多判斷2的n次冪的優秀解法,我就不班門弄斧了。(可參考:https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/)
當然我認為這里還有更容易理解的一個算法:x & (x - 1) == 0
大家可以看下面的圖就懂了,這里就不延展了:
BUT!!! 這里為什么要去煞費苦心的判斷數組的長度是2的n次冪?
不知道小伙伴們是否還記得大明湖畔的HashMap
?一般我們要求HashMap
數組的長度需要是2的n次冪,因為在key
值尋找數組位置的方法:(n - 1) & hash
n是數組長度,這里如果數組長度是2的n次冪就可以通過位運算來提升性能,當length
為2的n次冪時下面公式是等價的:
n & (length - 1) <=> n % length
還記得上面說過,數組的長度默認都是CPU * 2
,而一般服務器CPU核心數都是2、4、8、16等等,所以這一個小優化就很實用了,再仔細想想,原來數組長度的初始化也是很講究的。
這里位運算的好處就是效率遠遠高于與運算,Netty
針對于這個小細節都做了優化,真是太棒了。
接著看下ThreadPerTaskExecutor
線程執行器,每次執行任務都會通過它來創建一個線程實體。
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
傳遞進來的threadFactory
為DefaultThreadFactory
,這里面會構造NioEventLoop
線程命名規則為nioEventLoop-1-xxx
,我們就不細看這個了。當線程執行的時候會調用execute()
方法,這里會創建一個FastThreadLocalThread
線程,具體看代碼:
public class DefaultThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); return t; } protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } }
這里通過newThread()
來創建一個線程,然后初始化線程對象數據,最終會調用到Thread.init()
中。
接著繼續看MultithreadEventExecutorGroup
構造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); // .... 省略部分代碼 } }
上面代碼的最后一部分是 newChild
方法, 這個是一個抽象方法, 它的任務是實例化 EventLoop
對象. 我們跟蹤一下它的代碼, 可以發現, 這個方法在 NioEventLoopGroup
類中實現了, 其內容很簡單:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
其實就是實例化一個 NioEventLoop
對象, 然后返回。NioEventLoop
構造函數中會保存provider
和事件輪詢器selector
,在其父類中還會創建一個MpscQueue隊列
,然后保存線程執行器executor
。
再回過頭來想一想,MultithreadEventExecutorGroup
內部維護了一個 EventExecutor[] children
數組, Netty
的 EventLoopGroup
的實現機制其實就建立在 MultithreadEventExecutorGroup
之上。
每當 Netty
需要一個 EventLoop
時, 會調用 next()
方法從EventLoopGroup
數組中獲取一個可用的 EventLoop
對象。其中next
方法的實現是通過NioEventLoopGroup.next()
來完成的,就是用的上面有過講解的通過輪詢算法來計算得出的。
最后總結一下整個 EventLoopGroup
的初始化過程:
EventLoopGroup
(其實是MultithreadEventExecutorGroup
) 內部維護一個類型為 EventExecutor children
數組,數組長度是nThreads
如果我們在實例化 NioEventLoopGroup
時, 如果指定線程池大小, 則 nThreads
就是指定的值, 反之是處理器核心數 * 2
MultithreadEventExecutorGroup
中會調用 newChild
抽象方法來初始化 children
數組
抽象方法 newChild
是在 NioEventLoopGroup
中實現的, 它返回一個 NioEventLoop
實例.
NioEventLoop
屬性:
SelectorProvider provider
屬性: NioEventLoopGroup
構造器中通過 SelectorProvider.provider()
獲取一個 SelectorProvider
Selector selector
屬性: NioEventLoop
構造器中通過調用通過 selector = provider.openSelector()
獲取一個 selector
對象.
在Netty
中,Channel
是對Socket
的抽象,每當Netty
建立一個連接后,都會有一個與其對應的Channel
實例。
我們在開頭的Demo
中,設置了channel(NioSocketChannel.class)
,NioSocketChannel
的類結構如下:
接著分析代碼,當我們調用b.channel()
時實際上會進入AbstractBootstrap.channel()
邏輯,接著看AbstractBootstrap
中代碼:
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
可以看到,這里ReflectiveChannelFactory
其實就是返回我們指定的channelClass:NioSocketChannel
, 然后指定AbstractBootstrap
中的channelFactory = new ReflectiveChannelFactory()
。
到了這一步,我們已經知道NioEventLoopGroup
和channel()
的流程,接著來看看Channel
的 初始化流程,這也是Netty
客戶端啟動的的核心流程之一:
ChannelFuture f = b.connect(HOST, PORT).sync();
接著就開始從b.connect()
為入口一步步往后跟,先看下NioSocketChannel
構造的整體流程:
從connet
往后梳理下整體流程:
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() { Channel channel = channelFactory.newChannel(); init(channel); ChannelFuture regFuture = config().group().register(channel); return regFuture; }
為了更易讀,這里代碼都做了簡化,只保留了一些重要的代碼。
緊接著我們看看channelFactory.newChannel()
做了什么,這里channelFactory
是ReflectiveChannelFactory
,我們在上面的章節分析過:
@Override public T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
這里的clazz
是NioSocketChannel
,同樣是在上面章節講到過,這里是調用NioSocketChannel
的構造函數然后初始化一個Channel
實例。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); } public NioSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } }
這里其實也很簡單,就是創建一個Java NIO SocketChannel
而已,接著看看NioSocketChannel
的父類還做了哪些事情,這里梳理下類的關系:
NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel { protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); ch.configureBlocking(false); } }
這里會調用父類的構造參數,并且傳遞readInterestOp = SelectionKey.OP_READ:
,這里還有一個很重要的點,配置 Java NIO SocketChannel
為非阻塞的,我們之前在NIO
章節的時候講解過,這里也不再贅述。
接著繼續看AbstractChannel
的構造函數:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } }
這里創建一個ChannelId
,創建一個Unsafe
對象,這里的Unsafe
并不是Java中的Unsafe,后面也會講到。然后創建一個ChannelPipeline
,后面也會講到,到了這里,一個完整的NioSocketChannel
就初始化完成了,我們再來總結一下:
Netty
的 SocketChannel
會與 Java
原生的 SocketChannel
綁定在一起;
會注冊 Read
事件;
會為每一個 Channel
分配一個 channelId
;
會為每一個 Channel
創建一個Unsafe
對象;
會為每一個 Channel
分配一個 ChannelPipeline
;
還是回到最上面initAndRegister
方法,我們上面都是在分析里面newChannel
的操作,這個方法是NioSocketChannel
創建的一個流程,接著我們在繼續跟init()
和register()
的過程:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { final ChannelFuture initAndRegister() { Channel channel = channelFactory.newChannel(); init(channel); ChannelFuture regFuture = config().group().register(channel); } }
init()
就是將一些參數options
和attrs
設置到channel
中,我們重點需要看的是register
方法,其調用鏈為:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
這里最后到了unsafe
的register()
方法,最終調用到AbstractNioChannel.doRegister()
:
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } }
javaChannel()
就是Java NIO
中的SocketChannel
,這里是將SocketChannel
注冊到與eventLoop
相關聯的selector
上。
最后我們整理一下服務啟動的整體流程:
initAndRegister()
初始化并注冊什么呢?
channelFactory.newChannel()
通過反射創建一個 NioSocketChannel
將 Java
原生 Channel
綁定到 NettyChannel
中
注冊 Read
事件
為 Channel
分配 id
為 Channel
創建 unsafe
對象
為 Channel
創建 ChannelPipeline
(默認是 head<=>tail
的雙向鏈表)
`init(channel)``
把 Bootstrap
中的配置設置到 Channel
中
register(channel)
把 Channel
綁定到一個 EventLoop
上
把 Java
原生 Channel、Netty
的 Channel、Selector
綁定到 SelectionKey
中
觸發 Register
相關的事件
上面有提到過在初始化Channel
的過程中會創建一個Unsafe
的對象,然后綁定到Channel
上:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
newUnsafe
直接調用到了NioSocketChannel
中的方法:
@Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }
NioSocketChannelUnsafe
是NioSocketChannel
中的一個內部類,然后向上還有幾個父類繼承,這里主要是對應到相關Java
底層的Socket
操作。
我們還是回到pipeline
初始化的過程,來看一下newChannelPipeline()
的具體實現:
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
我們調用 DefaultChannelPipeline
的構造器, 傳入了一個 channel
, 而這個 channel
其實就是我們實例化的 NioSocketChannel
。
DefaultChannelPipeline
會將這個 NioSocketChannel
對象保存在channel
字段中. DefaultChannelPipeline
中, 還有兩個特殊的字段, 即 head
和 tail
, 而這兩個字段是一個雙向鏈表的頭和尾. 其實在 DefaultChannelPipeline
中, 維護了一個以 AbstractChannelHandlerContext
為節點的雙向鏈表, 這個鏈表是 Netty
實現 Pipeline
機制的關鍵.
關于 DefaultChannelPipeline
中的雙向鏈表以及它所起的作用, 我們會在后續章節詳細講解。這里只是對pipeline
做個初步的認識。
HeadContext
的繼承層次結構如下所示:
TailContext
的繼承層次結構如下所示:
我們可以看到, 鏈表中 head
是一個 ChannelOutboundHandler
, 而 tail
則是一個 ChannelInboundHandler
.
客戶端連接的入口方法還是在Bootstrap.connect()
中,上面也分析過一部分內容,請求的具體流程是:
Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
看到這里,還是用Java NIO SocketChannel
發送的connect
請求進行客戶端連接請求。
到此,相信大家對“Netty Client啟動流程是怎樣的”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。