您好,登錄后才能下訂單哦!
本篇內容主要講解“AIO與NIO的實際區別是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“AIO與NIO的實際區別是什么”吧!
1. 從某種程度上來說,NIO依然是同步阻塞的
雖然NIO中Channel(網絡Channel)和Buffer可以實現非阻塞的read/write操作,而且Selector提供了多路復用的功能,使得可以在一個線程中管理使用多個IO通道,避免了傳統IO的存在的問題。但是,NIO中在Selector進行調用select()方法進行通道選擇時,其依舊是同步阻塞的,而且由于多個Channel注冊于Selector上,這個方法會同時阻塞多個IO請求操作,盡管select()方法可以設置超時返回,但依舊是不利的。
換句話說,雖然NIO在網絡操作中,提供了非阻塞的read/write方法,但是NIO的IO行為還是同步的。對于NIO來說,我們的業務線程是在IO操作準備好時(調用Selector中的select()方法),得到通知(select()方法返回,表示有準備好的Channel),接著就由這個線程自行進行IO操作(通過Channel進行read/write操作),在第一代NIO中,每個線程可以持有多個IO通道并選擇使用,但實際上一個線程還是只能選擇操作一個IO,IO操作本身是同步的。
2. NIO改進
為了真正實現異步非阻塞的IO操作,在NIO的基礎上進行改進,升級為2代NIO——即AIO機制。
AIO相比于NIO,則更加進了一步,它不是在IO準備好時再通知線程,而是在IO操作已經完成后,再給線程發出通知。因此AIO是不會阻塞的,此時我們的業務邏輯將變成一個回調函數,等待IO操作完成后,由系統自動觸發。也就是相當于
在AIO中,當進行讀寫操作時,只須直接調用API的read或write方法即可。這兩種方法均為異步的,對于讀操作而言,當有流可讀取時,操作系統會將可讀的流傳入read方法的緩沖區,并通知應用程序;對于寫操作而言,當操作系統將write方法傳遞的流寫入完畢時,操作系統主動通知應用程序。 即可以理解為,read/write方法都是異步的,完成后會主動調用回調函數。
主要在Java.nio.channels包下增加了下面四個異步通道:
AsynchronousSocketChannel
AsynchronousServerSocketChannel
AsynchronousFileChannel
AsynchronousDatagramChannel
在AIO socket編程中,服務端通道是AsynchronousServerSocketChannel,這個類提供了一個open()靜態工廠,一個bind()方法用于綁定服務端IP地址(還有端口號),另外還提供了accept()用于接收用戶連接請求。在客戶端使用的通道是AsynchronousSocketChannel,這個通道處理提供open靜態工廠方法外,還提供了read和write方法。
在AIO編程中,發出一個事件(accept read write等)之后要指定事件處理類(回調函數),AIO中的事件處理類是CompletionHandler<V,A>,這個接口定義了如下兩個方法,分別在異步操作成功和失敗時被回調。
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
3. AIO與NIO的實際區別
在JAVA NIO框架中,我們說到了一個重要概念“selector”(選擇器)。它負責代替應用查詢中所有已注冊的通道到操作系統中進行IO事件輪詢、管理當前注冊的通道集合,定位發生事件的通道等操操作。
但是在JAVA AIO框架中,由于應用程序不是“輪詢”方式,而是訂閱-通知方式,所以不再需要“selector”(選擇器)了,改由channel通道直接到操作系統注冊監聽。異步IO則采用“訂閱-通知”模式:即應用程序向操作系統注冊IO監聽,然后繼續做自己的事情。當操作系統發生IO事件,并且準備好數據后,在主動通知應用程序,觸發相應的函數。這就使得AIO真正意義上實現了異步阻塞模式。(AIO是依賴于操作系統的實現的)
和同步IO一樣,異步IO也是由操作系統進行支持的。微軟的windows系統提供了一種異步IO技術:IOCP(I/O CompletionPort,I/O完成端口);
Linux下由于沒有這種異步IO技術,所以使用的是epoll(類似于Selector的一種多路復用IO技術的實現)對異步IO進行模擬。
1. java.nio.channels.AsynchronousChannel:這是一個接口,用來標記一個channel支持異步IO操作。有主要的三個子類AsynchronousFileChannel、AsynchronousSocketChannel和AsynchronousServerSocketChannel,分別對應FileChannel、SocketChannel以及ServerSocketChannel。(很奇怪為什么沒有AsynchronousDatagramChannel)
2. AsynchronousChannelGroup:異步channel的分組管理,目的是為了資源共享。一個AsynchronousChannelGroup綁定一個線程池,這個線程池執行三個任務:等待IO事件、處理IO數據和派發CompletionHandler。AsynchronousServerSocketChannel創建的時候可以傳入一個AsynchronousChannelGroup,那么通過AsynchronousServerSocketChannel創建的 AsynchronousSocketChannel將同屬于一個組,共享資源,(可以理解為相當于Selector)。AsynchronousChannelGroup需要綁定線程池來創建,通過三個靜態方法來創建,可以需要根據具體應用相應調整。
public abstract class AsynchronousChannelGroup { public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory); public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,int initialSize); public static AsynchronousChannelGroup withThreadPool(ExecutorService executor); }
3. CompletionHandler:異步IO操作結果的回調接口,用于定義在IO操作完成后所作的回調工作。AIO的API允許兩種方式來處理異步操作的結果,返回的Future模式或者注冊CompletionHandler,常用CompletionHandler的方式,這些handler的調用是由AsynchronousChannelGroup的線程池派發的。顯然,線程池的大小是性能的關鍵因素。
CompletionHandler接口有兩個個方法,分別對應于處理成功、失敗、被取消(通過返回的Future)情況下的回調處理:
public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
4. ByteBuffer:負責承載通信過程中需要讀、寫的消息。
使用方式主要為三步:打開通道、綁定監聽端口、接收客戶端連接請求。
1. 打開(創建)通道
可以通過調用AsynchronousServerSocketChannel的靜態方法open()來創建AsynchronousServerSocketChannel實例
try { AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(); }catch (IOException e) { e.printStackTrace(); }
或者在open()方法傳入AsynchronousChannelGroup參數,設置通道分組,以實現組內通道資源共享。如果通道打開失敗,就會拋出IOException
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 1024); AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); }catch (IOException e) { e.printStackTrace(); }
AsynchronousChannelGroup封裝了處理由綁定到組的異步通道所觸發的I/O操作完成所需的機制。每個AsynchronousChannelGroup關聯了一個被用于提交處理I/O事件和分發消費在組內通道上執行的異步操作結果的completion-handlers的線程池。除了處理I/O事件,該線程池還有可能處理其他一些用于支持完成異步I/O操作的任務。從上面例子可以看到,通過指定AsynchronousChannelGroup的方式打開AsynchronousServerSocketChannel,可以定制server channel執行的線程池。如果不指定AsynchronousChannelGroup,則AsynchronousServerSocketChannel會歸類到一個默認的分組中。
2. 綁定監聽端口和地址
通過調用bind()方法來綁定要監聽的端口。
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 1024); AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); int port = 8888; serverSocket.bind(new InetSocketAddress(port)); }catch (IOException e) { e.printStackTrace(); }
3. 監聽和接收客戶端連接請求
監聽客戶端連接請求,主要通過調用accept()方法完成。accept()有兩個重載方法:
public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>); public abstract Future<AsynchronousSocketChannel> accept();
這兩個重載方法的行為方式完全相同,提供CompletionHandler回調參數或者返回一個Future<T>類型變量。
Future版本的accept方法通過Future接口可以調用Future.get()方法阻塞等待調用結果,返回一個AsynchronousSocketChannel對象。
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 1024); AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); int port = 8888; serverSocket.bind(new InetSocketAddress(port)); while(true) { Future<AsynchronousSocketChannel> accept = serverSocket.accept(); AsynchronousSocketChannel socket = accept.get();//阻塞方法,獲取AsynchronousSocketChannel //通過獲取的Socket來進行網絡IO操作 //但一般不這樣使用,因為這樣就會導致變得和第一代NIO一樣了,所以基本都是使用另一種CompletionHandler的重載方法 } }catch (IOException e1) { e1.printStackTrace(); }catch (InterruptedException e2) { e2.printStackTrace(); } catch (ExecutionException e3) { e3.printStackTrace(); }
而CompletionHandler回調參數版本則相反,真正的數據IO處理并不會放在當前線程中,而是通過一個回調方法處理,處理邏輯代碼就寫在CompletionHandler中的completed方法中,因為該方法會在AsynchronousServerSocketChannel成功接收到一個AsynchronousSocketChannel,回調執行,而如果AsynchronousServerSocketChannel接受AsynchronousSocketChannel失敗,就會回調failed方法。
serverSocketChannel .accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() { @Override public void completed(final AsynchronousSocketChannel result, final AsynchronousServerSocketChannel attachment) { // 接收到新的客戶端連接,此時本次accept已經完成 // 繼續監聽下一個客戶端連接到來 serverSocketChannel.accept(serverSocketChannel,this); // result即和該客戶端的連接會話 // 此時可以通過result與客戶端進行交互 } ... });
為什么會在completed方法中調用accept方法:因為當一個新的客戶端建立連接之后,就會回調completed方法,一個AsynchronousServerSocketChannel會與多個客戶端建立連接,此時就需要繼續調用accept方法來接受更多的客戶端連接。
4. 設置TCP連接屬性:通過一個AsynchronousServerSocketChannel建立的連接肯定是TCP連接了,所以通過該對象我們可以設置TCP連接的一些屬性。
// 設置socket選項,比如設置保持TCP連接,也就是TCP長連接 serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設置 boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
獲取本地IP地址
InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
1. 創建連接
首先需要調用open方法創建一個AsynchronousSocketChannel對象,然后通過connect方法與服務端建立連接。connect方法也有兩個重載版本
一個版本是返回Future對象,另一種是傳入CompletionHandler參數對象
AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); // Future future = socket.connect(new InetSocketAddress("localhost",8888)); // future.get(); socket.connect(new InetSocketAddress("localhost", 8888), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() { @Override public void completed(Void result, AsynchronousSocketChannel attachment) { } @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { } });
2. 寫數據
構建一個ByteBuffer
對象并調用socketChannel.write(ByteBuffer)
方法異步發送消息,并通過CompletionHandler
回調接收處理發送結果:
ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes()); socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 發送完成,result:總共寫入的字節數 } @Override public void failed(final Throwable exc, final Object attachment) { // 發送失敗 } });
3. 讀數據
構建一個指定接收長度的ByteBuffer
用于接收數據,調用socketChannel.read()
方法讀取消息并通過CompletionHandler
處理讀取結果:
ByteBuffer readBuffer = ByteBuffer.allocate(128); socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 讀取完成,result:實際讀取的字節數。如果通道中沒有數據可讀則result=-1。 } @Override public void failed(final Throwable exc, final Object attachment) { // 讀取失敗 } });
4. 通過AsynchronousSocketChannel也可以設置設置/獲取socket選項(TCP連接屬性)
// 設置socket選項 socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設置 boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
1. AIO中定義的異步通道允許指定一個CompletionHandler處理器消費一個異步操作的結果(也就是當準備好IO數據通道后,就回調CompletionHandler中的方法,使用IO數據通道進行IO處理,這也就導致了異步操作,不在等候IO通道的就緒,也不用將IO操作在當前線程中執行,而是采用回調的方式)。從上文中也可以看到,AIO中大部分的異步I/O操作接口都封裝了一個帶CompletionHandler類型參數的重載方法,使用CompletionHandler可以很方便地處理AIO中的異步I/O操作結果。CompletionHandler是一個具有兩個泛型類型參數的接口,聲明了兩個接口方法:
public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
NIO以及AIOU雖然實現了異步非阻塞網絡IO操作,但是,其依舊具有一些缺點:
雖然JAVA NIO 和 JAVA AIO框架提供了多路復用IO/異步IO的支持,但是并沒有提供上層“信息格式”的良好封裝。例如前兩者并沒有提供針對 ProtocolBuffer、JSON這些信息格式的封裝,但是Netty框架提供了這些數據格式封裝(基于責任鏈模式的編碼和解碼功能)
要編寫一個可靠的、易維護的、高性能的(注意它們的排序)NIO/AIO服務器應用。除了框架本身要兼容實現各類操作系統的實現外。更重要的是它應該還要處理很多上層特有服務,例如:客戶端的權限、還有上面提到的信息格式封裝、簡單的數據讀取。這些Netty框架都提供了響應的支持。
JAVA NIO框架存在一個poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能block意味著CPU的使用率會變成100%(這是底層JNI的問題,上層要處理這個異常實際上也好辦)。當然這個bug只有在Linux內核上才能重現。這個問題在JDK 1.7版本中還沒有被完全解決:http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719。雖然Netty 4.0中也是基于JAVA NIO框架進行封裝的(上文中已經給出了Netty中NioServerSocketChannel類的介紹),但是Netty已經將這個bug進行了處理。
到此,相信大家對“AIO與NIO的實際區別是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。