您好,登錄后才能下訂單哦!
小編給大家分享一下如何實現Condition的await和signal等待/通知機制,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
任何一個java對象都天然繼承于Object類,在線程間實現通信的往往會應用到Object的幾個方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()幾個方法實現等待/通知機制,同樣的, 在java Lock體系下依然會有同樣的方法實現等待/通知機制。從整體上來看Object的wait和notify/notify是與對象監視器配合完成線程間的等待/通知機制,而Condition與Lock配合完成等待通知機制,前者是java底層級別的,后者是語言級別的,具有更高的可控制性和擴展性。兩者除了在使用方式上不同外,在功能特性上還是有很多的不同:
Condition能夠支持不響應中斷,而通過使用Object方式不支持;
Condition能夠支持多個等待隊列(new 多個Condition對象),而Object方式只能支持一個;
Condition能夠支持超時時間的設置,而Object不支持
參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:
針對Object的wait方法
void await() throws InterruptedException:當前線程進入等待狀態,如果其他線程調用condition的signal或者signalAll方法并且當前線程獲取Lock從await方法返回,如果在等待狀態中被中斷會拋出被中斷異常;
long awaitNanos(long nanosTimeout):當前線程進入等待狀態直到被通知,中斷或者超時;
boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種,支持自定義時間單位
boolean awaitUntil(Date deadline) throws InterruptedException:當前線程進入等待狀態直到被通知,中斷或者到了某個時間
針對Object的notify/notifyAll方法
void signal():喚醒一個等待在condition上的線程,將該線程從等待隊列中轉移到同步隊列中,如果在同步隊列中能夠競爭到Lock則可以從等待方法中返回。
void signalAll():與1的區別在于能夠喚醒所有等待在condition上的線程
要想能夠深入的掌握condition還是應該知道它的實現原理,現在我們一起來看看condiiton的源碼。創建一個condition對象是通過lock.newCondition()
,而這個方法實際上是會new出一個ConditionObject對象,該類是AQS的一個內部類,有興趣可以去看看。前面我們說過,condition是要和lock配合使用的也就是condition和Lock是綁定在一起的,而lock的實現原理又依賴于AQS,自然而然ConditionObject作為AQS的一個內部類無可厚非。
我們知道在鎖機制的實現上,AQS內部維護了一個同步隊列,如果是獨占式鎖的話,所有獲取鎖失敗的線程的尾插入到同步隊列,同樣的,condition內部也是使用同樣的方式,內部維護了一個 等待隊列,所有調用condition.await方法的線程會加入到等待隊列中,并且線程狀態轉換為等待狀態。另外注意到ConditionObject中有兩個成員變量:
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
這樣我們就可以看出來ConditionObject通過持有等待隊列的頭尾指針來管理等待隊列。主要注意的是Node類復用了在AQS中的Node類,其節點狀態和相關屬性可以去看,如果您仔細看完這篇文章對condition的理解易如反掌,對lock體系的實現也會有一個質的提升。Node類有這樣一個屬性:
//后繼節點 Node nextWaiter;
進一步說明,等待隊列是一個單向隊列,而在之前說AQS時知道同步隊列是一個雙向隊列。接下來我們用一個demo,通過debug進去看是不是符合我們的猜想:
public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { lock.lock(); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } }); thread.start(); } }
這段代碼沒有任何實際意義,甚至很臭,只是想說明下我們剛才所想的。新建了10個線程,沒有線程先獲取鎖,然后調用condition.await方法釋放鎖將當前線程加入到等待隊列中,通過debug控制當走到第10個線程的時候查看firstWaiter
即等待隊列中的頭結點,debug模式下情景圖如下:
從這個圖我們可以很清楚的看到這樣幾點:
1. 調用condition.await方法后線程依次尾插入到等待隊列中,如圖隊列中的線程引用依次為Thread-0,Thread-1,Thread-2....Thread-8;
2. 等待隊列是一個單向隊列。通過我們的猜想然后進行實驗驗證,我們可以得出等待隊列的示意圖如下圖所示:
同時還有一點需要注意的是:我們可以多次調用lock.newCondition()方法創建多個condition對象,也就是一個lock可以持有多個等待隊列。而在之前利用Object的方式實際上是指在對象Object對象監視器上只能擁有一個同步隊列和一個等待隊列,而并發包中的Lock擁有一個同步隊列和多個等待隊列。示意圖如下:
如圖所示,ConditionObject是AQS的內部類,因此每個ConditionObject能夠訪問到AQS提供的方法,相當于每個Condition都擁有所屬同步器的引用。
當調用condition.await()方法后會使得當前獲取lock的線程進入到等待隊列,如果該線程能夠從await()方法返回的話一定是該線程獲取了與condition相關聯的lock。接下來,我們還是從源碼的角度去看,只有熟悉了源碼的邏輯我們的理解才是最深的。await()方法源碼為:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 1\. 將當前線程包裝成Node,尾插入到等待隊列中 Node node = addConditionWaiter(); // 2\. 釋放當前線程所占用的lock,在釋放的過程中會喚醒同步隊列中的下一個節點 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 3\. 當前線程進入到等待狀態 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 4\. 自旋等待獲取到同步狀態(即獲取到lock) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 5\. 處理被中斷的情況 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
代碼的主要邏輯請看注釋,我們都知道當當前線程調用condition.await()方法后,會使得當前線程釋放lock然后加入到等待隊列中,直至被signal/signalAll后會使得當前線程從等待隊列中移至到同步隊列中去,直到獲得了lock后才會從await方法返回,或者在等待時被中斷會做中斷處理。那么關于這個實現過程我們會有這樣幾個問題:1. 是怎樣將當前線程添加到等待隊列中去的?2.釋放鎖的過程?3.怎樣才能從await方法退出?而這段代碼的邏輯就是告訴我們這三個問題的答案。具體請看注釋,在第1步中調用addConditionWaiter將當前線程添加到等待隊列中,該方法源碼為:
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //將當前線程包裝成Node Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else //尾插入 t.nextWaiter = node; //更新lastWaiter lastWaiter = node; return node; }
這段代碼就很容易理解了,將當前節點包裝成Node,如果等待隊列的firstWaiter為null的話(等待隊列為空隊列),則將firstWaiter指向當前的Node,否則,更新lastWaiter(尾節點)即可。就是通過尾插入的方式將當前線程封裝的Node插入到等待隊列中即可,同時可以看出等待隊列是一個不帶頭結點的鏈式隊列,之前我們學習AQS時知道同步隊列是一個帶頭結點的鏈式隊列,這是兩者的一個區別。將當前節點插入到等待對列之后,會使當前線程釋放lock,由fullyRelease方法實現,fullyRelease源碼為:
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { //成功釋放同步狀態 failed = false; return savedState; } else { //不成功釋放同步狀態拋出異常 throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
這段代碼就很容易理解了,調用AQS的模板方法release方法釋放AQS的同步狀態并且喚醒在同步隊列中頭結點的后繼節點引用的線程,如果釋放成功則正常返回,若失敗的話就拋出異常。到目前為止,這兩段代碼已經解決了前面的兩個問題的答案了,還剩下第三個問題,怎樣從await方法退出?現在回過頭再來看await方法有這樣一段邏輯:
while (!isOnSyncQueue(node)) { // 3\. 當前線程進入到等待狀態 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
很顯然,當線程第一次調用condition.await()方法時,會進入到這個while()循環中,然后通過LockSupport.park(this)方法使得當前線程進入等待狀態,那么要想退出這個await方法第一個前提條件自然而然的是要先退出這個while循環,出口就只剩下兩個地方:1. 邏輯走到break退出while循環;2. while循環中的邏輯判斷為false。
再看代碼出現第1種情況的條件是當前等待的線程被中斷后代碼會走到break退出,第二種情況是當前節點被移動到了同步隊列中(即另外線程調用的condition的signal或者signalAll方法),while中邏輯判斷為false后結束while循環。總結下,就是當前線程被中斷或者調用condition.signal/condition.signalAll方法當前節點移動到了同步隊列后 ,這是當前線程退出await方法的前提條件。
當退出while循環后就會調用acquireQueued(node, savedState)
,這個方法在介紹AQS的底層實現時說過了,若感興趣的話可以去,該方法的作用是在自旋過程中線程不斷嘗試獲取同步狀態,直至成功(線程獲取到lock)。這樣也說明了退出await方法必須是已經獲得了condition引用(關聯)的lock。到目前為止,開頭的三個問題我們通過閱讀源碼的方式已經完全找到了答案,也對await方法的理解加深。await方法示意圖如下圖:
如圖,調用condition.await方法的線程必須是已經獲得了lock,也就是當前線程是同步隊列中的頭結點。調用該方法后會使得當前線程所封裝的Node尾插入到等待隊列中。
超時機制的支持
condition還額外支持了超時機制,使用者可調用方法awaitNanos,awaitUtil。這兩個方法的實現原理,基本上與AQS中的tryAcquire方法如出一轍,關于tryAcquire可以仔細閱讀。
不響應中斷的支持
要想不響應中斷可以調用condition.awaitUninterruptibly()方法,該方法的源碼為:
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
這段方法與上面的await方法基本一致,只不過減少了對中斷的處理,并省略了reportInterruptAfterWait方法拋被中斷的異常。
調用condition的signal或者signalAll方法可以將等待隊列中等待時間最長的節點移動到同步隊列中,使得該節點能夠有機會獲得lock。按照等待隊列是先進先出(FIFO)的,所以等待隊列的頭節點必然會是等待時間最長的節點,也就是每次調用condition的signal方法是將頭節點移動到同步隊列中。我們來通過看源碼的方式來看這樣的猜想是不是對的,signal方法源碼為:
public final void signal() { //1\. 先檢測當前線程是否已經獲取lock if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //2\. 獲取等待隊列中第一個節點,之后的操作都是針對這個節點 Node first = firstWaiter; if (first != null) doSignal(first); }
signal方法首先會檢測當前線程是否已經獲取lock,如果沒有獲取lock會直接拋出異常,如果獲取的話再得到等待隊列的頭指針引用的節點,之后的操作的doSignal方法也是基于該節點。下面我們來看看doSignal方法做了些什么事情,doSignal方法源碼為:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //1\. 將頭結點從等待隊列中移除 first.nextWaiter = null; //2\. while中transferForSignal方法對頭結點做真正的處理 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
具體邏輯請看注釋,真正對頭節點做處理的邏輯在transferForSignal放,該方法源碼為:
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //1\. 更新狀態為0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //2.將該節點移入到同步隊列中去 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
關鍵邏輯請看注釋,這段代碼主要做了兩件事情1.將頭結點的狀態更改為CONDITION;2.調用enq方法,將該節點尾插入到同步隊列中,關于enq方法請看AQS的底層實現這篇文章。現在我們可以得出結論:調用condition的signal的前提條件是當前線程已經獲取了lock,該方法會使得等待隊列中的頭節點即等待時間最長的那個節點移入到同步隊列,而移入到同步隊列后才有機會使得等待線程被喚醒,即從await方法中的LockSupport.park(this)方法中返回,從而才有機會使得調用await方法的線程成功退出。signal執行示意圖如下圖:
signalAll
sigllAll與sigal方法的區別體現在doSignalAll方法上,前面我們已經知道doSignal方法只會對等待隊列的頭節點進行操作,,而doSignalAll的源碼為:
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
該方法只不過時間等待隊列中的每一個節點都移入到同步隊列中,即“通知”當前調用condition.await()方法的每一個線程。
文章開篇提到等待/通知機制,通過使用condition提供的await和signal/signalAll方法就可以實現這種機制,而這種機制能夠解決最經典的問題就是“生產者與消費者問題”,關于“生產者消費者問題”之后會用單獨的一篇文章進行講解,這也是面試的高頻考點。await和signal和signalAll方法就像一個開關控制著線程A(等待方)和線程B(通知方)。它們之間的關系可以用下面一個圖來表現得更加貼切:
如圖,線程awaitThread先通過lock.lock()方法獲取鎖成功后調用了condition.await方法進入等待隊列,而另一個線程signalThread通過lock.lock()方法獲取鎖成功后調用了condition.signal或者signalAll方法,使得線程awaitThread能夠有機會移入到同步隊列中,當其他線程釋放lock后使得線程awaitThread能夠有機會獲取lock,從而使得線程awaitThread能夠從await方法中退出執行后續操作。如果awaitThread獲取lock失敗會直接進入到同步隊列。
我們用一個很簡單的例子說說condition的用法:
public class AwaitSignal { private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); private static volatile boolean flag = false; public static void main(String[] args) { Thread waiter = new Thread(new waiter()); waiter.start(); Thread signaler = new Thread(new signaler()); signaler.start(); } static class waiter implements Runnable { @Override public void run() { lock.lock(); try { while (!flag) { System.out.println(Thread.currentThread().getName() + "當前條件不滿足等待"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "接收到通知條件滿足"); } finally { lock.unlock(); } } } static class signaler implements Runnable { @Override public void run() { lock.lock(); try { flag = true; condition.signalAll(); } finally { lock.unlock(); } } } }
輸出結果為:
Thread-0當前條件不滿足等待 Thread-0接收到通知,條件滿足
開啟了兩個線程waiter和signaler,waiter線程開始執行的時候由于條件不滿足,執行condition.await方法使該線程進入等待狀態同時釋放鎖,signaler線程獲取到鎖之后更改條件,并通知所有的等待線程后釋放鎖。這時,waiter線程獲取到鎖,并由于signaler線程更改了條件此時相對于waiter來說條件滿足,繼續執行。
看完了這篇文章,相信你對“如何實現Condition的await和signal等待/通知機制”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。