您好,登錄后才能下訂單哦!
這篇文章主要介紹“Java NIO就緒模式怎么實現”,在日常操作中,相信很多人在Java NIO就緒模式怎么實現問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Java NIO就緒模式怎么實現”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Java NIO非堵塞應用通常適用用在I/O讀寫等方面,我們知道,系統運行的性能瓶頸通常在I/O讀寫,包括對端口和文件的操作上,過去,在打開一個I/O通道后,read()將一直等待在端口一邊讀取字節內容,如果沒有內容進來,read()也是傻傻的等,這會影響我們程序繼續做其他事情,那么改進做法就是開設線程,讓線程去等待,但是這樣做也是相當耗費資源的。
Java NIO非堵塞技術實際是采取Reactor模式,或者說是Observer模式為我們監察I/O端口,如果有內容進來,會自動通知我們,這樣,我們就不必開啟多個線程死等,從外界看,實現了流暢的I/O讀寫,不堵塞了。
Java NIO出現不只是一個技術性能的提高,你會發現網絡上到處在介紹它,因為它具有里程碑意義,從JDK1.4開始,Java開始提高性能相關的功能,從而使得Java在底層或者并行分布式計算等操作上已經可以和C或Perl等語言并駕齊驅。
圖 1 類結構圖
package cn.chenkangxian.nioconcurrent; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.LinkedList; import java.util.List; /** * @Project: testNio * * @Author: chenkangxian * * @Annotation: 使用線程池來處理大量channel并發 * * @Date:2011-7-5 * * @Copyright: 2011 chenkangxian, All rights reserved. * */ public class SelectSocketsThreadPool extends SelectSockets { private static final int MAX_THREADS = 5; private ThreadPool pool = new ThreadPool(MAX_THREADS); /** * 從socket中讀數據 */ protected void readDataFromSocket(SelectionKey key) throws Exception { WorkerThread worker = pool.getWorker(); if (worker == null) { return; worker.serviceChannel(key); } /** * * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation:線程池 * * @Date:2011-7-20 * * @Copyright: 2011 chenkangxian, All rights reserved. * */ private class ThreadPool { List idle = new LinkedList(); /** * 線程池初始化 * * @param poolSize 線程池大小 */ ThreadPool(int poolSize) { for (int i = 0; i < poolSize; i++) { WorkerThread thread = new WorkerThread(this); thread.setName("Worker" + (i + 1)); thread.start(); idle.add(thread); } } /** * 獲得工作線程 * * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @return */ WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (idle.size() > 0) { worker = (WorkerThread) idle.remove(0); } } return (worker); } /** * 送回工作線程 * * Author: chenkangxian * * Last Modification Time: 2011-7-20 * * @param worker */ void returnWorker(WorkerThread worker) { synchronized (idle) { idle.add(worker); } } } private class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; WorkerThread(ThreadPool pool) { this.pool = pool; } public synchronized void run() { System.out.println(this.getName() + " is ready"); while (true) { try { this.wait();//等待被notify } catch (InterruptedException e) { e.printStackTrace(); this.interrupt(); } if (key == null) {//直到有key continue; } System.out.println(this.getName() + " has been awakened"); try { drainChannel(key); } catch (Exception e) { System.out.println("Caught '" + e + "' closing channel"); try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; this.pool.returnWorker(this); } } synchronized void serviceChannel(SelectionKey key) { this.key = key; //消除讀的關注 key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); this.notify(); } void drainChannel(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); while ((count = channel.read(buffer)) > 0) { buffer.flip(); while (buffer.hasRemaining()) { channel.write(buffer); } buffer.clear(); } if (count < 0) { channel.close(); return; } //重新開始關注讀事件 key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.selector().wakeup(); } } public static void main(String[] args) throws Exception { new SelectSocketsThreadPool().go(args); } }
package cn.chenkangxian.nioconcurrent; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * * @Project: concurrentnio * * @Author: chenkangxian * * @Annotation: * * @Date:2011-7-11 * * @Copyright: 2011 chenkangxian, All rights reserved. * */ public class SelectSockets { public static int PORT_NUMBER = 1234; private ByteBuffer buffer = ByteBuffer.allocate(1024); public static void main(String[] args) throws Exception { new SelectSockets().go(args); } public void go(String[] args) throws Exception{ int port = PORT_NUMBER; // if(args.length > 0){ // port = Integer.parseInt(args[0]); // } // System.out.println("Listening on port " + port); ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverChannel.socket(); Selector selector = Selector.open(); serverSocket.bind(new InetSocketAddress(port)); serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); //沒有輪詢,單個selector if(n == 0){ continue; } Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); registerChannel(selector,channel,SelectionKey.OP_READ); sayHello(channel); } if(key.isReadable()){ readDataFromSocket(key); } it.remove(); } } } /** * 在selector上注冊channel,并設置interest * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param selector 選擇器 * * @param channel 通道 * * @param ops interest * * @throws Exception */ protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception{ if(channel == null){ return ; } channel.configureBlocking(false); channel.register(selector, ops); } /** * 處理有可用數據的通道 * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param key 可用通道對應的key * * @throws Exception */ protected void readDataFromSocket(SelectionKey key) throws Exception{ SocketChannel socketChannel = (SocketChannel)key.channel(); int count; buffer.clear(); //Empty buffer while((count = socketChannel.read(buffer)) > 0){ buffer.flip(); while(buffer.hasRemaining()){ socketChannel.write(buffer); } buffer.clear(); } if(count < 0){ socketChannel.close(); } } /** * 打招呼 * * Author: chenkangxian * * Last Modification Time: 2011-7-11 * * @param channel 客戶端channel * * @throws Exception */ private void sayHello(SocketChannel channel) throws Exception{ buffer.clear(); buffer.put("Hello 哈羅! \r\n".getBytes()); buffer.flip(); channel.write(buffer); } }
到此,關于“Java NIO就緒模式怎么實現”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。