您好,登錄后才能下訂單哦!
這篇文章主要講解了“Java怎么通過AQS實現數據組織”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Java怎么通過AQS實現數據組織”吧!
通過前面的介紹,大家一定看出來了,上述的各種類型的鎖和一些線程控制接口(CountDownLatch 等),最終都是通過 AQS 來實現的,不同之處只在于 tryAcquire 等抽象函數如何實現。從這個角度來看,AQS(AbstractQueuedSynchronizer) 這個基類設計的真的很不錯,能夠包容各種同步控制方案,并提供了必須的下層依賴:比如阻塞,隊列等。接下來我們就來揭開它神秘的面紗。
AQS 顧名思義,就是通過隊列來組織修改互斥資源的請求。當這個資源空閑時間,那么修改請求可以直接進行,而當這個資源處于鎖定狀態時,就需要等待,AQS 會將所有等待的請求維護在一個類似于 CLH 的隊列中。CLH:Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是通過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。主要原理圖如下:
圖中的 state 是一個用 volatile 修飾的 int 變量,它的使用都是通過 CAS 來進行的,而 FIFO 隊列完成請求排隊的工作,隊列的操作也是通過 CAS 來進行的,正因如此該隊列的操作才能達到理想的性能要求。
通過 CAS 修改 state 比較容易,大家應該都能理解,但是如果要通過 CAS 維護一個雙向隊列要怎么做呢?這里我們看一下 AQS 中 CLH 隊列的實現。在 AQS 中有兩個指針一個指針指向了隊列頭,一個指向了隊列尾。它們都是懶初始化的,也就是說最初都為null。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
隊列中的每個節點,都是一個 Node 實例,該實例的第一個關鍵字段是 waitState,它表述了當前節點所處的狀態,通過 CAS 進行修改:
SIGNAL:表示當前節點承擔喚醒后繼節點的責任
CANCELLED:表示當前節點已經超時或者被打斷
CONDITION:表示當前節點正在 Condition 上等待(通過鎖可以創建 Condition 對象)
PROPAGATE:只會設置在 head 節點上,用于表明釋放共享鎖時,需要將這個行為傳播到其他節點上,這個我們稍后詳細介紹。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
//...
}
因為是雙向隊列,所以 Node 實例中勢必有 prev 和 next 指針,此外 Node 中還會保存與其對應的線程。最后是 nextWaiter,當一個節點對應了共享請求時,nextWaiter 指向了 Node. SHARED 而當一個節點是排他請求時,nextWaiter 默認指向了 Node. EXCLUSIVE 也就是 null。我們知道 AQS 也提供了 Condition 功能,該功能就是通過 nextWaiter 來維護在 Condition 上等待的線程。也就是說這里的 nextWaiter 在鎖的實現部分中,扮演者共享鎖和排它鎖的標志位,而在條件等待隊列中,充當鏈表的 next 指針。
接下來,我們由最常見的入隊操作出發,介紹 AQS 框架的實現與使用。從下面的代碼中可以看到入隊操作支持兩種模式,一種是排他模式,一種是共享模式,分別對應了排它鎖場景和共享鎖場景。
當任意一種請求,要入隊時,先會構建一個 Node 實例,然后獲取當前 AQS 隊列的尾結點,如果尾結點為空,就是說隊列還沒初始化,初始化過程在后面 enq 函數中實現
這里我們先看初始化之后的情況,即 tail != null,先將當前 Node 的前向指針 prev 更新,然后通過 CAS 將尾結點修改為當前 Node,修改成功時,再更新前一個節點的后向指針 next,因為只有修改尾指針過程是原子的,所以這里會出現新插入一個節點時,之前的尾節點 previousTail 的 next 指針為null的情況,也就是說會存在短暫的正向指針和反向指針不同步的情況,不過在后面的介紹中,你會發現 AQS 很完備地避開了這種不同步帶來的風險(通過從后往前遍歷)
如果上述操作成功,則當前線程已經進入同步隊列,否則,可能存在多個線程的競爭,其他線程設置尾結點成功了,而當前線程失敗了,這時候會和尾結點未初始化一樣進入 enq 函數中。
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 已經進行了初始化
node.prev = pred;
// CAS 修改尾節點
if (compareAndSetTail(pred, node)) {
// 成功之后再修改后向指針
pred.next = node;
return node;
}
}
// 循環 CAS 過程和初始化過程
enq(node);
return node;
}
正常通過 CAS 修改數據都會在一個循環中進行,而這里的 addWaiter 只是在一個 if 中進行,這是為什么呢?實際上,大家看到的 addWaiter 的這部分 CAS 過程是一個快速執行線,在沒有競爭時,這種方式能省略不少判斷過程。當發生競爭時,會進入 enq 函數中,那里才是循環 CAS 的地方。
整個 enq 的工作在一個循環中進行
先會檢查是否未進行初始化,是的話,就設置一個虛擬節點 Node 作為 head 和 tail,也就是說同步隊列的第一個節點并不保存實際數據,只是一個保存指針的地方
初始化完成后,通過 CAS 修改尾節點,直到修改成功為止,最后修復后向指針
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {// 在一個循環中進行 CAS 操作
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS 修改尾節點
if (compareAndSetTail(t, node)) {
// 成功之后再修改后向指針
t.next = node;
return t;
}
}
感謝各位的閱讀,以上就是“Java怎么通過AQS實現數據組織”的內容了,經過本文的學習后,相信大家對Java怎么通過AQS實現數據組織這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。