您好,登錄后才能下訂單哦!
????AbstractQueuedSynchronizer (AQS)類如其名,抽象的隊列式同步容器,AQS定義類一套多線程訪問共享資源的同步器,許多同步類的實現都依賴于它,比如之前學習的ReentrantLock/Semaphore/CountDownLatch。
AQS阻塞隊列.png
1。自定義同步器在實現時只需要實現共享資源state的獲取于釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了,自定義同步容器實現時主要實現以下幾種方法:
2.isHeldExclusively():該線程是否正在獨占資源,只有用到condition才需要取實現它。
3.tryAcquire(int):獨占方式,嘗試獲取資源,成功則返回true,失敗則返回false。
4.tryRelease(int):獨占方式,嘗試釋放資源,成功則返回true,失敗則返回false。
5.tryAcquireShared(int):共享方式,嘗試獲取資源,附屬表示獲取失敗,0表示獲取成功,但是沒有剩余資源可用,其他線程不能在獲取,正數表示獲取成功,并且有剩余資源,也就是自己可以重復獲取(可重入)。
6.tryReleaseShare(int):共享方式。嘗試釋放資源,成功返回true,失敗返回false。
7.以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()的時候,會調用tryAcquire()獨占該鎖并將state+1。此后其他線程在tryAcquire()獨占該鎖并將state+1。此后表示其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(幾鎖釋放)為止,其他線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態的。
2.1 Reentrantlock類結構
public class ReentrantLock implements Lock, java.io.Serializable
可以看出Reentrantlock 實現類lock接口,首先看下這個lock接口。
public interface Lock {
//普通的獲取鎖的方法,lock會阻塞直到成功
void lock();
//與lock不同的時,它可以相應中斷,
void lockInterruptibly() throws InterruptedException;
//嘗試獲取鎖,立即返回,不阻塞,成功返回true,失敗返回false
boolean tryLock();
//先嘗試獲取鎖,如果成功則返回true,失敗則阻塞等待,等待時間由參數決定,在等待的同時相應中斷,拋出中斷異常,如果在等待的過程中獲得類鎖則返回true,否則直到時間超時,則返回false。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//普通釋放鎖的方法
void unlock();
//新建一個條件,lock可以關聯多個條件,關于條件在以后篇幅中記錄
Condition newCondition();
}
????可以看出,顯示鎖,與synchronied相比,支持非阻塞的方式獲取鎖,可以響應中斷,可以限時,這使得它非常的靈活。
?????現在回到Reentrantlock類,可以看到其內部基本上定義類三個內部類:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
.....
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
static final class NonfairSync extends Sync {
....
}
....
static final class FairSync extends Sync {
.....
}
.....
}
??????很顯然從上面的類結構,我們可以得出jdk利用CAS算法和AQS實現類ReentrantLock。
2.2. 構造函數分析
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
???ReentrantLock出于性能考慮,默認采用非公平鎖。
2.3. 獲取鎖
2.3.1. 非公平模式下的加鎖
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//直接通過CAS算法,通過改變共享資源-state值,成功則設置當前線程為持有鎖的線程(獨占),失敗則調用頂層AQS的acquire方法,再次嘗試,失敗則將線程放到阻塞隊列
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
2.3.2. 公平模式下的加鎖
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
//獲取鎖
final void lock() {
acquire(1); //在這里調用的時AQS頂層方法,獲取失敗則將線程放到阻塞隊列末尾
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//當前資源還沒有被其他線程鎖定
if (c == 0) {
//沒有其他線程正在競爭當前的臨界資源,并且通過CAS加鎖成功,則設置當前的線程為獨占此資源的線程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果此時臨界資源已經被鎖定,則先判斷持有者是否時當前線程
else if (current == getExclusiveOwnerThread()) {
//如果是持有者,則可重復獲取鎖,同時更新狀態值,注意加幾次鎖就要釋放幾次,直到為0
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
2.3.3、acquireInterruptibly解析
public void lockInterruptibly() throws InterruptedException {
//調用AQS頂層方法
sync.acquireInterruptibly(1);
}
//AQS頂層方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//中斷響應
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大致過程如下:
(1)如果線程已經中斷則直接拋出異常
(2)如果線程還沒中斷,則通過自旋方式,申請鎖直至到當前線程獲得鎖,或失敗,然后響應中斷。
2.4 釋放鎖
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
其釋放獨占鎖的過程大致如下:
(1)先獲取當前資源狀態,減去釋放release,一般是1;
(2)判斷當前線程是否鎖定資源的線程,如果不是則拋出異常;
(3)假如資源狀態值為0,則說明鎖已經釋放,將獨占線程至空,更新狀態值,返回設置成功或失敗
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。