91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

死磕 java同步系列之ReentrantReadWriteLock源碼解析

發布時間:2020-07-16 17:00:53 來源:網絡 閱讀:341 作者:彤哥讀源碼 欄目:編程語言

問題

(1)讀寫鎖是什么?

(2)讀寫鎖具有哪些特性?

(3)ReentrantReadWriteLock是怎么實現讀寫鎖的?

(4)如何使用ReentrantReadWriteLock實現高效安全的TreeMap?

簡介

讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分為讀訪問和寫訪問,多個線程可以同時對共享資源進行讀訪問,但是同一時間只能有一個線程對共享資源進行寫訪問,使用讀寫鎖可以極大地提高并發量。

特性

讀寫鎖具有以下特性:

是否互斥

可以看到,讀寫鎖除了讀讀不互斥,讀寫、寫讀、寫寫都是互斥的。

那么,ReentrantReadWriteLock是怎么實現讀寫鎖的呢?

類結構

在看源碼之前,我們還是先來看一下ReentrantReadWriteLock這個類的主要結構。

死磕 java同步系列之ReentrantReadWriteLock源碼解析

ReentrantReadWriteLock中的類分成三個部分:

(1)ReentrantReadWriteLock本身實現了ReadWriteLock接口,這個接口只提供了兩個方法readLock()writeLock()

(2)同步器,包含一個繼承了AQS的Sync內部類,以及其兩個子類FairSync和NonfairSync;

(3)ReadLock和WriteLock兩個內部類實現了Lock接口,它們具有鎖的一些特性。

源碼分析

主要屬性

// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;

維護了讀鎖、寫鎖和同步器。

主要構造方法

// 默認構造方法
public ReentrantReadWriteLock() {
    this(false);
}
// 是否使用公平鎖的構造方法
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

它提供了兩個構造方法,默認構造方法使用的是非公平鎖模式,在構造方法中初始化了讀鎖和寫鎖。

獲取讀鎖和寫鎖的方法

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

屬性中的讀鎖和寫鎖是私有屬性,通過這兩個方法暴露出去。

下面我們主要分析讀鎖和寫鎖的加鎖、解鎖方法,且都是基于非公平模式的。

ReadLock.lock()

// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {
    sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
public final void acquireShared(int arg) {
    // 嘗試獲取共享鎖(返回1表示成功,返回-1表示失敗)
    if (tryAcquireShared(arg) < 0)
        // 失敗了就可能要排隊
        doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 狀態變量的值
    // 在讀寫鎖模式下,高16位存儲的是共享鎖(讀鎖)被獲取的次數,低16位存儲的是互斥鎖(寫鎖)被獲取的次數
    int c = getState();
    // 互斥鎖的次數
    // 如果其它線程獲得了寫鎖,直接返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 讀鎖被獲取的次數
    int r = sharedCount(c);

    // 下面說明此時還沒有寫鎖,嘗試去更新state的值獲取讀鎖
    // 讀者是否需要排隊(是否是公平模式)
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 獲取讀鎖成功
        if (r == 0) {
            // 如果之前還沒有線程獲取讀鎖
            // 記錄第一個讀者為當前線程
            firstReader = current;
            // 第一個讀者重入的次數為1
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 如果有線程獲取了讀鎖且是當前線程是第一個讀者
            // 則把其重入次數加1
            firstReaderHoldCount++;
        } else {
            // 如果有線程獲取了讀鎖且當前線程不是第一個讀者
            // 則從緩存中獲取重入次數保存器
            HoldCounter rh = cachedHoldCounter;
            // 如果緩存不屬性當前線程
            // 再從ThreadLocal中獲取
            // readHolds本身是一個ThreadLocal,里面存儲的是HoldCounter
            if (rh == null || rh.tid != getThreadId(current))
                // get()的時候會初始化rh
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                // 如果rh的次數為0,把它放到ThreadLocal中去
                readHolds.set(rh);
            // 重入的次數加1(初始次數為0)
            rh.count++;
        }
        // 獲取讀鎖成功,返回1
        return 1;
    }
    // 通過這個方法再去嘗試獲取讀鎖(如果之前其它線程獲取了寫鎖,一樣返回-1表示失敗)
    return fullTryAcquireShared(current);
}
// AbstractQueuedSynchronizer.doAcquireShared()
private void doAcquireShared(int arg) {
    // 進入AQS的隊列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 當前節點的前一個節點
            final Node p = node.predecessor();
            // 如果前一個節點是頭節點(說明是第一個排隊的節點)
            if (p == head) {
                // 再次嘗試獲取讀鎖
                int r = tryAcquireShared(arg);
                // 如果成功了
                if (r >= 0) {
                    // 頭節點后移并傳播
                    // 傳播即喚醒后面連續的讀節點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 沒獲取到讀鎖,阻塞并等待被喚醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
    // h為舊的頭節點
    Node h = head;
    // 設置當前節點為新頭節點
    setHead(node);

    // 如果舊的頭節點或新的頭節點為空或者其等待狀態小于0(表示狀態為SIGNAL/PROPAGATE)
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // 需要傳播
        // 取下一個節點
        Node s = node.next;
        // 如果下一個節點為空,或者是需要獲取讀鎖的節點
        if (s == null || s.isShared())
            // 喚醒下一個節點
            doReleaseShared();
    }
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 這個方法只會喚醒一個節點
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果頭節點狀態為SIGNAL,說明要喚醒下一個節點
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 喚醒下一個節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 把頭節點的狀態改為PROPAGATE成功才會跳到下面的if
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果喚醒后head沒變,則跳出循環
        if (h == head)                   // loop if head changed
            break;
    }
}

看完【死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖】的分析再看這章的內容應該會比較簡單,中間一樣的方法我們這里直接跳過了。

我們來看看大致的邏輯:

(1)先嘗試獲取讀鎖;

(2)如果成功了直接結束;

(3)如果失敗了,進入doAcquireShared()方法;

(4)doAcquireShared()方法中首先會生成一個新節點并進入AQS隊列中;

(5)如果頭節點正好是當前節點的上一個節點,再次嘗試獲取鎖;

(6)如果成功了,則設置頭節點為新節點,并傳播;

(7)傳播即喚醒下一個讀節點(如果下一個節點是讀節點的話);

(8)如果頭節點不是當前節點的上一個節點或者(5)失敗,則阻塞當前線程等待被喚醒;

(9)喚醒之后繼續走(5)的邏輯;

在整個邏輯中是在哪里連續喚醒讀節點的呢?

答案是在doAcquireShared()方法中,在這里一個節點A獲取了讀鎖后,會喚醒下一個讀節點B,這時候B也會獲取讀鎖,然后B繼續喚醒C,依次往復,也就是說這里的節點是一個喚醒一個這樣的形式,而不是一個節點獲取了讀鎖后一次性喚醒后面所有的讀節點。

死磕 java同步系列之ReentrantReadWriteLock源碼解析

ReadLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {
    sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {
    // 如果嘗試釋放成功了,就喚醒下一個節點
    if (tryReleaseShared(arg)) {
        // 這個方法實際是喚醒下一個節點
        doReleaseShared();
        return true;
    }
    return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 如果第一個讀者(讀線程)是當前線程
        // 就把它重入的次數減1
        // 如果減到0了就把第一個讀者置為空
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 如果第一個讀者不是當前線程
        // 一樣地,把它重入的次數減1
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        // 共享鎖獲取的次數減1
        // 如果減為0了說明完全釋放了,才返回true
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行為跟方法名有點不符,實際是喚醒下一個節點
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果頭節點狀態為SIGNAL,說明要喚醒下一個節點
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 喚醒下一個節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // 把頭節點的狀態改為PROPAGATE成功才會跳到下面的if
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果喚醒后head沒變,則跳出循環
        if (h == head)                   // loop if head changed
            break;
    }
}

解鎖的大致流程如下:

(1)將當前線程重入的次數減1;

(2)將共享鎖總共被獲取的次數減1;

(3)如果共享鎖獲取的次數減為0了,說明共享鎖完全釋放了,那就喚醒下一個節點;

如下圖,ABC三個節點各獲取了一次共享鎖,三者釋放的順序分別為ACB,那么最后B釋放共享鎖的時候tryReleaseShared()才會返回true,進而才會喚醒下一個節點D。

死磕 java同步系列之ReentrantReadWriteLock源碼解析

WriteLock.lock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {
    sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
    // 先嘗試獲取鎖
    // 如果失敗,則會進入隊列中排隊,后面的邏輯跟ReentrantLock一模一樣了
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 狀態變量state的值
    int c = getState();
    // 互斥鎖被獲取的次數
    int w = exclusiveCount(c);
    if (c != 0) {
        // 如果c!=0且w==0,說明共享鎖被獲取的次數不為0
        // 這句話整個的意思就是
        // 如果共享鎖被獲取的次數不為0,或者被其它線程獲取了互斥鎖(寫鎖)
        // 那么就返回false,獲取寫鎖失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 溢出檢測
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 到這里說明當前線程已經獲取過寫鎖,這里是重入了,直接把state加1即可
        setState(c + acquires);
        // 獲取寫鎖成功
        return true;
    }
    // 如果c等于0,就嘗試更新state的值(非公平模式writerShouldBlock()返回false)
    // 如果失敗了,說明獲取寫鎖失敗,返回false
    // 如果成功了,說明獲取寫鎖成功,把自己設置為占有者,并返回true
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
// 獲取寫鎖失敗了后面的邏輯跟ReentrantLock是一致的,進入隊列排隊,這里就不列源碼了

寫鎖獲取的過程大致如下:

(1)嘗試獲取鎖;

(2)如果有讀者占有著讀鎖,嘗試獲取寫鎖失敗;

(3)如果有其它線程占有著寫鎖,嘗試獲取寫鎖失敗;

(4)如果是當前線程占有著寫鎖,嘗試獲取寫鎖成功,state值加1;

(5)如果沒有線程占有著鎖(state==0),當前線程嘗試更新state的值,成功了表示嘗試獲取鎖成功,否則失敗;

(6)嘗試獲取鎖失敗以后,進入隊列排隊,等待被喚醒;

(7)后續邏輯跟ReentrantLock是一致;

WriteLock.unlock()

// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {
    sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
    // 如果嘗試釋放鎖成功(完全釋放鎖)
    // 就嘗試喚醒下一個節點
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {
    // 如果寫鎖不是當前線程占有著,拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 狀態變量的值減1
    int nextc = getState() - releases;
    // 是否完全釋放鎖
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    // 設置狀態變量的值
    setState(nextc);
    // 如果完全釋放了寫鎖,返回true
    return free;
}

寫鎖釋放的過程大致為:

(1)先嘗試釋放鎖,即狀態變量state的值減1;

(2)如果減為0了,說明完全釋放了鎖;

(3)完全釋放了鎖才喚醒下一個等待的節點;

總結

(1)ReentrantReadWriteLock采用讀寫鎖的思想,能提高并發的吞吐量;

(2)讀鎖使用的是共享鎖,多個讀鎖可以一起獲取鎖,互相不會影響,即讀讀不互斥;

(3)讀寫、寫讀和寫寫是會互斥的,前者占有著鎖,后者需要進入AQS隊列中排隊;

(4)多個連續的讀線程是一個接著一個被喚醒的,而不是一次性喚醒所有讀線程;

(5)只有多個讀鎖都完全釋放了才會喚醒下一個寫線程;

(6)只有寫鎖完全釋放了才會喚醒下一個等待者,這個等待者有可能是讀線程,也可能是寫線程;

彩蛋

(1)如果同一個線程先獲取讀鎖,再獲取寫鎖會怎樣?

死磕 java同步系列之ReentrantReadWriteLock源碼解析

分析上圖中的代碼,在tryAcquire()方法中,如果讀鎖被獲取的次數不為0(c != 0 && w == 0),返回false,返回之后外層方法會讓當前線程阻塞。

可以通過下面的方法驗證:

readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();

運行程序后會發現代碼停止在writeLock.lock();,當然,你也可以打個斷點跟蹤進去看看。

(2)如果同一個線程先獲取寫鎖,再獲取讀鎖會怎樣?

死磕 java同步系列之ReentrantReadWriteLock源碼解析

分析上面的代碼,在tryAcquireShared()方法中,第一個紅框處并不會返回,因為不滿足getExclusiveOwnerThread() != current;第二個紅框處如果原子更新成功就說明獲取了讀鎖,然后就會執行第三個紅框處的代碼把其重入次數更改為1。

可以通過下面的方法驗證:

writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();

你可以打個斷點跟蹤一下看看。

(3)死鎖了么?

通過上面的兩個例子,我們可以感受到同一個線程先讀后寫和先寫后讀是完全不一樣的,為什么不一樣呢?

先讀后寫,一個線程占有讀鎖后,其它線程還是可以占有讀鎖的,這時候如果在其它線程占有讀鎖之前讓自己占有了寫鎖,其它線程又不能占有讀鎖了,這段程序會非常難實現,邏輯也很奇怪,所以,設計成只要一個線程占有了讀鎖,其它線程包括它自己都不能再獲取寫鎖。

先寫后讀,一個線程占有寫鎖后,其它線程是不能占有任何鎖的,這時候,即使自己占有一個讀鎖,對程序的邏輯也不會有任何影響,所以,一個線程占有寫鎖后是可以再占有讀鎖的,只是這個時候其它線程依然無法獲取讀鎖。

如果你仔細思考上面的邏輯,你會發現一個線程先占有讀鎖后占有寫鎖,會有一個很大的問題——鎖無法被釋放也無法被獲取了。這個線程先占有了讀鎖,然后自己再占有寫鎖的時候會阻塞,然后它就自己把自己搞死了,進而把其它線程也搞死了,它無法釋放鎖,其它線程也無法獲得鎖了。

這是死鎖嗎?似乎不是,死鎖的定義是線程A占有著線程B需要的資源,線程B占有著線程A需要的資源,兩個線程相互等待對方釋放資源,經典的死鎖例子如下:

Object a = new Object();
Object b = new Object();

new Thread(()->{
    synchronized (a) {
        LockSupport.parkNanos(1000000);
        synchronized (b) {

        }
    }
}).start();

new Thread(()->{
    synchronized (b) {
        synchronized (a) {

        }
    }
}).start();

簡單的死鎖用jstack是可以看到的:

"Thread-1":
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40)
        - waiting to lock <0x000000076baa9068> (a java.lang.Object)
        - locked <0x000000076baa9078> (a java.lang.Object)
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:748)
"Thread-0":
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32)
        - waiting to lock <0x000000076baa9078> (a java.lang.Object)
        - locked <0x000000076baa9068> (a java.lang.Object)
        at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.

(4)如何使用ReentrantReadWriteLock實現一個高效安全的TreeMap?

class SafeTreeMap {
    private final Map<String, Object> m = new TreeMap<String, Object>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    public Object get(String key) {
        readLock.lock();
        try {
            return m.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Object put(String key, Object value) {
        writeLock.lock();
        try {
            return m.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
}

推薦閱讀

  1. 死磕 java同步系列之ReentrantLock VS synchronized

  2. 死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

  3. 死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

  4. 死磕 java同步系列之AQS起篇

  5. 死磕 java同步系列之自己動手寫一個鎖Lock

  6. 死磕 java魔法類之Unsafe解析

  7. 死磕 java同步系列之JMM(Java Memory Model)

  8. 死磕 java同步系列之volatile解析

  9. 死磕 java同步系列之synchronized解析

歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

死磕 java同步系列之ReentrantReadWriteLock源碼解析

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

措美县| 苏尼特右旗| 阳高县| 德清县| 济阳县| 阿拉善盟| 达孜县| 富源县| 襄城县| 高平市| 赤水市| 黑龙江省| 开原市| 永春县| 新平| 古田县| 武穴市| 图木舒克市| 新兴县| 曲沃县| 响水县| 固镇县| 昌都县| 武功县| 长宁县| 曲水县| 叙永县| 赤壁市| 鄄城县| 巴彦县| 瑞丽市| 南康市| 寿阳县| 普宁市| 孙吴县| 贺州市| 大同市| 汶上县| 白朗县| 历史| 商洛市|