您好,登錄后才能下訂單哦!
[TOC]
LinkedTransferQueue 是一個由鏈表結構組成的wujie阻塞傳輸隊列,它是一個很多隊列的結合體(ConcurrentLinkedQueue,LinkedBlockingQueue,SynchronousQueue),在除了有基本阻塞隊列的功能(但是這個阻塞隊列沒有使用鎖)之外;隊列實現了TransferQueue接口重寫了tryTransfer和transfer方法,這組方法和SynchronousQueue公平模式的隊列類似,具有匹配的功能。
// 是否是多核
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
// 自旋次數
private static final int FRONT_SPINS = 1 << 7;
// 前驅節點正在處理,當前節點需要自旋的次數
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
// 容忍清除節點失敗次數的閾值
static final int SWEEP_THRESHOLD = 32;
static final class Node {
// 表示存放數據還是獲取數據
final boolean isData; // false if this is a request node
// 存放數據是item有值
volatile Object item; // initially non-null if isData; CASed to match
// next節點
volatile Node next;
// 等待線程
volatile Thread waiter;
// 構造
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
}
// 頭結點
transient volatile Node head;
// 尾節點
private transient volatile Node tail;
// xfer方法的入參, 不同類型的方法內部調用xfer方法時入參不同
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
注意:xfer
者幾個參數很重要。
NOW
: 表示的是立即,不需要等待的意思,用于poll和tryTransfer方法,poll 隊列為空返回,tryTransfer隊列沒有消費者,直接返回,都是不等待的。
ASYNC
:異步,offer, put, add等入隊方法,由于是×××隊列,所以不會阻塞。
SYNC
:同步表示會阻塞,take一個元素,沒有就會阻塞,transfer傳輸,必須等待消費者來消費。
TIMED
: 帶超時時間的now,會等待一定的時間后返回。
public LinkedTransferQueue() {
}
// 隊尾彈出一個元素,沒有就返回null
public E poll() {
return xfer(null, false, NOW, 0);
}
// 立即轉交一個元素給消費者,如果此時隊列沒有消費者,那就false
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
// 轉交一個元素給消費者,如果此時隊列沒有消費者,那就阻塞
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
// 清除方法
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
我們可以看見上面所有的方法都是調用的xfer方法,下面我們來詳解下這個方法。
private E xfer(E e, boolean haveData, int how, long nanos) {
// 插入元素,
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // 死循環 // restart on append race
// 從頭結點開始匹配
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData; // 獲取節點的類型
Object item = p.item; // item 的值
// 兩種情況 1.put節點 item != null isData 為true 2.take item = null false isData false
// 或者節點已經被匹配了
if (item != p && (item != null) == isData) { // unmatched // 節點沒有被匹配過
if (isData == haveData) // can't match // 類型一致,只能執行入隊操作
break;
if (p.casItem(item, e)) { // match 匹配,可能存在多線程競爭匹配
for (Node q = p; q != h;) { // 不是頭節點了,頭結點發生了改變,被匹配了,自己也匹配了,
// 下一個節點
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
// 自關聯 節點不要了
h.forgetNext();
break;
} // advance and retry
// head 已經被更新過,或者更新head失敗,需要重新判斷
// h = head == null,隊列為空
// (q = h.next) == null 最后一個節點
// 頭接單的下一個節點有沒有被匹配
// 說明值有頭結點匹配了,頭結點的next節點也匹配了,才要更新頭結點,優化手段
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 匹配成功
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
// 已經匹配就往下走
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
/* // xfer方法的入參, 不同類型的方法內部調用xfer方法時入參不同
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer*/
// 模式不同只能入隊啦
if (how != NOW) { // No matches available
if (s == null)
// 創建一個新節點
s = new Node(e, haveData);
// tryAppend 給tail追加節點
Node pred = tryAppend(s, haveData);
// 不能添加到這個節點 ,重新循環
if (pred == null)
continue retry;
// lost race vs opposite mode
// ASYNC 添加成功返回了
// SYNC TIMED 需要阻塞線程
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
// now 是立即返回
return e; // not waiting
}
}
分析:
casItem
設置item,完成數據的傳遞,然后判斷q != h
,q發生變化說明頭結點被別的線程匹配了,這里可能多個線程來匹配,所以頭節點是可能發生變化的,我們不是每一次都更新頭節點,而是當頭節點被匹配,頭結點的下一個節點也被匹配才會更新頭節點,這是一種優化手段;當我們匹配成功了,喚醒匹配的節點LockSupport.unpark(p.waiter)
,然后返回。NOW
,NOW
對應的方法是poll
和tryTransfer
,是不會等待的,也不會入隊的,所以直接返回;接下來的幾種狀態都是要入隊的,所以創建一個s = new Node(e, haveData)
,然后調用tryAppend
方法入隊追加到隊尾,返回前置節點;此時在判斷how是ASYNC
還是SYNC
和TIMED
,ASYNC
不要等待所以直接返回,SYNC
和`TIMED
是需要等待的,所以調用awaitMatch
方法等待,直到匹配成功或者超時時間到了。入隊尾
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append 遍歷
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) { // 還沒有節點
if (casHead(null, s))
return s; // initialize
}
// 是否符合入隊要求
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
// p.next 不為null,說明p真正的尾節點,p需要向后推進
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
// p.next = null,說明找到最后一個節點了,可以入隊了
// 可能存在競爭,失敗,就繼續下一個節點
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
// 入隊成功了
if (p != t) { // 說明此時的入隊節點的前節點p和尾節點有距離 是否需要更新尾節點
// update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 被匹配過了
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
// 被中斷 超時時間到了
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);//
return e;
}
// 初始化自旋
if (spins < 0) {
// establish spins at/near front
//初始化自旋次數,即計算自旋次數
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
// 自旋遞減
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
// 自旋次數到了 就會阻塞
// 設置阻塞線程
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
// 超時阻塞
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
// 阻塞
else {
LockSupport.park(this);
}
}
}
LinkedTransferQueue 是很多隊列的集合體,雖然方法基本一樣,但是實現卻是大大的不同,我們以前的阻塞隊列幾乎都是使用鎖來控制入隊和出隊的,LinkedTransferQueue 沒有使用鎖,入隊和出隊都是使用自旋加cas實現的,比鎖的消耗更低,使用了很多的優化(控制自旋次數等),性能更高;隊列是wujie的,所以使用時一定要注意內存的問題。
參考《Java 并發編程的藝術》
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。