您好,登錄后才能下訂單哦!
本篇內容介紹了“JUC的DelayQueue怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
延遲隊列 DelayQueue 用于存放具有過期屬性的元素,被添加到 DelayQueue 中的元素只有在到達過期時間之后才會出隊列,常用于延遲任務調度。DelayQueue 本質上是一個無界的阻塞隊列,底層依賴于優先級隊列 PriorityQueue 作為存儲結構,并使用 ReentrantLock 鎖保證線程安全。
DelayQueue 的字段定義如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { /** 保證線程安全的可重入獨占鎖 */ private final transient ReentrantLock lock = new ReentrantLock(); /** 底層存儲結構,優先級隊列 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** Leader-Follower 模式,用于記錄角色為 leader 的線程對象 */ private Thread leader = null; /** 因為隊列為空,或元素未到期而阻塞的線程 */ private final Condition available = lock.newCondition(); // ... 省略方法定義 }
重點分析一下 DelayQueue#leader
字段設置的意圖,該字段用于記錄當前角色為 leader 的線程對象。當執行出隊列操作(DelayQueue#take
和 DelayQueue#poll(long, TimeUnit)
方法)時,如果 DelayQueue#leader
字段為 null,即不存在 leader 線程,且有未到期的延遲元素,則會將當前線程設置成 leader 角色,并等待該元素到期,以減少不必要的等待時間,保證延遲元素能夠在到期時及時被響應。
設想如果不這樣設計,那么線程在遇到隊列中沒有到期的延遲元素時應該怎么辦呢?可以采取以下 3 種策略:
進入忙循環輪詢。
進入條件隊列等待,并稍后由其它線程喚醒。
先退出當前方法,等待后續再次執行出隊列操作。
可以看出,這些策略要么是消耗 CPU 資源,要么就是無法對隊列中的元素在到期時及時出隊列,而引入 leader 角色正好能夠避免了這些問題。在某個線程以 leader 的身份等待優先級最高的延遲元素到期時,其它線程在發現隊列中沒有到期的元素時會以 follower 角色無限期等待。而在 leader 線程從等待狀態退出時,它會主動放棄自己的 leader 角色,并喚醒一個正在處于等待狀態的 follower 線程,該線程將有機會晉升成為新的 leader。
在 leader 線程等待期間,有可能會插入優先級更高的元素,這個時候就需要剝奪該線程的 leader 角色,以提供其它線程成為 leader 的機會,繼而保證剛剛新插入的元素能夠在到期時及時被響應。否則就需要等到當前 leader 線程退出等待狀態之后或者有新的線程請求獲取元素時才有機會出隊列,這樣可能存在較大的延遲。
DelayQueue 實現自 BlockingQueue 接口,下面針對核心方法的實現逐一進行分析。不過在開始分析之前,我們先來介紹一下 java.util.concurrent.Delayed
接口,DelayQueue 要求添加到其中的元素必須實現該接口。Delayed 接口定義如下:
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
該接口繼承自 Comparable 接口,所以添加到 DelayQueue 中的元素都是可比較的。方法 Delayed#getDelay
接收一個 TimeUnit 類型的單位值,用于返回當前延遲元素的剩余到期時間,如果小于等于 0 則說明該元素已經到期。
針對添加元素的操作,DelayQueue 實現了 DelayQueue#offer
、DelayQueue#add
和 DelayQueue#put
方法,不過后兩者都是直接調用了 DelayQueue#offer
方法。
此外,該方法的超時版本 DelayQueue#offer(E, long, TimeUnit)
也是直接委托給 DelayQueue#offer
方法執行,并沒有真正實現超時等待機制。這主要是因為 DelayQueue 是無界的,所有的添加操作都能夠被立即響應,而不會阻塞。
下面展開分析一下 DelayQueue#offer
方法的實現,如下:
public boolean offer(E e) { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 往隊列中添加元素 q.offer(e); // 當前添加的元素是隊列中最先過期的 if (q.peek() == e) { // 清空 leader,保證該元素能夠及時出隊列 leader = null; // 喚醒因元素均為到期或隊列為空而等待的線程 available.signal(); } return true; } finally { // 釋放鎖 lock.unlock(); } }
DelayQueue 不允許向其中添加值為 null 的元素,這主要由優先級隊列 PriorityQueue 保證。如果待添加的元素值合法,則執行:
加鎖,保證同一時間只有一個線程在操作隊列;
將待添加元素插入到 DelayQueue 中;
檢查 DelayQueue 中優先級最高的的元素是否是剛剛新加入的元素;
如果是則剝奪當前 leader 線程的 leader 角色,并喚醒一個之前因為隊列中的元素均未到期或隊列為空而等待的線程;
釋放鎖并返回。
為什么當隊列中插入了一個優先級最高的元素時需要剝奪當前 leader 線程的 leader 角色,并喚醒一個處于等待的 follower 線程呢?
其實在前面也有所提及,假設現在 DelayQueue 中最先過期的元素還有 10 秒到期,則 leader 線程會等待 10 秒后再次嘗試出隊列,其它 follower 線程因為檢測到當前已有線程成為 leader,所以在發現沒有已到期的元素時會等待。假設現在有一個還有 5 秒到期的元素插入了進來,如果在該元素到期之后沒有新的線程來請求出隊列,則該元素將不能及時被響應,直到 leader 線程退出等待,即使此時有相當數量的 follower 線程在等待元素到期。
針對獲取元素的操作,DelayQueue 實現了 DelayQueue#poll
、DelayQueue#peek
和 DelayQueue#take
方法。其中 DelayQueue#peek
方法在獲取到鎖的基礎上直接調用了 PriorityQueue#peek
方法,僅獲取 DelayQueue 中最先到期的元素(獲取時可能還未到期),而不移除該元素,實現上比較簡單。
方法 DelayQueue#take
相對于 DelayQueue#poll
的區別在于,當隊列為空或沒有到期的元素時該方法會無限期阻塞,直到有元素到期或該線程被中斷,而 DelayQueue#poll
方法在相同場景下則會立即返回 null。
下面分別展開分析這兩個方法的實現,首先來看一下 DelayQueue#poll
方法,實現如下:
public E poll() { final ReentrantLock lock = this.lock; // 獲取鎖 lock.lock(); try { // 獲取隊列中最先過期的元素 E first = q.peek(); // 隊列為空,或者元素還未到期,立即返回 null if (first == null || first.getDelay(NANOSECONDS) > 0) { return null; } // 當前元素已過期,移除并返回 else { return q.poll(); } } finally { // 釋放鎖 lock.unlock(); } }
上述方法會檢查 DelayQueue 中是否有已經到期的元素,如果有則將該元素出隊列并返回,否則,如果隊列為空或沒有已經到期的元素,則立即返回 null。針對 DelayQueue#poll
方法,DelayQueue 還提供了超時版本 DelayQueue#poll(long, TimeUnit)
,當隊列為空或沒有已經到期的元素時等待指定時間。
再來看一下 DelayQueue#take
方法的實現,如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獲取鎖,支持響應中斷 lock.lockInterruptibly(); try { for (; ; ) { // 獲取最先過期的元素 E first = q.peek(); // 隊列為空,則等待 if (first == null) { available.await(); } // 隊列非空 else { // 如果當前元素已經過期,則出隊列 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) { return q.poll(); } /* 當前元素還未過期 */ first = null; // don't retain ref while waiting // 已經有其它線程成為 leader,則等待 if (leader != null) { available.await(); } // 沒有 leader 線程,將自己設置為 leader 角色 else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待 delay 納秒 available.awaitNanos(delay); } finally { // 如果等待期間自己的 leader 角色未被剝奪,則在等待完成之后主動放棄 if (leader == thisThread) { leader = null; } } } } } } finally { // 如果隊列不為空,則喚醒一個之前因為隊列為空而等待的線程 if (leader == null && q.peek() != null) { available.signal(); } // 釋放鎖 lock.unlock(); } }
方法 DelayQueue#take
在獲取到鎖之后會先檢查隊列是否為空,如果為空則等待,否則執行:
如果 DelayQueue 中優先級最高的元素已經到期,則出隊列并返回該元素;
否則,如果當前已經有 leader 線程,則等待;
如果當前沒有 leader 線程,則將自己設置為 leader 角色,并等待隊列中優先級最高的元素到期。
如果 DelayQueue 中優先級最高的元素到期,或者等待期間被中斷,則當前 leader 線程會主動放棄自己的 leader 角色,以給其它 follower 線程機會。當然在等待期間,當前線程的 leader 角色也可能會被剝奪,前面我們在分析 DelayQueue#offer
方法時已經介紹過,當等待期間有其它優先級更高的元素插入進來時,執行插入的線程會剝奪當前 leader 線程的 leader 角色,以便讓剛剛插入的優先級更高的元素能夠在到期時及時出隊列。
在整個 DelayQueue#take
方法執行的最后,如果 DelayQueue 非空,且當前沒有線程成為 leader,則會喚醒一個之前因為隊列為空而阻塞的 follower 線程。這里限制 leader == null
主要是防止在有 leader 存在的前提下,被喚醒的線程會因為隊列中沒有到期的元素而再次等待。
針對移除元素的操作,DelayQueue 實現了 DelayQueue#remove
方法,并提供了有參和無參的版本,其中無參版本實際上是委托給 DelayQueue#poll
方法執行的。下面來分析一下有參版本的實現,如下:
public boolean remove(Object o) { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 從優先級隊列中移除元素 return q.remove(o); } finally { // 釋放鎖 lock.unlock(); } }
DelayQueue 移除指定元素的操作在獲取到鎖的前提下,交由 PriorityQueue 執行,實現上比較簡單。
DelayQueue 的 DelayQueue#size
方法在實現上與 DelayQueue#remove
方法思路相同,不再展開。但需要注意的一點是,方法 DelayQueue#size
所返回的值并不僅僅包含那些未到期的元素,也可能包含一些已經到期而未被從隊列中移除的元素,一種可能是這些元素未被及時響應,另外一種可能就是線程僅獲取了元素值,而沒有移除對應的結點,例如調用了 DelayQueue#peek
方法。
“JUC的DelayQueue怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。