您好,登錄后才能下訂單哦!
(1)DelayQueue是阻塞隊列嗎?
(2)DelayQueue的實現方式?
(3)DelayQueue主要用于什么場景?
DelayQueue是java并發包下的延時阻塞隊列,常用于實現定時任務。
從繼承體系可以看到,DelayQueue實現了BlockingQueue,所以它是一個阻塞隊列。
另外,DelayQueue還組合了一個叫做Delayed的接口,DelayQueue中存儲的所有元素必須實現Delayed接口。
那么,Delayed是什么呢?
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed是一個繼承自Comparable的接口,并且定義了一個getDelay()方法,用于表示還有多少時間到期,到期了應返回小于等于0的數值。
// 用于控制并發的鎖
private final transient ReentrantLock lock = new ReentrantLock();
// 優先級隊列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于標記當前是否有線程在排隊(僅用于取元素時)
private Thread leader = null;
// 條件,用于表示現在是否有可取的元素
private final Condition available = lock.newCondition();
從屬性我們可以知道,延時隊列主要使用優先級隊列來實現,并輔以重入鎖和條件來控制并發安全。
因為優先級隊列是×××的,所以這里只需要一個條件就可以了。
還記得優先級隊列嗎?點擊鏈接直達【死磕 java集合之PriorityQueue源碼分析】
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
構造方法比較簡單,一個默認構造方法,一個初始化添加集合c中所有元素的構造方法。
因為DelayQueue是阻塞隊列,且優先級隊列是×××的,所以入隊不會阻塞不會超時,因此它的四個入隊方法是一樣的。
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
入隊方法比較簡單:
(1)加鎖;
(2)添加元素到優先級隊列中;
(3)如果添加的元素是堆頂元素,就把leader置為空,并喚醒等待在條件available上的線程;
(4)解鎖;
因為DelayQueue是阻塞隊列,所以它的出隊有四個不同的方法,有拋出異常的,有阻塞的,有不阻塞的,有超時的。
我們這里主要分析兩個,poll()和take()方法。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
poll()方法比較簡單:
(1)加鎖;
(2)檢查第一個元素,如果為空或者還沒到期,就返回null;
(3)如果第一個元素到期了就調用優先級隊列的poll()彈出第一個元素;
(4)解鎖。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 堆頂元素
E first = q.peek();
// 如果堆頂元素為空,說明隊列中還沒有元素,直接阻塞等待
if (first == null)
available.await();
else {
// 堆頂元素的到期時間
long delay = first.getDelay(NANOSECONDS);
// 如果小于0說明已到期,直接調用poll()方法彈出堆頂元素
if (delay <= 0)
return q.poll();
// 如果delay大于0 ,則下面要阻塞了
// 將first置為空方便gc,因為有可能其它元素彈出了這個元素
// 這里還持有著引用不會被清理
first = null; // don't retain ref while waiting
// 如果前面有其它線程在等待,直接進入等待
if (leader != null)
available.await();
else {
// 如果leader為null,把當前線程賦值給它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay時間后自動醒過來
// 醒過來后把leader置空并重新進入循環判斷堆頂元素是否到期
// 這里即使醒過來后也不一定能獲取到元素
// 因為有可能其它線程先一步獲取了鎖并彈出了堆頂元素
// 條件鎖的喚醒分成兩步,先從Condition的隊列里出隊
// 再入隊到AQS的隊列中,當其它線程調用LockSupport.unpark(t)的時候才會真正喚醒
// 關于AQS我們后面會講的^^
available.awaitNanos(delay);
} finally {
// 如果leader還是當前線程就把它置為空,讓其它線程有機會獲取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出隊后,如果leader為空且堆頂還有元素,就喚醒下一個等待的線程
if (leader == null && q.peek() != null)
// signal()只是把等待的線程放到AQS的隊列里面,并不是真正的喚醒
available.signal();
// 解鎖,這才是真正的喚醒
lock.unlock();
}
}
take()方法稍微要復雜一些:
(1)加鎖;
(2)判斷堆頂元素是否為空,為空的話直接阻塞等待;
(3)判斷堆頂元素是否到期,到期了直接調用優先級隊列的poll()彈出元素;
(4)沒到期,再判斷前面是否有其它線程在等待,有則直接等待;
(5)前面沒有其它線程在等待,則把自己當作第一個線程等待delay時間后喚醒,再嘗試獲取元素;
(6)獲取到元素之后再喚醒下一個等待的線程;
(7)解鎖;
說了那么多,是不是還是不知道怎么用呢?那怎么能行,請看下面的案例:
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<Message> queue = new DelayQueue<>();
long now = System.currentTimeMillis();
// 啟動一個線程從隊列中取元素
new Thread(()->{
while (true) {
try {
// 將依次打印1000,2000,5000,7000,8000
System.out.println(queue.take().deadline - now);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 添加5個元素到隊列中
queue.add(new Message(now + 5000));
queue.add(new Message(now + 8000));
queue.add(new Message(now + 2000));
queue.add(new Message(now + 1000));
queue.add(new Message(now + 7000));
}
}
class Message implements Delayed {
long deadline;
public Message(long deadline) {
this.deadline = deadline;
}
@Override
public long getDelay(TimeUnit unit) {
return deadline - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return String.valueOf(deadline);
}
}
是不是很簡單,越早到期的元素越先出隊。
(1)DelayQueue是阻塞隊列;
(2)DelayQueue內部存儲結構使用優先級隊列;
(3)DelayQueue使用重入鎖和條件來控制并發安全;
(4)DelayQueue常用于定時任務;
java中的線程池實現定時任務是直接用的DelayQueue嗎?
當然不是,ScheduledThreadPoolExecutor中使用的是它自己定義的內部類DelayedWorkQueue,其實里面的實現邏輯基本都是一樣的,只不過DelayedWorkQueue里面沒有使用現成的PriorityQueue,而是使用數組又實現了一遍優先級隊列,本質上沒有什么區別。
歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。