您好,登錄后才能下訂單哦!
一、什么是同步器
同步器是用來構建鎖或者其他同步組件的基礎框架,它使用一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,它能實現大部分的同步需求。
同步器是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。可以這樣理解二者的關系:鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個線程的并行訪問),隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了狀態管理、線程的排隊、等待與喚醒等底層操作。鎖和同步容器很好地隔離了使用者和實現者所需要關注的領域。
二、同步器的基本成員(介紹常用的類好方法)
Node 是AQS的內部類構成AQS隊列的一種數據結構。
成員變量 | 作用 |
---|---|
waitStatus | 記錄節點的等待狀態。包括如下狀態:① CANCELLED,值為1,由于同步隊列中等待線程超時或者被中斷,需要從同步隊列中取消等待,節點進入該狀態將不會變化。② SIGNAL值為-1,后繼節點的線程處于等待狀態,而當前線程如果釋放了同步狀態或者取消,將會通知后繼節點,使得后繼節點得以運行。③ CONDITION值為-2,節點在等待隊列中,節點等待在Condtion上,當其他線程對Condtion調用了signal方法后,該節點將會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中。④ PROPAGATE值為-3,表示下一次共享式同步狀態將會無條件地被傳播下去。⑤ INITAL,值為0,初始狀態 |
SHARED = new Node() | 表示共享式的node |
EXCLUSIVE = null | 獨占式的node |
Node prev | Node的前節點 |
Node next | Node的后節點 |
nextWaitert | 等待隊列的中node的下一個節點 |
ConditionObject是AQS的內部類構成類似Object的等待/通知機制。
成員/方法 | 作用 |
---|---|
Node firstWaiter | 等待隊列的頭節點 |
Node lastWaiter | 等待隊列的尾節點 |
await() | 當前線程進入等待狀態知道被通知或中斷,當前線程進入運行狀態且從await()返回的情況如下,包括:① 其它線程調用Interrupt()方法中斷當前線程。② 如果當前線程從await()方法返回,那么表明該線程已經獲取了Condtion對象鎖對應的鎖 |
awaitUninterruptibly() | 當前線程進入等待直到被通知,該方法對中斷不敏感 |
awaitNanos(long nanosTimeout)) | 當前線程進入等待狀態直到被通知、中斷或者超時。返回值表示剩余的時間,如果在nanosTimeout納秒之前被喚醒,那么返回就是(nanosTimeout-實際耗時);如果返回是0或者負數,那么可以認定已經超時了 |
awaitUntil(Date deadline) | 當前線程進入等待狀態直到被通知、中斷或者某個時間。如果沒有到指定時間就被通知,方法返回true,否則,表示超時,方法返回false |
signal() | 喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須獲得與Condition相關的鎖 |
signalAll | 喚醒所有等待在Condition上的線程,能夠從等待方法返回的線程必須獲得與Condition相關聯的鎖 |
AQS主要成員
成員變量 | 作用 |
---|---|
state | 維護鎖的一個變量(同步狀態,很重要)① setState 。② getState。 ③ compareAndSetState。 |
Node head | FIFO同步隊列的頭結點 。 |
Node tail | FIFO同步隊列的尾結點 。 |
AQS主要方法
方法名 | 作用 |
---|---|
acquire() | 獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法會調用重寫的tryAcquire(arg)方法(需要鎖自己實現) |
release(int arg) | 獨占式釋放狀態,如果釋放狀態成功,則會去喚醒頭結點;釋放狀態調用tryRelease(arg)方法(需要自己實現) |
acquireShared(int arg) | 共享式獲取同步狀態,也就是說可以幾個線程同時獲取同步狀態,如果當前線程未獲取同步狀態,將會進入同步隊列。 |
releaseShared() | 共享式釋放狀態,釋放之后會喚醒頭結點 |
acquireInterruptibly() | 響應中斷的獨占式獲取同步狀態,當前線程未獲取同步狀態而進入同步隊列中,如果當前線程被中斷,則該方法會拋出中斷異常,并返回 |
tryAcquireNanos() | 在acquireInterruptibly()基礎上增加超時限制,如果當前線程在超時時間內沒有獲取同步狀態,那么將返回false,如果獲取到了返回true |
acquireSharedInterruptibly() | 響應中斷的共享式獲取同步狀態 |
tryAcquireSharedNanos() | 在acquireSharedInterruptibly()的基礎上增加超時限制 |
以上就是AQS的一些基本成員和方法,下面主要從現實的角度分析這些方法,理解這些方法的實現,能剛好的幫助我們去理解鎖。
三、AQS的方法實現分析
1)、獨占系列的方法
①、acquire()獨占式獲取同步狀態,表示只會有一個線程獲取,其它線程進入同步隊列。
源代碼如下:
// 獲取鎖的方法(獨占模式)
public final void acquire(int arg) {
// tryAcquire(arg) 這個方法需要我們自己去實現,如果獲取失敗,
// 調用addWaiter構造節點
// acquireQueued
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我們可以看見acquire方法內部調用了tryAcquire(arg)方法,這個方法需要構造同步組件的類自己去實現,不過返回值已經被AQS定義好了,返回true代表獲取同步狀態成功,返回false代表失敗,需要將線程構造節點加入同步隊列,就是調用acquireQueued這個方法。
acquireQueued這個方法實際是先去調用了addWaiter方法。
addWaiter(),這個方法其實就是把當前節點加入到同步隊列,加入成功才返回,其實隊列初始化時會制造一個空的節點,然后在空的節點后面設置同步節點(可以理解為每次獲取獲取鎖的那個線程其實就是頭結點,它是一個空的節點,結合acquireQueued方法,每次獲取鎖之后,該節點就會升級為頭節點,并且變成一個空節點。)
private Node addWaiter(Node mode) {
// 構造一個節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 獲取尾節點
Node pred = tail;
// 判斷尾節點是否為空
if (pred != null) {
// 不為空,設置node節點的上一個節點為pred(也就是尾節點)
node.prev = pred;
// cas設置尾節點
if (compareAndSetTail(pred, node)) {
// 成功后,設置pred節點的next節點為node,返回
pred.next = node;
return node;
}
}
enq(node);
return node;
}
acquireQueued()方法,通過上面的addWaiter方法我們已經把這個節點加入同步隊列,接下來需要處理這個節點。首先判斷自己的前節點是否是頭結點,自己是否獲取到同步狀態,如果滿足,把自己設置尾頭結點,返回,如果不是,進入shouldParkAfterFailedAcquire(詳情見后面方法分析)方法主要作用是判斷自己的前置節點是否是SIGNAL狀態,是的話自己就可以阻塞自己了,調用parkAndCheckInterrupt(詳情見后面方法分析)方法,直到被喚醒或者中斷。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取前一個節點
final Node p = node.predecessor();
// p是頭結點,自己獲取鎖成功
if (p == head && tryAcquire(arg)) {
// 設置自己為頭節點,變成一個空節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 找到前節點為signal,然后阻塞自己
// 清理等待超時或者中斷的節點
// 嘗試設置線程的狀態為signal
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 出現異常
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法,這個方法主要做三件事,判斷自己的前置節點是否是SIGNAL,返回true,就可以阻塞了,不是如果狀態大于0,證明前面的節點被中斷或者超時了,需要從隊列清理了,不是大于0,就利用cas設置前置節點為SIGNAL,返回false。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取node前節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 如果pred節點釋放了狀態,會通知自己
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 大于0證明,前面的線程等待超時或者已經被中斷,需要從節點中移除
// 需要找到不大于0的那個節點
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 找到小于等于0的前節點,設置為SIGNAL
// 這個地方ws值只會為PROPAGATE或者0
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt方法,這個方法需要前面的方法返回true才會執行,它會阻塞node的這個線程,返回線程的中斷中斷狀態并清理Thread.interrupted(),所以獨占式獲取同步狀態對中斷不響應的。
private final boolean parkAndCheckInterrupt() {
// 阻塞線程
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire方法,在finally塊里面,出現異常就會執行這個方法,做一些處理當前node的操作。
// 異常后,finally里面執行的方法
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// node是為節點,設置尾節點是pred
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 不是頭結點和尾節點,前節點是SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
附上一張acquire獨占式獲取同步狀態的流程圖:
②、release()獨占式釋放同步狀態,釋放線程后喚醒節點。
源代碼:
其中tryRelease也需要同步組件自己去實現,語義也被AQS所定義,true代表釋放成功,false代表失敗,如果為true,就需要決定是否去喚醒節點,首先獲取同步隊列的頭節點,判斷頭結點不是空,證明有同步對別有節點才需要喚醒,判斷頭結點不是剛剛初始化,如果是剛剛初始化,就還沒有阻塞,請參考acquire的acquireQueued處理節點的邏輯,都為true執行unparkSuccessor方法,false返回。
public final boolean release(int arg) {
// 釋放鎖
if (tryRelease(arg)) {
// 獲取頭結點
Node h = head;
// 頭結點不為空,證明初始化了
// 證明頭結點不是剛剛創建
// 那就可以去喚醒頭結點或者它的后繼節點
// 為0就證明沒有其他節點了,不需要喚醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor()方法,喚醒節點,喚醒的node的后置節點,因為在獲取同步狀態是我們阻塞的也是后置節點,喚醒后置節點后,會去找到前節點,也就是當前的結點去獲取同步狀態,然后再把自己變成頭結點。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 沒有這個節點或者超時或者被中斷了,查找一個可以用的節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 證明有這個節點
if (s != null)
LockSupport.unpark(s.thread);
}
獨占式釋放同步狀態的流程圖:
③、acquireInterruptibly()響應中斷的獨占式獲取同步狀態
可以看出如果線程中斷立馬返回異常,然后再去執行tryAcquire()獲取同步狀態,獲取失敗執行doAcquireInterruptibly方法。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly()方法,和acquire的acquireQueued的方法差不多,區別就是在parkAndCheckInterrupt這個方法如果返回true,就會拋異常InterruptedException,說明這個方法響應異常。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
④、tryAcquireNanos()帶時間的獲取同步狀態,在時間內獲取到,返回true,超時返回false,首先判斷線程中斷狀態,為true就拋異常,為false就嘗試獲取同步狀態tryAcquire,獲取失敗執行doAcquireNanos方法。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos()方法,其實這個都是在acquire方法上的改進,我們看看這個方法,首先算下時間也就是deadline,然后加入同步隊列addWaiter方法,然后判斷node的前節點是否為頭結點,是就嘗試獲取同步狀態,都為true就返回,為false就接著算下時間,判斷node前節點是否為SIGNAL,也就是shouldParkAfterFailedAcquire這個方法,為true,線程阻塞計算的時間,然后true(等待阻塞時間到)和false都判斷線程中斷狀態,中斷就拋出異常,執行異常方法,不為true,繼續循環,直到獲取鎖或者超時。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2)、共享系列的方法
①、acquireShared共享式獲取同步狀態,獲取失敗就加入同步隊列
AQS也把語義指定好了,返貨負數證明沒有了,就執行doAcquireShared方法
public final void acquireShared(int arg) {
// 返回負數就證明沒有鎖了,加入同步隊列
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared()方法,首先構造節點加入隊列addWaiter,然后獲取node的前節點,判斷node的前節點是否為頭結點,如果是,獲取資源的個數,如果資源大于等于0,調用setHeadAndPropagate方法,然后返回,不滿足,調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法和獨占式一樣。
private void doAcquireShared(int arg) {
// 構建共享節點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取node的前節點
final Node p = node.predecessor();
// 前節點是否是頭節點
if (p == head) {
// 獲取鎖的個數
int r = tryAcquireShared(arg);
// 大于等于0,獲取鎖成功
if (r >= 0) {
setHeadAndPropagate(node, r); // 設置頭結點,如果有多余資源接著喚醒
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate()方法,設置頭結點,設置waitStatus為Propagate,為如果還有資源,喚醒后面的節點,調用doReleaseShared方法(這個方法會在共享式釋放同步狀態詳解)
private void setHeadAndPropagate(Node node, int propagate) {
// 頭結點
Node h = head; // Record old head for check below
// 設置頭結點為node
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 還有資源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 當前節點的next節點是共享或者沒有next節點
if (s == null || s.isShared())
// 喚醒后置節點
doReleaseShared();
}
}
②、releaseShared()共享式釋放狀態
tryReleaseShared是需要同步組件自己去實現,釋放成功調用doReleaseShared喚醒節點
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()方法,方法有些復雜,不好理解,我們主要來分析三個if的含義,第一個 if (ws == Node.SIGNAL) 表示當前node需要被喚醒,然后后面利用cas設置waitStatus為0,因為是共享模式可能有多個線程同時來釋放同步狀態,所以只能有一個釋放成功,另外一個重試;第二個else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),其實也是用來處理并發的,當第一次并發失敗的線程第二次進入時,可能會看到ws等于0(因為成功的線程設置的),所以利用cas設置為PROPAGATE,表示傳遞,這里補充一下不管是0或者PROPAGATE,都會被喚醒的線程利用cas設置為SIGNAL(參考shouldParkAfterFailedAcquire方法);第三個(h == head)
被喚醒的線程B會首先執行setHead
因此如果最后h!=head,說明新一輪的喚醒競爭已經開始,當前線程c已經覺察到,因此繼續參與競爭,加快喚醒
因此如果最后h==head,說明新一輪的喚醒競爭尚未開始,而被喚醒的線程B必然會開啟新一輪的喚醒競爭,而當前線程c可以安心退出喚醒競選
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
// 頭結點
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 表示后面節點需要喚醒
if (ws == Node.SIGNAL) {
// 多線程控制并發 ,可能存在多個線程同時來修改
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果ws等于0,嘗試把cas設置waitStatus為PROPAGATE,傳遞下去
// 請聯系shouldParkAfterFailedAcquire方法一起看
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果頭結點沒有發生變化,表示設置完成,退出循環
// 如果發生變化,加入喚醒的過程(加速喚醒,可能存在多個線程在喚醒這些node,速度比一個接一個要快)
if (h == head) // loop if head changed
break;
}
}
③、acquireSharedInterruptibly()和tryAcquireSharedNanos()一個響應中斷,一個響應中斷支持添加獲取的超時時間(參考獨占模式的這些方法)
3)、ConditionObject系列方法
①、await()方法,類似Object的await方法,阻塞線程釋放鎖。
我們可以看見await的第一步是調用addConditionWaiter方法,它的作用是構建等待節點加入隊列的尾部,使用的也是AQS的Node,隊列里面順便也會清理清除Node不為CONDITION的節點;第二步需要釋放線程獲取的同步狀態fullyRelease方法;第三步:阻塞線程,找到線程中斷時機,也就是調用signal方法的前后順序;第四步:調用acquireQueued方法處理節點(阻塞還是其它);第五步:清理節點unlinkCancelledWaiters方法(清除Node不為CONDITION的節點);第六步:響應await語義,await阻塞線程時調用interrupt方法會拋異常reportInterruptAfterWait方法。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入等待隊列,清除節點
Node node = addConditionWaiter();
// 釋放狀態
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞線程
LockSupport.park(this);
// 線程是被中斷喚醒的
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 加入同步隊列
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清理節點
unlinkCancelledWaiters();
if (interruptMode != 0)
// 響應中斷 await語義
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter方法每次都是加入隊列的尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
// 獲取第一個
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 獲取第一個的下一個
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
// t需要斷開連接
t.nextWaiter = null;
// 第一次trail = null
// firstWaiter = next
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
isOnSyncQueue方法判斷node是否在同步隊列中
final boolean isOnSyncQueue(Node node) {
// 節點狀態為CONDITION ,或者node.prev == null 等待節點沒有前置節點
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 等待節點沒有next節點
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 循環查找
return findNodeFromTail(node);
checkInterruptWhileWaiting 方法,判斷是否是中斷喚醒,這方法就是為了確認中斷的時機是在signal的前面還是后面signal,因為需要響應中斷
private int checkInterruptWhileWaiting(Node node) {
// 判斷是否是線程中斷喚醒
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 設置成功表示在signal 執行之前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 設置成功表示在signal 執行之后
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
acquireQueued和unlinkCancelledWaiters方法前面都介紹過了,一個是加入同步隊列,一個是清理節點,介紹下reportInterruptAfterWait方法,它是我為了響應線程Interrupt方法,interruptMode == THROW_IE只在在signal方法后調用Interrupt方法才滿足,線程阻塞時調用Interrupt方法會拋異常,這是Object.await里面滿足的,請參考checkInterruptWhileWaiting方法里面的transferAfterCancelledWait方法理解其實現。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
②、awaitNanos(long nanosTimeout)和awaitUntil(Date deadline)都是提供了超時時間,和await方法類似,只是加入了時間機制。
③、awaitUninterruptibly不響應中斷方法,發現里面都沒有判斷是都發生中斷的標記,只有調用signal喚醒node,循環才會結束,然后調用acquireQueued處理這個節點(阻塞還是其它)
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
④、signal方法,喚醒第一個等待隊列的node。
public final void signal() {
// 判斷是否獲取鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 初始化
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal方法,首先把這個first節點和等待隊列斷開連接,然后把調用transferForSignal方法把節點從等待隊列加入同步隊列,喚醒節點的線程,然后被喚醒的線程就會在await方法里面執行acquireQueued這個方法。
private void doSignal(Node first) {
do {
// 隊列里面即將沒有節點,所以首尾都要為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 把first 斷開連接
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
⑤、signalAll喚醒所有等待隊列的節點加入同步隊列,并且清空等待隊列
public final void signalAll() {
// 判斷是否獲取鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 獲取第一個節點
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
// 隊列設置為null
lastWaiter = firstWaiter = null;
// 從首節點開始加入同步隊列,知道隊列為空
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
四、總結
我們學習AQS其實我覺得主要從三個方面,也就是本文的第三部分,從獨占式獲取和釋放同步狀態、共享式獲取和釋放同步狀態和ConditionObject里面的等待/通知機制;這里在說一下獨占式釋放鎖和共享式釋放鎖,獨占式因為只會有一個線程獲取同步狀態,所以釋放時也只會有一個,但是在共享這一塊,我們在釋放同步同步狀態時可能會有多個線程同時來釋放,可能出現并發的情況,理解doReleaseShared是理解共享式釋放的重點;學習獲取和釋放同步狀態,理解同步隊列節點的變化是重點;學習等待/通知理解等待隊列和同步隊列的關系和節點的轉換;只有學習好了AQS才能更好的學習后面JUC的那些鎖。
最后感慨下AQS里面的邏輯是真心有些繞,本人有些理解的可能有些不夠。
參考《Java 并發編程的藝術》
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。