BIO, NIO, AIO以Java的角度理解:
public class PlainEchoServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); //Bind server to port try { while (true) { //Block until new client connection is accepted final Socket clientSocket = socket.accept(); System.out.println("Accepted connection from " + clientSocket); //Create new thread to handle client connection new Thread(new Runnable() { @Override public void run() { try { BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true); //Read data from client and write it back while (true) { writer.println(reader.readLine()); writer.flush(); } } catch (IOException e) { e.printStackTrace(); try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); //Start thread } } catch (IOException e) { e.printStackTrace(); } } }
在Java里的由來,在JDK1.4及以后版本中提供了一套API來專門操作非阻塞I/O,我們可以在java.nio包及其子包中找到相關的類和接口。由于這套API是JDK新提供的I/O API,因此,也叫New I/O,這就是包名nio的由來。這套API由三個主要的部分組成:緩沖區(Buffers)、通道(Channels)和非阻塞I/O的核心類組成。在理解NIO的時候,需要區分,說的是New I/O還是非阻塞IO,New I/O是Java的包,NIO是非阻塞IO概念。這里講的是后面一種。
NIO本身是基于事件驅動思想來完成的,其主要想解決的是BIO的大并發問題:在使用同步I/O的網絡應用中,如果要同時處理多個客戶端請求,或是在客戶端要同時和多個服務器進行通訊,就必須使用多線程來處理。也就是說,將每一個客戶端請求分配給一個線程來單獨處理。這樣做雖然可以達到我們的要求,但同時又會帶來另外一個問題。由于每創建一個線程,就要為這個線程分配一定的內存空間(也叫工作存儲器),而且操作系統本身也對線程的總數有一定的限制。如果客戶端的請求過多,服務端程序可能會因為不堪重負而拒絕客戶端的請求,甚至服務器可能會因此而癱瘓。 NIO基于Selector,當socket有流可讀或可寫入socket時,操作系統會相應的通知引用程序進行處理,應用再將流讀取到緩沖區或寫入操作系統。也就是說,這個時候,已經不是一個連接就要對應一個處理線程了,而是有效的請求,對應一個線程,當連接沒有數據時,是沒有工作線程來處理的。
public class PlainNioEchoServer { public void serve(int port) throws IOException { System.out.println("Listening for connections on port " + port); ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); //Bind server to port ss.bind(address); serverChannel.configureBlocking(false); Selector selector = Selector.open(); //Register the channel with the selector to be interested in new Client connections that get accepted serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { try { //Block until something is selected selector.select(); } catch (IOException ex) { ex.printStackTrace(); //handle in a proper way break; } //Get all SelectedKey instances Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); //Remove the SelectedKey from the iterator iterator.remove(); try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); //Accept the client connection SocketChannel client = server.accept(); System.out.println("Accepted connection from " + client); client.configureBlocking(false); //Register connection to selector and set ByteBuffer client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100)); } //Check for SelectedKey for read if (key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); //Read data to ByteBuffer client.read(output); } //Check for SelectedKey for write if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); output.flip(); //Write data from ByteBuffer to channel client.write(output); output.compact(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { } } } } } }
public class PlainNio2EchoServer { public void serve(int port) throws IOException { System.out.println("Listening for connections on port " + port); final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(port); // Bind Server to port serverChannel.bind(address); final CountDownLatch latch = new CountDownLatch(1); // Start to accept new Client connections. Once one is accepted the CompletionHandler will get called. serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(final AsynchronousSocketChannel channel, Object attachment) { // Again accept new Client connections serverChannel.accept(null, this); ByteBuffer buffer = ByteBuffer.allocate(100); // Trigger a read operation on the Channel, the given CompletionHandler will be notified once something was read channel.read(buffer, buffer, new EchoCompletionHandler(channel)); } @Override public void failed(Throwable throwable, Object attachment) { try { // Close the socket on error serverChannel.close(); } catch (IOException e) { // ingnore on close } finally { latch.countDown(); } } }); try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel channel; EchoCompletionHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); // Trigger a write operation on the Channel, the given CompletionHandler will be notified once something was written channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { // Trigger again a write operation if something is left in the ByteBuffer channel.write(buffer, buffer, this); } else { buffer.compact(); // Trigger a read operation on the Channel, the given CompletionHandler will be notified once something was read channel.read(buffer, buffer, EchoCompletionHandler.this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } } }
說道實現原理,還要從操作系統的IO模型上了解 按照《Unix網絡編程》的劃分,IO模型可以分為:阻塞IO、非阻塞IO、IO復用、信號驅動IO和異步IO,按照POSIX標準來劃分只分為兩類:同步IO和異步IO。 如何區分呢?首先一個IO操作其實分成了兩個步驟:發起IO請求和實際的IO操作,同步IO和異步IO的區別就在于第二個步驟是否阻塞,如果實際的IO讀寫阻塞請求進程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO復用、信號驅動IO都是同步IO,如果不阻塞,而是操作系統幫你做完IO操作再將結果返回給你,那么就是異步IO。阻塞IO和非阻塞IO的區別在于第一步,發起IO請求是否會被阻塞,如果阻塞直到完成那么就是傳統的阻塞IO,如果不阻塞,那么就是非阻塞IO。
收到操作系統的IO模型,又不得不提select/poll/epoll/iocp。 可以理解的說明是:在Linux 2.6以后,java NIO的實現,是通過epoll來實現的,這點可以通過jdk的源代碼發現。而AIO,在windows上是通過IOCP實現的,在linux上還是通過epoll來實現的。 這里強調一點:AIO,這是I/O處理模式,而epoll等都是實現AIO的一種編程模型;換句話說,AIO是一種接口標準,各家操作系統可以實現也可以不實現。在不同操作系統上在高并發情況下最好都采用操作系統推薦的方式。Linux上還沒有真正實現網絡方式的AIO。
implements Iocp.OverlappedChannel
implements Port.PollableChannel
