您好,登錄后才能下訂單哦!
參照《Netty系列之Netty 服務端創建》,研究了netty的服務端創建過程。至于netty的優勢,可以參照網絡其他文章。《Netty系列之Netty 服務端創建》是 李林鋒撰寫的netty源碼分析的一篇好文,絕對是技術干貨。但拋開技術來說,也存在一些瑕疵。
缺點如下
代碼銜接不連貫,上下不連貫。
代碼片段是截圖,對閱讀代理不便(可能和閱讀習慣有關)
本篇主要內容,參照《Netty系列之Netty 服務端創建》,梳理出自己喜歡的閱讀風格。
1.整體邏輯圖
整體將服務端創建分為2部分:(1)綁定端口,提供服務過程;(2)輪詢網絡請求
1.1 綁定端口序列圖
1.2 類圖
類圖僅僅涵蓋了綁定過程中比較重要的幾個組件
1.3 代碼分析
step 2 doBind 綁定本地端口,啟動服務
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise);//2 } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise);//2 } }); } return promise; }
主要分為2個處理單元
step3 initAndRegister
final ChannelFuture initAndRegister() { Channel channel; try { channel = createChannel(); } catch (Throwable t) { return VoidChannel.INSTANCE.newFailedFuture(t); } try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } //注冊NioServerSocketChannel到Reactor線程的多路復用器上 ChannelPromise regFuture = channel.newPromise(); channel.unsafe().register(regFuture); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
createChannel由子類ServerBootstrap實現,創建新的NioServerSocketChannel,并完成Channel初始化,以及注冊。
4.ServerBootstrap.createChannel
Channel createChannel() { EventLoop eventLoop = group().next(); return channelFactory().newChannel(eventLoop, childGroup); }
它有兩個參數,參數1是從父類的NIO線程池中順序獲取一個NioEventLoop,它就是服務端用于監聽和接收客戶端連接的Reactor線程。第二個參數就是所謂的workerGroup線程池,它就是處理IO讀寫的Reactor線程組。
5.ServerBootstrap.init
void init(Channel channel) throws Exception { //設置Socket參數和NioServerSocketChannel的附加屬性 final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //將AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中 ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //將用于服務端注冊的Handler ServerBootstrapAcceptor添加到ChannelPipeline中 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
到此處,Netty服務端監聽的相關資源已經初始化完畢。
6.AbstractChannel.AbstractUnsafe.register
public final void register(final ChannelPromise promise) { //首先判斷是否是NioEventLoop自身發起的操作,如果是,則不存在并發操作,直接執行Channel注冊; if (eventLoop.inEventLoop()) { register0(promise); } else {//如果由其它線程發起,則封裝成一個Task放入消息隊列中異步執行。 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } } }
7.register0
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister(); registered = true; promise.setSuccess(); pipeline.fireChannelRegistered(); if (isActive()) {//完成綁定時,不會調用該代碼段 pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
觸發事件
8.doRegister
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //將NioServerSocketChannel注冊到NioEventLoop的Selector上 selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
大伙兒可能會很詫異,應該注冊OP_ACCEPT(16)到多路復用器上,怎么注冊0呢?0表示只注冊,不監聽任何網絡操作。這樣做的原因如下:
注冊方法是多態的,它既可以被NioServerSocketChannel用來監聽客戶端的連接接入,也可以用來注冊SocketChannel,用來監聽網絡讀或者寫操作;
通過SelectionKey的interestOps(int ops)方法可以方便的修改監聽操作位。所以,此處注冊需要獲取SelectionKey并給AbstractNioChannel的成員變量selectionKey賦值。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。