您好,登錄后才能下訂單哦!
本篇內容介紹了“Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Java 7的ConcurrenHashMap的源碼我建議大家都看看,那個版本的源碼就是Java多線程編程的教科書。在Java 7的源碼中,作者對悲觀鎖的使用非常謹慎,大多都轉換為自旋鎖加volatile獲得相同的語義,即使最后迫不得已要用,作者也會通過各種技巧減少鎖的臨界區。在上一篇文章中我們也有講到,自旋鎖在臨界區比較小的時候是一個較優的選擇是因為它避免了線程由于阻塞而切換上下文,但本質上它也是個鎖,在自旋等待期間只有一個線程能進入臨界區,其他線程只會自旋消耗CPU的時間片。Java 8中ConcurrentHashMap的實現通過一些巧妙的設計和技巧,避開了自旋鎖的局限,提供了更高的并發性能。如果說Java 7版本的源碼是在教我們如何將悲觀鎖轉換為自旋鎖,那么在Java 8中我們甚至可以看到如何將自旋鎖轉換為無鎖的方法和技巧。
把書讀薄
image
圖片來源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/
在開始本文之前,大家首先在心里還是要有這樣的一張圖,如果有同學對HashMap比較熟悉,那這張圖也應該不會陌生。事實上在整體的數據結構的設計上Java 8的ConcurrentHashMap和HashMap基本上是一致的。
Java 7中ConcurrentHashMap為了提升性能使用了很多的編程技巧,但是引入Segment的設計還是有很大的改進空間的,Java 7中ConcurrrentHashMap的設計有下面這幾個可以改進的點:
1. Segment在擴容的時候非擴容線程對本Segment的寫操作時都要掛起等待的
2. 對ConcurrentHashMap的讀操作需要做兩次哈希尋址,在讀多寫少的情況下其實是有額外的性能損失的
3. 盡管size()方法的實現中先嘗試無鎖讀,但是如果在這個過程中有別的線程做寫入操作,那調用size()的這個線程就會給整個ConcurrentHashMap加鎖,這是整個ConcurrrentHashMap唯一一個全局鎖,這點對底層的組件來說還是有性能隱患的
4. 極端情況下(比如客戶端實現了一個性能很差的哈希函數)get()方法的復雜度會退化到O(n)。
針對1和2,在Java 8的設計是廢棄了Segment的使用,將悲觀鎖的粒度降低至桶維度,因此調用get的時候也不需要再做兩次哈希了。size()的設計是Java 8版本中最大的亮點,我們在后面的文章中會詳細說明。至于紅黑樹,這篇文章仍然不做過多闡述。接下來的篇幅會深挖細節,把書讀厚,涉及到的模塊有:初始化,put方法, 擴容方法transfer以及size()方法,而其他模塊,比如hash函數等改變較小,故不再深究。
ForwardingNode
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { // MOVED = -1,ForwardingNode的哈希值為-1 super(MOVED, null, null, null); this.nextTable = tab; } }
除了普通的Node和TreeNode之外,ConcurrentHashMap還引入了一個新的數據類型ForwardingNode,我們這里只展示他的構造方法,ForwardingNode的作用有兩個:
在動態擴容的過程中標志某個桶已經被復制到了新的桶數組中
如果在動態擴容的時候有get方法的調用,則ForwardingNode將會把請求轉發到新的桶數組中,以避免阻塞get方法的調用,ForwardingNode在構造的時候會將擴容后的桶數組nextTable保存下來。
這是在Java 8版本的ConcurrentHashMap實現CAS的工具,以int類型為例其方法定義如下:
/** * Atomically update Java variable to <tt>x</tt> if it is currently * holding <tt>expected</tt>. * @return <tt>true</tt> if successful */ public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
相應的語義為:
如果對象o起始地址偏移量為offset的值等于expected,則將該值設為x,并返回true表明更新成功,否則返回false,表明CAS失敗
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 檢查參數 throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中說過 this.sizeCtl = cap; }
即使是最復雜的一個初始化方法代碼也是比較簡單的,這里我們只需要注意兩個點:
concurrencyLevel在Java 7中是Segment數組的長度,由于在Java 8中已經廢棄了Segment,因此concurrencyLevel只是一個保留字段,無實際意義
sizeCtl這個值第一次出現,這個值如果等于-1則表明系統正在初始化,如果是其他負數則表明系統正在擴容,在擴容時sizeCtl二進制的低十六位等于擴容的線程數加一,高十六位(除符號位之外)包含桶數組的大小信息
public V put(K key, V value) { return putVal(key, value, false); }
put方法將調用轉發到putVal方法:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 【A】延遲初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 【B】當前桶是空的,直接更新 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 【C】如果當前的桶的第一個元素是一個ForwardingNode節點,則該線程嘗試加入擴容 else if ((ffh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 【D】否則遍歷桶內的鏈表或樹,并插入 else { // 暫時折疊起來,后面詳細看 } } // 【F】流程走到此處,說明已經put成功,map的記錄總數加一 addCount(1L, binCount); return null; }
從整個代碼結構上來看流程還是比較清楚的,我用括號加字母的方式標注了幾個非常重要的步驟,put方法依然牽扯出很多的知識點
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 說明已經有線程在初始化了,本線程開始自旋 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // CAS保證只有一個線程能走到這個分支 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; tabtable = tab = nt; // sc = n - n/4 = 0.75n sc = n - (n >>> 2); } } finally { // 恢復sizeCtl > 0相當于釋放鎖 sizeCtl = sc; } break; } } return tab; }
在初始化桶數組的過程中,系統如何保證不會出現并發問題呢,關鍵點在于自旋鎖的使用,當有多個線程都執行initTable方法的時候,CAS可以保證只有一個線程能夠進入到真正的初始化分支,其他線程都是自旋等待。這段代碼中我們關注三點即可:
依照前文所述,當有線程開始初始化桶數組時,會通過CAS將sizeCtl置為-1,其他線程以此為標志開始自旋等待
當桶數組初始化結束后將sizeCtl的值恢復為正數,其值等于0.75倍的桶數組長度,這個值的含義和之前HashMap中的THRESHOLD一致,是系統觸發擴容的臨界點
在finally語句中對sizeCtl的操作并沒有使用CAS是因為CAS保證只有一個線程能夠執行到這個地方
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }
put方法的第二個分支會用tabAt判斷當前桶是否是空的,如果是則會通過CAS寫入,tabAt通過UNSAFE接口會拿到桶中的最新元素,casTabAt通過CAS保證不會有并發問題,如果CAS失敗,則通過循環再進入其他分支
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // RESIZE_STAMP_SHIFT = 16 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 這里將sizeCtl的值自增1,表明參與擴容的線程數量+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
在這個地方我們就要詳細說下sizeCtl這個標志位了,臨時變量rs由resizeStamp這個方法返回
static final int resizeStamp(int n) { // RESIZE_STAMP_BITS = 16 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
因為入參n是一個int類型的值,所有Integer.numberOfLeadingZeros(n)的返回值介于0到32之間,如果轉換成二進制
Integer.numberOfLeadingZeros(n)的最大值是:00000000 00000000 00000000 00100000
Integer.numberOfLeadingZeros(n)的最小值是:00000000 00000000 00000000 00000000
因此resizeStampd的返回值也就介于00000000 00000000 10000000 00000000到00000000 00000000 10000000 00100000之間,從這個返回值的范圍可以看出來resizeStamp的返回值高16位全都是0,是不包含任何信息的。因此在ConcurrrentHashMap中,會把resizeStamp的返回值左移16位拼到sizeCtl中,這就是為什么sizeCtl的高16位包含整個Map大小的原理。有了這個分析,這段代碼中比較長的if判斷也就能看懂了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; (sc >>> RESIZE_STAMP_SHIFT) != rs保證所有線程要基于同一個舊的桶數組擴容 transferIndex <= 0已經有線程完成擴容任務了
至于sc == rs + 1 || sc == rs + MAX_RESIZERS這兩個判斷條件如果是細心的同學一定會覺得難以理解,這個地方確實是JDK的一個BUG,這個BUG已經在JDK 12中修復,詳細情況可以參考一下Oracle的官網:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427,這兩個判斷條件應該寫成這樣:sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,因為直接比較rs和sc是沒有意義的,必須要有移位操作。它表達的含義是
sc == (rs << RESIZE_STAMP_SHIFT) + 1當前擴容的線程數為0,即已經擴容完成了,就不需要再新增線程擴容
sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS參與擴容的線程數已經到了最大,就不需要再新增線程擴容
真正擴容的邏輯在transfer方法中,我們后面會詳細看,不過有個小細節可以提前注意,如果nextTable已經初始化了,transfer會返回nextTable的的引用,后續可以直接操作新的桶數組。
如果桶數組已經初始化好了,該擴容的也擴容了,并且根據哈希定位到的桶中已經有元素了,那流程就跟普通的HashMap一樣了,唯一一點不同的就是,這時候要給當前的桶加鎖,且看代碼:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0)// 折疊 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 折疊} else if ((ffh = f.hash) == MOVED)// 折疊 else { V oldVal = null; synchronized (f) { // 要注意這里這個不起眼的判斷條件 if (tabAt(tab, i) == f) { if (fh >= 0) { // fh>=0的節點是鏈表,否則是樹節點或者ForwardingNode binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((eek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 如果鏈表中有值了,直接更新 if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((ee = e.next) == null) { // 如果流程走到這里,則說明鏈表中還沒值,直接連接到鏈表尾部 pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 紅黑樹的操作先略過 } } } } // put成功,map的元素個數+1 addCount(1L, binCount); return null; }
這段代碼中要特備注意一個不起眼的判斷條件(上下文在源碼上已經標注出來了):tabAt(tab, i) == f,這個判斷的目的是為了處理調用put方法的線程和擴容線程的競爭。因為synchronized是阻塞鎖,如果調用put方法的線程恰好和擴容線程同時操作同一個桶,且調用put方法的線程競爭鎖失敗,等到該線程重新獲取到鎖的時候,當前桶中的元素就會變成一個ForwardingNode,那就會出現tabAt(tab, i) != f的情況。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // 初始化新的桶數組 try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTabnextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 判斷是會否是最后一個擴容線程 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((ffh = f.hash) == MOVED) // 只有最后一個擴容線程才有機會執行這個分支 advance = true; // already processed else { // 復制過程與HashMap類似,這里不再贅述 synchronized (f) { // 折疊 } } } }
在深入到源碼細節之前我們先根據下圖看一下在Java 8中ConcurrentHashMap擴容的幾個特點:
新的桶數組nextTable是原先桶數組長度的2倍,這與之前HashMap一致
參與擴容的線程也是分段將table中的元素復制到新的桶數組nextTable中
桶一個桶數組中的元素在新的桶數組中均勻的分布在兩個桶中,桶下標相差n(舊的桶數組的長度),這一點依然與HashMap保持一致
image-20210424202636495
先看一個關鍵的變量transferIndex,這是一個被volatile修飾的變量,這一點可以保證所有線程讀到的一定是最新的值。
private transient volatile int transferIndex;
這個值會被第一個參與擴容的線程初始化,因為只有第一個參與擴容的線程才滿足條件nextTab == null
if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTabnextTable = nextTab; transferIndex = n; }
在了解了transferIndex屬性的基礎上,上面的這個循環就好理解了
while (advance) { int nextIndex, nextBound; // 當bound <= i <= transferIndex的時候i自減跳出這個循環繼續干活 if (--i >= bound || finishing) advance = false; // 擴容的所有任務已經被認領完畢,本線程結束干活 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 否則認領新的一段復制任務,并通過`CAS`更新transferIndex的值 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
transferIndex就像是一個游標,每個線程認領一段復制任務的時候都會通過CAS將其更新為transferIndex - stride, CAS可以保證transferIndex可以按照stride這個步長降到0。
對于每一個擴容線程,for循環的變量i代表要復制的桶的在桶數組中的下標,這個值的上限和下限通過游標transferIndex和步長stride計算得來,當i減小為負數,則說明當前擴容線程完成了擴容任務,這時候流程會走到這個分支:
// i >= n || i + n >= nextn現在看來取不到 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 【A】完成整個擴容過程 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } // 【B】判斷是否是最后一個擴容線程,如果是,則需要重新掃描一遍桶數組,做二次確認 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 說明是最后一個擴容線程 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 重新掃描一遍桶數組,做二次確認 finishing = advance = true; i = n; // recheck before commit } }
因為變量finishing被初始化為false,所以當線程第一次進入這個if分支的話,會先執行注釋為【B】的這個分支,同時因為sizeCtl的低16位被初始化為參與擴容的線程數加一,因此,當條件(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT滿足時,就能證明當前線程就是最后一個擴容線程了,這這時候將i置為n重新掃描一遍桶數組,并且將finishing置為true保證當桶數組被掃描結束后能夠進入注釋為【A】的分支結束擴容。
這里就有一個問題,按照我們前面的分析,擴容線程能夠通力協作,保證各自負責的桶數組的分段不重不漏,這里為什么還需要做二次確認么?有一個開發者在concurrency-interest這個郵件列表中也關于這件事咨詢了Doug Lea(地址:http://cs.oswego.edu/pipermail/concurrency-interest/2020-July/017171.html),他給出的回復是:
Yes, this is a valid point; thanks. The post-scan was needed in a previous version, and could be removed. It does not trigger often enough to matter though, so is for now another minor tweak that might be included next time CHM is updated.
雖然Doug在郵件中的措辭用了could be, not often enough等,但也確認了最后一個擴容線程的二次檢查是沒有必要的。具體的復制過程與HashMap類似,感興趣的讀者可以翻一下高端的面試從來不會在HashMap的紅黑樹上糾纏太多這篇文章。
// 記錄map元素總數的成員變量 private transient volatile long baseCount;
在put方法的最后,有一個addCount方法,因為putVal執行到此處說明已經成功新增了一個元素,所以addCount方法的作用就是維護當前ConcurrentHashMap的元素總數,在ConcurrentHashMap中有一個變量baseCount用來記錄map中元素的個數,如下圖所示,如果同一時刻有n個線程通過CAS同時操作baseCount變量,有且僅有一個線程會成功,其他線程都會陷入無休止的自旋當中,那一定會帶來性能瓶頸。
image-20210420221407349
為了避免大量線程都在自旋等待寫入baseCount,ConcurrentHashMap引入了一個輔助隊列,如下圖所示,現在操作baseCount的線程可以分散到這個輔助隊列中去了,調用size()的時候只需要將baseCount和輔助隊列中的數值相加即可,這樣就實現了調用size()無需加鎖。
image-20210420222306734
輔助隊列是一個類型為CounterCell的數組:
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
可以簡單理解為只是包裝了一個long型的變量value,還需要解決一個問題是,對于某個具體的線程它是如何知道操作輔助隊列中的哪個值呢?答案是下面的這個方法:
static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
getProbe方法會返回當前線程的一個唯一身份碼,這個值是不會變的,因此可以將getProbe的返回值與輔助隊列的長度作求余運算得到具體的下標,它的返回值可能是0,如果返回0則需要調用ThreadLocalRandom.localInit()初始化。addCount方法中有兩個細節需要注意
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 注意這里的判斷條件,是有技巧的 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || // 變量uncontended記錄著這個CAS操作是否成功 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { // 檢查是否需要擴容,后面再詳細看 } }
細節一:
首先我們要注意方法中剛進來的if判斷條件:
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { }
作者在這里巧妙的運用了邏輯短路,如果(as = counterCells) != null則后面的CAS是不會執行的,為什么要這么設置呢?作者有兩點考慮:
1. 原因在于如果(as = counterCells) != null,則說明輔助隊列已經初始化好了,相比于所有的線程都自旋等待baseCount這一個變量,讓線程通過CAS去操作隊列中的值有更大的可能性成功,因為輔助隊列的最大長度為大于當前處理器個數的2的正整數冪,可以支持更大的并發
2. 如果輔助隊列還沒有初始化好,直到有必要的時候再去創建隊列,如何判斷“必要性”呢?就看對baseCount的CAS操作能否成功,如果失敗,就說明當前系統的并發已經比較高了,需要隊列的輔助,否則直接操作baseCount
細節二:
只有當輔助隊列已存在,且由ThreadLocalRandom.getProbe()在輔助隊列中確定的位置不為null時,才對其做CAS操作,這本來是一個正常的防御性判斷,但是uncontended記錄了CAS是否成功,如果失敗,則會在fullAddCount中調用ThreadLocalRandom.advanceProbe換一個身份碼調整下當前線程在輔助隊列的位置,避免所有線程都在輔助隊列的同一個坑位自旋等待。
// See LongAdder version for explanation // wasUncontended 記錄著調用方CAS是否成功,如果失敗則換一個輔助隊列的元素繼續CAS private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 【A】如果輔助隊列已經創建,則直接操作輔助隊列 if ((as = counterCells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // 如果調用方CAS失敗了,本輪空跑,下一個循環換下標繼續操作 wasUncontended = true; // Continue after rehash else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; else if (counterCells != as || n >= NCPU) // 如果輔助隊列長度已經超過了CPU個數,本輪空跑,下一個循環換下標繼續操作 collide = false; // At max size or stale else if (!collide) // 如果上一次操作失敗了(CAS失敗或者新建CounterCell失敗),本輪空跑,下一個循環換下標繼續操作 collide = true; else if (cellsBusy == 0 && // 如果連續兩次操作輔助隊列失敗,則考慮擴容 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } // 如果上一次操作失敗或者調用方CAS失敗,都會走到這里,變換要操作的輔助隊列下標 h = ThreadLocalRandom.advanceProbe(h); } // 【B】如果輔助隊列還未創建,則加鎖創建 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // 【C】如果輔助隊列創建失敗(拿鎖失敗),則嘗試直接操作`baseCount` else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
因為counterCells是一個普通的數組,因此對其的寫操作,包括初始化,擴容以及元素的寫都需要加鎖,加鎖的方式是對全局變量cellsBusy的自旋鎖。先看最外層的三個分支:
【B】如果輔助隊列還沒有創建,則加鎖創建
【C】如果因為拿鎖失敗導致輔助隊列創建失敗,則嘗試自旋寫入變量baseCount,萬一真的成功了呢
【A】如果輔助隊列已經創建了,則直接去操作輔助隊列相應的元素
注釋中標注【A】的這個分支代碼較多,其主要思路是如果通過CAS或者加鎖操作輔助隊列中的某個元素失敗,則首先通過調用ThreadLocalRandom.advanceProbe(h)換一個隊列中的元素繼續操作,這次操作是否成功會記錄在臨時變量collide中。如果下一次操作還是失敗,則說明此時的并發量比較大需要擴容了。如果輔助隊列的長度已經超過了CPU的個數,那就不再擴容,繼續換一個元素操作,因為同一時間能運行的線程數最大不會超過計算機的CPU個數。
在這個過程中有四個細節仍然需要注意:
細節一:
counterCells只是一個普通的數組,因此并不是線程安全的,所以對其寫操作需要加鎖保證并發安全
細節二:
加鎖的時候,作者做了一個double-check的動作,我看有的文章將其解讀為“類似于單例模式的double-check”,這個是不對的,作者這樣做的原因我們在上一篇文章中有講過,首先第一個檢查cellsBusy == 0是流程往下走的基礎,如果cellsBusy == 1則直接拿鎖失敗退出,調用h = ThreadLocalRandom.advanceProbe(h);更新h后重試,如果cellsBusy == 0校驗通過,則調用CounterCell r = new CounterCell(x);初始化一個CounterCell,這樣做是為了減少自旋鎖的臨界區的大小,以此來提升并發性能
細節三:
在加鎖的時候先判斷下cellsBusy是否為0,如果為1那直接宣告拿鎖失敗,為什么這么做呢?因為相比于調用UNSAFE的CAS操作,直接讀取volatile的消耗更少,如果直接讀取cellsBusy已經能判斷出拿鎖失敗,那就沒必要再調用耗時更多的CAS了
細節四:
對cellsBusy從0到1的更改調用了CAS但是從1置為0卻只用了賦值操作,這是因為CAS可以保證能走到這條語句的只有一個線程,因此可以用賦值操作來更改cellsBusy的值。
前面兩個方法主要是把ConcurrentHashMap中的元素個數分散的記錄到baseCount和輔助隊列中,調用size()方法的時候只需要把這些值相加即可。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
“Java8 ConcurrentHashMap源碼中隱藏的兩個Bug是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。