您好,登錄后才能下訂單哦!
這篇文章主要介紹“Java多線程并發AbstractQueuedSynchronizer怎么使用”,在日常操作中,相信很多人在Java多線程并發AbstractQueuedSynchronizer怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Java多線程并發AbstractQueuedSynchronizer怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
AbstractQueuedSynchronizer 簡稱 AQS ,抽象隊列同步器,用來實現依賴于先進先出(FIFO)等待隊列的阻塞鎖和相關同步器的框架。這個類旨在為大多數依賴單個原子 int 值來表示同步狀態的同步器提供基礎的能力封裝。 例如 ReentrantLock、Semaphore 和 FutureTask 等等都是基于 AQS 實現的,我們也可以繼承 AQS 實現自定義同步器。
網絡上常見的解釋是:
如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的工作線程,并且將共享資源設置為鎖定狀態。如果被請求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。
個人理解,可以把 AQS 當成一把鎖,它內部通過一個隊列記錄了所有要使用鎖的請求線程,并且管理鎖自己當前的狀態(鎖定、空閑等狀態)。相當于 AQS 就是共享資源本身,當有線程請求這個資源是,AQS 將請求資源的線程記錄當前工作線程,并將自身設置為鎖定狀態。后續其他線程請求這個 AQS 時,將請求線程記錄到等待隊列中,其他線程此時未獲取到鎖,進入阻塞等待狀態。
在深入 AQS 前,我們應該持有一個疑問是為什么需要 AQS ?synchronized 關鍵字和 CAS 原子類都提供了豐富的同步方案了。
但在實際的需求中,對同步的需求是各式各樣的,比如,我們需要對一個鎖加上超時時間,那么光憑 synchronized 關鍵字或是 CAS 就無法實現了,需要對其進行二次封裝。而 JDK 中提供了豐富的同步方案,比如 ReentrantLock ,而 ReentrantLock 是就是基于 AQS 實現的。
這部分內容來自 JDK 的注釋
要將此類用作同步器的基礎,請在適用時重新定義以下方法,方法是使用 getState、setState 和/或 compareAndSetState 檢查和/或修改同步狀態:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
默認情況下,這些方法中的每一個都會引發 UnsupportedOperationException。 這些方法的實現必須是內部線程安全的,并且通常應該是短暫的而不是阻塞的。 定義這些方法是使用此類的唯一受支持的方法。 所有其他方法都被聲明為最終方法,因為它們不能獨立變化。
您可能還會發現從 AbstractOwnableSynchronizer 繼承的方法對于跟蹤擁有獨占同步器的線程很有用。 鼓勵您使用它們——這使監視和診斷工具能夠幫助用戶確定哪些線程持有鎖。
即使此類基于內部 FIFO 隊列,它也不會自動執行 FIFO 采集策略。
獨占同步的核心形式為:
Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread;
(共享模式類似,但可能涉及級聯信號。)
因為在入隊之前調用了獲取中的檢查,所以新獲取的線程可能會搶在其他被阻塞和排隊的線程之前。 但是,如果需要,您可以定義 tryAcquire 和/或 tryAcquireShared 以通過內部調用一個或多個檢查方法來禁用插入,從而提供公平的 FIFO 獲取順序。 特別是,如果 hasQueuedPredecessors(一種專門為公平同步器使用的方法)返回 true,大多數公平同步器可以定義 tryAcquire 返回 false。 其他變化是可能的。
默認插入(也稱為貪婪、放棄和避免護送)策略的吞吐量和可擴展性通常最高。 雖然這不能保證公平或無饑餓,但允許較早排隊的線程在較晚的排隊線程之前重新競爭,并且每次重新競爭都有無偏見的機會成功對抗傳入線程。 此外,雖然獲取不是通常意義上的“旋轉”,但它們可能會在阻塞之前執行多次調用 tryAcquire 并穿插其他計算。 當獨占同步只是短暫地保持時,這提供了自旋的大部分好處,而沒有大部分責任。 如果需要,您可以通過預先調用獲取具有“快速路徑”檢查的方法來增加這一點,可能會預先檢查 hasContended 和/或 hasQueuedThreads 以僅在同步器可能不會被爭用時才這樣做。
此類通過將其使用范圍專門用于可以依賴 int 狀態、獲取和釋放參數以及內部 FIFO 等待隊列的同步器,部分地為同步提供了高效且可擴展的基礎。 如果這還不夠,您可以使用原子類、您自己的自定義 java.util.Queue 類和 LockSupport 阻塞支持從較低級別構建同步器。
這是一個不可重入互斥鎖類,它使用值 0 表示未鎖定狀態,使用值 1 表示鎖定狀態。 雖然不可重入鎖并不嚴格要求記錄當前所有者線程,但無論如何,此類都會這樣做以使使用情況更易于監控。
它還支持條件并公開一些檢測方法:
class Mutex implements Lock, java.io.Serializable { // Our internal helper class private static class Sync extends AbstractQueuedSynchronizer { // Acquires the lock if state is zero public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (!isHeldExclusively()) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // Reports whether in locked state public boolean isLocked() { return getState() != 0; } public boolean isHeldExclusively() { // a data race, but safe due to out-of-thin-air guarantees return getExclusiveOwnerThread() == Thread.currentThread(); } // Provides a Condition public Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // The sync object does all the hard work. We just forward to it. private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isLocked(); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
這是一個類似于 CountDownLatch 的鎖存器類,只是它只需要一個信號即可觸發。 因為鎖存器是非獨占的,所以它使用共享的獲取和釋放方法。
class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
AbstractQueuedSynchronizer 繼承自 AbstractOwnableSynchronizer ,后者邏輯十分簡單:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; // 設置當前持有鎖的線程 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
AbstractOwnableSynchronizer 只是定義了設置持有鎖的線程的能力。
AQS 的等待隊列是 CLH (Craig , Landin , and Hagersten) 鎖定隊列的變體,CLH 鎖通常用于自旋鎖。AQS 將每個請求共享資源的線程封裝程一個 CLH 節點來實現的,這個節點的定義是:
/** CLH Nodes */ abstract static class Node { volatile Node prev; // initially attached via casTail volatile Node next; // visibly nonnull when signallable Thread waiter; // visibly nonnull when enqueued volatile int status; // written by owner, atomic bit ops by others // methods for atomic operations final boolean casPrev(Node c, Node v) { // for cleanQueue return U.weakCompareAndSetReference(this, PREV, c, v); // 通過 CAS 確保同步設置 prev 的值 } final boolean casNext(Node c, Node v) { // for cleanQueue return U.weakCompareAndSetReference(this, NEXT, c, v); } final int getAndUnsetStatus(int v) { // for signalling return U.getAndBitwiseAndInt(this, STATUS, ~v); } final void setPrevRelaxed(Node p) { // for off-queue assignment U.putReference(this, PREV, p); } final void setStatusRelaxed(int s) { // for off-queue assignment U.putInt(this, STATUS, s); } final void clearStatus() { // for reducing unneeded signals U.putIntOpaque(this, STATUS, 0); } private static final long STATUS = U.objectFieldOffset(Node.class, "status"); private static final long NEXT = U.objectFieldOffset(Node.class, "next"); private static final long PREV = U.objectFieldOffset(Node.class, "prev"); }
CLH 的節點的數據結構是一個雙向鏈表的節點,只不過每個操作都是經過 CAS 確保線程安全的。要加入 CLH 鎖隊列,您可以將其自動拼接為新的尾部;要出隊,需要設置 head 字段,以便下一個符合條件的等待節點成為新的頭節點:
+------+ prev +-------+ prev +------+ | | <---- | | <---- | | | head | next | first | next | tail | | | ----> | | ----> | | +------+ +-------+ +------+
Node 中的 status 字段表示當前節點代表的線程的狀態。
status 存在三種狀態:
static final int WAITING = 1; // must be 1 static final int CANCELLED = 0x80000000; // must be negative static final int COND = 2; // in a condition wait
WAITING:表示等待狀態,值為 1。
CANCELLED:表示當前線程被取消,為 0x80000000。
COND:表示當前節點在等待條件,也就是在條件等待隊列中,值為 2。
在上面的 COND 中,提到了一個條件等待隊列的概念。
首先,Node 是一個靜態抽象類,它在 AQS 中存在三種實現類:
ExclusiveNode
SharedNode
ConditionNode
前兩者都是空實現:
static final class ExclusiveNode extends Node { } static final class SharedNode extends Node { }
而最后的 ConditionNode 多了些內容:
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker { ConditionNode nextWaiter; // 檢查線程是否中斷或當前線程的狀態已取消等待。 public final boolean isReleasable() { return status <= 1 || Thread.currentThread().isInterrupted(); } public final boolean block() { while (!isReleasable()) LockSupport.park(); return true; } }
ConditionNode 拓展了兩個方法:
檢查線程狀態是否處于等待。
阻塞當前線程:當前線程正在等待執行,通過 LockSupport.park()
阻塞當前線程。這里通過 while 循環持續重試,嘗試阻塞線程。
而到這一步,所有的信息都指向了一個相關的類 Condition 。
AQS 中的 Condition 的實現是內部類 ConditionObject :
public class ConditionObject implements Condition, java.io.Serializable
ConditionObject 實現了 Condition 接口和序列化接口,后者說明了該類型的對象可以進行序列化。而前者 Condition 接口,定義了一些行為能力:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
Condition 中定義的能力與 Java 的 Object 類中提供的同步相關方法(wait、notify 和 notifyAll) 代表的能力極為相似。前者提供了更豐富的等待方法。類比的角度來看,如果 Object 是配合 synchronized 關鍵字使用的,那么 Condition 就是用來配合基于 AQS 實現的鎖來使用的接口。
可以將 Condition 的方法分為兩組:等待和喚醒。
// 等待,當前線程在接到信號或被中斷之前一直處于等待狀態 void await() throws InterruptedException; // 等待,當前線程在接到信號之前一直處于等待狀態,不響應中斷 void awaitUninterruptibly(); //等待,當前線程在接到信號、被中斷或到達指定等待時間之前一直處于等待狀態 long awaitNanos(long nanosTimeout) throws InterruptedException; // 等待,當前線程在接到信號、被中斷或到達指定等待時間之前一直處于等待狀態。 // 此方法在行為上等效于: awaitNanos(unit.toNanos(time)) > 0 boolean await(long time, TimeUnit unit) throws InterruptedException; // 等待,當前線程在接到信號、被中斷或到達指定最后期限之前一直處于等待狀態 boolean awaitUntil(Date deadline) throws InterruptedException;
// 喚醒一個等待線程。如果所有的線程都在等待此條件,則選擇其中的一個喚醒。在從 await 返回之前,該線程必須重新獲取鎖。 void signal(); // 喚醒所有等待線程。如果所有的線程都在等待此條件,則喚醒所有線程。在從 await 返回之前,每個線程都必須重新獲取鎖。 void signalAll();
分析完 Condition ,繼續來理解 ConditionObject。 ConditionObject 是 Condition 在 AQS 中的實現:
public class ConditionObject implements Condition, java.io.Serializable { /** condition 隊列頭節點 */ private transient ConditionNode firstWaiter; /** condition 隊列尾節點 */ private transient ConditionNode lastWaiter; // ---- Signalling methods ---- // 移除一個或所有等待者并將其轉移到同步隊列。 private void doSignal(ConditionNode first, boolean all) public final void signal() public final void signalAll() // ---- Waiting methods ---- // 將節點添加到條件列表并釋放鎖定。 private int enableWait(ConditionNode node) // 如果最初放置在條件隊列中的節點現在準備好重新獲取同步隊列,則返回 true。 private boolean canReacquire(ConditionNode node) // 從條件隊列中取消鏈接給定節點和其他非等待節點,除非已經取消鏈接。 private void unlinkCancelledWaiters(ConditionNode node) // 實現不可中斷的條件等待 public final void awaitUninterruptibly() public final void await() public final long awaitNanos(long nanosTimeout) public final boolean awaitUntil(Date deadline) public final boolean await(long time, TimeUnit unit) // ---- support for instrumentation ---- // 如果此條件是由給定的同步對象創建的,則返回 true。 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) // 查詢是否有線程在此條件下等待。 protected final boolean hasWaiters() // 返回在此條件下等待的線程數的估計值。 protected final int getWaitQueueLength() // 返回一個集合,其中包含可能正在等待此 Condition 的那些線程。 protected final Collection<Thread> getWaitingThreads() }
ConditionObject 實現了 Condition 能力的基礎上,拓展了對 ConditionNode 相關的操作,方法通過其用途可以劃分為三組:
Signalling
Waiting
其他方法
public final void signal() { ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); if (first != null) doSignal(first, false); } public final void signalAll() { ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); if (first != null) doSignal(first, true); }
喚醒方法主要邏輯是通過 doSignal(ConditionNode first, boolean all)
實現的。doSignal
方法根據參數,進行一個 while 循環,
兩個方法傳遞進來的都是頭節點,也就是從 ConditionNode 雙向鏈表的頭節點開始遍歷,如果第二個參數 all 設置為 false ,只執行一次遍歷中邏輯。循環中的邏輯是:
// 最終都調用了這個方法 private void doSignal(ConditionNode first, boolean all) { while (first != null) { // 取出 first 的下一個節點,設置為 next ConditionNode next = first.nextWaiter; // 如果 first 是鏈表中唯一的一個節點,設置 lastWaiter 為 null if ((firstWaiter = next) == null) // lastWaiter = null; // 讀取 first 的 status ,檢查是否是 COND if ((first.getAndUnsetStatus(COND) & COND) != 0) { // first 處于 COND 狀態,出隊 enqueue(first); // 通過 all 來判斷是否將等待的線程都進行喚醒邏輯。 if (!all) break; } first = next; // 循環指向下一個 } }
關鍵方法 enqueue(ConditionNode)
是 AQS 中的方法:
final void enqueue(Node node) { if (node != null) { for (;;) { // 獲取尾節點 Node t = tail; // 避免不必要的內存屏障 node.setPrevRelaxed(t); if (t == null) // 空隊列首先初始化一個頭節點 tryInitializeHead(); else if (casTail(t, node)) { // 更新 tail 指針為 node (這里不是將 t = node) t.next = node; // 為節點 t 的 next 指針指向 node if (t.status < 0) // t 的狀態 < 0 一般代表后續節點需要運行了 LockSupport.unpark(node.waiter); break; } } } }
可以看出 enqueue(ConditionNode)
中本質上是通過調用 LockSupport.unpark(node.waiter);
來喚醒線程的。
對外提供的等待能力的方法包括:
// 實現不可中斷的條件等待 public final void awaitUninterruptibly() public final void await() public final long awaitNanos(long nanosTimeout) public final boolean awaitUntil(Date deadline) public final boolean await(long time, TimeUnit unit)
它們內部都用到了公共的邏輯:
// 添加節點到 condition 列表并釋放鎖 private int enableWait(ConditionNode node) private boolean canReacquire(ConditionNode node) private void unlinkCancelledWaiters(ConditionNode node)
private int enableWait(ConditionNode node) { if (isHeldExclusively()) { // 如果是當前線程持有鎖資源 node.waiter = Thread.currentThread(); // 將節點的綁定的線程設置為當前線程 node.setStatusRelaxed(COND | WAITING); // 設置節點狀態 ConditionNode last = lastWaiter; // 獲取 尾節點 if (last == null) firstWaiter = node; // 如果列表為空, node 就是頭節點 else last.nextWaiter = node; // 否則,將尾節點的下一個節點設置為 node lastWaiter = node; // 更新 lastWaiter 指針 int savedState = getState(); // 獲取當前線程的同步狀態 if (release(savedState)) // 在當前持有鎖資源的線程嘗試釋放鎖 return savedState; } node.status = CANCELLED; // 當前線程未持有鎖資源,更新 node 的狀態為 CANCELLED throw new IllegalMonitorStateException(); // 并拋出 IllegalMonitorStateException }
這個方法對傳入的節點插入到等待隊列的隊尾,并根據當前線程的狀態進行了檢查。關鍵方法的 release(int)
:
public final boolean release(int arg) { if (tryRelease(arg)) { // 嘗試釋放鎖資源 signalNext(head); // 釋放成功,喚醒下一個等待中的線程 return true; } return false; }
喚醒給定節點的下一個節點(如果存在),通過調用 LockSupport.unpark(s.waiter)
喚醒節點對應的線程。
private static void signalNext(Node h) { Node s; if (h != null && (s = h.next) != null && s.status != 0) { s.getAndUnsetStatus(WAITING); LockSupport.unpark(s.waiter); } }
檢查傳入的 node 是否在鏈表中,且不為頭節點:
// 如果最初放置在條件隊列中的節點現在準備好重新獲取同步隊列,則返回 true。 private boolean canReacquire(ConditionNode node) { // 檢查傳入的 node 是否在鏈表中,且不為頭節點 return node != null && node.prev != null && isEnqueued(node); }
// in AQS final boolean isEnqueued(Node node) { // 從 Node 雙向鏈表尾部開始遍歷,是否存在 node for (Node t = tail; t != null; t = t.prev) if (t == node) return true; return false; }
private void unlinkCancelledWaiters(ConditionNode node) { // node 為空 / node 不是隊尾 / node 是最后一個節點 if (node == null || node.nextWaiter != null || node == lastWaiter) { ConditionNode w = firstWaiter, trail = null; // w = first , trail = null // /從鏈表頭節點開始遍歷 while (w != null) { ConditionNode next = w.nextWaiter; // 取出下一個節點 if ((w.status & COND) == 0) { // 當前節點的狀態包含 COND w.nextWaiter = null; // 當前節點的 next 設置為 null if (trail == null) // 如果 trail 指針為空 firstWaiter = next; // firstWaiter 指向 next else trail.nextWaiter = next; // trail 指針不為空,尾指針的 next 指向當前節點的下一個節點 if (next == null) lastWaiter = trail; // 最后將 lastWaiter 設置為 trail (過濾后的 trail 鏈表插入到隊尾) } else trail = w; // 頭節點狀態不是 COND,當前節點設置為 trail 指針。 w = next; // 下一個循環 } } }
這個方法遍歷 ConditionNode 隊列,過濾掉狀態不包含 COND 的節點。
上面三個方法是內部處理邏輯。而對外暴露的是以下五個方法:
public final void awaitUninterruptibly() public final void await() public final long awaitNanos(long nanosTimeout) public final boolean awaitUntil(Date deadline) public final boolean await(long time, TimeUnit unit)
除了awaitUninterruptibly()
,其他方法所代表的能力和 Condition 接口中定義的所代表的能力基本一致。
awaitUninterruptibly()
是用于實現不可中斷的條件等待:
public final void awaitUninterruptibly() { ConditionNode node = new ConditionNode(); // 創建一個新的 node int savedState = enableWait(node); // 將這個新 node 插入,并返回 node 的狀態 LockSupport.setCurrentBlocker(this); // 設置 blocker boolean interrupted = false, rejected = false; // flag:中斷和拒絕 while (!canReacquire(node)) { // 當前線程關聯的 node 不再等待隊列 if (Thread.interrupted()) // 嘗試中斷線程 interrupted = true; else if ((node.status & COND) != 0) { // 中斷線程不成功的情況下,如果 node 狀態包含 COND // 嘗試阻塞線程 try { if (rejected) node.block(); // 實際上也是 LockSupport.park else ForkJoinPool.managedBlock(node); } catch (RejectedExecutionException ex) { rejected = true; // 拒絕執行 } catch (InterruptedException ie) { interrupted = true; // 中斷 } } else Thread.onSpinWait(); // 當前線程無法繼續執行 } // 不是隊列中的唯一節點時執行下面邏輯 LockSupport.setCurrentBlocker(null); node.clearStatus(); // 清除 node 的 status acquire(node, savedState, false, false, false, 0L); // 【*】重點方法 if (interrupted) Thread.currentThread().interrupt(); }
在這個方法中,首先講解兩個方法:
Thread.onSpinWait()
表示調用者暫時無法繼續,直到其他活動發生一個或多個動作。 通過在自旋等待循環構造的每次迭代中調用此方法,調用線程向運行時指示它正忙于等待。 運行時可能會采取措施來提高調用自旋等待循環構造的性能。
ForkJoinPool.managedBlock(node)
則是通過 Blocker 來檢查線程的運行狀態,然后嘗試阻塞線程。
最后是最關鍵的方法 acquire
,它的詳細邏輯放到最后講解, 這個方法的作用就是,當前線程進入等待后,需要將關聯的線程開啟一個自旋,掛起后能夠持續去嘗試獲取鎖資源。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ConditionNode node = new ConditionNode(); int savedState = enableWait(node); LockSupport.setCurrentBlocker(this); // for back-compatibility boolean interrupted = false, cancelled = false, rejected = false; while (!canReacquire(node)) { if (interrupted |= Thread.interrupted()) { if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; // else interrupted after signal } else if ((node.status & COND) != 0) { try { if (rejected) node.block(); else ForkJoinPool.managedBlock(node); } catch (RejectedExecutionException ex) { rejected = true; } catch (InterruptedException ie) { interrupted = true; } } else Thread.onSpinWait(); // awoke while enqueuing } LockSupport.setCurrentBlocker(null); node.clearStatus(); acquire(node, savedState, false, false, false, 0L); if (interrupted) { if (cancelled) { unlinkCancelledWaiters(node); throw new InterruptedException(); } Thread.currentThread().interrupt(); } }
await()
方法相較于 awaitUninterruptibly()
,while 邏輯基本一致,最后多了一步 cancelled 狀態檢查,如果 cancelled = true ,調用 unlinkCancelledWaiters(node)
,去清理等待隊列。
awaitNanos(long)
在 await()
之上多了對超時時間的計算和處理邏輯:
public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ConditionNode node = new ConditionNode(); int savedState = enableWait(node); long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; long deadline = System.nanoTime() + nanos; boolean cancelled = false, interrupted = false; while (!canReacquire(node)) { if ((interrupted |= Thread.interrupted()) || (nanos = deadline - System.nanoTime()) <= 0L) { // 多了一個超時條件 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; } else LockSupport.parkNanos(this, nanos); } node.clearStatus(); acquire(node, savedState, false, false, false, 0L); if (cancelled) { unlinkCancelledWaiters(node); if (interrupted) throw new InterruptedException(); } else if (interrupted) Thread.currentThread().interrupt(); long remaining = deadline - System.nanoTime(); // avoid overflow return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; }
awaitUntil(Date)
和 awaitNanos(long)
同理,只是將超時計算改成了日期計算:
long abstime = deadline.getTime(); // ... boolean cancelled = false, interrupted = false; while (!canReacquire(node)) { if ((interrupted |= Thread.interrupted()) || System.currentTimeMillis() >= abstime) { // 時間檢查 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; } else LockSupport.parkUntil(this, abstime); }
await(long, TimeUnit)
則是邏輯更加與 awaitNanos(long)
相似了, 只是多了一步計算 awaitNanos(long nanosTimeout)
中的參數 nanosTimeout 的操作:
long nanosTimeout = unit.toNanos(time);
在 wait 方法組中,最終都會調用到這個邏輯:
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // 在取消第一個線程時重試 boolean interrupted = false, first = false; Node pred = null; // 入隊時節點的前一個指針 /* * 反復執行: * 檢查當前節點是否是 first * 若是, 確保 head 穩定,否則確保有效的 prev * 如果節點是第一個或尚未入隊,嘗試獲取 * 否則,如果節點尚未創建,則創建這個它 * 否則,如果節點尚未入隊,嘗試入隊一次 * 否則,如果通過 park 喚醒,重試,最多 postSpins 次 * 否則,如果 WAITING 狀態未設置,設置并重試 * 否則,park 并且清除 WAITING 狀態, 檢查取消邏輯 */ for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }
這個方法會在 Node 關聯的線程讓出鎖資源后,開啟一個死循環嘗試通過 tryAcquire
嘗試獲取鎖資源,最后如果超時或嘗試次數超出限制,會通過 LockSupport.park
阻塞自身。
到此,關于“Java多線程并發AbstractQueuedSynchronizer怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。