您好,登錄后才能下訂單哦!
本篇內容介紹了“基于NIO的網絡編程框架Netty有哪些組件”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Netty是一個基于異步與事件驅動的網絡應用程序框架,它支持快速與簡單地開發可維護的高性能的服務器與客戶端。
所謂事件驅動就是由通過各種事件響應來決定程序的流程,在Netty中到處都充滿了異步與事件驅動,這種特點使得應用程序可以以任意的順序響應在任意的時間點產生的事件,它帶來了非常高的可伸縮性,讓你的應用可以在需要處理的工作不斷增長時,通過某種可行的方式或者擴大它的處理能力來適應這種增長。
Netty提供了高性能與易用性,它具有以下特點:
擁有設計良好且統一的API,支持NIO與OIO(阻塞IO)等多種傳輸類型,支持真正的無連接UDP Socket。
簡單而強大的線程模型,可高度定制線程(池)。(定制化的Reactor模型)
良好的模塊化與解耦,支持可擴展和靈活的事件模型,可以很輕松地分離關注點以復用邏輯組件(可插拔的)。
性能高效,擁有比Java核心API更高的吞吐量,通過zero-copy功能以實現最少的內存復制消耗。
內置了許多常用的協議編解碼器,如HTTP、SSL、WebScoket等常見協議可以通過Netty做到開箱即用。用戶也可以利用Netty簡單方便地實現自己的應用層協議。
大多數人使用Netty主要還是為了提高應用的性能,而高性能則離不開非阻塞IO。Netty的非阻塞IO是基于Java NIO的,并且對其進行了封裝(直接使用Java NIO API在高復雜度下的應用中是一項非常繁瑣且容易出錯的操作,而Netty幫你封裝了這些復雜操作)。
讀完這一章,我們基本上可以了解到Netty所有重要的組件,對Netty有一個全面的認識,這對下一步深入學習Netty是十分重要的,而學完這一章,我們其實已經可以用Netty解決一些常規的問題了。
為了更好的理解和進一步深入Netty,我們先總體認識一下Netty用到的組件及它們在整個Netty架構中是怎么協調工作的。Netty應用中必不可少的組件:
Bootstrap or ServerBootstrap
EventLoop
EventLoopGroup
ChannelPipeline
Channel
Future or ChannelFuture
ChannelInitializer
ChannelHandler
Bootstrap,一個Netty應用通常由一個Bootstrap開始,它主要作用是配置整個Netty程序,串聯起各個組件。
Handler,為了支持各種協議和處理數據的方式,便誕生了Handler組件。Handler主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數據接收、異常、數據轉換等。
ChannelInboundHandler,一個最常用的Handler。這個Handler的作用就是處理接收到數據時的事件,也就是說,我們的業務邏輯一般就是寫在這個Handler里面的,ChannelInboundHandler就是用來處理我們的核心業務邏輯。
ChannelInitializer,當一個鏈接建立時,我們需要知道怎么來接收或者發送數據,當然,我們有各種各樣的Handler實現來處理它,那么ChannelInitializer便是用來配置這些Handler,它會提供一個ChannelPipeline,并把Handler加入到ChannelPipeline。
ChannelPipeline,一個Netty應用基于ChannelPipeline機制,這種機制需要依賴于EventLoop和EventLoopGroup,因為它們三個都和事件或者事件處理相關。
EventLoops的目的是為Channel處理IO操作,一個EventLoop可以為多個Channel服務。
EventLoopGroup會包含多個EventLoop。
Channel代表了一個Socket鏈接,或者其它和IO操作相關的組件,它和EventLoop一起用來參與IO處理。
Future,在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過一會等它執行完成或者直接注冊一個監聽,具體的實現就是通過Future和ChannelFutures,他們可以注冊一個監聽,當操作執行成功或失敗時監聽會自動觸發。總之,所有的操作都會返回一個ChannelFuture。
Channels、Events 和 IO
Netty是一個非阻塞的、事件驅動的、網絡編程框架。當然,我們很容易理解Netty會用線程來處理IO事件,對于熟悉多線程編程的人來說,你或許會想到如何同步你的代碼,但是Netty不需要我們考慮這些,具體是這樣:
一個Channel會對應一個EventLoop,而一個EventLoop會對應著一個線程,也就是說,僅有一個線程在負責一個Channel的IO操作。
關于這些名詞之間的關系,可以見下圖:
如圖所示:當一個連接到達,Netty會注冊一個channel,然后EventLoopGroup會分配一個EventLoop綁定到這個channel,在這個channel的整個生命周期過程中,都會由綁定的這個EventLoop來為它服務,而這個EventLoop就是一個線程。
說到這里,那么EventLoops和EventLoopGroups關系是如何的呢?我們前面說過一個EventLoopGroup包含多個Eventloop,但是我們看一下下面這幅圖,這幅圖是一個繼承樹,從這幅圖中我們可以看出,EventLoop其實繼承自EventloopGroup,也就是說,在某些情況下,我們可以把一個EventLoopGroup當做一個EventLoop來用。
BootsStrapping
我們利用BootsStrap來配置netty 應用,它有兩種類型,一種用于Client端:BootsStrap,另一種用于Server端:ServerBootstrap,要想區別如何使用它們,你僅需要記住一個用在Client端,一個用在Server端。下面我們來詳細介紹一下這兩種類型的區別:
1.第一個最明顯的區別是,ServerBootstrap用于Server端,通過調用bind()方法來綁定到一個端口監聽連接;Bootstrap用于Client端,需要調用connect()方法來連接服務器端,但我們也可以通過調用bind()方法返回的ChannelFuture中獲取Channel從而去connect服務器端。
2.客戶端的Bootstrap一般用一個EventLoopGroup,而服務器端的ServerBootstrap會用到兩個(這兩個也可以是同一個實例)。為何服務器端要用到兩個EventLoopGroup呢?這么設計有明顯的好處,如果一個ServerBootstrap有兩個EventLoopGroup,那么就可以把第一個EventLoopGroup用來專門負責綁定到端口監聽連接事件,而把第二個EventLoopGroup用來處理每個接收到的連接,下面我們用一幅圖來展現一下這種模式:
PS: 如果僅由一個EventLoopGroup處理所有請求和連接的話,在并發量很大的情況下,這個EventLoopGroup有可能會忙于處理已經接收到的連接而不能及時處理新的連接請求,用兩個的話,會有專門的線程來處理連接請求,不會導致請求超時的情況,大大提高了并發處理能力。
我們知道一個Channel需要由一個EventLoop來綁定,而且兩者一旦綁定就不會再改變。一般情況下一個EventLoopGroup中的EventLoop數量會少于Channel數量,那么就很有可能出現一個多個Channel公用一個EventLoop的情況,這就意味著如果一個Channel中的EventLoop很忙的話,會影響到這個Eventloop對其它Channel的處理,這也就是為什么我們不能阻塞EventLoop的原因。
當然,我們的Server也可以只用一個EventLoopGroup,由一個實例來處理連接請求和IO事件,請看下面這幅圖:
Netty核心ChannelHandler
下面我們來看一下netty中是怎樣處理數據的,回想一下我們前面講到的Handler,對了,就是它。說到Handler我們就不得不提ChannelPipeline,ChannelPipeline負責安排Handler的順序及其執行,下面我們就來詳細介紹一下他們:
ChannelPipeline and handlers
我們的應用程序中用到的最多的應該就是ChannelHandler,我們可以這么想象,數據在一個ChannelPipeline中流動,而ChannelHandler便是其中的一個個的小閥門,這些數據都會經過每一個ChannelHandler并且被它處理。這里有一個公共接口ChannelHandler:
從上圖中我們可以看到,ChannelHandler有兩個子類ChannelInboundHandler和ChannelOutboundHandler,這兩個類對應了兩個數據流向,如果數據是從外部流入我們的應用程序,我們就看做是inbound,相反便是outbound。其實ChannelHandler和Servlet有些類似,一個ChannelHandler處理完接收到的數據會傳給下一個Handler,或者什么不處理,直接傳遞給下一個。下面我們看一下ChannelPipeline是如何安排ChannelHandler的:
從上圖中我們可以看到,一個ChannelPipeline可以把兩種Handler(ChannelInboundHandler和ChannelOutboundHandler)混合在一起,當一個數據流進入ChannelPipeline時,它會從ChannelPipeline頭部開始傳給第一個ChannelInboundHandler,當第一個處理完后再傳給下一個,一直傳遞到管道的尾部。與之相對應的是,當數據被寫出時,它會從管道的尾部開始,先經過管道尾部的“最后”一個ChannelOutboundHandler,當它處理完成后會傳遞給前一個ChannelOutboundHandler。
數據在各個Handler之間傳遞,這需要調用方法中傳遞的ChanneHandlerContext來操作, 在netty的API中提供了兩個基類分ChannelOutboundHandlerAdapter和ChannelInboundHandlerAdapter,他們僅僅實現了調用ChanneHandlerContext來把消息傳遞給下一個Handler,因為我們只關心處理數據,因此我們的程序中可以繼承這兩個基類來幫助我們做這些,而我們僅需實現處理數據的部分即可。
我們知道InboundHandler和OutboundHandler在ChannelPipeline中是混合在一起的,那么它們如何區分彼此呢?其實很容易,因為它們各自實現的是不同的接口,對于inbound event,Netty會自動跳過OutboundHandler,相反若是outbound event,ChannelInboundHandler會被忽略掉。
當一個ChannelHandler被加入到ChannelPipeline中時,它便會獲得一個ChannelHandlerContext的引用,而ChannelHandlerContext可以用來讀寫Netty中的數據流。因此,現在可以有兩種方式來發送數據,一種是把數據直接寫入Channel,一種是把數據寫入ChannelHandlerContext,它們的區別是寫入Channel的話,數據流會從Channel的頭開始傳遞,而如果寫入ChannelHandlerContext的話,數據流會流入管道中的下一個Handler。
Encoders, Decoders and Domain Logic
Netty中會有很多Handler,具體是哪種Handler還要看它們繼承的是InboundAdapter還是OutboundAdapter。當然,Netty中還提供了一些列的Adapter來幫助我們簡化開發,我們知道在Channelpipeline中每一個Handler都負責把Event傳遞給下一個Handler,如果有了這些輔助Adapter,這些額外的工作都可自動完成,我們只需覆蓋實現我們真正關心的部分即可。此外,還有一些Adapter會提供一些額外的功能,比如編碼和解碼。那么下面我們就來看一下其中的三種常用的ChannelHandler:
Encoders和Decoders
因為我們在網絡傳輸時只能傳輸字節流,因此,在發送數據之前,我們必須把我們的message型轉換為bytes,與之對應,我們在接收數據后,必須把接收到的bytes再轉換成message。我們把bytes to message這個過程稱作Decode(解碼成我們可以理解的),把message to bytes這個過程成為Encode。
Netty中提供了很多現成的編碼/解碼器,我們一般從他們的名字中便可知道他們的用途,如ByteToMessageDecoder、MessageToByteEncoder,如專門用來處理Google Protobuf協議的ProtobufEncoder、 ProtobufDecoder。
我們前面說過,具體是哪種Handler就要看它們繼承的是InboundAdapter還是OutboundAdapter,對于Decoders,很容易便可以知道它是繼承自ChannelInboundHandlerAdapter或 ChannelInboundHandler,因為解碼的意思是把ChannelPipeline傳入的bytes解碼成我們可以理解的message(即Java Object),而ChannelInboundHandler正是處理Inbound Event,而Inbound Event中傳入的正是字節流。Decoder會覆蓋其中的“ChannelRead()”方法,在這個方法中來調用具體的decode方法解碼傳遞過來的字節流,然后通過調用ChannelHandlerContext.fireChannelRead(decodedMessage)方法把編碼好的Message傳遞給下一個Handler。與之類似,Encoder就不必多少了。
Domain Logic
其實我們最最關心的事情就是如何處理接收到的解碼后的數據,我們真正的業務邏輯便是處理接收到的數據。Netty提供了一個最常用的基類SimpleChannelInboundHandler
Netty從某方面來說就是一套NIO框架,在Java NIO基礎上做了封裝,所以要想學好Netty我建議先理解好Java NIO,
NIO可以稱為New IO也可以稱為Non-blocking IO,它比Java舊的阻塞IO在性能上要高效許多(如果讓每一個連接中的IO操作都單獨創建一個線程,那么阻塞IO并不會比NIO在性能上落后,但不可能創建無限多的線程,在連接數非常多的情況下會很糟糕)。
ByteBuffer:NIO的數據傳輸是基于緩沖區的,ByteBuffer正是NIO數據傳輸中所使用的緩沖區抽象。ByteBuffer支持在堆外分配內存,并且嘗試避免在執行I/O操作中的多余復制。一般的I/O操作都需要進行系統調用,這樣會先切換到內核態,內核態要先從文件讀取數據到它的緩沖區,只有等數據準備完畢后,才會從內核態把數據寫到用戶態,所謂的阻塞IO其實就是說的在等待數據準備好的這段時間內進行阻塞。如果想要避免這個額外的內核操作,可以通過使用mmap(虛擬內存映射)的方式來讓用戶態直接操作文件。
Channel:它類似于(fd)文件描述符,簡單地來說它代表了一個實體(如一個硬件設備、文件、Socket或者一個能夠執行一個或多個不同的I/O操作的程序組件)。你可以從一個Channel中讀取數據到緩沖區,也可以將一個緩沖區中的數據寫入到Channel。
Selector:選擇器是NIO實現的關鍵,NIO采用的是I/O多路復用的方式來實現非阻塞,Selector通過在一個線程中監聽每個Channel的IO事件來確定有哪些已經準備好進行IO操作的Channel,因此可以在任何時間檢查任意的讀操作或寫操作的完成狀態。這種方式避免了等待IO操作準備數據時的阻塞,使用較少的線程便可以處理許多連接,減少了線程切換與維護的開銷。
了解了NIO的實現思想之后,我覺得還很有必要了解一下Unix中的I/O模型,Unix中擁有以下5種I/O模型:
阻塞I/O(Blocking I/O)
非阻塞I/O(Non-blocking I/O)
I/O多路復用(I/O multiplexing (select and poll))
信號驅動I/O(signal driven I/O (SIGIO))
異步I/O(asynchronous I/O (the POSIX aio_functions))
阻塞I/O模型是最常見的I/O模型,通常我們使用的InputStream/OutputStream都是基于阻塞I/O模型。在上圖中,我們使用UDP作為例子,recvfrom()函數是UDP協議用于接收數據的函數,它需要使用系統調用并一直阻塞到內核將數據準備好,之后再由內核緩沖區復制數據到用戶態(即是recvfrom()接收到數據),所謂阻塞就是在等待內核準備數據的這段時間內什么也不干。
舉個生活中的例子,阻塞I/O就像是你去餐廳吃飯,在等待飯做好的時間段中,你只能在餐廳中坐著干等(如果你在玩手機那么這就是非阻塞I/O了)。
在非阻塞I/O模型中,內核在數據尚未準備好的情況下回返回一個錯誤碼
EWOULDBLOCK
,而recvfrom并沒有在失敗的情況下選擇阻塞休眠,而是不斷地向內核詢問是否已經準備完畢,在上圖中,前三次內核都返回了EWOULDBLOCK
,直到第四次詢問時,內核數據準備完畢,然后開始將內核中緩存的數據復制到用戶態。這種不斷詢問內核以查看某種狀態是否完成的方式被稱為polling(輪詢)。
非阻塞I/O就像是你在點外賣,只不過你非常心急,每隔一段時間就要打電話問外賣小哥有沒有到。
I/O多路復用的思想跟非阻塞I/O是一樣的,只不過在非阻塞I/O中,是在recvfrom的用戶態(或一個線程)中去輪詢內核,這種方式會消耗大量的CPU時間。而I/O多路復用則是通過select()或poll()系統調用來負責進行輪詢,以實現監聽I/O讀寫事件的狀態。如上圖中,select監聽到一個datagram可讀時,就交由recvfrom去發送系統調用將內核中的數據復制到用戶態。
這種方式的優點很明顯,通過I/O多路復用可以監聽多個文件描述符,且在內核中完成監控的任務。但缺點是至少需要兩個系統調用(select()與recvfrom())。
I/O多路復用同樣適用于點外賣這個例子,只不過你在等外賣的期間完全可以做自己的事情,當外賣到的時候會通過外賣APP或者由外賣小哥打電話來通知你(因為內核會幫你輪詢)。
Unix中提供了兩種I/O多路復用函數,select()和poll()。select()的兼容性更好,但它在單個進程中所能監控的文件描述符是有限的,這個值與FD_SETSIZE
相關,32位系統中默認為1024,64位系統中為2048。select()還有一個缺點就是他輪詢的方式,它采取了線性掃描的輪詢方式,每次都要遍歷FD_SETSIZE個文件描述符,不管它們是否活不活躍的。poll()本質上與select()的實現沒有區別,不過在數據結構上區別很大,用戶必須分配一個pollfd結構數組,該數組維護在內核態中,正因如此,poll()并不像select()那樣擁有大小上限的限制,但缺點同樣也很明顯,大量的fd數組會在用戶態與內核態之間不斷復制,不管這樣的復制是否有意義。
還有一種比select()與poll()更加高效的實現叫做epoll(),它是由Linux內核2.6推出的可伸縮的I/O多路復用實現,目的是為了替代select()與poll()。epoll()同樣沒有文件描述符上限的限制,它使用一個文件描述符來管理多個文件描述符,并使用一個紅黑樹來作為存儲結構。同時它還支持邊緣觸發(edge-triggered)與水平觸發(level-triggered)兩種模式(poll()只支持水平觸發),在邊緣觸發模式下,epoll_wait
僅會在新的事件對象首次被加入到epoll時返回,而在水平觸發模式下,epoll_wait
會在事件狀態未變更前不斷地觸發。也就是說,邊緣觸發模式只會在文件描述符變為就緒狀態時通知一次,水平觸發模式會不斷地通知該文件描述符直到被處理。
關于epoll_wait
請參考如下epoll API。
// 創建一個epoll對象并返回它的文件描述符。 // 參數flags允許修改epoll的行為,它只有一個有效值EPOLL_CLOEXEC。 int epoll_create1(int flags); // 配置對象,該對象負責描述監控哪些文件描述符和哪些事件。 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // 等待與epoll_ctl注冊的任何事件,直至事件發生一次或超時。 // 返回在events中發生的事件,最多同時返回maxevents個。 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epoll另一亮點是采用了事件驅動的方式而不是輪詢,在epoll_ctl中注冊的文件描述符在事件觸發的時候會通過一個回調機制來激活該文件描述符,epoll_wait
便可以收到通知。這樣效率就不會與文件描述符的數量成正比
在Java NIO2(從JDK1.7開始引入)中,只要Linux內核版本在2.6以上,就會采用epoll,如下源碼所示(DefaultSelectorProvider.java)。
public static SelectorProvider create() { String osname = AccessController.doPrivileged( new GetPropertyAction("os.name")); if ("SunOS".equals(osname)) { return new sun.nio.ch.DevPollSelectorProvider(); } // use EPollSelectorProvider for Linux kernels >= 2.6 if ("Linux".equals(osname)) { String osversion = AccessController.doPrivileged( new GetPropertyAction("os.version")); String[] vers = osversion.split("\\.", 0); if (vers.length >= 2) { try { int major = Integer.parseInt(vers[0]); int minor = Integer.parseInt(vers[1]); if (major > 2 || (major == 2 && minor >= 6)) { return new sun.nio.ch.EPollSelectorProvider(); } } catch (NumberFormatException x) { // format not recognized } } } return new sun.nio.ch.PollSelectorProvider(); }
信號驅動I/O模型使用到了信號,內核在數據準備就緒時會通過信號來進行通知。我們首先開啟了一個信號驅動I/O套接字,并使用sigaction系統調用來安裝信號處理程序,內核直接返回,不會阻塞用戶態。當datagram準備好時,內核會發送SIGIN信號,recvfrom接收到信號后會發送系統調用開始進行I/O操作。
這種模型的優點是主進程(線程)不會被阻塞,當數據準備就緒時,通過信號處理程序來通知主進程(線程)準備進行I/O操作與對數據的處理。
我們之前討論的各種I/O模型無論是阻塞還是非阻塞,它們所說的阻塞都是指的數據準備階段。異步I/O模型同樣依賴于信號處理程序來進行通知,但與以上I/O模型都不相同的是,異步I/O模型通知的是I/O操作已經完成,而不是數據準備完成。
可以說異步I/O模型才是真正的非阻塞,主進程只管做自己的事情,然后在I/O操作完成時調用回調函數來完成一些對數據的處理操作即可。
閑扯了這么多,想必大家已經對I/O模型有了一個深刻的認識。之后,我們將會結合部分源碼(Netty4.X)來探討Netty中的各大核心組件,以及如何使用Netty,你會發現實現一個Netty程序是多么簡單(而且還伴隨了高性能與可維護性)。
網絡傳輸的基本單位是字節,在Java NIO中提供了ByteBuffer作為字節緩沖區容器,但該類的API使用起來不太方便,所以Netty實現了ByteBuf作為其替代品,下面是使用ByteBuf的優點:
相比ByteBuffer使用起來更加簡單。
通過內置的復合緩沖區類型實現了透明的zero-copy。
容量可以按需增長。
讀和寫使用了不同的索引指針。
支持鏈式調用。
支持引用計數與池化。
可以被用戶自定義的緩沖區類型擴展。
在討論ByteBuf之前,我們先需要了解一下ByteBuffer的實現,這樣才能比較深刻地明白它們之間的區別。
ByteBuffer繼承于abstract class Buffer
(所以還有LongBuffer、IntBuffer等其他類型的實現),本質上它只是一個有限的線性的元素序列,包含了三個重要的屬性。
Capacity:緩沖區中元素的容量大小,你只能將capacity個數量的元素寫入緩沖區,一旦緩沖區已滿就需要清理緩沖區才能繼續寫數據。
Position:指向下一個寫入數據位置的索引指針,初始位置為0,最大為capacity-1。當寫模式轉換為讀模式時,position需要被重置為0。
Limit:在寫模式中,limit是可以寫入緩沖區的最大索引,也就是說它在寫模式中等價于緩沖區的容量。在讀模式中,limit表示可以讀取數據的最大索引。
由于Buffer中只維護了position一個索引指針,所以它在讀寫模式之間的切換需要調用一個flip()方法來重置指針。使用Buffer的流程一般如下:
寫入數據到緩沖區。
調用flip()方法。
從緩沖區中讀取數據
調用buffer.clear()或者buffer.compact()清理緩沖區,以便下次寫入數據。
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel(); // 分配一個48字節大小的緩沖區 ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); // 讀取數據到緩沖區 while (bytesRead != -1) { buf.flip(); // 將position重置為0 while(buf.hasRemaining()){ System.out.print((char) buf.get()); // 讀取數據并輸出到控制臺 } buf.clear(); // 清理緩沖區 bytesRead = inChannel.read(buf); } aFile.close(); Buffer中核心方法的實現也非常簡單,主要就是在操作指針position。
Buffer中核心方法的實現也非常簡單,主要就是在操作指針position。
/** * Sets this buffer's mark at its position. * * @return This buffer */ public final Buffer mark() { mark = position; // mark屬性是用來標記當前索引位置的 return this; } // 將當前索引位置重置為mark所標記的位置 public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; } // 翻轉這個Buffer,將limit設置為當前索引位置,然后再把position重置為0 public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } // 清理緩沖區 // 說是清理,也只是把postion與limit進行重置,之后再寫入數據就會覆蓋之前的數據了 public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; } // 返回剩余空間 public final int remaining() { return limit - position; }
Java NIO中的Buffer API操作的麻煩之處就在于讀寫轉換需要手動重置指針。而ByteBuf沒有這種繁瑣性,它維護了兩個不同的索引,一個用于讀取,一個用于寫入。當你從ByteBuf讀取數據時,它的readerIndex將會被遞增已經被讀取的字節數,同樣的,當你寫入數據時,writerIndex則會遞增。readerIndex的最大范圍在writerIndex的所在位置,如果試圖移動readerIndex超過該值則會觸發異常。
ByteBuf中名稱以read或write開頭的方法將會遞增它們其對應的索引,而名稱以get或set開頭的方法則不會。ByteBuf同樣可以指定一個最大容量,試圖移動writerIndex超過該值則會觸發異常。
public byte readByte() { this.checkReadableBytes0(1); // 檢查readerIndex是否已越界 int i = this.readerIndex; byte b = this._getByte(i); this.readerIndex = i + 1; // 遞增readerIndex return b; } private void checkReadableBytes0(int minimumReadableBytes) { this.ensureAccessible(); if(this.readerIndex > this.writerIndex - minimumReadableBytes) { throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this})); } } public ByteBuf writeByte(int value) { this.ensureAccessible(); this.ensureWritable0(1); // 檢查writerIndex是否會越過capacity this._setByte(this.writerIndex++, value); return this; } private void ensureWritable0(int minWritableBytes) { if(minWritableBytes > this.writableBytes()) { if(minWritableBytes > this.maxCapacity - this.writerIndex) { throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this})); } else { int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity); this.capacity(newCapacity); } } } // get與set只對傳入的索引進行了檢查,然后對其位置進行get或set public byte getByte(int index) { this.checkIndex(index); return this._getByte(index); } public ByteBuf setByte(int index, int value) { this.checkIndex(index); this._setByte(index, value); return this; }
ByteBuf同樣支持在堆內和堆外進行分配。在堆內分配也被稱為支撐數組模式,它能在沒有使用池化的情況下提供快速的分配和釋放。
ByteBuf heapBuf = Unpooled.copiedBuffer(bytes); if (heapBuf.hasArray()) { // 判斷是否有一個支撐數組 byte[] array = heapBuf.array(); // 計算第一個字節的偏移量 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); // 獲得可讀字節 handleArray(array,offset,length); // 調用你的處理方法 }
另一種模式為堆外分配,Java NIO ByteBuffer類在JDK1.4時就已經允許JVM實現通過JNI調用來在堆外分配內存(調用malloc()函數在JVM堆外分配內存),這主要是為了避免額外的緩沖區復制操作。
ByteBuf directBuf = Unpooled.directBuffer(capacity); if (!directBuf.hasArray()) { int length = directBuf.readableBytes(); byte[] array = new byte[length]; // 將字節復制到數組中 directBuf.getBytes(directBuf.readerIndex(),array); handleArray(array,0,length); }
ByteBuf還支持第三種模式,它被稱為復合緩沖區,為多個ByteBuf提供了一個聚合視圖。在這個視圖中,你可以根據需要添加或者刪除ByteBuf實例,ByteBuf的子類CompositeByteBuf實現了該模式。
一個適合使用復合緩沖區的場景是HTTP協議,通過HTTP協議傳輸的消息都會被分成兩部分——頭部和主體,如果這兩部分由應用程序的不同模塊產生,將在消息發送時進行組裝,并且該應用程序還會為多個消息復用相同的消息主體,這樣對于每個消息都將會創建一個新的頭部,產生了很多不必要的內存操作。使用CompositeByteBuf是一個很好的選擇,它消除了這些額外的復制,以幫助你復用這些消息。
CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); ByteBuf headerBuf = ....; ByteBuf bodyBuf = ....; messageBuf.addComponents(headerBuf,bodyBuf); for (ByteBuf buf : messageBuf) { System.out.println(buf.toString()); }
CompositeByteBuf透明的實現了zero-copy,zero-copy其實就是避免數據在兩個內存區域中來回的復制。從操作系統層面上來講,zero-copy指的是避免在內核態與用戶態之間的數據緩沖區復制(通過mmap避免),而Netty中的zero-copy更偏向于在用戶態中的數據操作的優化,就像使用CompositeByteBuf來復用多個ByteBuf以避免額外的復制,也可以使用wrap()方法來將一個字節數組包裝成ByteBuf,又或者使用ByteBuf的slice()方法把它分割為多個共享同一內存區域的ByteBuf,這些都是為了優化內存的使用率。
那么如何創建ByteBuf呢?在上面的代碼中使用到了Unpooled,它是Netty提供的一個用于創建與分配ByteBuf的工具類,建議都使用這個工具類來創建你的緩沖區,不要自己去調用構造函數。經常使用的是wrappedBuffer()與copiedBuffer(),它們一個是用于將一個字節數組或ByteBuffer包裝為一個ByteBuf,一個是根據傳入的字節數組與ByteBuffer/ByteBuf來復制出一個新的ByteBuf。
// 通過array.clone()來復制一個數組進行包裝 public static ByteBuf copiedBuffer(byte[] array) { return array.length == 0?EMPTY_BUFFER:wrappedBuffer((byte[])array.clone()); } // 默認是堆內分配 public static ByteBuf wrappedBuffer(byte[] array) { return (ByteBuf)(array.length == 0?EMPTY_BUFFER:new UnpooledHeapByteBuf(ALLOC, array, array.length)); } // 也提供了堆外分配的方法 private static final ByteBufAllocator ALLOC; public static ByteBuf directBuffer(int initialCapacity) { return ALLOC.directBuffer(initialCapacity); }
Channel channel = ...; ByteBufAllocator allocator = channel.alloc(); ByteBuf buffer = allocator.directBuffer(); do something.......
為了優化內存使用率,Netty提供了一套手動的方式來追蹤不活躍對象,像UnpooledHeapByteBuf這種分配在堆內的對象得益于JVM的GC管理,無需額外操心,而UnpooledDirectByteBuf是在堆外分配的,它的內部基于DirectByteBuffer,DirectByteBuffer會先向Bits類申請一個額度(Bits還擁有一個全局變量totalCapacity,記錄了所有DirectByteBuffer總大小),每次申請前都會查看是否已經超過-XX:MaxDirectMemorySize所設置的上限,如果超限就會嘗試調用System.gc(),以試圖回收一部分內存,然后休眠100毫秒,如果內存還是不足,則只能拋出OOM異常。堆外內存的回收雖然有了這么一層保障,但為了提高性能與使用率,主動回收也是很有必要的。由于Netty還實現了ByteBuf的池化,像PooledHeapByteBuf和PooledDirectByteBuf就必須依賴于手動的方式來進行回收(放回池中)。
Netty使用了引用計數器的方式來追蹤那些不活躍的對象。引用計數的接口為ReferenceCounted,它的思想很簡單,只要ByteBuf對象的引用計數大于0,就保證該對象不會被釋放回收,可以通過手動調用release()與retain()方法來操作該對象的引用計數值遞減或遞增。用戶也可以通過自定義一個ReferenceCounted的實現類,以滿足自定義的規則。
package io.netty.buffer; public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf { // 由于ByteBuf的實例對象會非常多,所以這里沒有將refCnt包裝為AtomicInteger // 而是使用一個全局的AtomicIntegerFieldUpdater來負責操作refCnt private static final AtomicIntegerFieldUpdater refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); // 每個ByteBuf的初始引用值都為1 private volatile int refCnt = 1; public int refCnt() { return this.refCnt; } protected final void setRefCnt(int refCnt) { this.refCnt = refCnt; } public ByteBuf retain() { return this.retain0(1); } // 引用計數值遞增increment,increment必須大于0 public ByteBuf retain(int increment) { return this.retain0(ObjectUtil.checkPositive(increment, "increment")); } public static int checkPositive(int i, String name) { if(i <= 0) { throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)"); } else { return i; } } // 使用CAS操作不斷嘗試更新值 private ByteBuf retain0(int increment) { int refCnt; int nextCnt; do { refCnt = this.refCnt; nextCnt = refCnt + increment; if(nextCnt <= increment) { throw new IllegalReferenceCountException(refCnt, increment); } } while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt)); return this; } public boolean release() { return this.release0(1); } public boolean release(int decrement) { return this.release0(ObjectUtil.checkPositive(decrement, "decrement")); } private boolean release0(int decrement) { int refCnt; do { refCnt = this.refCnt; if(refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } } while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)); if(refCnt == decrement) { this.deallocate(); return true; } else { return false; } } protected abstract void deallocate(); }
Netty中的Channel與Java NIO的概念一樣,都是對一個實體或連接的抽象,但Netty提供了一套更加通用的API。就以網絡套接字為例,在Java中OIO與NIO是截然不同的兩套API,假設你之前使用的是OIO而又想更改為NIO實現,那么幾乎需要重寫所有代碼。而在Netty中,只需要更改短短幾行代碼(更改Channel與EventLoop的實現類,如把OioServerSocketChannel替換為NioServerSocketChannel),就可以完成OIO與NIO(或其他)之間的轉換。
每個Channel最終都會被分配一個ChannelPipeline和ChannelConfig,前者持有所有負責處理入站與出站數據以及事件的ChannelHandler,后者包含了該Channel的所有配置設置,并且支持熱更新,由于不同的傳輸類型可能具有其特別的配置,所以該類可能會實現為ChannelConfig的不同子類。
Channel是線程安全的(與之后要講的線程模型有關),因此你完全可以在多個線程中復用同一個Channel,就像如下代碼所示。
final Channel channel = ... final ByteBuf buffer = Unpooled.copiedBuffer("Hello,World!", CharsetUtil.UTF_8).retain(); Runnable writer = new Runnable() { @Override public void run() { channel.writeAndFlush(buffer.duplicate()); } }; Executor executor = Executors.newCachedThreadPool(); executor.execute(writer); executor.execute(writer); .......
Netty除了支持常見的NIO與OIO,還內置了其他的傳輸類型。
Nmae | Package | Description |
---|---|---|
NIO | io.netty.channel.socket.nio | 以Java NIO為基礎實現 |
OIO | io.netty.channel.socket.oio | 以java.net為基礎實現,使用阻塞I/O模型 |
Epoll | io.netty.channel.epoll | 由JNI驅動epoll()實現的更高性能的非阻塞I/O,它只能使用在Linux |
Local | io.netty.channel.local | 本地傳輸,在JVM內部通過管道進行通信 |
Embedded | io.netty.channel.embedded | 允許在不需要真實網絡傳輸的環境下使用ChannelHandler,主要用于對ChannelHandler進行測試 |
NIO、OIO、Epoll我們應該已經很熟悉了,下面主要說說Local與Embedded。
Local傳輸用于在同一個JVM中運行的客戶端和服務器程序之間的異步通信,與服務器Channel相關聯的SocketAddress并沒有綁定真正的物理網絡地址,它會被存儲在注冊表中,并在Channel關閉時注銷。因此Local傳輸不會接受真正的網絡流量,也就是說它不能與其他傳輸實現進行互操作。
Embedded傳輸主要用于對ChannelHandler進行單元測試,ChannelHandler是用于處理消息的邏輯組件,Netty通過將入站消息與出站消息都寫入到EmbeddedChannel中的方式(提供了write/readInbound()與write/readOutbound()來讀寫入站與出站消息)來實現對ChannelHandler的單元測試。
ChannelHandler充當了處理入站和出站數據的應用程序邏輯的容器,該類是基于事件驅動的,它會響應相關的事件然后去調用其關聯的回調函數,例如當一個新的連接被建立時,ChannelHandler的channelActive()方法將會被調用。
關于入站消息和出站消息的數據流向定義,如果以客戶端為主視角來說的話,那么從客戶端流向服務器的數據被稱為出站,反之為入站。
入站事件是可能被入站數據或者相關的狀態更改而觸發的事件,包括:連接已被激活、連接失活、讀取入站數據、用戶事件、發生異常等。
出站事件是未來將會觸發的某個動作的結果的事件,這些動作包括:打開或關閉遠程節點的連接、將數據寫(或沖刷)到套接字。
ChannelHandler的主要用途包括:
對入站與出站數據的業務邏輯處理
記錄日志
將數據從一種格式轉換為另一種格式,實現編解碼器。以一次HTTP協議(或者其他應用層協議)的流程為例,數據在網絡傳輸時的單位為字節,當客戶端發送請求到服務器時,服務器需要通過解碼器(處理入站消息)將字節解碼為協議的消息內容,服務器在發送響應的時候(處理出站消息),還需要通過編碼器將消息內容編碼為字節。
捕獲異常
提供Channel生命周期內的通知,如Channel活動時與非活動時
Netty中到處都充滿了異步與事件驅動,而回調函數正是用于響應事件之后的操作。由于異步會直接返回一個結果,所以Netty提供了ChannelFuture(實現了java.util.concurrent.Future)來作為異步調用返回的占位符,真正的結果會在未來的某個時刻完成,到時候就可以通過ChannelFuture對其進行訪問,每個Netty的出站I/O操作都將會返回一個ChannelFuture。
Netty還提供了ChannelFutureListener接口來監聽ChannelFuture是否成功,并采取對應的操作。
Channel channel = ... ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1",6666)); // 注冊一個監聽器 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { // do something.... } else { // 輸出錯誤信息 Throwable cause = future.cause(); cause.printStackTrace(); // do something.... } } });
ChannelFutureListener接口中還提供了幾個簡單的默認實現,方便我們使用。
package io.netty.channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; public interface ChannelFutureListener extends GenericFutureListener{ // 在Future完成時關閉 ChannelFutureListener CLOSE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { future.channel().close(); } }; // 如果失敗則關閉 ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if(!future.isSuccess()) { future.channel().close(); } } }; // 將異常信息傳遞給下一個ChannelHandler ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if(!future.isSuccess()) { future.channel().pipeline().fireExceptionCaught(future.cause()); } } }; }ChannelHandler接口定義了對它生命周期進行監聽的回調函數,在ChannelHandler被添加到ChannelPipeline或者被移除時都會調用這些函數。package io.netty.channel; public interface ChannelHandler { void handlerAdded(ChannelHandlerContext var1) throws Exception; void handlerRemoved(ChannelHandlerContext var1) throws Exception; /** @deprecated */ @Deprecated void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; // 該注解表明這個ChannelHandler可被其他線程復用 @Inherited @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface Sharable { } }入站消息與出站消息由其對應的接口ChannelInboundHandler與ChannelOutboundHandler負責,這兩個接口定義了監聽Channel的生命周期的狀態改變事件的回調函數。package io.netty.channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; public interface ChannelInboundHandler extends ChannelHandler { // 當channel被注冊到EventLoop時被調用 void channelRegistered(ChannelHandlerContext var1) throws Exception; // 當channel已經被創建,但還未注冊到EventLoop(或者從EventLoop中注銷)被調用 void channelUnregistered(ChannelHandlerContext var1) throws Exception; // 當channel處于活動狀態(連接到遠程節點)被調用 void channelActive(ChannelHandlerContext var1) throws Exception; // 當channel處于非活動狀態(沒有連接到遠程節點)被調用 void channelInactive(ChannelHandlerContext var1) throws Exception; // 當從channel讀取數據時被調用 void channelRead(ChannelHandlerContext var1, Object var2) throws Exception; // 當channel的上一個讀操作完成時被調用 void channelReadComplete(ChannelHandlerContext var1) throws Exception; // 當ChannelInboundHandler.fireUserEventTriggered()方法被調用時被調用 void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception; // 當channel的可寫狀態發生改變時被調用 void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception; // 當處理過程中發生異常時被調用 void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; } package io.netty.channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import java.net.SocketAddress; public interface ChannelOutboundHandler extends ChannelHandler { // 當請求將Channel綁定到一個地址時被調用 // ChannelPromise是ChannelFuture的一個子接口,定義了如setSuccess(),setFailure()等方法 void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception; // 當請求將Channel連接到遠程節點時被調用 void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception; // 當請求將Channel從遠程節點斷開時被調用 void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // 當請求關閉Channel時被調用 void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // 當請求將Channel從它的EventLoop中注銷時被調用 void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // 當請求從Channel讀取數據時被調用 void read(ChannelHandlerContext var1) throws Exception; // 當請求通過Channel將數據寫到遠程節點時被調用 void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception; // 當請求通過Channel將緩沖中的數據沖刷到遠程節點時被調用 void flush(ChannelHandlerContext var1) throws Exception; }通過實現ChannelInboundHandler或者ChannelOutboundHandler就可以完成用戶自定義的應用邏輯處理程序,不過Netty已經幫你實現了一些基本操作,用戶只需要繼承并擴展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter來作為自定義實現的起始點。ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter都繼承于ChannelHandlerAdapter,該抽象類簡單實現了ChannelHandler接口。public abstract class ChannelHandlerAdapter implements ChannelHandler { boolean added; public ChannelHandlerAdapter() { } // 該方法不允許將此ChannelHandler共享復用 protected void ensureNotSharable() { if(this.isSharable()) { throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared"); } } // 使用反射判斷實現類有沒有@Sharable注解,以確認該類是否為可共享復用的 public boolean isSharable() { Class clazz = this.getClass(); Map cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = (Boolean)cache.get(clazz); if(sharable == null) { sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class)); cache.put(clazz, sharable); } return sharable.booleanValue(); } public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter默認只是簡單地將請求傳遞給ChannelPipeline中的下一個ChannelHandler,源碼如下:public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { public ChannelInboundHandlerAdapter() { } public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } } public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { public ChannelOutboundHandlerAdapter() { } public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } public void read(ChannelHandlerContext ctx) throws Exception { ctx.read(); } public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }對于處理入站消息,另外一種選擇是繼承SimpleChannelInboundHandler,它是Netty的一個繼承于ChannelInboundHandlerAdapter的抽象類,并在其之上實現了自動釋放資源的功能。我們在了解ByteBuf時就已經知道了Netty使用了一套自己實現的引用計數算法來主動釋放資源,假設你的ChannelHandler繼承于ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter,那么你就有責任去管理你所分配的ByteBuf,一般來說,一個消息對象(ByteBuf)已經被消費(或丟棄)了,并不會傳遞給ChannelHandler鏈中的下一個處理器(如果該消息到達了實際的傳輸層,那么當它被寫入或Channel關閉時,都會被自動釋放),所以你就需要去手動釋放它。通過一個簡單的工具類ReferenceCountUtil的release方法,就可以做到這一點。// 這個泛型為消息對象的類型 public abstract class SimpleChannelInboundHandler extends ChannelInboundHandlerAdapter { private final TypeParameterMatcher matcher; private final boolean autoRelease; protected SimpleChannelInboundHandler() { this(true); } protected SimpleChannelInboundHandler(boolean autoRelease) { this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I"); this.autoRelease = autoRelease; } protected SimpleChannelInboundHandler(Class inboundMessageType) { this(inboundMessageType, true); } protected SimpleChannelInboundHandler(Class inboundMessageType, boolean autoRelease) { this.matcher = TypeParameterMatcher.get(inboundMessageType); this.autoRelease = autoRelease; } public boolean acceptInboundMessage(Object msg) throws Exception { return this.matcher.match(msg); } // SimpleChannelInboundHandler只是替你做了ReferenceCountUtil.release() public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if(this.acceptInboundMessage(msg)) { this.channelRead0(ctx, msg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if(this.autoRelease && release) { //ByteBuf的釋放 ReferenceCountUtil.release(msg); } } } // 這個方法才是我們需要實現的方法 protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception; } // ReferenceCountUtil中的源碼,release方法對消息對象的類型進行判斷然后調用它的release()方法 public static boolean release(Object msg) { return msg instanceof ReferenceCounted?((ReferenceCounted)msg).release():false; }ChannelPipeline為了模塊化與解耦合,不可能由一個ChannelHandler來完成所有應用邏輯,所以Netty采用了攔截器鏈的設計。ChannelPipeline就是用來管理ChannelHandler實例鏈的容器,它的職責就是保證實例鏈的流動。每一個新創建的Channel都將會被分配一個新的ChannelPipeline,這種關聯關系是永久性的,一個Channel一生只能對應一個ChannelPipeline。一個入站事件被觸發時,它會先從ChannelPipeline的最左端(頭部)開始一直傳播到ChannelPipeline的最右端(尾部),而出站事件正好與入站事件順序相反(從最右端一直傳播到最左端)。這個順序是定死的,Netty總是將ChannelPipeline的入站口作為頭部,而將出站口作為尾部。在事件傳播的過程中,ChannelPipeline會判斷下一個ChannelHandler的類型是否和事件的運動方向相匹配,如果不匹配,就跳過該ChannelHandler并繼續檢查下一個(保證入站事件只會被ChannelInboundHandler處理),一個ChannelHandler也可以同時實現ChannelInboundHandler與ChannelOutboundHandler,它在入站事件與出站事件中都會被調用。在閱讀ChannelHandler的源碼時,發現很多方法需要一個ChannelHandlerContext類型的參數,該接口是ChannelPipeline與ChannelHandler之間相關聯的關鍵。ChannelHandlerContext可以通知ChannelPipeline中的當前ChannelHandler的下一個ChannelHandler,還可以動態地改變當前ChannelHandler在ChannelPipeline中的位置(通過調用ChannelPipeline中的各種方法來修改)。ChannelHandlerContext負責了在同一個ChannelPipeline中的ChannelHandler與其他ChannelHandler之間的交互,每個ChannelHandlerContext都對應了一個ChannelHandler。在DefaultChannelPipeline的源碼中,已經表現的很明顯了。public class DefaultChannelPipeline implements ChannelPipeline { ......... // 頭部節點和尾部節點的引用變量 // ChannelHandlerContext在ChannelPipeline中是以鏈表的形式組織的 final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ......... // 添加一個ChannelHandler到鏈表尾部 public final ChannelPipeline addLast(String name, ChannelHandler handler) { return this.addLast((EventExecutorGroup)null, name, handler); } public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized(this) { // 檢查ChannelHandler是否為一個共享對象(@Sharable) // 如果該ChannelHandler沒有@Sharable注解,并且是已被添加過的那么就拋出異常 checkMultiplicity(handler); // 返回一個DefaultChannelHandlerContext,注意該對象持有了傳入的ChannelHandler newCtx = this.newContext(group, this.filterName(name, handler), handler); this.addLast0(newCtx); // 如果當前ChannelPipeline沒有被注冊,那么就先加到未決鏈表中 if(!this.registered) { newCtx.setAddPending(); this.callHandlerCallbackLater(newCtx, true); return this; } // 否則就調用ChannelHandler中的handlerAdded() EventExecutor executor = newCtx.executor(); if(!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { public void run() { DefaultChannelPipeline.this.callHandlerAdded0(newCtx); } }); return this; } } this.callHandlerAdded0(newCtx); return this; } // 將新的ChannelHandlerContext插入到尾部與尾部之前的節點之間 private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = this.tail.prev; newCtx.prev = prev; newCtx.next = this.tail; prev.next = newCtx; this.tail.prev = newCtx; } ..... }ChannelHandlerContext還定義了許多與Channel和ChannelPipeline重合的方法(像read()、write()、connect()這些用于出站的方法或者如fireChannelXXXX()這樣用于入站的方法),不同之處在于調用Channel或者ChannelPipeline上的這些方法,它們將會從頭沿著整個ChannelHandler實例鏈進行傳播,而調用位于ChannelHandlerContext上的相同方法,則會從當前所關聯的ChannelHandler開始,且只會傳播給實例鏈中的下一個ChannelHandler。而且,事件之間的移動(從一個ChannelHandler到下一個ChannelHandler)也是通過ChannelHandlerContext中的方法調用完成的。public class DefaultChannelPipeline implements ChannelPipeline { public final ChannelPipeline fireChannelRead(Object msg) { // 注意這里將頭節點傳入了進去 AbstractChannelHandlerContext.invokeChannelRead(this.head, msg); return this; } } --------------------------------------------------------------- abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if(executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if(this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelRead(this, msg); } catch (Throwable var3) { this.notifyHandlerException(var3); } } else { // 尋找下一個ChannelHandler this.fireChannelRead(msg); } } public ChannelHandlerContext fireChannelRead(Object msg) { invokeChannelRead(this.findContextInbound(), msg); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while(!ctx.inbound); // 直到找到一個ChannelInboundHandler return ctx; } }EventLoop為了最大限度地提供高性能和可維護性,Netty設計了一套強大又易用的線程模型。在一個網絡框架中,最重要的能力是能夠快速高效地處理在連接的生命周期內發生的各種事件,與之相匹配的程序構造被稱為事件循環,Netty定義了接口EventLoop來負責這項工作。如果是經常用Java進行多線程開發的童鞋想必經常會使用到線程池,也就是Executor這套API。Netty就是從Executor(java.util.concurrent)之上擴展了自己的EventExecutorGroup(io.netty.util.concurrent),同時為了與Channel的事件進行交互,還擴展了EventLoopGroup接口(io.netty.channel)。在io.netty.util.concurrent包下的EventExecutorXXX負責實現線程并發相關的工作,而在io.netty.channel包下的EventLoopXXX負責實現網絡編程相關的工作(處理Channel中的事件)。在Netty的線程模型中,一個EventLoop將由一個永遠不會改變的Thread驅動,而一個Channel一生只會使用一個EventLoop(但是一個EventLoop可能會被指派用于服務多個Channel),在Channel中的所有I/O操作和事件都由EventLoop中的線程處理,也就是說一個Channel的一生之中都只會使用到一個線程。不過在Netty3,只有入站事件會被EventLoop處理,所有出站事件都會由調用線程處理,這種設計導致了ChannelHandler的線程安全問題。Netty4簡化了線程模型,通過在同一個線程處理所有事件,既解決了這個問題,還提供了一個更加簡單的架構。package io.netty.channel; public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", 2147483647)); //內部隊列 private final QueuetailTasks; protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); } protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) { this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); } protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); this.tailTasks = this.newTaskQueue(maxPendingTasks); } protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); this.tailTasks = this.newTaskQueue(maxPendingTasks); } // 返回它所在的EventLoopGroup public EventLoopGroup parent() { return (EventLoopGroup)super.parent(); } public EventLoop next() { return (EventLoop)super.next(); } // 注冊Channel,這里ChannelPromise和Channel關聯到了一起 public ChannelFuture register(Channel channel) { return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); } public ChannelFuture register(ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } // 剩下這些函數都是用于調度任務 public final void executeAfterEventLoopIteration(Runnable task) { ObjectUtil.checkNotNull(task, "task"); if(this.isShutdown()) { reject(); } if(!this.tailTasks.offer(task)) { this.reject(task); } if(this.wakesUpForTask(task)) { this.wakeup(this.inEventLoop()); } } final boolean removeAfterEventLoopIterationTask(Runnable task) { return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); } protected boolean wakesUpForTask(Runnable task) { return !(task instanceof SingleThreadEventLoop.NonWakeupRunnable); } protected void afterRunningAllTasks() { this.runAllTasksFrom(this.tailTasks); } protected boolean hasTasks() { return super.hasTasks() || !this.tailTasks.isEmpty(); } public int pendingTasks() { return super.pendingTasks() + this.tailTasks.size(); } interface NonWakeupRunnable extends Runnable { } }為了確保一個Channel的整個生命周期中的I/O事件會被一個EventLoop負責,Netty通過inEventLoop()方法來判斷當前執行的線程的身份,確定它是否是分配給當前Channel以及它的EventLoop的那一個線程。如果當前(調用)線程正是EventLoop中的線程,那么所提交的任務將會被(true)直接執行,否則,EventLoop將調度該任務以便(false)稍后執行,并將它放入內部的任務隊列(每個EventLoop都有它自己的任務隊列,SingleThreadEventLoop的源碼就能發現很多用于調度內部任務隊列的方法),在下次處理它的事件時,將會執行隊列中的那些任務。這種設計可以讓任何線程與Channel直接交互,而無需在ChannelHandler中進行額外的同步。從性能上來考慮,千萬不要將一個需要長時間來運行的任務放入到任務隊列中,它會影響到該隊列中的其他任務的執行。解決方案是使用一個專門的EventExecutor來執行它(ChannelPipeline提供了帶有EventExecutorGroup參數的addXXX()方法,該方法可以將傳入的ChannelHandler綁定到你傳入的EventExecutor之中),這樣它就會在另一條線程中執行,與其他任務隔離。public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ..... public void execute(Runnable task) { if(task == null) { throw new NullPointerException("task"); } else { boolean inEventLoop = this.inEventLoop(); if(inEventLoop) { this.addTask(task); } else { this.startThread(); this.addTask(task); if(this.isShutdown() && this.removeTask(task)) { reject(); } } if(!this.addTaskWakesUp && this.wakesUpForTask(task)) { this.wakeup(inEventLoop); } } } public boolean inEventLoop(Thread thread) { return thread == this.thread; } ..... }Bootstrap在深入了解地Netty的核心組件之后,發現它們的設計都很模塊化,如果想要實現你自己的應用程序,就需要將這些組件組裝到一起。Netty通過Bootstrap類,以對一個Netty應用程序進行配置(組裝各個組件),并最終使它運行起來。對于客戶端程序和服務器程序所使用到的Bootstrap類是不同的,后者需要使用ServerBootstrap,這樣設計是因為,在如TCP這樣有連接的協議中,服務器程序往往需要一個以上的Channel,通過父Channel來接受來自客戶端的連接,然后創建子Channel用于它們之間的通信,而像UDP這樣無連接的協議,它不需要每個連接都創建子Channel,只需要一個Channel即可。一個比較明顯的差異就是Bootstrap與ServerBootstrap的group()方法,后者提供了一個接收2個EventLoopGroup的版本。// 該方法在Bootstrap的父類AbstractBootstrap中,泛型B為它當前子類的類型(為了鏈式調用) public B group(EventLoopGroup group) { if(group == null) { throw new NullPointerException("group"); } else if(this.group != null) { throw new IllegalStateException("group set already"); } else { this.group = group; return this; } } // ServerBootstrap中的實現,它也支持只用一個EventLoopGroup public ServerBootstrap group(EventLoopGroup group) { return this.group(group, group); } public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if(childGroup == null) { throw new NullPointerException("childGroup"); } else if(this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } else { this.childGroup = childGroup; return this; } }Bootstrap其實沒有什么可以好說的,它就只是一個裝配工,將各個組件拼裝組合到一起,然后進行一些配置,有關它的詳細API請參考 Netty JavaDoc。Echo示例下面我們將通過一個經典的Echo客戶端與服務器的例子,來梳理一遍創建Netty應用的流程。首先實現的是服務器,我們先實現一個EchoServerInboundHandler,處理入站消息。public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8)); // 由于讀事件不是一次性就能把完整消息發送過來的,這里并沒有調用writeAndFlush ctx.write(in); // 直接把消息寫回給客戶端(會被出站消息處理器處理,不過我們的應用沒有實現任何出站消息處理器) } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 等讀事件已經完成時,沖刷之前寫數據的緩沖區 // 然后添加了一個監聽器,它會在Future完成時進行關閉該Channel. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } // 處理異常,輸出異常信息,然后關閉Channel @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }服務器的應用邏輯只有這么多,剩下就是用ServerBootstrap進行配置了。public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler(); EventLoopGroup group = new NioEventLoopGroup(); // 傳輸類型使用NIO try { ServerBootstrap b = new ServerBootstrap(); b.group(group) // 配置EventLoopGroup .channel(NioServerSocketChannel.class) // 配置Channel的類型 .localAddress(new InetSocketAddress(port)) // 配置端口號 .childHandler(new ChannelInitializer() { // 實現一個ChannelInitializer,它可以方便地添加多個ChannelHandler @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); } }); // 綁定地址,同步等待它完成 ChannelFuture f = b.bind().sync(); // 關閉這個Future f.channel().closeFuture().sync(); } finally { // 關閉應用程序,一般來說Netty應用只需要調用這個方法就夠了 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.printf( "Usage: %s\n", EchoServer.class.getSimpleName() ); return; } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } }接下來實現客戶端,同樣需要先實現一個入站消息處理器。public class EchoClientInboundHandler extends SimpleChannelInboundHandler{ /** * 我們在Channel連接到遠程節點直接發送一條消息給服務器 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 輸出從服務器Echo的消息 System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }然后配置客戶端。public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) // 服務器的地址 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientInboundHandler()); } }); ChannelFuture f = b.connect().sync(); // 連接到服務器 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s\n", EchoClient.class.getSimpleName()); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }實現一個Netty應用程序就是如此簡單,用戶大多數都是在編寫各種應用邏輯的ChannelHandler(或者使用Netty內置的各種實用ChannelHandler),然后只需要將它們全部添加到ChannelPipeline即可。參考文獻Netty: HomeChapter 6. I/O Multiplexing: The select and poll Functions - Shichao’s Notesepoll(7) - Linux manual pageJava NIONetty: HomeChapter 6. I/O Multiplexing: The select and poll Functions - Shichao’s Notesepoll(7) - Linux manual pageJava NIO
“基于NIO的網絡編程框架Netty有哪些組件”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。