您好,登錄后才能下訂單哦!
這篇文章主要講解了“DelayQueue使用方式是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“DelayQueue使用方式是什么”吧!
假設我們生產者提交一個任務,消費者5秒鐘之后才可以執行,那么我們可以把任務定義為如下格式,并實現Delayed接口,其中data是任務存儲的信息。
/** * 具體的任務 * @author wangshixiang */ public class Task implements Delayed { /** * 數據 */ private final String data; /** * 任務執行時間 */ private final long time; public Task(String data,TimeUnit timeUnit,long time){ this.data=data; this.time=System.currentTimeMillis()+timeUnit.toMillis(time); } @Override public long getDelay(TimeUnit unit) { long res= time-System.currentTimeMillis(); return unit.convert(res,TimeUnit.MILLISECONDS); } public String getData() { return data; } @Override public int compareTo(Delayed o) { if (o instanceof Task ){ Task task= (Task) o; return (int) (this.time-task.time); } return 0; } }
定義好任務后,我們需要定義一個任務隊列 QUEUE_TASK,來存儲消息,實現效果為程序運行后 五秒鐘后輸出Hello...
private static final DelayQueue<Task> QUEUE_TASK =new DelayQueue<>(); public static void main(String[] args) throws InterruptedException { QUEUE_TASK .add(new Task("Hello ... ", TimeUnit.SECONDS,5)); System.out.println(QUEUE_TASK .take().getData()); }
Delayed 接口定義:
public interface DeDlayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
我們發現Delayed接口繼承了Comparable接口,并且有一個getDelay方法,在程序運行的過程中,會調用頭部任務的這個方法,來返回該任務具體還有多長時間可以執行。當我們任務實現這個接口時 可以存儲任務的執行時間,通過執行時間-當前時間 計算出距離執行時間的差值,因此我們Task定義了一個任務的變量,在創建對象時設置任務的執行時間。
2. DelayQueue 延時隊列
首先我們看一下DelayQueue類繼承實現結構圖
可以理解為 DelayQueue 是一個帶延遲執行功能的阻塞隊列
為什么Delayed接口繼承了Comparable接口 ?
DelayQueue是怎么實現只有到預定時間才能取出任務 ?
向隊列里放入一個任務時 發生了什么事情 ?
帶著這幾個問題,我們來看一下DelayQueeu的源碼 首先看一下主要的參數:
//鎖 private final transient ReentrantLock lock = new ReentrantLock(); //優先級隊列 執行時間最早的排在第一個 private final PriorityQueue<E> q = new PriorityQueue<E>(); //是否有線程在等待任務到執行時間 private Thread leader; //條件喚醒 private final Condition available = lock.newCondition();
那么我們先看add(E e)方法 ,任務入隊列時做了哪些操作
public boolean add(E e) { return offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
入隊列時做了一下步驟:
獲取鎖
放入元素 (放入優先級隊列)
如果自己排在第一個 則原來標記的leader線程已經失效 直接設置為null,并喚醒消費者
釋放鎖
接下來在看出隊列時take()方法做了哪些操作
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) //我拿到元素了 喚醒其他的線程 available.signal(); lock.unlock(); } }
出隊列做了如下步驟:
獲取鎖(可中斷的鎖 獲取這種鎖允許其他線程中斷此線程)
取出第一個元素 如果第一個元素為空 則直接 await(),等待被喚醒(如放隊列時的喚醒)
如果第一個元素不為空,查看是否到執行時間,如果沒有到執行時間 查看是否有leader已經注意到這個任務 如果他注意到這個任務 我直接await()。如果沒人注意,那么我就把自己設置為leader然后設置帶時間的await()。
睡眠到執行時間后 醒來后查看leader是否還是自己 如果是的話 取消自己的leader身份。然后在嘗試獲取任務。
如果我獲取到了符合要求的元素,那么我應該喚醒大家 來一塊競爭獲取下一個元素。
帶時間的出隊列方法 E poll(long timeout, TimeUnit unit) 的實現邏輯與take()方法的唯一區別就是。只有當自己剩余等待時間大于第一個元素剩余執行時間時 才允許把自己設置為leader
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0L) return null; else //睡眠等待時間 有可能提前返回 那么返回的是剩余等待時間 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); if (nanos <= 0L) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) //如果剩余等待時間比第一個元素剩余執行時間還短 那么應該睡剩余等待時間 nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); //計算剩余等待時間 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
在大多數業務場景中,我們會利用中間件提供的延時消息的功能。比如利用redis zset實現 ,kafka rabbit mq 的延時隊列。我們需要根據我們的業務場景,來選擇合適的中間件。
訂單超時未支付取消.
調用其他系統時失敗間隔重試.
調用第三方接口時,過段時間異步獲取結果。
感謝各位的閱讀,以上就是“DelayQueue使用方式是什么”的內容了,經過本文的學習后,相信大家對DelayQueue使用方式是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。