您好,登錄后才能下訂單哦!
這篇文章主要介紹“Java AQS的實現原理是什么”,在日常操作中,相信很多人在Java AQS的實現原理是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Java AQS的實現原理是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
我們這里借助ReentrantLock
來搞清楚AQS的實現原理。
這個方法就是開始獲取鎖運行的入口,在這個方法的實現中,交給了sync
對象來獲取鎖。
public void lock() { sync.acquire(1); } private final Sync sync; // Sync對象是一個ReentrantLock實現的內部抽象類,具體的實現又分為了公平版本與非公平兩種 abstract static class Sync extends AbstractQueuedSynchronizer {} // 在ReentrantLock的無參構造器中,默認使用的實現就是非公平鎖的實現 public ReentrantLock() { sync = new NonfairSync(); } // 也可以通過帶參數的構造器來使用公平鎖 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
由于公平鎖FairSync
和NonfairSync
的差別主要在tryAcquire
方法上,別的邏輯都是相同的,因此我們就直接看Sync
和AQS中的實現。
方法實現如下,來自AQS的實現:
// 首先會調用 tryAcquire 和 acquireQueued 方法,如果2個方法都返回true的話, // 那么才會調用自行中斷的邏輯 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
tryAcquire
方法就會因為公平鎖和非公平鎖的差異,有2種不同的實現,首先來看看非公平鎖的實現,也就是ReentrantLock
的默認策略。
這個方法會直接調用并返回 Sync
實現的 nonfairTryAcquire(acquires)
方法。
Sync類中的實現
// 這里的參數 acquires = 1 final boolean nonfairTryAcquire(int acquires) { // 獲取當前調用者的線程對象 final Thread current = Thread.currentThread(); // 獲取AQS中定義的state值,這個state值是AQS的核心之一 int c = getState(); // 在ReentrantLock的實現中,state就表示當前是否有線程持有鎖,0代表沒有線程持有鎖, // 當前訪問的線程就可以繼續執行代碼,如果大于0則表示當前持有鎖的線程的數量。 // 由于ReentrantLock屬于可重入鎖,因此,這個值會>=1 if (c == 0) { // 能進來就表示當前沒有線程持有鎖,那么嘗試用CAS獲取鎖 if (compareAndSetState(0, acquires)) { // 獲取鎖成功,那么將當前線程設置到AQS中的當前線程中 setExclusiveOwnerThread(current); return true; } } // 如果當前持有鎖的就是自己,那么就代表是鎖的重入 else if (current == getExclusiveOwnerThread()) { // 累計持有鎖的次數 int nextc = c + acquires; // 這里就說明了,state能夠設置的最大值就是Int.MAX_VALUE, // 當處于MAX_VALUE的時候再加1,那么Int數字的最高位就會變成1,符號位為負 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新新的state值 setState(nextc); return true; } return false; }
AQS中的實現
private volatile int state; // 當前持有鎖的線程對象 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
小結一下,非公平鎖tryAcquire
方法就是先看看有沒有線程持有鎖,沒有的話自己就通過CAS的方式嘗試獲取一下鎖,如果獲取鎖成功或者是自己重入,那么tryAcquire
方法就會返回true,acquire
方法中的條件判斷就會直接返回false,lock方法結束,線程繼續支持下面的代碼。
下面來看看公平鎖的實現,大體的邏輯跟非公平的是相同的。
FairSync中的實現
// 這里的參數 acquire = 1 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 判斷當前是不是有線程持有鎖 if (c == 0) { // 當前沒有線程持有鎖就進來 // 由于是公平鎖,那么就要保證只有在當前等待隊列為空或者隊列中等待的線程 // 都沒有到運行的條件的時候,才嘗試通過CAS來獲取鎖。否則就去乖乖排隊 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 同非公平鎖,鎖重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
AQS中的實現
// 檢查當前AQS等待隊列中是否有正在等待的有效線程節點 public final boolean hasQueuedPredecessors() { Node h, s; // 首先讓參數h指向當前隊列的頭部 if ((h = head) != null) { // 隊列不為空 // 將臨時變量賦值為當前第二個節點 // 這里需要簡單說明一下AQS的等待隊列的構成,第一個節點是沒有業務含義的, // 只是用作喚醒下一個待執行的線程節點 if ((s = h.next) == null || s.waitStatus > 0) { // 能進來就表示當前隊列只有一個頭結點,或者第二個節點的狀態是已取消 // - > 參考說明1 // 如果第二個節點不為空,那么就釋放這個引用 s = null; // traverse in case of concurrent cancellation // 從后往前遍歷,找到距離隊列頭最近的有效節點 for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p; } } // 如果找到了正在隊列中的排隊的有效節點并且不是當前訪問的線程,那么就返回true if (s != null && s.thread != Thread.currentThread()) return true; } // 頭結點指向NULL,那么說明隊列是空的,直接返回false return false; }
說明1:這里就引入了隊列節點中的等待狀態這個重要的概念,在ReentrantLock
中,我們只需要關注CANCELLED
和SIGNAL
即可。只有CANCELLED
是大于0的,新節點的默認值為0。因此只要等待狀態大于0就代表該節點被取消了。
// 等待隊列中節點的等待狀態 volatile int waitStatus; // 當前節點因為等待超時或者被中斷了被取消 static final int CANCELLED = 1; // 接下來有資格被喚醒獲得鎖的標記,只有獲得了這個標記的節點才能被執行完的線程喚醒 static final int SIGNAL = -1;
小結一下,公平鎖對比非公平鎖,在最開始有機會獲取鎖的時候,會先檢查一下當前隊列中是否已經有線程在排隊等待執行了,如果等待隊列中是空的或者沒有有效的排隊節點,才會獲取鎖。如果獲取鎖成功,或者鎖重入成功,那么同樣會結束AQS的邏輯,繼續執行業務代碼。
上面分析完在tryAcquire
方法中如果成功獲得鎖的情況,就會結束AQS的邏輯,接下來就來分析未能成功獲得鎖的邏輯,即:acquire
方法中條件判斷的第二個條件判斷。
在AQS中實現
// 這個方法就會將當前線程添加到等待隊列中,并且返回操作是否成功,arg就是傳入的acquire值,為1 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
首先通過addWaiter
方法構建一個包含線程對象的節點并且添加到隊列中
// 這里的參數為 Node.EXCLUSIVE,表示這是一個排它鎖的實現,這里的值為NULL private Node addWaiter(Node mode) { // 構建一個線程節點 Node node = new Node(mode); // 這里就是AQS的核心理念了,通過不斷的自旋,將線程節點插入到隊列中 for (;;) { // 獲取原來的隊列的隊尾,因為AQS才去的尾插方式 Node oldTail = tail; if (oldTail != null) { // 將新插入的節點指向原來的尾結點 node.setPrevRelaxed(oldTail); // 通過CAS的方式將當前節點設置到線程共享隊列的尾部去,這里要注意, // 凡是涉及到多線程操作的屬性,都需要通過CAS保證操作的原子性 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; // 設置成功,就返回插入的節點對象;如若不成功, // 就表示有別的線程也修改了尾結點,那么就要等下一次循環重試 return node; } } else { // 沒有尾結點說明隊列不存在,那么就進行初始化 initializeSyncQueue(); } } } // 初始化等待隊列 private final void initializeSyncQueue() { Node h; // 還是通過CAS的方式,給隊列初始化一個默認的Node節點,幾個重要的屬性的初始值如下 // waitStatus = 0; thread = null if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; }
小結一下,addWaiter
方法通過尾插的方式將沒有搶到鎖的線程封裝為Node
節點,插入到AQS的等待隊列中。如果隊列還未初始化,那么就先初始化隊列,隊列自帶一個無實際含義的頭結點。
在AQS中實現
// arg就是傳入的acquire參數,為1;該方法返回值的含義為,線程在等待過程中是否被中斷 final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { // 還是不斷的自旋,等待機會進行操作 for (;;) { // 獲取新插入節點的上一個節點 final Node p = node.predecessor(); // 如果上一個節點已經是頭結點,那么說明當前就已經輪到當前線程獲取鎖執行業務了 // 那么就再次嘗試搶一次鎖 if (p == head && tryAcquire(arg)) { // 能進來就說明獲取鎖成功了 // 那么就將當前線程的節點設置為頭結點 // 在這個方法中,會去除節點中的信息,做一個純粹的頭結點 setHead(node); // 將已經沒有指向的原頭結點的next指為空,等待回收 p.next = null; // help GC // 這里返回的值為false,因為當前線程并未被阻塞就獲得了鎖 return interrupted; } // 走到這里說明當前線程并沒有獲取到鎖,那么就要考慮是否要將線程阻塞了 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } // 線程獲取鎖失敗之后,是否需要將線程阻塞,這里2個參數, // pred是新插入節點的上一個節點,node是新插入的節點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取上一個節點的等待狀態 int ws = pred.waitStatus; // 判斷上一個節點的狀態是不是SIGNAL,只有狀態為SIGNAL的才是有效的可指向節點 if (ws == Node.SIGNAL) // 如果上一個節點是SIGNAL狀態,那就說明當前線程可以連接上該節點,然后被掛起了 return true; // 上面我們提到過,只有節點被取消了,等待狀態才會>0 if (ws > 0) { // 一直往前找,直到找到等待狀態<=0的,數據規范的話即找到最近的一個等待狀態 // 為SIGNAL的節點,跳過全部被取消的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 此時pred指向的即第一個合法的線程節點,指向當前新插入的節點 pred.next = node; } else { // 這里的意思就是上一個節點就是有效節點,那么就將上一個節點的等待狀態強制 // 更新為SIGNAL,即-1。毫無意義的那個頭結點也會被設置為SIGNAL狀態 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; }
小結一下,這個方法的含義就是,由于當前線程沒有獲取到鎖資源,因此就需要被阻塞。同時在這個方法中,也會整理等待隊列,將那些已經被取消的節點從隊列中移除。接著就是調用條件判斷中的執行方法,將線程掛起阻塞起來。
private final boolean parkAndCheckInterrupt() { // 調用了park方法,底層是調用UnSafe類的park方法來實現 LockSupport.park(this); return Thread.interrupted(); }
至此,lock
方法的全部情況都清楚了,如果線程能拿到鎖,那就直接結束lock階段,要是搶不到鎖,那么就進入等待隊列中,在進入隊列之前,如果發現還有機會獲取鎖,那么會再次嘗試獲取一次,如若還是獲取不到,那么就以尾插的方式進入等待隊列,通過調用LockSupport.park方法,將線程阻塞,等待被喚醒。
主動調用這個方法以后,就代表對于鎖的占用結束。
在ReentrantLock中實現
public void unlock() { sync.release(1); }
在AQS中實現
public final boolean release(int arg) { // 調用tryRelease方法真正實現解除鎖 // 只有state加的所有鎖被解除了,那么才會喚醒下一個線程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在Sync中實現
// 這里的releases就是傳入的參數1,即如果是重入鎖,那么這里需要解鎖多次 protected final boolean tryRelease(int releases) { // 計算state的值,這里的含義就是state-1 int c = getState() - releases; // 如果當前線程不是持有鎖的線程,那么就報錯 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 鎖是否完全釋放完的標記 boolean free = false; // state減到0說明鎖已經完全釋放完了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 更新state的值 setState(c); // 只有鎖被完全釋放完,才返回true return free; }
在AQS中實現
// 喚醒下一個需要鎖的線程,這里的node是頭結點 private void unparkSuccessor(Node node) { // 獲得頭結點的等待狀態 int ws = node.waitStatus; // 如果頭結點是SIGNAL,那么重置為0,因為這個節點已經沒有意義了,會被移除 if (ws < 0) node.compareAndSetWaitStatus(ws, 0); // 獲得頭結點后面的待喚醒的節點 Node s = node.next; // 如果這個待喚醒的節點為空或者等待狀態不正確,在這里就是不等于SIGNAL if (s == null || s.waitStatus > 0) { s = null; // 從尾部開始查詢,找到合法的待喚醒節點 for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) // 喚醒線程 LockSupport.unpark(s.thread); }
這個方法提供了取消線程等待獲取鎖的功能
在AQS中被實現
private void cancelAcquire(Node node) { // 健壯性判斷,節點非空才可以被取消 if (node == null) return; // 將節點中的線程指向去掉 node.thread = null; // 獲取到當前節點的上一個節點 Node pred = node.prev; // 通過循環,跳過所有當前被取消節點之前的也已經被標記取消的節點 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 通過上面的循環以后pred的值就為等待隊列中隊尾的一個合法的未被取消的節點 // 獲取到該合法節點下一個指向的節點 Node predNext = pred.next; // 標記當前被取消的節點的等待狀態為被取消 node.waitStatus = Node.CANCELLED; // 如果被取消的節點就是隊尾的節點,那么就通過CAS將pred設置為尾結點 // 就可以拋棄中間那些同樣被標記為被取消的節點,如果有的是 if (node == tail && compareAndSetTail(node, pred)) { // 將pred的下一個節點指向NULL,因為pred現在就是隊尾 pred.compareAndSetNext(predNext, null); } else { // 這說明取消的節點不是尾結點,而是中間的節點 // 這個值會在下面的條件判斷被賦值為上一個節點的等待狀態 int ws; // 如果找到的上一個合法節點不是頭結點 // 并且上一個節點的等待狀態是SIGNAL,并且將不是SIGNAL狀態的負數狀態轉換為SIGNAL // 并且線程不為空 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { // 能進來就說明被取消的節點處于中間,那么就要將這個node從隊列中跳過 Node next = node.next; // 如果被取消的節點是一個有效的節點,不為空并且狀態也是對的 if (next != null && next.waitStatus <= 0) // 那么就將上一個節點指向被取消節點的下一個節點 pred.compareAndSetNext(predNext, next); } else { // 能進入這里說明上一個合法節點已經是頭結點了, // 那么就說明被取消的這個節點已經是原本除了頭結點以外的最靠前面的節點, // 那么被取消的這個節點其實就等價于頭結點了,應該喚醒后面還在等待的線程節點 // 喚醒下一個被掛起的線程,具體已經分析過了,這里就省略了 unparkSuccessor(node); } node.next = node; } }
到此,關于“Java AQS的實現原理是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。