您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關怎么在C#中實現請求唯一性校驗支持高并發,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
使用場景描述:
網絡請求中經常會遇到發送的請求,服務端響應是成功的,但是返回的時候出現網絡故障,導致客戶端無法接收到請求結果,那么客戶端程序可能判斷為網絡故障,而重復發送同一個請求。當然如果接口中定義了請求結果查詢接口,那么這種重復會相對少一些。特別是交易類的數據,這種操作更是需要避免重復發送請求。另外一種情況是用戶過于快速的點擊界面按鈕,產生連續的相同內容請求,那么后端也需要進行過濾,這種一般出現在系統對接上,無法去控制第三方系統的業務邏輯,需要從自身業務邏輯里面去限定。
其他需求描述:
這類請求一般存在時間范圍和高并發的特點,就是短時間內會出現重復的請求,因此對模塊需要支持高并發性。
技術實現:
對請求的業務內容進行MD5摘要,并且將MD5摘要存儲到緩存中,每個請求數據都通過這個一個公共的調用的方法進行判斷。
代碼實現:
公共調用代碼 UniqueCheck 采用單例模式創建唯一對象,便于在多線程調用的時候,只訪問一個統一的緩存庫
/* * volatile就像大家更熟悉的const一樣,volatile是一個類型修飾符(type specifier)。 * 它是被設計用來修飾被不同線程訪問和修改的變量。 * 如果沒有volatile,基本上會導致這樣的結果:要么無法編寫多線程程序,要么編譯器失去大量優化的機會。 */ private static readonly object lockHelper = new object(); private volatile static UniqueCheck _instance; /// <summary> /// 獲取單一實例 /// </summary> /// <returns></returns> public static UniqueCheck GetInstance() { if (_instance == null) { lock (lockHelper) { if (_instance == null) _instance = new UniqueCheck(); } } return _instance; }
這里需要注意volatile的修飾符,在實際測試過程中,如果沒有此修飾符,在高并發的情況下會出現報錯。
自定義一個可以進行并發處理隊列,代碼如下:ConcurrentLinkedQueue
using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace PackgeUniqueCheck { /// <summary> /// 非加鎖并發隊列,處理100個并發數以內 /// </summary> /// <typeparam name="T"></typeparam> public class ConcurrentLinkedQueue<T> { private class Node<K> { internal K Item; internal Node<K> Next; public Node(K item, Node<K> next) { this.Item = item; this.Next = next; } } private Node<T> _head; private Node<T> _tail; public ConcurrentLinkedQueue() { _head = new Node<T>(default(T), null); _tail = _head; } public bool IsEmpty { get { return (_head.Next == null); } } /// <summary> /// 進入隊列 /// </summary> /// <param name="item"></param> public void Enqueue(T item) { Node<T> newNode = new Node<T>(item, null); while (true) { Node<T> curTail = _tail; Node<T> residue = curTail.Next; //判斷_tail是否被其他process改變 if (curTail == _tail) { //A 有其他process執行C成功,_tail應該指向新的節點 if (residue == null) { //C 其他process改變了tail節點,需要重新取tail節點 if (Interlocked.CompareExchange<Node<T>>( ref curTail.Next, newNode, residue) == residue) { //D 嘗試修改tail Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail); return; } } else { //B 幫助其他線程完成D操作 Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail); } } } } /// <summary> /// 隊列取數據 /// </summary> /// <param name="result"></param> /// <returns></returns> public bool TryDequeue(out T result) { Node<T> curHead; Node<T> curTail; Node<T> next; while (true) { curHead = _head; curTail = _tail; next = curHead.Next; if (curHead == _head) { if (next == null) //Queue為空 { result = default(T); return false; } if (curHead == curTail) //Queue處于Enqueue第一個node的過程中 { //嘗試幫助其他Process完成操作 Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail); } else { //取next.Item必須放到CAS之前 result = next.Item; //如果_head沒有發生改變,則將_head指向next并退出 if (Interlocked.CompareExchange<Node<T>>(ref _head, next, curHead) == curHead) break; } } } return true; } /// <summary> /// 嘗試獲取最后一個對象 /// </summary> /// <param name="result"></param> /// <returns></returns> public bool TryGetTail(out T result) { result = default(T); if (_tail == null) { return false; } result = _tail.Item; return true; } } }
雖然是一個非常簡單的唯一性校驗邏輯,但是要做到高效率,高并發支持,高可靠性,以及低內存占用,需要實現這樣的需求,需要做細致的模擬測試。
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Collections; namespace PackgeUniqueCheck { public class UniqueCheck { /* * volatile就像大家更熟悉的const一樣,volatile是一個類型修飾符(type specifier)。 * 它是被設計用來修飾被不同線程訪問和修改的變量。 * 如果沒有volatile,基本上會導致這樣的結果:要么無法編寫多線程程序,要么編譯器失去大量優化的機會。 */ private static readonly object lockHelper = new object(); private volatile static UniqueCheck _instance; /// <summary> /// 獲取單一實例 /// </summary> /// <returns></returns> public static UniqueCheck GetInstance() { if (_instance == null) { lock (lockHelper) { if (_instance == null) _instance = new UniqueCheck(); } } return _instance; } private UniqueCheck() { //創建一個線程安全的哈希表,作為字典緩存 _DataKey = Hashtable.Synchronized(new Hashtable()); Queue myqueue = new Queue(); _DataQueue = Queue.Synchronized(myqueue); _Myqueue = new ConcurrentLinkedQueue<string>(); _Timer = new Thread(DoTicket); _Timer.Start(); } #region 公共屬性設置 /// <summary> /// 設定定時線程的休眠時間長度:默認為1分鐘 /// 時間范圍:1-7200000,值為1毫秒到2小時 /// </summary> /// <param name="value"></param> public void SetTimeSpan(int value) { if (value > 0&& value <=7200000) { _TimeSpan = value; } } /// <summary> /// 設定緩存Cache中的最大記錄條數 /// 值范圍:1-5000000,1到500萬 /// </summary> /// <param name="value"></param> public void SetCacheMaxNum(int value) { if (value > 0 && value <= 5000000) { _CacheMaxNum = value; } } /// <summary> /// 設置是否在控制臺中顯示日志 /// </summary> /// <param name="value"></param> public void SetIsShowMsg(bool value) { Helper.IsShowMsg = value; } /// <summary> /// 線程請求阻塞增量 /// 值范圍:1-CacheMaxNum,建議設置為緩存最大值的10%-20% /// </summary> /// <param name="value"></param> public void SetBlockNumExt(int value) { if (value > 0 && value <= _CacheMaxNum) { _BlockNumExt = value; } } /// <summary> /// 請求阻塞時間 /// 值范圍:1-max,根據阻塞增量設置請求阻塞時間 /// 阻塞時間越長,阻塞增量可以設置越大,但是請求實時響應就越差 /// </summary> /// <param name="value"></param> public void SetBlockSpanTime(int value) { if (value > 0) { _BlockSpanTime = value; } } #endregion #region 私有變量 /// <summary> /// 內部運行線程 /// </summary> private Thread _runner = null; /// <summary> /// 可處理高并發的隊列 /// </summary> private ConcurrentLinkedQueue<string> _Myqueue = null; /// <summary> /// 唯一內容的時間健值對 /// </summary> private Hashtable _DataKey = null; /// <summary> /// 內容時間隊列 /// </summary> private Queue _DataQueue = null; /// <summary> /// 定時線程的休眠時間長度:默認為1分鐘 /// </summary> private int _TimeSpan = 3000; /// <summary> /// 定時計時器線程 /// </summary> private Thread _Timer = null; /// <summary> /// 緩存Cache中的最大記錄條數 /// </summary> private int _CacheMaxNum = 500000; /// <summary> /// 線程請求阻塞增量 /// </summary> private int _BlockNumExt = 10000; /// <summary> /// 請求阻塞時間 /// </summary> private int _BlockSpanTime = 100; #endregion #region 私有方法 private void StartRun() { _runner = new Thread(DoAction); _runner.Start(); Helper.ShowMsg("內部線程啟動成功!"); } private string GetItem() { string tp = string.Empty; bool result = _Myqueue.TryDequeue(out tp); return tp; } /// <summary> /// 執行循環操作 /// </summary> private void DoAction() { while (true) { while (!_Myqueue.IsEmpty) { string item = GetItem(); _DataQueue.Enqueue(item); if (!_DataKey.ContainsKey(item)) { _DataKey.Add(item, DateTime.Now); } } //Helper.ShowMsg("當前數組已經為空,處理線程進入休眠狀態..."); Thread.Sleep(2); } } /// <summary> /// 執行定時器的動作 /// </summary> private void DoTicket() { while (true) { Helper.ShowMsg("當前數據隊列個數:" + _DataQueue.Count.ToString()); if (_DataQueue.Count > _CacheMaxNum) { while (true) { Helper.ShowMsg(string.Format("當前隊列數:{0},已經超出最大長度:{1},開始進行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString())); string item = _DataQueue.Dequeue().ToString(); if (!string.IsNullOrEmpty(item)) { if (_DataKey.ContainsKey(item)) { _DataKey.Remove(item); } if (_DataQueue.Count <= _CacheMaxNum) { Helper.ShowMsg("清理完成,開始休眠清理線程..."); break; } } } } Thread.Sleep(_TimeSpan); } } /// <summary> /// 線程進行睡眠等待 /// 如果當前負載壓力大大超出了線程的處理能力 /// 那么需要進行延時調用 /// </summary> private void BlockThread() { if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt) { Thread.Sleep(_BlockSpanTime); } } #endregion #region 公共方法 /// <summary> /// 開啟服務線程 /// </summary> public void Start() { if (_runner == null) { StartRun(); } else { if (_runner.IsAlive == false) { StartRun(); } } } /// <summary> /// 關閉服務線程 /// </summary> public void Stop() { if (_runner != null) { _runner.Abort(); _runner = null; } } /// <summary> /// 添加內容信息 /// </summary> /// <param name="item">內容信息</param> /// <returns>true:緩存中不包含此值,隊列添加成功,false:緩存中包含此值,隊列添加失敗</returns> public bool AddItem(string item) { BlockThread(); item = Helper.MakeMd5(item); if (_DataKey.ContainsKey(item)) { return false; } else { _Myqueue.Enqueue(item); return true; } } /// <summary> /// 判斷內容信息是否已經存在 /// </summary> /// <param name="item">內容信息</param> /// <returns>true:信息已經存在于緩存中,false:信息不存在于緩存中</returns> public bool CheckItem(string item) { item = Helper.MakeMd5(item); return _DataKey.ContainsKey(item); } #endregion } }
模擬測試代碼:
private static string _example = Guid.NewGuid().ToString(); private static UniqueCheck _uck = null; static void Main(string[] args) { _uck = UniqueCheck.GetInstance(); _uck.Start(); _uck.SetIsShowMsg(false); _uck.SetCacheMaxNum(20000000); _uck.SetBlockNumExt(1000000); _uck.SetTimeSpan(6000); _uck.AddItem(_example); Thread[] threads = new Thread[20]; for (int i = 0; i < 20; i++) { threads[i] = new Thread(AddInfo); threads[i].Start(); } Thread checkthread = new Thread(CheckInfo); checkthread.Start(); string value = Console.ReadLine(); checkthread.Abort(); for (int i = 0; i < 50; i++) { threads[i].Abort(); } _uck.Stop(); } static void AddInfo() { while (true) { _uck.AddItem(Guid.NewGuid().ToString()); } } static void CheckInfo() { while (true) { Console.WriteLine("開始時間:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); Console.WriteLine("插入結果:{0}", _uck.AddItem(_example)); Console.WriteLine("結束時間:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); //調整進程休眠時間,可以測試高并發的情況 //Thread.Sleep(1000); } }
測試截圖:
關于怎么在C#中實現請求唯一性校驗支持高并發就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。