您好,登錄后才能下訂單哦!
本篇內容介紹了“C#中怎么使用CAS實現無鎖算法”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
CAS(Compare-and-Swap)是一種多線程并發編程中常用的原子操作,用于實現多線程間的同步和互斥訪問。 它操作通常包含三個參數:一個內存地址(通常是一個共享變量的地址)、期望的舊值和新值。
CompareAndSwap(內存地址,期望的舊值,新值)
CAS 操作會比較內存地址處的值與期望的舊值是否相等,如果相等,則將新值寫入該內存地址; 如果不相等,則不進行任何操作。這個比較和交換的操作是一個原子操作,不會被其他線程中斷。
CAS 通常是通過硬件層面的CPU指令實現的,其原子性是由硬件保證的。具體的實現方式根據環境會有所不同。
CAS 操作通常會有一個返回值,用于表示操作是否成功。返回結果可能是true或false,也可能是內存地址處的舊值。
相比于傳統的鎖機制,CAS 有一些優勢:
原子性:CAS 操作是原子的,不需要額外的鎖來保證多線程環境下的數據一致性,避免了鎖帶來的性能開銷和競爭條件。
無阻塞:CAS 操作是無阻塞的,不會因為資源被鎖定而導致線程的阻塞和上下文切換,提高了系統的并發性和可伸縮性。
適用性:CAS 操作可以應用于廣泛的數據結構和算法,如自旋鎖、計數器、隊列等,使得它在實際應用中具有較大的靈活性和適用性。
在 C# 中,我們可以使用 Interlocked 類來實現 CAS 操作。
Interlocked 類提供了一組 CompareExchange 的重載方法,用于實現不同類型的數據的 CAS 操作。
public static int CompareExchange(ref int location1, int value, int comparand); public static long CompareExchange(ref long location1, long value, long comparand); // ... 省略其他重載方法 public static object CompareExchange(ref object location1, object value, object comparand); public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;
CompareExchange 方法將 location1 內存地址處的值與 comparand 比較,如果相等,則將 value 寫入 location1 內存地址處,否則不進行任何操作。
該方法返回 location1 內存地址處的值。
通過判斷方法返回值與 comparand 是否相等,我們就可以知道 CompareExchange 方法是否執行成功。
在使用 CAS 實現無鎖算法時,通常我們不光是為了比較和更新一個數據,還需要在更新成功后進行下一步的操作。結合 while(true) 循環,我們可以不斷地嘗試更新數據,直到更新成功為止。
偽代碼如下:
while (true) { // 讀取數據 oldValue = ...; // 計算新值 newValue = ...; // CAS 更新數據 result = CompareExchange(ref location, newValue, oldValue); // 判斷 CAS 是否成功 if (result == oldValue) { // CAS 成功,執行后續操作 break; } }
在復雜的無鎖算法中,因為每一步操作都是獨立的,連續的操作并非原子,所以我們不光要借助 CAS,每一步操作前都應判斷是否有其他線程已經修改了數據。
下面是一個簡單的計數器類,它使用 CAS 實現了一個線程安全的自增操作。
public class Counter { private int _value; public int Increment() { while (true) { int oldValue = _value; int newValue = oldValue + 1; int result = Interlocked.CompareExchange(ref _value, newValue, oldValue); if (result == oldValue) { return newValue; } } } }
CLR 底層源碼中,我們也會經常看到這樣的代碼,比如 ThreadPool 增加線程時的計數器。https://github.com/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446
internal void EnsureThreadRequested() { // // If we have not yet requested #procs threads, then request a new thread. // // CoreCLR: Note that there is a separate count in the VM which has already been incremented // by the VM by the time we reach this point. // int count = _separated.numOutstandingThreadRequests; while (count < Environment.ProcessorCount) { int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count); if (prev == count) { ThreadPool.RequestWorkerThread(); break; } count = prev; } }
下面是一個簡單的隊列類,它使用 CAS 實現了一個線程安全的入隊和出隊操作。相較于上面的計數器,這里的操作更加復雜,我們每一步都需要考慮是否有其他線程已經修改了數據。
這樣的算法有點像薛定諤的貓,你不知道它是死是活,只有當你試圖去觀察它的時候,它才可能會變成死或者活。
public class ConcurrentQueue<T> { // _head 和 _tail 是兩個偽節點,_head._next 指向隊列的第一個節點,_tail 指向隊列的最后一個節點。 // _head 和 _tail 會被多個線程修改和訪問,所以要用 volatile 修飾。 private volatile Node _head; private volatile Node _tail; public ConcurrentQueue() { _head = new Node(default); // _tail 指向 _head 時,隊列為空。 _tail = _head; } public void Enqueue(T item) { var node = new Node(item); while (true) { Node tail = _tail; Node next = tail._next; // 判斷給 next 賦值的這段時間,是否有其他線程修改過 _tail if (tail == _tail) { // 如果 next 為 null,則說明從給 tail 賦值到給 next 賦值這段時間,沒有其他線程修改過 tail._next, if (next == null) { // 如果 tail._next 為 null,則說明從給 tail 賦值到這里,沒有其他線程修改過 tail._next, // tail 依舊是隊列的最后一個節點,我們就可以直接將 node 賦值給 tail._next。 if (Interlocked.CompareExchange(ref tail._next, node, null) == null) { // 如果_tail == tail,則說明從上一步 CAS 操作到這里,沒有其他線程修改過 _tail,也就是沒有其他線程執行過 Enqueue 操作。 // 那么當前線程 Enqueue 的 node 就是隊列的最后一個節點,我們就可以直接將 node 賦值給 _tail。 Interlocked.CompareExchange(ref _tail, node, tail); break; } } // 如果 next 不為 null,則說明從給 tail 賦值到給 next 賦值這段時間,有其他線程修改過 tail._next, else { // 如果沒有其他線程修改過 _tail,那么 next 就是隊列的最后一個節點,我們就可以直接將 next 賦值給 _tail。 Interlocked.CompareExchange(ref _tail, next, tail); } } } } public bool TryDequeue(out T item) { while (true) { Node head = _head; Node tail = _tail; Node next = head._next; // 判斷 _head 是否被修改過 // 如果沒有被修改過,說明從給 head 賦值到給 next 賦值這段時間,沒有其他線程執行過 Dequeue 操作。 if (head == _head) { // 如果 head == tail,說明隊列為空 if (head == tail) { // 雖然上面已經判斷過隊列是否為空,但是在這里再判斷一次 // 是為了防止在給 tail 賦值到給 next 賦值這段時間,有其他線程執行過 Enqueue 操作。 if (next == null) { item = default; return false; } // 如果 next 不為 null,則說明從給 tail 賦值到給 next 賦值這段時間,有其他線程修改過 tail._next,也就是有其他線程執行過 Enqueue 操作。 // 那么 next 就可能是隊列的最后一個節點,我們嘗試將 next 賦值給 _tail。 Interlocked.CompareExchange(ref _tail, next, tail); } // 如果 head != tail,說明隊列不為空 else { item = next._item; if (Interlocked.CompareExchange(ref _head, next, head) == head) { // 如果 _head 沒有被修改過 // 說明從給 head 賦值到這里,沒有其他線程執行過 Dequeue 操作,上面的 item 就是隊列的第一個節點的值。 // 我們就可以直接返回。 break; } // 如果 _head 被修改過 // 說明從給 head 賦值到這里,有其他線程執行過 Dequeue 操作,上面的 item 就不是隊列的第一個節點的值。 // 我們就需要重新執行 Dequeue 操作。 } } } return true; } private class Node { public readonly T _item; public Node _next; public Node(T item) { _item = item; } } }
我們可以通過以下代碼來進行測試
using System.Collections.Concurrent; var queue = new ConcurrentQueue<int>(); var results = new ConcurrentBag<int>(); int dequeueRetryCount = 0; var enqueueTask = Task.Run(() => { // 確保 Enqueue 前 dequeueTask 已經開始運行 Thread.Sleep(10); Console.WriteLine("Enqueue start"); Parallel.For(0, 100000, i => queue.Enqueue(i)); Console.WriteLine("Enqueue done"); }); var dequeueTask = Task.Run(() => { Thread.Sleep(10); Console.WriteLine("Dequeue start"); Parallel.For(0, 100000, i => { while (true) { if (queue.TryDequeue(out int result)) { results.Add(result); break; } Interlocked.Increment(ref dequeueRetryCount); } }); Console.WriteLine("Dequeue done"); }); await Task.WhenAll(enqueueTask, dequeueTask); Console.WriteLine( $"Enqueue and dequeue done, total data count: {results.Count}, dequeue retry count: {dequeueRetryCount}"); var hashSet = results.ToHashSet(); for (int i = 0; i < 100000; i++) { if (!hashSet.Contains(i)) { Console.WriteLine("Error, missing " + i); break; } } Console.WriteLine("Done");
輸出結果:
Dequeue start
Enqueue start
Enqueue done
Dequeue done
Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
Done
上述的 retry count 為 797,說明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重試,那是因為在 Dequeue 操作中,可能暫時沒有數據可供 Dequeue,需要等待其他線程執行 Enqueue 操作。
當然這個 retry count 是不穩定的,因為在多線程環境下,每次執行的結果都可能不一樣。
“C#中怎么使用CAS實現無鎖算法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。