您好,登錄后才能下訂單哦!
本篇內容主要講解“Java高并發編程基礎之如何使用AQS”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java高并發編程基礎之如何使用AQS”吧!
引言
曾經有一道比較比較經典的面試題“你能夠說說java的并發包下面有哪些常見的類?”大多數人應該都可以說出 CountDownLatch、CyclicBarrier、Sempahore多線程并發三大利器。這三大利器都是通過AbstractQueuedSynchronizer抽象類(下面簡寫AQS)來實現的,所以學習三大利器之前我們有必要先來學習下AQS。
AQS是一種提供了原子式管理同步狀態、阻塞和喚醒線程功能以及隊列模型的簡單框架”
AQS結構
說到同步我們如何來保證同步?大家第一印象肯定是加鎖了,說到鎖的話大家肯定首先會想到的是Synchronized。Synchronized大家應該基本上都會使用,加鎖和釋放鎖都是jvm 來幫我們實現的,我們只需要簡單的加個 Synchronized關鍵字就可以了。用起來超級方便。但是有沒有一種情況我們設置一個鎖的超時時間Synchronized就有點實現不了,這時候我們就可以用ReentrantLock來實現,ReentrantLock是通過aqs來實現的,今天我們就通過ReentrantLock來學習一下aqs。
CAS && 公平鎖和非公平鎖
AQS里面用到了大量的CAS學習AQS之前我們還是有必要簡單的先了解下CAS、公平鎖和非公平鎖。
CAS
CAS 全稱是 compare and swap,是一種用于在多線程環境下實現同步功能的機制。CAS 操作包含三個操作數 :內存位置、預期數值和新值。CAS 的實現邏輯是將內存位置處的數值與預期數值相比較,若相等,則將內存位置處的值替換為新值。若不相等,則不做任何操作,這個操作是個原子性操作,java里面的AtomicInteger等類都是通過cas來實現的。
公平鎖和非公平鎖
公平鎖:多個線程按照申請鎖的順序去獲得鎖,線程會直接進入隊列去排隊,隊列中第一個才能獲得到鎖。優點:等待鎖的線程不會餓死,每個線程都可以獲取到鎖。缺點:整體吞吐效率相對非公平鎖要低,等待隊列中除第一個線程以外的所有線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。
非公平鎖:多個線程去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待隊列,如果能獲取到,就直接獲取到鎖。優點:可以減少CPU喚醒線程的開銷,整體的吞吐效率會高點,CPU也不必喚醒所有線程,會減少喚起線程的數量。缺點:處于等待隊列中的線程可能會餓死,或者等很久才會獲得鎖。文字有點拗口,我們來個實際的例子說明下。比如我們去食堂就餐的時候都要排隊,大家都按照先來后到的順序排隊打飯,這就是公平鎖。如果等到你準備拿盤子打飯的時候 直接蹦出了一個五大三粗的胖子插隊到你前面,你看打不贏他只能忍氣吞聲讓他插隊,等胖子打完飯了又來個小個子也來插你隊,這時候你沒法忍了,直接大吼一聲讓他滾,這個 小個子只能屁顛屁顛到隊尾去排隊了這就是非公平鎖。我們先來看看AQS有哪些屬性
// 頭節點 private transient volatile Node head; // 阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個鏈表 private transient volatile Node tail; // 這個是最重要的,代表當前鎖的狀態,0代表沒有被占用,大于 0 代表有線程持有當前鎖 // 這個值可以大于 1,是因為鎖可以重入,每次重入都加上 1 private volatile int state; // 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入 // reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經擁有了鎖 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
下面我們來寫一個demo分析下lock 加鎖和釋放鎖的過程
final void lock() { // 上來先試試直接把狀態置位1,如果此時沒人獲取鎖就直接 if (compareAndSetState(0, 1)) // 爭搶成功則修改獲得鎖狀態的線程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
cas嘗試失敗,說明已經有人再持有鎖,所以進入acquire方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire方法,看名字大概能猜出什么意思,就是試一試。tryAcquire實際上是調用了父類Sync的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲取下當前鎖的狀態 int c = getState(); // 這個if 邏輯跟前面一進來就獲取鎖的邏輯一樣都是通過cas嘗試獲取下鎖 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 進入這個判斷說明 鎖重入了 狀態需要進行+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 如果鎖的重入次數大于int的最大值,直接就拋出異常了,正常情況應該不存在這種情況,不過jdk還是嚴謹的 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false 說明嘗試獲取鎖失敗了,失敗了就要進行acquireQueued方法了 return false; }
tryAcquire方法如果獲取鎖失敗了,那么肯定就要排隊等待獲取鎖。排隊的線程需要待在哪里等待獲取鎖?這個就跟我們線程池執行任務一樣,線程池把任務都封裝成一個work,然后當線程處理任務不過來的時候,就把任務放到隊列里面。AQS同樣也是類似的,把排隊等待獲取鎖的線程封裝成一個NODE。然后再把NODE放入到一個隊列里面。隊列如下所示,不過需要注意一點head是不存NODE的。
接下來我們繼續分析源碼,看下獲取鎖失敗是如何被加入隊列的。就要執行acquireQueued方法,執行acquireQueued方法之前需要先執行addWaiter方法
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // cas 加入隊列隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾結點不為空 || cas 加入尾結點失敗 enq(node); return node; }
enq
接下來再看看enq方法
// 通過自旋和CAS一定要當前node加入隊尾 private Node enq(final Node node) { for (;;) { Node t = tail; // 尾結點為空說明隊列還是空的,還沒有被初始化,所以初始化頭結點,可以看到頭結點的node 是沒有綁定線程的也就是不存數據的 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
通過addWaiter方法已經把獲取鎖的線程通過封裝成一個NODE加入對列。上述方法的一個執行流程圖如下:
接下來就是繼續執行acquireQueued方法
acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 通過自旋去獲取鎖 前驅節點==head的時候去嘗試獲取鎖,這個方法在前面已經分析過了。 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 進入這個if說明node的前驅節點不等于head 或者嘗試獲取鎖失敗了 // 判斷是否需要掛起當前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 異常情況進入cancelAcquire,在jdk11的時候這個源碼直接是catch (Throwable e){ cancelAcquire(node);} 簡單明了 if (failed) cancelAcquire(node); } }
setHead
這個方法每當有一個node獲取到鎖了,就把當前node節點設置為頭節點,可以簡單的看做當前節點獲取到鎖了就把當前節點”移除“(變為頭結點)隊列。
shouldParkAfterFailedAcquire
說到這個方法我們就要先看下NODE可能會有哪些狀態在源碼里面我們可以看到總共會有四種狀態
CANCELLED:值為1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態,進入該狀態后的結點將不會再變化。
SIGNAL:值為-1,被標識為該等待喚醒狀態的后繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該后繼結點的線程執行。說白了,就是處于喚醒狀態,只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態的后繼結點的線程執行。
CONDITION:值為-2,與Condition相關,該標識的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態標識結點的線程處于可運行狀態。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驅節點狀態 如果這個狀態為-1 則返回true,把當前線程掛起 if (ws == Node.SIGNAL) return true; // 大于0,說明狀態為CANCELLED if (ws > 0) { do { // 刪除被取消的node(讓被取消的node成為一個沒有引用的node等著下次GC被回收) node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 進入這里只能是 0,-2,-3。NODE節點初始化的時候waitStatus默認值是0,所以只有這里才有修改waitStatus的地方 // 通過cas 把前驅節點的狀態設置為-1,然后返回false ,外面調用這個方法的是個循環,又會調用一次這個方法 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt
掛起當前線程,并且阻塞
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 掛起當前線程,阻塞 return Thread.interrupted(); }
在這里插入圖片描述
解鎖
加鎖成功了,那鎖用完了就應該釋放鎖了,釋放鎖重點看下unparkSuccessor這個方法就好了
private void unparkSuccessor(Node node) { // 頭結點狀態 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // s==null head的successor節點獲取鎖成功后,執行了head.next=null的操作后,解鎖線程讀取了head.next,因此s==null // head的successor節點被取消(cancelAcquire)時,執行了如下操作:successor.waitStatus=1 ; successor.next = successor; if (s == null || s.waitStatus > 0) { s = null; // 從尾節點開始往前找,找到最前面的非取消的節點 這里沒有break 哦 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒線程 ,喚醒的線程會從acquireQueued去獲取鎖 LockSupport.unpark(s.thread); }
釋放鎖代碼比較簡單,基本都寫在代碼注釋里面了,流程如下:
這段代碼里面有一個比較經典的面試題:如果頭結點的下一個節點為空或者頭結點的下一個節點的狀態為取消的時候為什么要從后往前找,找到最前面非取消的節點?
node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作,但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找。
在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node
到此,相信大家對“Java高并發編程基礎之如何使用AQS”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。