91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么掌握AQS

發布時間:2021-10-26 13:45:10 來源:億速云 閱讀:131 作者:iii 欄目:開發技術

這篇文章主要講解了“怎么掌握AQS”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么掌握AQS”吧!

鎖原理 - 信號量 vs  管程

在并發編程領域,有兩大核心問題:互斥與同步,互斥即同一時刻只允許一個線程訪問共享資源,同步,即線程之間如何通信、協作,一般這兩大問題可以通過信號量和管程來解決。

信號量

信號量(Semaphore)是操作系統提供的一種進程間常見的通信方式,主要用來協調并發程序對共享資源的訪問,操作系統可以保證對信號量操作的原子性。它是怎么實現的呢。

  • 信號量由一個共享整型變量 S 和兩個原子操作 PV 組成,S 只能通過 P 和 V 操作來改變

  • P 操作:即請求資源,意味著 S 要減 1,如果 S < 0, 則表示沒有資源了,此時線程要進入等待隊列(同步隊列)等待

  • V 操作: 即釋放資源,意味著 S 要加 1, 如果 S 小于等于 0,說明等待隊列里有線程,此時就需要喚醒線程。

示意圖如下

怎么掌握AQS

信號量機制的引入解決了進程同步和互斥問題,但信號量的大量同步操作分散在各個進程中不便于管理,還有可能導致系統死鎖。如:生產者消費者問題中將P、V顛倒可能死鎖(見文末參考鏈接),另外條件越多,需要的信號量就越多,需要更加謹慎地處理信號量之間的處理順序,否則很容易造成死鎖現象。

基于信號量給編程帶來的隱患,于是有了提出了對開發者更加友好的并發編程模型-管程

管程

Dijkstra 于 1971  年提出:把所有進程對某一種臨界資源的同步操作都集中起來,構成一個所謂的秘書進程。凡要訪問該臨界資源的進程,都需先報告秘書,由秘書來實現諸進程對同一臨界資源的互斥使用,這種機制就是管程。

管程是一種在信號量機制上進行改進的并發編程模型,解決了信號量在臨界區的 PV 操作上配對的麻煩,把配對的 PV  操作集中在一起而形成的并發編程方法理論,極大降低了使用和理解成本。

  1. 鴻蒙官方戰略合作共建——HarmonyOS技術社區

  2. 管程由四部分組成:

  3. 管程內部的共享變量。

  4. 管程內部的條件變量。

  5. 管程內部并行執行的進程。

對于局部與管程內部的共享數據設置初始值的語句。

由此可見,管程就是一個對象監視器。任何線程想要訪問該資源(共享變量),就要排隊進入監控范圍。進入之后,接受檢查,不符合條件,則要繼續等待,直到被通知,然后繼續進入監視器。

需要注意的事,信號量和管程兩者是等價的,信號量可以實現管程,管程也可以實現信號量,只是兩者的表現形式不同而已,管程對開發者更加友好。

兩者的區別如下

怎么掌握AQS

管程為了解決信號量在臨界區的 PV 操作上的配對的麻煩,把配對的 PV  操作集中在一起,并且加入了條件變量的概念,使得在多條件下線程間的同步實現變得更加簡單。

怎么理解管程中的入口等待隊列,共享變量,條件變量等概念,有時候技術上的概念較難理解,我們可以借助生活中的場景來幫助我們理解,就以我們的就醫場景為例來簡單說明一下,正常的就醫流程如下:

  1. 病人去掛號后,去侯診室等待叫號

  2. 叫到自己時,就可以進入就診室就診了

  3. 就診時,有兩種情況,一種是醫生很快就確定病人的病,并作出診斷,診斷完成后,就通知下一位病人進來就診,一種是醫生無法確定病因,需要病人去做個驗血 / CT  檢查才能確定病情,于是病人就先去驗個血 / CT

  4. 病人驗完血 / 做完 CT 后,重新取號,等待叫號(進入入口等待隊列)

  5. 病人等到自己的號,病人又重新拿著驗血 / CT 報告去找醫生就診

整個流程如下

怎么掌握AQS

那么管程是如何解決互斥和同步的呢

首先來看互斥,上文中醫生即共享資源(也即共享變量),就診室即為臨界區,病人即線程,任何病人如果想要訪問臨界區,必須首先獲取共享資源(即醫生),入口一次只允許一個線程經過,在共享資源被占有的情況下,如果再有線程想占有共享資源,就需要到等待隊列去等候,等到獲取共享資源的線程釋放資源后,等待隊列中的線程就可以去競爭共享資源了,這樣就解決了互斥問題,所以本質上管程是通過將共享資源及其對共享資源的操作(線程安全地獲取和釋放)封裝起來來保證互斥性的。

再來看同步,同步是通過文中的條件變量及其等待隊列實現的,同步的實現分兩種情況

  1. 病人進入就診室后,無需做驗血 / CT  等操作,于是醫生診斷完成后,就會釋放共享資源(解鎖)去通知(notify,notifyAll)入口等待隊列的下一個病人,下一個病人聽到叫號后就能看醫生了。

  2. 如果病人進入就診室后需要做驗血 / CT 等操作,會去驗血 / CT 隊列(條件隊列)排隊,  同時釋放共享變量(醫生),通知入口等待隊列的其他病人(線程)去獲取共享變量(醫生),獲得許可的線程執行完臨界區的邏輯后會喚醒條件變量等待隊列中的線程,將它放到入口等待隊列中  ,等到其獲取共享變量(醫生)時,即可進入入口(臨界區)處理。

在 Java 里,鎖大多是依賴于管程來實現的,以大家熟悉的內置鎖 synchronized 為例,它的實現原理如下。

怎么掌握AQS

可以看到 synchronized 鎖也是基于管程實現的,只不過它只有且只有一個條件變量(就是鎖對象本身)而已,這也是為什么JDK 要實現 Lock  鎖的原因之一,Lock 支持多個條件變量。

通過這樣的類比,相信大家對管程的工作機制有了比較清晰的認識,為啥要花這么大的力氣介紹管程呢,一來管程是解決并發問題的萬能鑰匙,二來 AQS 是基于  Java 并發包中管程的一種實現,所以理解管程對我們理解 AQS 會大有幫助,接下來我們就來看看 AQS 是如何工作的。

AQS 實現原理

AQS 全稱是 AbstractQueuedSynchronizer,是一個用來構建鎖和同步器的框架,它維護了一個共享資源 state  和一個 FIFO 的等待隊列(即上文中管程的入口等待隊列),底層利用了 CAS 機制來保證操作的原子性。

AQS 實現鎖的主要原理如下:

怎么掌握AQS

以實現獨占鎖為例(即當前資源只能被一個線程占有),其實現原理如下:state 初始化 0,在多線程條件下,線程要執行臨界區的代碼,必須首先獲取  state,某個線程獲取成功之后, state 加 1,其他線程再獲取的話由于共享資源已被占用,所以會到 FIFO 等待隊列去等待,等占有 state  的線程執行完臨界區的代碼釋放資源( state 減 1)后,會喚醒 FIFO 中的下一個等待線程(head 中的下一個結點)去獲取 state。

state 由于是多線程共享變量,所以必須定義成 volatile,以保證 state 的可見性, 同時雖然 volatile  能保證可見性,但不能保證原子性,所以 AQS 提供了對 state 的原子操作方法,保證了線程安全。

另外 AQS 中實現的 FIFO 隊列(CLH 隊列)其實是雙向鏈表實現的,由 head, tail 節點表示,head  結點代表當前占用的線程,其他節點由于暫時獲取不到鎖所以依次排隊等待鎖釋放。

所以我們不難明白 AQS 的如下定義:

public abstract class AbstractQueuedSynchronizer   extends AbstractOwnableSynchronizer     implements java.io.Serializable {     // 以下為雙向鏈表的首尾結點,代表入口等待隊列     private transient volatile Node head;     private transient volatile Node tail;     // 共享變量 state     private volatile int state;     // cas 獲取 / 釋放 state,保證線程安全地獲取鎖     protected final boolean compareAndSetState(int expect, int update) {         // See below for intrinsics setup to support this         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);     }     // ...  }

AQS 源碼

剖析ReentrantLock 是我們比較常用的一種鎖,也是基于 AQS 實現的,所以接下來我們就來分析一下 ReentrantLock  鎖的實現來一探 AQS 究竟。本文將會采用圖文并茂的方式讓大家理解 AQS  的實現原理,大家在學習過程中,可以多類比一下上文中就診的例子,相信會有助于理解。

首先我們要知道 ReentrantLock 是獨占鎖,也有公平和非公平兩種鎖模式,什么是獨占與有共享模式,什么又是公平鎖與非公平鎖

與獨占鎖對應的是共享鎖,這兩者有什么區別呢

獨占鎖:即其他線程只有在占有鎖的線程釋放后才能競爭鎖,有且只有一個線程能競爭成功(醫生只有一個,一次只能看一個病人)

共享鎖:即共享資源可以被多個線程同時占有,直到共享資源被占用完畢(多個醫生,可以看多個病人),常見的有讀寫鎖 ReadWriteLock,  CountdownLatch,兩者的區別如下

怎么掌握AQS

什么是公平鎖與非公平鎖

還是以就醫為例,所謂公平鎖即大家取號后老老實實按照先來后到的順序在侯診室依次等待叫號,如果是非公平鎖呢,新來的病人(線程)很霸道,不取號排隊  ,直接去搶先看病,占有醫生(不一定成功)

怎么掌握AQS

公平鎖與非公平鎖

本文我們將會重點分析獨占,非公平模式的源碼實現,不分析共享模式與 Condition 的實現,因為剖析了獨占鎖的實現,由于原理都是相似的,再分析共享與  Condition 就不難了。

首先我們先來看下 ReentrantLock 的使用方法

// 1. 初始化可重入鎖 private ReentrantLock lock = new ReentrantLock(); public void run() {     // 加鎖     lock.lock();     try {         // 2. 執行臨界區代碼     } catch (InterruptedException e) {         e.printStackTrace();     } finally {         // 3. 解鎖         lock.unlock();     } }

第一步是初始化可重入鎖,可以看到默認使用的是非公平鎖機制

public ReentrantLock() {     sync = new NonfairSync(); }

當然你也可以用如下構造方法來指定使用公平鎖:

public ReentrantLock(boolean fair) {     sync = fair ? new FairSync() : new NonfairSync(); }

畫外音: FairSync 和 NonfairSync 是 ReentrantLock 實現的內部類,分別指公平和非公平模式,ReentrantLock  ReentrantLock 的加鎖(lock),解鎖(unlock)在內部具體都是調用的 FairSync,NonfairSync 的加鎖和解鎖方法。

幾個類的關系如下:

怎么掌握AQS

我們先來剖析下非公平鎖(NonfairSync)的實現方式,來看上述示例代碼的第二步:加鎖,由于默認的是非公平鎖的加鎖,所以我們來分析下非公平鎖是如何加鎖的

怎么掌握AQS

可以看到 lock 方法主要有兩步

  1. 使用 CAS 來獲取 state 資源,如果成功設置 1,代表 state 資源獲取鎖成功,此時記錄下當前占用 state 的線程  setExclusiveOwnerThread(Thread.currentThread());

  2. 如果 CAS 設置 state 為 1 失敗(代表獲取鎖失敗),則執行 acquire(1) 方法,這個方法是 AQS 提供的方法,如下

public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); }

tryAcquire 剖析

首先 調用 tryAcquire 嘗試著獲取 state,如果成功,則跳過后面的步驟。如果失敗,則執行 acquireQueued 將線程加入 CLH  等待隊列中。

先來看下 tryAcquire 方法,這個方法是 AQS 提供的一個模板方法,最終由其 AQS  具體的實現類(Sync)實現,由于執行的是非公平鎖邏輯,執行的代碼如下:

final boolean nonfairTryAcquire(int acquires) {     final Thread current = Thread.currentThread();     int c = getState();      if (c == 0) {         // 如果 c 等于0,表示此時資源是空閑的(即鎖是釋放的),再用 CAS 獲取鎖         if (compareAndSetState(0, acquires)) {             setExclusiveOwnerThread(current);             return true;         }     }     else if (current == getExclusiveOwnerThread()) {         // 此條件表示之前已有線程獲得鎖,且此線程再一次獲得了鎖,獲取資源次數再加 1,這也映證了 ReentrantLock 為可重入鎖         int nextc = c + acquires;         if (nextc < 0) // overflow             throw new Error("Maximum lock count exceeded");         setState(nextc);         return true;     }     return false; }

此段代碼可知鎖的獲取主要分兩種情況

  1. state 為 0 時,代表鎖已經被釋放,可以去獲取,于是使用 CAS 去重新獲取鎖資源,如果獲取成功,則代表競爭鎖成功,使用  setExclusiveOwnerThread(current) 記錄下此時占有鎖的線程,看到這里的  CAS,大家應該不難理解為啥當前實現是非公平鎖了,因為隊列中的線程與新線程都可以 CAS 獲取鎖啊,新來的線程不需要排隊

  2. 如果 state 不為 0,代表之前已有線程占有了鎖,如果此時的線程依然是之前占有鎖的線程(current ==  getExclusiveOwnerThread() 為 true),代表此線程再一次占有了鎖(可重入鎖),此時更新  state,記錄下鎖被占有的次數(鎖的重入次數),這里的 setState 方法不需要使用 CAS  更新,因為此時的鎖就是當前線程占有的,其他線程沒有機會進入這段代碼執行。所以此時更新 state 是線程安全的。

假設當前 state = 0,即鎖不被占用,現在有 T1, T2, T3 這三個線程要去競爭鎖

怎么掌握AQS

假設現在 T1 獲取鎖成功,則兩種情況分別為 1、 T1 首次獲取鎖成功

怎么掌握AQS

2、 T1 再次獲取鎖成功,state 再加 1,表示鎖被重入了兩次,當前如果 T1一直申請占用鎖成功,state 會一直累加

怎么掌握AQS

acquireQueued 剖析

如果 tryAcquire(arg) 執行失敗,代表獲取鎖失敗,則執行 acquireQueued 方法,將線程加入 FIFO 等待隊列

public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         selfInterrupt(); }

所以接下來我們來看看 acquireQueued 的執行邏輯,首先會調用 addWaiter(Node.EXCLUSIVE) 將包含有當前線程的 Node  節點入隊, Node.EXCLUSIVE 代表此結點為獨占模式

再來看下 addWaiter 是如何實現的

private Node addWaiter(Node mode) {     Node node = new Node(Thread.currentThread(), mode);     Node pred = tail;     // 如果尾結點不為空,則用 CAS 將獲取鎖失敗的線程入隊     if (pred != null) {         node.prev = pred;         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }     // 如果結點為空,執行 enq 方法     enq(node);     return node; }

這段邏輯比較清楚,首先是獲取 FIFO 隊列的尾結點,如果尾結點存在,則采用 CAS 的方式將等待線程入隊,如果尾結點為空則執行 enq 方法

private Node enq(final Node node) {     for (;;) {         Node t = tail;         if (t == null) {             // 尾結點為空,說明 FIFO 隊列未初始化,所以先初始化其頭結點             if (compareAndSetHead(new Node()))                 tail = head;         } else {             // 尾結點不為空,則將等待線程入隊             node.prev = t;             if (compareAndSetTail(t, node)) {                 t.next = node;                 return t;             }         }     } }

首先判斷 tail 是否為空,如果為空說明 FIFO 隊列的 head,tail 還未構建,此時先構建頭結點,構建之后再用 CAS  的方式將此線程結點入隊

使用 CAS 創建 head 節點的時候只是簡單調用了 new Node() 方法,并不像其他節點那樣記錄 thread,這是為啥

因為 head 結點為虛結點,它只代表當前有線程占用了 state,至于占用 state 的是哪個線程,其實是調用了上文的  setExclusiveOwnerThread(current) ,即記錄在 exclusiveOwnerThread 屬性里。

執行完 addWaiter 后,線程入隊成功,現在就要看最后一個最關鍵的方法 acquireQueued  了,這個方法有點難以理解,先不急,我們先用三個線程來模擬一下之前的代碼對應的步驟

1、假設 T1 獲取鎖成功,由于此時 FIFO 未初始化,所以先創建 head 結點

怎么掌握AQS

2、此時 T2 或 T3 再去競爭 state 失敗,入隊,如下圖:

怎么掌握AQS

好了,現在問題來了, T2,T3 入隊后怎么處理,是馬上阻塞嗎,馬上阻塞意味著線程從運行態轉為阻塞態  ,涉及到用戶態向內核態的切換,而且喚醒后也要從內核態轉為用戶態,開銷相對比較大,所以 AQS 對這種入隊的線程采用的方式是讓它們自旋來競爭鎖,如下圖示

怎么掌握AQS

不過聰明的你可能發現了一個問題,如果當前鎖是獨占鎖,如果鎖一直被被 T1 占有, T2,T3 一直自旋沒太大意義,反而會占用  CPU,影響性能,所以更合適的方式是它們自旋一兩次競爭不到鎖后識趣地阻塞以等待前置節點釋放鎖后再來喚醒它。

另外如果鎖在自旋過程中被中斷了,或者自旋超時了,應該處于「取消」狀態。

基于每個 Node 可能所處的狀態,AQS 為其定義了一個變量  waitStatus,根據這個變量值對相應節點進行相關的操作,我們一起來看這看這個變量都有哪些值,是時候看一個 Node 結點的屬性定義了

static final class Node {     static final Node SHARED = new Node();//標識等待節點處于共享模式     static final Node EXCLUSIVE = null;//標識等待節點處于獨占模式      static final int CANCELLED = 1; //由于超時或中斷,節點已被取消     static final int SIGNAL = -1;  // 節點阻塞(park)必須在其前驅結點為 SIGNAL 的狀態下才能進行,如果結點為 SIGNAL,則其釋放鎖或取消后,可以通過 unpark 喚醒下一個節點,     static final int CONDITION = -2;//表示線程在等待條件變量(先獲取鎖,加入到條件等待隊列,然后釋放鎖,等待條件變量滿足條件;只有重新獲取鎖之后才能返回)     static final int PROPAGATE = -3;//表示后續結點會傳播喚醒的操作,共享模式下起作用      //等待狀態:對于condition節點,初始化為CONDITION;其它情況,默認為0,通過CAS操作原子更新     volatile int waitStatus;

通過狀態的定義,我們可以猜測一下 AQS 對線程自旋的處理:如果當前節點的上一個節點不為 head,且它的狀態為  SIGNAL,則結點進入阻塞狀態。來看下代碼以映證我們的猜測:

final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         boolean interrupted = false;         for (;;) {             final Node p = node.predecessor();             // 如果前一個節點是 head,則嘗試自旋獲取鎖             if (p == head && tryAcquire(arg)) {                 //  將 head 結點指向當前節點,原 head 結點出隊                 setHead(node);                 p.next = null; // help GC                 failed = false;                 return interrupted;             }             // 如果前一個節點不是 head 或者競爭鎖失敗,則進入阻塞狀態             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true;         }     } finally {         if (failed)             // 如果線程自旋中因為異常等原因獲取鎖最終失敗,則調用此方法             cancelAcquire(node);     } }

先來看第一種情況,如果當前結點的前一個節點是 head 結點,且獲取鎖(tryAcquire)成功的處理

怎么掌握AQS

可以看到主要的處理就是把 head 指向當前節點,并且讓原 head 結點出隊,這樣由于原 head 不可達, 會被垃圾回收。

注意其中 setHead 的處理

private void setHead(Node node) {     head = node;     node.thread = null;     node.prev = null; }

將 head 設置成當前結點后,要把節點的 thread, pre 設置成 null,因為之前分析過了,head 是虛節點,不保存除  waitStatus(結點狀態)的其他信息,所以這里把 thread ,pre 置為空,因為占有鎖的線程由 exclusiveThread 記錄了,如果  head 再記錄 thread 不僅多此一舉,反而在釋放鎖的時候要多操作一遍 head 的 thread 釋放。

如果前一個節點不是 head 或者競爭鎖失敗,則首先調用 shouldParkAfterFailedAcquire  方法判斷鎖是否應該停止自旋進入阻塞狀態:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus;              if (ws == Node.SIGNAL)        // 1. 如果前置頂點的狀態為 SIGNAL,表示當前節點可以阻塞了         return true;     if (ws > 0) {         // 2. 移除取消狀態的結點         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node;     } else {         // 3. 如果前置節點的 ws 不為 0,則其設置為 SIGNAL,         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return false; }

這一段代碼有點繞,需要稍微動點腦子,按以上步驟一步步來看

1、 首先我們要明白,根據之前 Node 類的注釋,如果前驅節點為 SIGNAL,則當前節點可以進入阻塞狀態。

怎么掌握AQS

如圖示:T2,T3 的前驅節點的 waitStatus 都為 SIGNAL,所以 T2,T3  此時都可以阻塞。

2、如果前驅節點為取消狀態,則前驅節點需要移除,這些采用了一個更巧妙的方法,把所有當前節點之前所有 waitStatus  為取消狀態的節點全部移除,假設有四個線程如下,T2,T3 為取消狀態,則執行邏輯后如下圖所示,T2, T3 節點會被 GC。

怎么掌握AQS

3、如果前驅節點小于等于 0,則需要首先將其前驅節點置為 SIGNAL,因為前文我們分析過,當前節點進入阻塞的一個條件是前驅節點必須為  SIGNAL,這樣下一次自旋后發現前驅節點為 SIGNAL,就會返回 true(即步驟 1)

shouldParkAfterFailedAcquire 返回 true 代表線程可以進入阻塞中斷,那么下一步 parkAndCheckInterrupt  就該讓線程阻塞了

private final boolean parkAndCheckInterrupt() {     // 阻塞線程     LockSupport.park(this);     // 返回線程是否中斷過,并且清除中斷狀態(在獲得鎖后會補一次中斷)     return Thread.interrupted(); }

這里的阻塞線程很容易理解,但為啥要判斷線程是否中斷過呢,因為如果線程在阻塞期間收到了中斷,喚醒(轉為運行態)獲取鎖后(acquireQueued 為  true)需要補一個中斷,如下所示:

public final void acquire(int arg) {     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         // 如果是因為中斷喚醒的線程,獲取鎖后需要補一下中斷         selfInterrupt(); }

至此,獲取鎖的流程已經分析完畢,不過還有一個疑惑我們還沒解開:前文不是說 Node 狀態為取消狀態會被取消嗎,那 Node  什么時候會被設置為取消狀態呢。

回頭看 acquireQueued

final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         // 省略自旋獲取鎖代碼             } finally {         if (failed)             // 如果線程自旋中因為異常等原因獲取鎖最終失敗,則調用此方法             cancelAcquire(node);     } }

看最后一個 cancelAcquire 方法,如果線程自旋中因為異常等原因獲取鎖最終失敗,則會調用此方法

private void cancelAcquire(Node node) {     // 如果節點為空,直接返回     if (node == null)         return;     // 由于線程要被取消了,所以將 thread 線程清掉     node.thread = null;      // 下面這步表示將 node 的 pre 指向之前第一個非取消狀態的結點(即跳過所有取消狀態的結點),waitStatus > 0 表示當前結點狀態為取消狀態     Node pred = node.prev;     while (pred.waitStatus > 0)         node.prev = pred = pred.prev;      // 獲取經過過濾后的 pre 的 next 結點,這一步主要用在后面的 CAS 設置 pre 的 next 節點上     Node predNext = pred.next;     // 將當前結點設置為取消狀態     node.waitStatus = Node.CANCELLED;      // 如果當前取消結點為尾結點,使用 CAS 則將尾結點設置為其前驅節點,如果設置成功,則尾結點的 next 指針設置為空     if (node == tail && compareAndSetTail(node, pred)) {         compareAndSetNext(pred, predNext, null);     } else {     // 這一步看得有點繞,我們想想,如果當前節點取消了,那是不是要把當前節點的前驅節點指向當前節點的后繼節點,但是我們之前也說了,要喚醒或阻塞結點,須在其前驅節點的狀態為 SIGNAL 的條件才能操作,所以在設置 pre 的 next 節點時要保證 pre 結點的狀態為 SIGNAL,想通了這一點相信你不難理解以下代碼。         int ws;         if (pred != head &&             ((ws = pred.waitStatus) == Node.SIGNAL ||              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&             pred.thread != null) {             Node next = node.next;             if (next != null && next.waitStatus <= 0)                 compareAndSetNext(pred, predNext, next);         } else {         // 如果 pre 為 head,或者  pre 的狀態設置 SIGNAL 失敗,則直接喚醒后繼結點去競爭鎖,之前我們說過, SIGNAL 的結點取消(或釋放鎖)后可以喚醒后繼結點             unparkSuccessor(node);         }         node.next = node; // help GC     } }

這一段代碼有點繞,我們一個個來看,首先考慮以下情況

1、首先第一步當前節點之前有取消結點時,則邏輯如下

怎么掌握AQS

2、如果當前結點既非頭結點的后繼結點,也非尾結點,即步驟 1 所示,則最終結果如下

怎么掌握AQS

這里肯定有人問,這種情況下當前節點與它的前驅結點無法被 GC 啊,還記得我們上文分析鎖自旋時的處理嗎,再看下以下代碼

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus;     // 省略無關代碼     if (ws > 0) {         // 移除取消狀態的結點         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node;     }      return false; }

這段代碼會將 node 的 pre 指向之前 waitStatus 為非 CANCEL 的節點

所以當 T4 執行這段代碼時,會變成如下情況

怎么掌握AQS

可以看到此時中間的兩個 CANCEL 節點不可達了,會被 GC

3、如果當前結點為 tail 結點,則結果如下,這種情況下當前結點不可達,會被 GC

怎么掌握AQS

4、如果當前結點為 head 的后繼結點時,如下

怎么掌握AQS

結果中的 CANCEL 結點同樣會在 tail 結點自旋調用 shouldParkAfterFailedAcquire 后不可達,如下

怎么掌握AQS

自此我們終于分析完了鎖的獲取流程,接下來我們來看看鎖是如何釋放的。

鎖釋放

不管是公平鎖還是非公平鎖,最終都是調的 AQS 的如下模板方法來釋放鎖

// java.util.concurrent.locks.AbstractQueuedSynchronizer  public final boolean release(int arg) {     // 鎖釋放是否成功     if (tryRelease(arg)) {         Node h = head;         if (h != null && h.waitStatus != 0)             unparkSuccessor(h);         return true;     }     return false; }

tryRelease 方法定義在了 AQS 的子類 Sync 方法里

// java.util.concurrent.locks.ReentrantLock.Sync  protected final boolean tryRelease(int releases) {     int c = getState() - releases;     // 只有持有鎖的線程才能釋放鎖,所以如果當前鎖不是持有鎖的線程,則拋異常     if (Thread.currentThread() != getExclusiveOwnerThread())         throw new IllegalMonitorStateException();     boolean free = false;     // 說明線程持有的鎖全部釋放了,需要釋放 exclusiveOwnerThread 的持有線程     if (c == 0) {         free = true;         setExclusiveOwnerThread(null);     }     setState(c);     return free; }

鎖釋放成功后該干嘛,顯然是喚醒之后 head 之后節點,讓它來競爭鎖

// java.util.concurrent.locks.AbstractQueuedSynchronizer  public final boolean release(int arg) {     // 鎖釋放是否成功     if (tryRelease(arg)) {         Node h = head;         if (h != null && h.waitStatus != 0)             // 鎖釋放成功后,喚醒 head 之后的節點,讓它來競爭鎖             unparkSuccessor(h);         return true;     }     return false; }

這里釋放鎖的條件為啥是 h != null && h.waitStatus != 0 呢。

如果 h == null,  這有兩種可能,一種是一個線程在競爭鎖,現在它釋放了,當然沒有所謂的喚醒后繼節點,一種是其他線程正在運行競爭鎖,只是還未初始化頭節點,既然其他線程正在運行,也就無需執行喚醒操作

如果 h != null 且 h.waitStatus == 0,說明 head 的后繼節點正在自旋競爭鎖,也就是說線程是運行狀態的,無需喚醒。

如果 h != null 且 h.waitStatus < 0, 此時 waitStatus 值可能為 SIGNAL,或  PROPAGATE,這兩種情況說明后繼結點阻塞需要被喚醒

來看一下喚醒方法 unparkSuccessor:

private void unparkSuccessor(Node node) {     // 獲取 head 的 waitStatus(假設其為 SIGNAL),并用 CAS 將其置為 0,為啥要做這一步呢,之前我們分析過多次,其實 waitStatus = SIGNAL(< -1)或 PROPAGATE(-&middot;3) 只是一個標志,代表在此狀態下,后繼節點可以喚醒,既然正在喚醒后繼節點,自然可以將其重置為 0,當然如果失敗了也不影響其喚醒后繼結點     int ws = node.waitStatus;     if (ws < 0)         compareAndSetWaitStatus(node, ws, 0);      // 以下操作為獲取隊列第一個非取消狀態的結點,并將其喚醒     Node s = node.next;     // s 狀態為非空,或者其為取消狀態,說明 s 是無效節點,此時需要執行 if 里的邏輯     if (s == null || s.waitStatus > 0) {         s = null;         // 以下操作為從尾向前獲取最后一個非取消狀態的結點         for (Node t = tail; t != null && t != node; t = t.prev)             if (t.waitStatus <= 0)                 s = t;     }     if (s != null)         LockSupport.unpark(s.thread); }

這里的尋找隊列的第一個非取消狀態的節點為啥要從后往前找呢,因為節點入隊并不是原子操作,如下

怎么掌握AQS

線程自旋時時是先執行 node.pre = pred, 然后再執行 pred.next = node,如果 unparkSuccessor  剛好在這兩者之間執行,此時是找不到 head 的后繼節點的,如下

怎么掌握AQS

如何利用 AQS 自定義一個互斥鎖

AQS 通過提供 state 及 FIFO  隊列的管理,為我們提供了一套通用的實現鎖的底層方法,基本上定義一個鎖,都是轉為在其內部定義 AQS 的子類,調用 AQS 的底層方法來實現的,由于 AQS  在底層已經為了定義好了這些獲取 state 及 FIFO 隊列的管理工作,我們要實現一個鎖就比較簡單了,我們可以基于 AQS  來實現一個非可重入的互斥鎖,如下所示

public class Mutex  {      private Sync sync = new Sync();          public void lock () {         sync.acquire(1);     }          public void unlock () {         sync.release(1);     }      private static class Sync extends AbstractQueuedSynchronizer {         @Override         protected boolean tryAcquire (int arg) {             return compareAndSetState(0, 1);         }          @Override         protected boolean tryRelease (int arg) {             setState(0);             return true;         }          @Override         protected boolean isHeldExclusively () {             return getState() == 1;         }     } }

可以看到區區幾行代碼就實現了,確實很方便。

感謝各位的閱讀,以上就是“怎么掌握AQS”的內容了,經過本文的學習后,相信大家對怎么掌握AQS這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

aqs
AI

延长县| 广平县| 稻城县| 冕宁县| 商水县| 新邵县| 公主岭市| 商洛市| 肇庆市| 牡丹江市| 阿克陶县| 凭祥市| 丹凤县| 肇东市| 罗平县| 怀远县| 汝城县| 镇康县| 图木舒克市| 太谷县| 滕州市| 苍南县| 保康县| 洛阳市| 内乡县| 济南市| 宜兴市| 铁力市| 屏东市| 灵山县| 安吉县| 陇西县| 东阿县| 乃东县| 灵宝市| 馆陶县| 仪陇县| 深州市| 商水县| 张家界市| 阿鲁科尔沁旗|