您好,登錄后才能下訂單哦!
本篇內容主要講解“java Nio使用NioSocket客戶端與服務端交互的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“java Nio使用NioSocket客戶端與服務端交互的方法”吧!
java Nio是jdk1.4新增的io方式—–nio(new IO),這種方式在目前來說算不算new,更合適的解釋應該是non-block IO。
non-block是相對于傳統的io方式來講的。傳統的Io方式是阻塞的,我們拿網絡io來舉例,傳統的io模型如下:
服務端主線程負責不斷地server.accept(),如果沒有客戶端請求主線程就會阻塞,當客戶端請求時,主線程會通過線程池創建一個新的線程執行。
簡單解釋就是一個線程負責一個客戶端的socket,當客戶端因網絡等原因傳遞速度慢的時候,服務端對應的客戶端的線程就會等待,很浪費資源。
同時線程過少的話會影響服務的吞吐量,而線程過多的話由于上下文切換等原因會導致效率十分低下,傳統的io方式并不適合如今的網絡流量。
Nio的模型如下:
nio相比傳統的io模型,最大的特點是優化了線程的使用。
nio通過selector可以使用一個線程去管理多個socket句柄,說是管理也不太合適,nio是采用的事件驅動模型,selector負責的是監控各個連接句柄的狀態,不是去輪詢每個句柄,而是在數據就緒后,將消息通知給selector,而具體的socket句柄管理則是采用多路復用的模型,交由操作系統來完成。
selector充當的是一個消息的監聽者,負責監聽channel在其注冊的事件,這樣就可以通過一個線程完成了大量連接的管理,當注冊的事件發生后,再調用相應線程進行處理。
這樣就不需要為每個連接都使用一個線程去維持長連接,減少了長連接的開銷,同時減少了上下文的切換提高了系統的吞吐量。
java Nio主要由三個核心部分組成:
- Buffer - Channel - Selector
所有的io的Nio都是從一個channel開始的,Channel有點類似于流,但是和流不同的是,channel是可以雙向讀寫的。Channel有幾種類型,主要包含文件io操作和網絡io:
- FileChannel (文件io) - DatagramChannel (udp數據報) - SocketChannel (tcp客戶端) - ServerSocketChannel (tcp服務端)
Buffer是一個中間緩存區,數據可以從channel讀取到buffer,也可以從buffer寫到channel中,在java中,傳統方式與io的交互,需要將數據從堆內存讀取到直接內存中,然后交由c語言來調用系統服務完成io的交互。
而使用Buffer可以直接在直接內存中開辟內存區域,減少了io復制的操作,從而提高了io操作的效率。
#基本數據類型的buffer - ByteBuffer - CharBuffer - DoubleBuffer - FloatBuffer - IntBuffer - LongBuffer - ShortBuffer #文件內存映射buffer - MappedByteBuffer #直接內存區buffer - DirectBuffer
Selector允許單個線程處理多個channel,可以將多個channel教給selector管理,并注冊相應的事件,而selector則采用事件驅動的方式,當注冊的事件就緒后,調用相應的相應的線程處理該時間,不用使用線程去維持長連接,減少了線程的開銷。
Selector通過靜態工廠的open方法建立,然后通過channel的register注冊到Channel上。
注冊后通過select方法等待請求,select請求有long類型參數,代表等待時間,如果等待時間內接受到操作請求,則返回可以操作請求的數量,否則超時往下走。
傳入參數為零或者無參方法,則會采用阻塞模式知道有相應請求。
收到請求后調用selectedKeys返回SelectionKey的集合。
SelectionKey保存了處理當前請求的Channel和Selector,并且提供了不同的操作類型。
SelectionKey的操作有四種:
- SelectionKey.OP_CONNECT - SelectionKey.OP_ACCEPT - SelectionKey.OP_READ - SelectionKey.OP_WRITE
下面為一個客戶端與服務端實用NioSocket交互的簡單例子:
//對selectionKey事件的處理 /** * description: * * @author wkGui */ interface ServerHandlerBs { void handleAccept(SelectionKey selectionKey) throws IOException; String handleRead(SelectionKey selectionKey) throws IOException; } /** * description: * * @author wkGui */ public class ServerHandlerImpl implements ServerHandlerBs { private int bufferSize = 1024; private String localCharset = "UTF-8"; public ServerHandlerImpl() { } public ServerHandlerImpl(int bufferSize) { this(bufferSize, null); } public ServerHandlerImpl(String localCharset) { this(-1, localCharset); } public ServerHandlerImpl(int bufferSize, String localCharset) { this.bufferSize = bufferSize > 0 ? bufferSize : this.bufferSize; this.localCharset = localCharset == null ? this.localCharset : localCharset; } @Override public void handleAccept(SelectionKey selectionKey) throws IOException { //獲取channel SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept(); //非阻塞 socketChannel.configureBlocking(false); //注冊selector socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize)); System.out.println("建立請求......"); } @Override public String handleRead(SelectionKey selectionKey) throws IOException { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = (ByteBuffer) selectionKey.attachment(); String receivedStr = ""; if (socketChannel.read(buffer) == -1) { //沒讀到內容關閉 socketChannel.shutdownOutput(); socketChannel.shutdownInput(); socketChannel.close(); System.out.println("連接斷開......"); } else { //將channel改為讀取狀態 buffer.flip(); //按照編碼讀取數據 receivedStr = Charset.forName(localCharset).newDecoder().decode(buffer).toString(); buffer.clear(); //返回數據給客戶端 buffer = buffer.put(("received string : " + receivedStr).getBytes(localCharset)); //讀取模式 buffer.flip(); socketChannel.write(buffer); //注冊selector 繼續讀取數據 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize)); } return receivedStr; } }
//服務端server類 /** * description: * * @author wkGui */ public class NioSocketServer { private volatile byte flag = 1; public void setFlag(byte flag) { this.flag = flag; } public void start() { //創建serverSocketChannel,監聽8888端口 try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { serverSocketChannel.socket().bind(new InetSocketAddress(8888)); //設置為非阻塞模式 serverSocketChannel.configureBlocking(false); //為serverChannel注冊selector Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服務端開始工作:"); //創建消息處理器 ServerHandlerBs handler = new ServerHandlerImpl(1024); while (flag == 1) { selector.select(); System.out.println("開始處理請求 : "); //獲取selectionKeys并處理 Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); try { //連接請求 if (key.isAcceptable()) { handler.handleAccept(key); } //讀請求 if (key.isReadable()) { System.out.println(handler.handleRead(key)); } } catch (IOException e) { e.printStackTrace(); } //處理完后移除當前使用的key keyIterator.remove(); } System.out.println("完成請求處理。"); } } catch (IOException e) { e.printStackTrace(); } } } //server端啟動類 /** * description: * * @author wkGui */ public class ServerMain { public static void main(String[] args) { NioSocketServer server = new NioSocketServer(); new Thread(() -> { try { Thread.sleep(10*60*1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { server.setFlag((byte) 0); } }).start(); server.start(); } }
//客戶端client類 /** * description: * * @author wkGui */ public class NioSocketClient { public void start() { try (SocketChannel socketChannel = SocketChannel.open()) { //連接服務端socket SocketAddress socketAddress = new InetSocketAddress("localhost", 8888); socketChannel.connect(socketAddress); int sendCount = 0; ByteBuffer buffer = ByteBuffer.allocate(1024); //這里最好使用selector處理 這里只是為了寫的簡單 while (sendCount < 10) { buffer.clear(); //向服務端發送消息 buffer.put(("current time : " + System.currentTimeMillis()).getBytes()); //讀取模式 buffer.flip(); socketChannel.write(buffer); buffer.clear(); //從服務端讀取消息 int readLenth = socketChannel.read(buffer); //讀取模式 buffer.flip(); byte[] bytes = new byte[readLenth]; buffer.get(bytes); System.out.println(new String(bytes, "UTF-8")); buffer.clear(); sendCount++; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { e.printStackTrace(); } } } //client啟動類 /** * description: * * @author wkGui */ public class ClientMain { public static void main(String[] args) { new NioSocketClient().start(); } }
WebSocket是一種在單個TCP連接上進行全雙工通信的協議。 WebSocket使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。在WebSocket API中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,并進行雙向數據傳輸。
WebSocket協議相比于Http協議來說,最大的特點就是可以實現服務端主動向客戶端發送消息。在WebSocket出現之前,如果客戶端想實時獲取服務端的消息,就需要使用AJAX輪詢,查詢是否有消息,這樣就很消耗服務器資源和帶寬。但是用WebSocket就可以實現服務端主動向客戶端發送數據,并且只需要占用一個TCP連接,節省了資源和帶寬。
為了建立一個WebSocket連接,客戶端瀏覽器首先要向服務器發起一個HTTP請求,這個請求和通常的HTTP請求不同,包含了一些附加的頭信息,其中附加頭信息“Upgrade: WebSocket” 表明這是一個申請協議升級的HTTP請求。服務器端解析這些附加的信息頭,然后生成應答消息返回給客戶端,客戶端和服務端的WebSocket連接就建立了。之后就可以使用WebSocket協議的格式來雙向發送消息。
建立連接時發送的HTTP請求頭:
返回的HTTP響應頭:
在響應頭中的 Sec-WebSocket-Accept 時通過Sec-WebSocket-Key構造出來的。首先在Sec-WebSocket-Key后接上一個258EAFA5-E914-47DA-95CA-C5AB0DC85B11,然后再進行SHA1摘要得到160位數據在,在使用BASE64進行編碼,最后得到的就是Sec-WebSocket-Accept。
WebSocket數據發送的幀格式如下所示:
FIN - 1bit
在數據發送的過程中,可能會分片發送,FIN表示是否為最后一個分片。如果發生了分片,則1表示時最后一個分片;不能再分片的情況下,這個標志總是為1。
RSV1 RSV2 RSV3 - 1bit each
用于擴展,不使用擴展時需要為全0;非零時通信雙方必須協商好擴展。這里我們用不上。
OPCODE - 4bits
用于表示所傳送數據的類型,也就是payload中的數據。
數值 | 含義 |
---|---|
0x0 | 附加數據幀 |
0x1 | 文本數據幀 |
0x2 | 二進制數據幀 |
0x3-0x7 | 保留 |
0x8 | 關閉連接幀 |
0x9 | ping幀 |
0xA | pong幀 |
0xB-0xF | 保留 |
MASK - 1bit
用于表示payload是否被進行了掩碼運算,1表示使用掩碼,0表示不使用掩碼。從客戶端發送向服務端的數據幀必須使用掩碼。
Payload length 7 bits,7+16 bits or 7+64 bits
用于表示payload的長度,有以下三種情況:
Payload length 表示的大小 | payload的長度 |
---|---|
0 - 125 | Payload length 大小 |
126 | 之后的2個字節表示的無符號整數 |
127 | 之后的8個字節表示的無符號整數 |
Masking-key - 0 or 4 bytes
32 bit長的掩碼,如果MASK為1,則幀中就存在這一個字段,在解析payload時,需要進行使用32長掩碼進行異或操作,之后才能得到正確結果。
利用Java NIO 來實現一個聊天室。部分代碼如下。
NIO的常規代碼:
selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { handleAccept(key); } if (key.isReadable()) { handleRead(key); } }
接受連接:
public void handleAccept(SelectionKey key) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc; try { sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); System.out.println(String.format("[server] -- client %s connected.", sc.getRemoteAddress().toString())); } catch (IOException e) { System.out.println(String.format("[server] -- error occur when accept: %s.", e.getMessage())); key.cancel(); } }
讀取通道中的數據:
public void handleRead(SelectionKey key) { SocketChannel sc = (SocketChannel) key.channel(); Client client = (Client) key.attachment(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 如果是第一次連接進來,就需要創建一個客戶端對象,存儲起來 if (client == null) { client = new Client(sc); clients.add(client); key.attach(client); byteBuffer.clear(); // 如果連接還沒有建立,就是要HTTP建立連接 try { sc.read(byteBuffer); byteBuffer.flip(); String response = WebSocketHandler.getResponse(new String(byteBuffer.array())); byteBuffer.clear(); byteBuffer.put(response.getBytes()); byteBuffer.flip(); while (byteBuffer.hasRemaining()) { sc.write(byteBuffer); } } catch (IOException e) { System.out.println(String.format("[server] -- error occur when read: %s.", e.getMessage())); } String message = "[系統消息] " + client.toString() + " 加入了群聊"; broadcast(message.getBytes(), client); } byteBuffer.clear(); int read = 0; try { read = sc.read(byteBuffer); if (read > 0) { byteBuffer.flip(); int opcode = byteBuffer.get() & 0x0f; // 8表示客戶端關閉了連接 if (opcode == 8) { System.out.println(String.format("[server] -- client %s connection close.", sc.getRemoteAddress())); clients.remove(client); String message = "[系統消息] " + client.toString() + " 退出了群聊"; broadcast(message.getBytes(), client); sc.close(); key.cancel(); return; } // 只考慮了最簡單的payload長度情況。 int len = byteBuffer.get(); len &= 0x7f; byte[] mask = new byte[4]; byteBuffer.get(mask); byte[] payload = new byte[len]; byteBuffer.get(payload); for (int i = 0; i < payload.length; i++) { payload[i] ^= mask[i % 4]; } System.out.println(String .format("[server] -- client: [%s], send: [%s].", client.toString(), new String(payload))); String message = String.format("[%s]: %s", client.toString(), new String(payload)); broadcast(message.getBytes(), client); } else if (read == -1) { System.out.println(String.format("[server] -- client %s connection close.", sc.getRemoteAddress())); clients.remove(client); String message = "[系統消息] " + client.toString() + " 退出了群聊"; broadcast(message.getBytes(), client); sc.close(); key.cancel(); } } catch (IOException e) { System.out.println(String.format("[server] -- error occur when read: %s.", e.getMessage())); } }
使用HTTP建立WebSocket連接。
public class WebSocketHandler { private static String APPEND_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; static class Header { private Map<String, String> properties = new HashMap<>(); public String get(String key) { return properties.get(key); } } private WebSocketHandler() {} private static Header phrase(String request) { Header header = new Header(); String[] pros = request.split("\r\n"); for (String pro : pros) { if (pro.contains(":")) { int index = pro.indexOf(":"); String key = pro.substring(0, index).trim(); String value = pro.substring(index + 1).trim(); header.properties.put(key, value); } } return header; } public static String getResponse(String request) { Header header = phrase(request); String acceptKey = header.get("Sec-WebSocket-Key") + APPEND_STRING; MessageDigest sha1; try { sha1 = MessageDigest.getInstance("sha1"); sha1.update(acceptKey.getBytes()); acceptKey = new String(Base64.getEncoder().encode(sha1.digest())); } catch (NoSuchAlgorithmException e) { System.out.println("fail to encode " + e.getMessage()); return null; } StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("HTTP/1.1 101 Switching Protocols\r\n").append("Upgrade: websocket\r\n") .append("Connection: Upgrade\r\n").append("Sec-WebSocket-Accept: " + acceptKey + "\r\n") .append("\r\n"); return stringBuilder.toString(); } }
客戶端對象
/** * @author XinHui Chen * @date 2020/2/8 19:20 */ public class Client { private SocketChannel socketChannel = null; private String id = null; public SocketChannel getSocketChannel() { return socketChannel; } public String getId() { return id; } Client(SocketChannel socketChannel) { this.socketChannel = socketChannel; this.id = UUID.randomUUID().toString(); } @Override public String toString() { try { return id + " " + socketChannel.getRemoteAddress().toString(); } catch (IOException e) { System.out.println(e.getMessage()); return null; } } }
使用網頁和控制臺與服務端建立WebSocket連接,發送數據。兩個都能成功顯示。
到此,相信大家對“java Nio使用NioSocket客戶端與服務端交互的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。