您好,登錄后才能下訂單哦!
前言
socket是軟件之間通訊最常用的一種方式。c#實現socket通訊有很多中方法,其中效率最高就是異步通訊。
異步通訊實際是利用windows完成端口(IOCP)來處理的,關于完成端口實現原理,大家可以參考網上文章。
我這里想強調的是采用完成端口機制的異步通訊是windows下效率最高的通訊方式,沒有之一!
異步通訊比同步通訊處理要難很多,代碼編寫中會遇到許多“坑“。如果沒有經驗,很難完成。
我搜集了大量資料,完成了對異步socket的封裝。此庫已用穩定高效的運行幾個月。
縱觀網上的資料,我還沒有遇到一個滿意的封裝庫。許多文章把數據收發和協議處理雜糅在一塊,代碼非常難懂,也無法擴展。
在編寫該庫時,避免以上缺陷。將邏輯處理層次化,模塊化!同時實現了高可用性與高性能。
為了使大家對通訊效率有初步了解,先看測試圖。
主機配置情況
百兆帶寬基本占滿,cpu占用40%,我的電腦在空閑時,cpu占用大概20%,也就是說程序占用cpu 20%左右。
這個庫是可擴展的,就是說即使10萬個連接,收發同樣的數據,cpu占用基本相同。
庫的結構圖
目標
即可作為服務端(監聽)也可以作為客戶端(主動連接)使用。
可以適應任何網絡協議。收發的數據針對字節流或一個完整的包。對協議內容不做處理。
高可用性。將復雜的底層處理封裝,對外接口非常友好。
高性能。最大限度優化處理。單機可支持數萬連接,收發速度可達幾百兆bit。
實現思路
網絡處理邏輯可以分為以下幾個部分:
網絡監聽 可以在多個端口實現監聽。負責生成socket,生成的socket供后續處理。監聽模塊功能比較單一,如有必要,可對監聽模塊做進一步優化。
主動連接 可以異步或同步的連接對方。連接成功后,對socket的后續處理,與監聽得到的socket完全一樣。注:無論是監聽得到的socket,還是連接得到的socket,后續處理完全一樣。
Socket收發處理 每個socket對應一個收發實例,socket收發只針對字節流處理。收發時,做了優化。比如發送時,對數據做了沾包,提高發送性能;接收時,一次投遞1K的數據。
組包處理 一般數據包都有包長度指示;比如 報頭的前倆個字節表示長度,根據這個值就可以組成一個完整的包。
NetListener 監聽
using System; using System.Net; using System.Net.Sockets; using System.Threading; namespace IocpCore { class NetListener { private Socket listenSocket; public ListenParam _listenParam { get; set; } public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket; bool start; NetServer _netServer; public NetListener(NetServer netServer) { _netServer = netServer; } public int _acceptAsyncCount = 0; public bool StartListen() { try { start = true; IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port); listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket.Bind(listenPoint); listenSocket.Listen(200); Thread thread1 = new Thread(new ThreadStart(NetProcess)); thread1.Start(); StartAccept(); return true; } catch (Exception ex) { NetLogger.Log(string.Format("**監聽異常!{0}", ex.Message)); return false; } } AutoResetEvent _acceptEvent = new AutoResetEvent(false); private void NetProcess() { while (start) { DealNewAccept(); _acceptEvent.WaitOne(1000 * 10); } } private void DealNewAccept() { try { if(_acceptAsyncCount <= 10) { StartAccept(); } while (true) { AsyncSocketClient client = _newSocketClientList.GetObj(); if (client == null) break; DealNewAccept(client); } } catch (Exception ex) { NetLogger.Log(string.Format("DealNewAccept 異常 {0}***{1}", ex.Message, ex.StackTrace)); } } private void DealNewAccept(AsyncSocketClient client) { client.SendBufferByteCount = _netServer.SendBufferBytePerClient; OnAcceptSocket?.Invoke(_listenParam, client); } private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs) { try { Interlocked.Decrement(ref _acceptAsyncCount); _acceptEvent.Set(); acceptEventArgs.Completed -= AcceptEventArg_Completed; ProcessAccept(acceptEventArgs); } catch (Exception ex) { NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace)); } } public bool StartAccept() { SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs(); acceptEventArgs.Completed += AcceptEventArg_Completed; bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs); Interlocked.Increment(ref _acceptAsyncCount); if (!willRaiseEvent) { Interlocked.Decrement(ref _acceptAsyncCount); _acceptEvent.Set(); acceptEventArgs.Completed -= AcceptEventArg_Completed; ProcessAccept(acceptEventArgs); } return true; } ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>(); private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs) { try { using (acceptEventArgs) { if (acceptEventArgs.AcceptSocket != null) { AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket); client.CreateClientInfo(this); _newSocketClientList.PutObj(client); _acceptEvent.Set(); } } } catch (Exception ex) { NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace)); } } } }
NetConnectManage連接處理
using System; using System.Net; using System.Net.Sockets; namespace IocpCore { class NetConnectManage { public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent; public bool ConnectAsyn(string peerIp, int peerPort, object tag) { try { Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp); SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs(); socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort); socketEventArgs.Completed += SocketConnect_Completed; SocketClientInfo clientInfo = new SocketClientInfo(); socketEventArgs.UserToken = clientInfo; clientInfo.PeerIp = peerIp; clientInfo.PeerPort = peerPort; clientInfo.Tag = tag; bool willRaiseEvent = socket.ConnectAsync(socketEventArgs); if (!willRaiseEvent) { ProcessConnect(socketEventArgs); socketEventArgs.Completed -= SocketConnect_Completed; socketEventArgs.Dispose(); } return true; } catch (Exception ex) { NetLogger.Log("ConnectAsyn",ex); return false; } } private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs) { ProcessConnect(socketEventArgs); socketEventArgs.Completed -= SocketConnect_Completed; socketEventArgs.Dispose(); } private void ProcessConnect(SocketAsyncEventArgs socketEventArgs) { SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo; if (socketEventArgs.SocketError == SocketError.Success) { DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo); } else { SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null); socketParam.ClientInfo = clientInfo; OnSocketConnectEvent?.Invoke(socketParam, null); } } void DealConnectSocket(Socket socket, SocketClientInfo clientInfo) { clientInfo.SetClientInfo(socket); AsyncSocketClient client = new AsyncSocketClient(socket); client.SetClientInfo(clientInfo); //觸發事件 SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket); socketParam.ClientInfo = clientInfo; OnSocketConnectEvent?.Invoke(socketParam, client); } public bool Connect(string peerIp, int peerPort, object tag, out Socket socket) { socket = null; try { Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp); SocketClientInfo clientInfo = new SocketClientInfo(); clientInfo.PeerIp = peerIp; clientInfo.PeerPort = peerPort; clientInfo.Tag = tag; EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort); socketTmp.Connect(remoteEP); if (!socketTmp.Connected) return false; DealConnectSocket(socketTmp, clientInfo); socket = socketTmp; return true; } catch (Exception ex) { NetLogger.Log(string.Format("連接對方:({0}:{1})出錯!", peerIp, peerPort), ex); return false; } } } }
AsyncSocketClient socket收發處理
using System; using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.Net.Sockets; namespace IocpCore { public class AsyncSocketClient { public static int IocpReadLen = 1024; public readonly Socket ConnectSocket; protected SocketAsyncEventArgs m_receiveEventArgs; public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } } protected byte[] m_asyncReceiveBuffer; protected SocketAsyncEventArgs m_sendEventArgs; public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } } protected byte[] m_asyncSendBuffer; public event Action<AsyncSocketClient, byte[]> OnReadData; public event Action<AsyncSocketClient, int> OnSendData; public event Action<AsyncSocketClient> OnSocketClose; static object releaseLock = new object(); public static int createCount = 0; public static int releaseCount = 0; ~AsyncSocketClient() { lock (releaseLock) { releaseCount++; } } public AsyncSocketClient(Socket socket) { lock (releaseLock) { createCount++; } ConnectSocket = socket; m_receiveEventArgs = new SocketAsyncEventArgs(); m_asyncReceiveBuffer = new byte[IocpReadLen]; m_receiveEventArgs.AcceptSocket = ConnectSocket; m_receiveEventArgs.Completed += ReceiveEventArgs_Completed; m_sendEventArgs = new SocketAsyncEventArgs(); m_asyncSendBuffer = new byte[IocpReadLen * 2]; m_sendEventArgs.AcceptSocket = ConnectSocket; m_sendEventArgs.Completed += SendEventArgs_Completed; } SocketClientInfo _clientInfo; public SocketClientInfo ClientInfo { get { return _clientInfo; } } internal void CreateClientInfo(NetListener netListener) { _clientInfo = new SocketClientInfo(); try { _clientInfo.Tag = netListener._listenParam._tag; IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint; Debug.Assert(netListener._listenParam._port == ip.Port); _clientInfo.LocalIp = ip.Address.ToString(); _clientInfo.LocalPort = netListener._listenParam._port; ip = ConnectSocket.RemoteEndPoint as IPEndPoint; _clientInfo.PeerIp = ip.Address.ToString(); _clientInfo.PeerPort = ip.Port; } catch (Exception ex) { NetLogger.Log("CreateClientInfo", ex); } } internal void SetClientInfo(SocketClientInfo clientInfo) { _clientInfo = clientInfo; } #region read process bool _inReadPending = false; public EN_SocketReadResult ReadNextData() { lock (this) { if (_socketError) return EN_SocketReadResult.ReadError; if (_inReadPending) return EN_SocketReadResult.InAsyn; if(!ConnectSocket.Connected) { OnReadError(); return EN_SocketReadResult.ReadError; } try { m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length); _inReadPending = true; bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投遞接收請求 if (!willRaiseEvent) { _inReadPending = false; ProcessReceive(); if (_socketError) { OnReadError(); return EN_SocketReadResult.ReadError; } return EN_SocketReadResult.HaveRead; } else { return EN_SocketReadResult.InAsyn; } } catch (Exception ex) { NetLogger.Log("ReadNextData", ex); _inReadPending = false; OnReadError(); return EN_SocketReadResult.ReadError; } } } private void ProcessReceive() { if (ReceiveEventArgs.BytesTransferred > 0 && ReceiveEventArgs.SocketError == SocketError.Success) { int offset = ReceiveEventArgs.Offset; int count = ReceiveEventArgs.BytesTransferred; byte[] readData = new byte[count]; Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count); _inReadPending = false; if (!_socketError) OnReadData?.Invoke(this, readData); } else { _inReadPending = false; OnReadError(); } } private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e) { lock (this) { _inReadPending = false; ProcessReceive(); if (_socketError) { OnReadError(); } } } bool _socketError = false; private void OnReadError() { lock (this) { if (_socketError == false) { _socketError = true; OnSocketClose?.Invoke(this); } CloseClient(); } } #endregion #region send process int _sendBufferByteCount = 102400; public int SendBufferByteCount { get { return _sendBufferByteCount; } set { if (value < 1024) { _sendBufferByteCount = 1024; } else { _sendBufferByteCount = value; } } } SendBufferPool _sendDataPool = new SendBufferPool(); internal EN_SendDataResult PutSendData(byte[] data) { if (_socketError) return EN_SendDataResult.no_client; if (_sendDataPool._bufferByteCount >= _sendBufferByteCount) { return EN_SendDataResult.buffer_overflow; } if (data.Length <= IocpReadLen) { _sendDataPool.PutObj(data); } else { List<byte[]> dataItems = SplitData(data, IocpReadLen); foreach (byte[] item in dataItems) { _sendDataPool.PutObj(item); } } return EN_SendDataResult.ok; } bool _inSendPending = false; public EN_SocketSendResult SendNextData() { lock (this) { if (_socketError) { return EN_SocketSendResult.SendError; } if (_inSendPending) { return EN_SocketSendResult.InAsyn; } int sendByteCount = GetSendData(); if (sendByteCount == 0) { return EN_SocketSendResult.NoSendData; } //防止拋出異常,否則影響性能 if (!ConnectSocket.Connected) { OnSendError(); return EN_SocketSendResult.SendError; } try { m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount); _inSendPending = true; bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs); if (!willRaiseEvent) { _inSendPending = false; ProcessSend(m_sendEventArgs); if (_socketError) { OnSendError(); return EN_SocketSendResult.SendError; } else { OnSendData?.Invoke(this, sendByteCount); //繼續發下一條 return EN_SocketSendResult.HaveSend; } } else { return EN_SocketSendResult.InAsyn; } } catch (Exception ex) { NetLogger.Log("SendNextData", ex); _inSendPending = false; OnSendError(); return EN_SocketSendResult.SendError; } } } private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs) { lock (this) { try { _inSendPending = false; ProcessSend(m_sendEventArgs); int sendCount = 0; if (sendEventArgs.SocketError == SocketError.Success) { sendCount = sendEventArgs.BytesTransferred; } OnSendData?.Invoke(this, sendCount); if (_socketError) { OnSendError(); } } catch (Exception ex) { NetLogger.Log("SendEventArgs_Completed", ex); } } } private bool ProcessSend(SocketAsyncEventArgs sendEventArgs) { if (sendEventArgs.SocketError == SocketError.Success) { return true; } else { OnSendError(); return false; } } private int GetSendData() { int dataLen = 0; while (true) { byte[] data = _sendDataPool.GetObj(); if (data == null) return dataLen; Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length); dataLen += data.Length; if (dataLen > IocpReadLen) break; } return dataLen; } private void OnSendError() { lock (this) { if (_socketError == false) { _socketError = true; OnSocketClose?.Invoke(this); } CloseClient(); } } #endregion internal void CloseSocket() { try { ConnectSocket.Close(); } catch (Exception ex) { NetLogger.Log("CloseSocket", ex); } } static object socketCloseLock = new object(); public static int closeSendCount = 0; public static int closeReadCount = 0; bool _disposeSend = false; void CloseSend() { if (!_disposeSend && !_inSendPending) { lock (socketCloseLock) closeSendCount++; _disposeSend = true; m_sendEventArgs.SetBuffer(null, 0, 0); m_sendEventArgs.Completed -= SendEventArgs_Completed; m_sendEventArgs.Dispose(); } } bool _disposeRead = false; void CloseRead() { if (!_disposeRead && !_inReadPending) { lock (socketCloseLock) closeReadCount++; _disposeRead = true; m_receiveEventArgs.SetBuffer(null, 0, 0); m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed; m_receiveEventArgs.Dispose(); } } private void CloseClient() { try { CloseSend(); CloseRead(); ConnectSocket.Close(); } catch (Exception ex) { NetLogger.Log("CloseClient", ex); } } //發送緩沖大小 private List<byte[]> SplitData(byte[] data, int maxLen) { List<byte[]> items = new List<byte[]>(); int start = 0; while (true) { int itemLen = Math.Min(maxLen, data.Length - start); if (itemLen == 0) break; byte[] item = new byte[itemLen]; Array.Copy(data, start, item, 0, itemLen); items.Add(item); start += itemLen; } return items; } } public enum EN_SocketReadResult { InAsyn, HaveRead, ReadError } public enum EN_SocketSendResult { InAsyn, HaveSend, NoSendData, SendError } class SendBufferPool { ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>(); public Int64 _bufferByteCount = 0; public bool PutObj(byte[] obj) { if (_bufferPool.PutObj(obj)) { lock (this) { _bufferByteCount += obj.Length; } return true; } else { return false; } } public byte[] GetObj() { byte[] result = _bufferPool.GetObj(); if (result != null) { lock (this) { _bufferByteCount -= result.Length; } } return result; } } }
NetServer 聚合其他類
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net.Sockets; using System.Threading; namespace IocpCore { public class NetServer { public Action<SocketEventParam> OnSocketPacketEvent; //每個連接發送緩沖大小 public int SendBufferBytePerClient { get; set; } = 1024 * 100; bool _serverStart = false; List<NetListener> _listListener = new List<NetListener>(); //負責對收到的字節流 組成完成的包 ClientPacketManage _clientPacketManage; public Int64 SendByteCount { get; set; } public Int64 ReadByteCount { get; set; } List<ListenParam> _listListenPort = new List<ListenParam>(); public void AddListenPort(int port, object tag) { _listListenPort.Add(new ListenParam(port, tag)); } /// <summary> /// /// </summary> /// <param name="listenFault">監聽失敗的端口</param> /// <returns></returns> public bool StartListen(out List<int> listenFault) { _serverStart = true; _clientPacketManage = new ClientPacketManage(this); _clientPacketManage.OnSocketPacketEvent += PutClientPacket; _netConnectManage.OnSocketConnectEvent += SocketConnectEvent; _listListener.Clear(); Thread thread1 = new Thread(new ThreadStart(NetPacketProcess)); thread1.Start(); Thread thread2 = new Thread(new ThreadStart(NetSendProcess)); thread2.Start(); Thread thread3 = new Thread(new ThreadStart(NetReadProcess)); thread3.Start(); listenFault = new List<int>(); foreach (ListenParam param in _listListenPort) { NetListener listener = new NetListener(this); listener._listenParam = param; listener.OnAcceptSocket += Listener_OnAcceptSocket; if (!listener.StartListen()) { listenFault.Add(param._port); } else { _listListener.Add(listener); NetLogger.Log(string.Format("監聽成功!端口:{0}", param._port)); } } return listenFault.Count == 0; } public void PutClientPacket(SocketEventParam param) { OnSocketPacketEvent?.Invoke(param); } //獲取包的最小長度 int _packetMinLen; int _packetMaxLen; public int PacketMinLen { get { return _packetMinLen; } } public int PacketMaxLen { get { return _packetMaxLen; } } /// <summary> /// 設置包的最小和最大長度 /// 當minLen=0時,認為是接收字節流 /// </summary> /// <param name="minLen"></param> /// <param name="maxLen"></param> public void SetPacketParam(int minLen, int maxLen) { Debug.Assert(minLen >= 0); Debug.Assert(maxLen > minLen); _packetMinLen = minLen; _packetMaxLen = maxLen; } //獲取包的總長度 public delegate int delegate_GetPacketTotalLen(byte[] data, int offset); public delegate_GetPacketTotalLen GetPacketTotalLen_Callback; ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>(); private void NetPacketProcess() { while (_serverStart) { try { DealEventPool(); } catch (Exception ex) { NetLogger.Log(string.Format("DealEventPool 異常 {0}***{1}", ex.Message, ex.StackTrace)); } _socketEventPool.WaitOne(1000); } } Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>(); public int ClientCount { get { lock (_clientGroup) { return _clientGroup.Count; } } } public List<Socket> ClientList { get { lock (_clientGroup) { return _clientGroup.Keys.ToList(); } } } private void DealEventPool() { while (true) { SocketEventParam param = _socketEventPool.GetObj(); if (param == null) return; if (param.SocketEvent == EN_SocketEvent.close) { lock (_clientGroup) { _clientGroup.Remove(param.Socket); } } if (_packetMinLen == 0)//字節流處理 { OnSocketPacketEvent?.Invoke(param); } else { //組成一個完整的包 邏輯 _clientPacketManage.PutSocketParam(param); } } } private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client) { try { if (param.Socket == null || client == null) //連接失敗 { } else { lock (_clientGroup) { bool remove = _clientGroup.Remove(client.ConnectSocket); Debug.Assert(!remove); _clientGroup.Add(client.ConnectSocket, client); } client.OnSocketClose += Client_OnSocketClose; client.OnReadData += Client_OnReadData; client.OnSendData += Client_OnSendData; _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read)); } _socketEventPool.PutObj(param); } catch (Exception ex) { NetLogger.Log(string.Format("SocketConnectEvent 異常 {0}***{1}", ex.Message, ex.StackTrace)); } } internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen) { try { lock (_clientGroup) { if (!_clientGroup.ContainsKey(socket)) { Debug.Assert(false); return; } NetLogger.Log(string.Format("報長度異常!包長:{0}", packetLen)); AsyncSocketClient client = _clientGroup[socket]; client.CloseSocket(); } } catch (Exception ex) { NetLogger.Log(string.Format("OnRcvPacketLenError 異常 {0}***{1}", ex.Message, ex.StackTrace)); } } #region listen port private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client) { try { lock (_clientGroup) { bool remove = _clientGroup.Remove(client.ConnectSocket); Debug.Assert(!remove); _clientGroup.Add(client.ConnectSocket, client); } client.OnSocketClose += Client_OnSocketClose; client.OnReadData += Client_OnReadData; client.OnSendData += Client_OnSendData; _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read)); SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket); param.ClientInfo = client.ClientInfo; _socketEventPool.PutObj(param); } catch (Exception ex) { NetLogger.Log(string.Format("Listener_OnAcceptSocket 異常 {0}***{1}", ex.Message, ex.StackTrace)); } } ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>(); private void NetSendProcess() { while (true) { DealSendEvent(); _listSendEvent.WaitOne(1000); } } ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>(); private void NetReadProcess() { while (true) { DealReadEvent(); _listReadEvent.WaitOne(1000); } } private void DealSendEvent() { while (true) { SocketEventDeal item = _listSendEvent.GetObj(); if (item == null) break; switch (item.SocketEvent) { case EN_SocketDealEvent.send: { while (true) { EN_SocketSendResult result = item.Client.SendNextData(); if (result == EN_SocketSendResult.HaveSend) continue; else break; } } break; case EN_SocketDealEvent.read: { Debug.Assert(false); } break; } } } private void DealReadEvent() { while (true) { SocketEventDeal item = _listReadEvent.GetObj(); if (item == null) break; switch (item.SocketEvent) { case EN_SocketDealEvent.read: { while (true) { EN_SocketReadResult result = item.Client.ReadNextData(); if (result == EN_SocketReadResult.HaveRead) continue; else break; } } break; case EN_SocketDealEvent.send: { Debug.Assert(false); } break; } } } private void Client_OnReadData(AsyncSocketClient client, byte[] readData) { //讀下一條 _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read)); try { SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket); param.ClientInfo = client.ClientInfo; param.Data = readData; _socketEventPool.PutObj(param); lock (this) { ReadByteCount += readData.Length; } } catch (Exception ex) { NetLogger.Log(string.Format("Client_OnReadData 異常 {0}***{1}", ex.Message, ex.StackTrace)); } } #endregion private void Client_OnSendData(AsyncSocketClient client, int sendCount) { //發送下一條 _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send)); lock (this) { SendByteCount += sendCount; } } private void Client_OnSocketClose(AsyncSocketClient client) { try { SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket); param.ClientInfo = client.ClientInfo; _socketEventPool.PutObj(param); } catch (Exception ex) { NetLogger.Log(string.Format("Client_OnSocketClose 異常 {0}***{1}", ex.Message, ex.StackTrace)); } } /// <summary> /// 放到發送緩沖 /// </summary> /// <param name="socket"></param> /// <param name="data"></param> /// <returns></returns> public EN_SendDataResult SendData(Socket socket, byte[] data) { if (socket == null) return EN_SendDataResult.no_client; lock (_clientGroup) { if (!_clientGroup.ContainsKey(socket)) return EN_SendDataResult.no_client; AsyncSocketClient client = _clientGroup[socket]; EN_SendDataResult result = client.PutSendData(data); if (result == EN_SendDataResult.ok) { //發送下一條 _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send)); } return result; } } /// <summary> /// 設置某個連接的發送緩沖大小 /// </summary> /// <param name="socket"></param> /// <param name="byteCount"></param> /// <returns></returns> public bool SetClientSendBuffer(Socket socket, int byteCount) { lock (_clientGroup) { if (!_clientGroup.ContainsKey(socket)) return false; AsyncSocketClient client = _clientGroup[socket]; client.SendBufferByteCount = byteCount; return true; } } #region connect process NetConnectManage _netConnectManage = new NetConnectManage(); /// <summary> /// 異步連接一個客戶端 /// </summary> /// <param name="peerIp"></param> /// <param name="peerPort"></param> /// <param name="tag"></param> /// <returns></returns> public bool ConnectAsyn(string peerIp, int peerPort, object tag) { return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag); } /// <summary> /// 同步連接一個客戶端 /// </summary> /// <param name="peerIp"></param> /// <param name="peerPort"></param> /// <param name="tag"></param> /// <param name="socket"></param> /// <returns></returns> public bool Connect(string peerIp, int peerPort, object tag, out Socket socket) { return _netConnectManage.Connect(peerIp, peerPort, tag, out socket); } #endregion } enum EN_SocketDealEvent { read, send, } class SocketEventDeal { public AsyncSocketClient Client { get; set; } public EN_SocketDealEvent SocketEvent { get; set; } public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent) { Client = client; SocketEvent = socketEvent; } } }
庫的使用
使用起來非常簡單,示例如下
using IocpCore; using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; using System.Windows; namespace WarningClient { public class SocketServer { public Action<SocketEventParam> OnSocketEvent; public Int64 SendByteCount { get { if (_netServer == null) return 0; return _netServer.SendByteCount; } } public Int64 ReadByteCount { get { if (_netServer == null) return 0; return _netServer.ReadByteCount; } } NetServer _netServer; EN_PacketType _packetType = EN_PacketType.byteStream; public void SetPacktType(EN_PacketType packetType) { _packetType = packetType; if (_netServer == null) return; if (packetType == EN_PacketType.byteStream) { _netServer.SetPacketParam(0, 1024); } else { _netServer.SetPacketParam(9, 1024); } } public bool Init(List<int> listenPort) { NetLogger.OnLogEvent += NetLogger_OnLogEvent; _netServer = new NetServer(); SetPacktType(_packetType); _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen; _netServer.OnSocketPacketEvent += SocketPacketDeal; foreach (int n in listenPort) { _netServer.AddListenPort(n, n); } List<int> listenFault; bool start = _netServer.StartListen(out listenFault); return start; } int GetPacketTotalLen(byte[] data, int offset) { if (MainWindow._packetType == EN_PacketType.znss) return GetPacketZnss(data, offset); else return GetPacketAnzhiyuan(data, offset); } int GetPacketAnzhiyuan(byte[] data, int offset) { int n = data[offset + 5] + 6; return n; } int GetPacketZnss(byte[] data, int offset) { int packetLen = (int)(data[4]) + 5; return packetLen; } public bool ConnectAsyn(string peerIp, int peerPort, object tag) { return _netServer.ConnectAsyn(peerIp, peerPort, tag); } public bool Connect(string peerIp, int peerPort, object tag, out Socket socket) { return _netServer.Connect(peerIp, peerPort, tag, out socket); } private void NetLogger_OnLogEvent(string message) { AppLog.Log(message); } Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>(); public int ClientCount { get { lock (_clientGroup) { return _clientGroup.Count; } } } public List<Socket> ClientList { get { if (_netServer != null) return _netServer.ClientList; return new List<Socket>(); } } void AddClient(SocketEventParam socketParam) { lock (_clientGroup) { _clientGroup.Remove(socketParam.Socket); _clientGroup.Add(socketParam.Socket, socketParam); } } void RemoveClient(SocketEventParam socketParam) { lock (_clientGroup) { _clientGroup.Remove(socketParam.Socket); } } ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>(); public ObjectPool<SocketEventParam> ReadDataPool { get { return _readDataPool; } } private void SocketPacketDeal(SocketEventParam socketParam) { OnSocketEvent?.Invoke(socketParam); if (socketParam.SocketEvent == EN_SocketEvent.read) { if (MainWindow._isShowReadPacket) _readDataPool.PutObj(socketParam); } else if (socketParam.SocketEvent == EN_SocketEvent.accept) { AddClient(socketParam); string peerIp = socketParam.ClientInfo.PeerIpPort; AppLog.Log(string.Format("客戶端鏈接!本地端口:{0},對端:{1}", socketParam.ClientInfo.LocalPort, peerIp)); } else if (socketParam.SocketEvent == EN_SocketEvent.connect) { string peerIp = socketParam.ClientInfo.PeerIpPort; if (socketParam.Socket != null) { AddClient(socketParam); AppLog.Log(string.Format("連接對端成功!本地端口:{0},對端:{1}", socketParam.ClientInfo.LocalPort, peerIp)); } else { AppLog.Log(string.Format("連接對端失敗!本地端口:{0},對端:{1}", socketParam.ClientInfo.LocalPort, peerIp)); } } else if (socketParam.SocketEvent == EN_SocketEvent.close) { MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket); RemoveClient(socketParam); string peerIp = socketParam.ClientInfo.PeerIpPort; AppLog.Log(string.Format("客戶端斷開!本地端口:{0},對端:{1},", socketParam.ClientInfo.LocalPort, peerIp)); } } public EN_SendDataResult SendData(Socket socket, byte[] data) { if(socket == null) { MessageBox.Show("還沒連接!"); return EN_SendDataResult.no_client; } return _netServer.SendData(socket, data); } internal void SendToAll(byte[] data) { lock (_clientGroup) { foreach (Socket socket in _clientGroup.Keys) { SendData(socket, data); } } } } }
以上這篇C#中一個高性能異步socket封裝庫的實現思路分享就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。