您好,登錄后才能下訂單哦!
本篇內容介紹了“Java多線程之鎖怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
首先強調一點:Java多線程的鎖都是基于對象的,Java中的每一個對象都可以作為一個鎖。同時,類鎖也是對象鎖,類是Class對象
核心思想
關鍵字在實例方法上,鎖為當前實例
關鍵字在靜態方法上,鎖為當前Class對象
關鍵字在代碼塊上,鎖為括號里面的對象
在進行線程執行順序的時候,如果添加了線程睡眠,那么就要看鎖的對象是誰,同一把鎖 / 非同一把鎖是不一樣的
synchronized 是Java提供的關鍵字,用來保證原子性的
synchronized的作用域如下
作用在普通方法上,此方法為原子方法:也就是說同一個時刻只有一個線程可以進入,其他線程必須在方法外等待,此時鎖是對象
作用在靜態方法上,此方法為原子方法:也就是說同一個時刻只有一個線程可以進入,其他線程必須在方法外等待,此時鎖是當前的Class對象
作用在代碼塊上,此代碼塊是原子操作:也就是說同一個時刻只有線程可以進入,其他線程必須在方法外等待,鎖是 synchronized(XXX) 里面的 XXX
先看一段簡單的代碼
public class SynchronizedTest { public static void main(String[] args) { test1(); test2(); } // 使用synchronized修飾的方法 public synchronized static void test1() { System.out.println("SynchronizedTest.test1"); } // 使用synchronized修飾的代碼塊 public static void test2() { synchronized (SynchronizedTest.class) { System.out.println("SynchronizedTest.test2"); } } }
執行之后,對其進行執行javap -v命令反編譯
// 省略啰嗦的代碼 public class cn.zq.sync.SynchronizedTest minor version: 0 major version: 52 flags: ACC_PUBLIC, ACC_SUPER { // 源碼 public cn.zq.sync.SynchronizedTest(); descriptor: ()V flags: ACC_PUBLIC // main 方法 public static void main(java.lang.String[]); descriptor: ([Ljava/lang/String;)V flags: ACC_PUBLIC, ACC_STATIC // synchronized 修飾的靜態方法 test1() public static synchronized void test1(); descriptor: ()V // 在這里我們可以看到 flags 中有一個 ACC_SYNCHRONIZED // 這個就是一個標記符這是 保證原子性的關鍵 // 當方法調用的時候,調用指令將會檢查方法的 ACC_SYNCHRONIZED 訪問標記符是否被設置 // 如果設置了,線程將先獲取 monitor,獲取成功之后才會執行方法體,方法執行之后,釋放monitor // 在方法執行期間,其他任何線程都無法在獲得一個 monitor 對象,本質上沒區別。 flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED Code: stack=2, locals=0, args_size=0 0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #5 // String SynchronizedTest.test1 5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return LineNumberTable: line 17: 0 line 18: 8 // 代碼塊使用的 synchronized public static void test2(); descriptor: ()V flags: ACC_PUBLIC, ACC_STATIC Code: stack=2, locals=2, args_size=0 0: ldc #7 // class cn/zq/sync/SynchronizedTest 2: dup 3: astore_0 // 這個 monitorenter 是一個指令 // 每個對象都有一個監視器鎖(monitor),當monitor被占用的時候就會處于鎖定狀態 // 線程執行monitorenter的時候,嘗試獲取monitor的鎖。過程如下 // 1.任何monitor進入數為0,則線程進入并設置為1,此線程就是monitor的擁有者 // 2.如果線程已經占用,當前線程再次進入的時候,會將monitor的次數+1 // 3.如何其他的線程已經占用了monitor,則線程進阻塞狀態,直到monitor的進入數為0 // 4.此時其他線程才能獲取當前代碼塊的執行權 4: monitorenter 5: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream; 8: ldc #8 // String SynchronizedTest.test2 10: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 13: aload_0 // 執行monitorexit這條指令的線程必須是擁有monitor的 // 執行的之后,monitor的進入數-1.如果為0,那么線程就退出 monitor,不再是此代碼塊的執行者 // 此時再由其他的線程獲得所有權 // 其實 wait/notify 等方法也依賴于monitor對象, // 所以只有在同步方法或者同步代碼塊中才可以使用,否則會報錯 java.lang.IllegalMonitorstateException 異常 14: monitorexit 15: goto 23 18: astore_1 19: aload_0 20: monitorexit 21: aload_1 22: athrow 23: return Exception table: from to target type 5 15 18 any 18 21 18 any LineNumberTable: line 21: 0 line 22: 5 line 23: 13 line 24: 23 StackMapTable: number_of_entries = 2 frame_type = 255 /* full_frame */ offset_delta = 18 locals = [ class java/lang/Object ] stack = [ class java/lang/Throwable ] frame_type = 250 /* chop */ offset_delta = 4 } SourceFile: "SynchronizedTest.java"
總結:
使用synchronized修飾的同步方法
通過反編譯我們可以看到,被synchronized修飾的方法,其中的 flags中有一個標記:ACC_SYNCHRONIZED
當線程執行方法的時候,會先去檢查是否有這樣的一個標記,如果有的話,說明就是一個同步方法,此時會為當前線程設置 monitor ,獲取成功之后才會去執行方法體,執行完畢之后釋放monitor
使用synchronized修飾的代碼塊
通過反編譯我們看到,在代碼塊的兩側有JVM指令,在進入代碼塊之前指令是 monitorenter
當線程執行到代碼塊的時候,會先拿到monitor(初始值為0),然后線程將其設置為1,此時當前線程獨占monitor
如果當前持有monitor的線程再次進入monitor,則monitor的值+1,當其退出的時候,monitor的次數-1
當線程線程退出一次monitor的時候,會執行monitorexit指令,但是只有持有monitor的線程才能獲取并執行monitorexit指令,當當前線程monitor為0的時候,當前線程退出持有鎖
此時其他線程再來爭搶
但是為什么要有兩個 monitorexit呢?
這個時候我們會發現synchronized是可重入鎖,其實現原理就是monitor的個數增加和減少
同時wait / notify方法的執行也會依賴 monitor,所以wait和notify方法必須放在同步代碼塊中,否則會報錯 java.lang.IllegalMonitorstateException
因為方法區域很大,所以設置一個標記,現在執行完判斷之后,就全部鎖起來,而代碼塊不確定大小,就需要細化monitor的范圍
ReentrantLock是Lock接口的一個實現類
在ReentrantLock內部有一個抽象靜態內部類Sync
其中一個是 NonfairSync(非公平鎖),另外一個是 FairSync (公平鎖),二者都實現了此抽象內部類Sync,ReentrantLock默認使用的是 非公平鎖 ,我們看一下源碼:
public class ReentrantLock implements Lock, java.io.Serializable { // 鎖的類型 private final Sync sync; // 抽象靜態類Sync繼承了AbstractQueueSynchroniser [這個在下面進行解釋] abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; // 抽象加鎖方法 abstract void lock(); // 不公平的 tryLock 也就是不公平的嘗試獲取 final boolean nonfairTryAcquire(int acquires) { // 獲取當前線程對象 final Thread current = Thread.currentThread(); // 獲取線程的狀態 int c = getState(); // 根據線程的不同狀態執行不同的邏輯 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 獲取獨占模式的線程的當前鎖的狀態 else if (current == getExclusiveOwnerThread()) { // 獲取新的層級大小 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 設置鎖的狀態 setState(nextc); return true; } return false; } // 嘗試釋放方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 返回當前線程是不是獨占的 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } // 返回 ConditionObject 對象 final ConditionObject newCondition() { return new ConditionObject(); } // 獲得獨占的線程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } // 獲得獨占線程的狀態 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } // 判斷是否是加鎖的 final boolean isLocked() { return getState() != 0; } // 序列化 private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); } } // 非公平鎖繼承了Sync static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; // 加鎖操作 final void lock() { // 判斷是不是第一次加鎖 底層調用 Unsafe的compareAndSwapInt()方法 if (compareAndSetState(0, 1)) // 設置為獨占鎖 setExclusiveOwnerThread(Thread.currentThread()); // 如果不是第一次加鎖,則調用 acquire 方法在加一層鎖 else acquire(1); } // 返回嘗試加鎖是否成功 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } // 公平鎖 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 加鎖操作,直接設置為1 final void lock() { acquire(1); } // 嘗試加鎖 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } }
Lock接口
public interface Lock { // 加鎖 void lock(); // 不斷加鎖 void lockInterruptibly() throws InterruptedException; // 嘗試加鎖 boolean tryLock(); // 嘗試加鎖,具有超時時間 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 釋放鎖 void unlock(); // Condition 對象 Condition newCondition(); }
Condition接口
public interface Condition { // 等待 void await() throws InterruptedException; // 超時等待 boolean await(long time, TimeUnit unit) throws InterruptedException; // 超時納秒等待 long awaitNanos(long nanosTimeout) throws InterruptedException; // 可中斷等待 void awaitUninterruptibly(); // 等待死亡 boolean awaitUntil(Date deadline) throws InterruptedException; // 指定喚醒 void signal(); // 喚醒所有 void signalAll(); }
為什么官方提供的是非公平鎖,因為如果是公平鎖,假如一個線程需要執行很久,那執行效率會大大降低
ReentrantLock的其他方法
public class ReentrantLock implements Lock, java.io.Serializable { // 鎖的類型 private final Sync sync; // 默認是非公平鎖 public ReentrantLock() { sync = new NonfairSync(); } // 有參構造,可以設置鎖的類型 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } // 加鎖 public void lock() { sync.lock(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } // 解鎖 調用release() 因為是重入鎖,所以需要減少重入的層數 public void unlock() { sync.release(1); } // 返回Condition對象 ,用來執行線程的喚醒等待等操作 public Condition newCondition() { return sync.newCondition(); } // 獲取鎖的層數 public int getHoldCount() { return sync.getHoldCount(); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } // 是否加鎖 public boolean isLocked() { return sync.isLocked(); } // 是否是公平鎖 public final boolean isFair() { return sync instanceof FairSync; } // 獲取獨占鎖 protected Thread getOwner() { return sync.getOwner(); } // 查詢是否有任何線程正在等待獲取此鎖 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } // 查詢給定線程是否正在等待獲取此鎖 public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } // 獲取隊列的長度 public final int getQueueLength() { return sync.getQueueLength(); } // 返回一個包含可能正在等待獲取該鎖的線程的集合 protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } // 判斷是否等待 public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } // 獲得等待隊列的長度 public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } // 獲取正在等待的線程集合 protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); } // toString() public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } }
總結:
1.ReentrantLock是獨占鎖
2.ReentrantLock是可重入鎖
3.底層使用AbstractQueuedSynchronizer實現
4.synchronized 和 ReentrantLock的區別
synchronized是是關鍵字,可以作用在靜態方法、普通方法、靜態代碼塊,底層使用monitor實現,synchronized是內置鎖,是悲觀鎖,其發生異常會中斷鎖,所以不會發生死鎖。是非中斷鎖
ReentrantLock是類,作用在方法中,其比synchronized更加靈活,但是必須手動加鎖釋放鎖,是樂觀鎖,發生異常不會中斷鎖,必須在finally中釋放鎖,是可中斷的,使用Lock的讀鎖可以提供效率
AQS:AbstractQueueSynchronizer => 抽象隊列同步器
AQS定義了一套多線程訪問共享資源的同步器框架,很多同步器的實現都依賴AQS。如ReentrantLock、Semaphore、CountDownLatch …
首先看一下AQS隊列的框架
它維護了一個volatile int state (代表共享資源)和一個FIFO線程等待隊列(多線程爭搶資源被阻塞的時候會先進進入此隊列),這里的volatile是核心。在下個部分進行講解~
state的訪問方式有三種
getState()
setState()
compareAndSetState()
AQS定義了兩種資源共享方式:Exclusive(獨占,只有一個線程可以執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore、CountdownLatch)
不同的自定義同步器爭用共享資源的方式也不同。自定義的同步器在實現的時候只需要實現共享資源的獲取和釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊)AQS在頂層已經實現好了。
自定義同步器時需要實現以下方法即可
isHeldExclusively():該線程是否正在獨占資源。只有用的Condition才需要去實現它
tryAcquire(int):獨占方式。嘗試獲取資源,成功返回true,否則返回false
tryRelease(int):獨占方式。嘗試釋放資源,成功返回true,否則返回false
tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗,0表示成功但沒有剩余可用資源,正數表示成功,且還有剩余資源
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待節點返回true,否則返回fasle
以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨占該鎖,然后將state+1,此后其他線程在調用tryAcquire()就會失敗,直到A線程unlock()到state為0為止,其他線程才有機會獲取該鎖。當前在A釋放鎖之前,A線程是可以重復獲取此鎖的(state)會累加。這就是可重入,但是獲取多少次,就要釋放多少次。
再和CountdownLock為例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程的個數一致)。這N個子線程是并行執行的,每個子線程執行完之后countDown一次。state會CAS-1。等到所有的子線程都執行完后(即state=0),會upark()主調用線程,然后主調用線程就會從await()函數返回,繼續剩余動作
一般來說,自定義同步器要么是獨占方法,要么是共享方式,也只需要實現tryAcquire - tryRelease,tryAcquireShared - tryReleaseShared 中的一組即可,但是AQS也支持自定義同步器同時實現獨占鎖和共享鎖兩種方式,如:ReentrantReadWriteLock
AQS的源碼
AbstractQueueSynchronizer 繼承了 AbstractOwnableSynchronizer
AbstractOwnableSynchronizer類
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } // 獨占模式當前的擁有者 private transient Thread exclusiveOwnerThread; // 設置獨占模式當前的擁有者 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } // 得到獨占模式當前的擁有者 protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
AbstractQueueSynchronizer類
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; protected AbstractQueuedSynchronizer() { } // AbstractQueueSynchronizer 中的靜態內部類 Node 節點 static final class Node { // 指示節點正在以共享模式等待的標記 static final Node SHARED = new Node(); // 指示節點正在以獨占模式等待的標記 static final Node EXCLUSIVE = null; // 表示線程已經取消 static final int CANCELLED = 1; // 表示線程之后需要釋放 static final int SIGNAL = -1; // 表示線程正在等待條件 static final int CONDITION = -2; // 指示下一個 acquireShared 應該無條件傳播 static final int PROPAGATE = -3; // 狀態標記 volatile int waitStatus; // 隊列的前一個節點 volatile Node prev; // 隊列的后一個節點 volatile Node next; // 線程 volatile Thread thread; // 下一個正在等待的節點 Node nextWaiter; // 判斷是否時共享的 final boolean isShared() { return nextWaiter == SHARED; } // 返回上一個節點,不能為null,為null拋出空指針異常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 構造 Node() { // Used to establish initial head or SHARED marker } // 有參構造,用來添加線程的隊列 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 有參構造,根據等待條件使用 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } // 頭節點 private transient volatile Node head; // 尾節點 private transient volatile Node tail; // 狀態 private volatile int state; // 獲取當前的狀態 protected final int getState() { return state; } //設置當前的狀態 protected final void setState(int newState) { state = newState; } // 比較設置當前的狀態 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // 納秒數,使之更快的旋轉 static final long spinForTimeoutThreshold = 1000L; // 將節點插入隊列 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // 加一個等待節點 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } // 設置頭節點 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } // 如果存在后繼節點,就喚醒 private void unparkSuccessor(Node node) { // 獲得節點的狀態 int ws = node.waitStatus; // 如果為負數,就執行比較并設置方法設置狀態 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 喚醒后面的節點 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } // 共享模式的釋放動作,并且向后繼節點發出信號 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } // 設置隊列的頭,并檢查后繼者能否在共享模式下等待,如果可以,就是否傳播設置為>0或者propagate狀態 private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } // 取消正在進行的嘗試 private void cancelAcquire(Node node) { // 節點為null,直接返回 if (node == null) return; node.thread = null; // 跳過已經取消的前一個節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } // 還有好多方法... 其實本質就是基于 隊列的判斷和操作,AQS提供了獨占鎖和共享鎖的設計 // 在AQS中,使用到了Unsafe類,所以AQS其實就是基于CAS算法的, // AQS的一些方法就是直接調用 Unsafe 的方法 如下所示 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } // 比較并設置頭 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } // 比較并設置尾 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } // 比較并設置狀態 private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } // 比較并設置下一個節點 private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } // 除此之外 AQS 還有一個實現了Condition的類 如下 public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 條件隊列的第一個節點 private transient Node firstWaiter; // 條件隊列的最后一個節點 private transient Node lastWaiter; public ConditionObject() { } // 在等待隊列中添加一個新的節點 private Node addConditionWaiter() { // 獲取最后一個節點 Node t = lastWaiter; // 如果最后一個節點被取消了,就清除它 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } // 刪除并轉移節點直到它沒有取消或者不為null private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 刪除所有的節點 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } // 取消節點的連接 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // 將等待最長的線程,喚醒 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 喚醒所有的等待線程 public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } // 實現不間斷的條件等待 public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } // 模式意味著在退出等待時重新中斷 private static final int REINTERRUPT = 1; // 模式的含義是在退出等待時拋出InterruptedException異常 private static final int THROW_IE = -1; // 檢查中斷,如果在信號通知之前被中斷,則返回THROW_IE; // 如果在信號通知之后,則返回REINTERRUPT;如果未被中斷,則返回 0 private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } // 拋出InterruptedException,重新中斷當前線程, // 或不執行任何操作,具體取決于模式。 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } // 實現不可中斷的條件等待 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 納秒級別的等待 public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } // 絕對定時等待 public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // 超時等待 public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // 判斷是不是獨占的 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } // 返回是否有正在等待的 protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } // 獲得等待隊列的長度 protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } // 獲取所有正在等待的線程集合 protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } } }
總結:
1.AQS為我們提供了很多實現。AQS內部有兩個內部類,ConditionObject和Node節點
2.和開頭說的一樣,其維護了一個state和一個隊列,也提供了獨占和共享的實現
3.總結一下流程
調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功就直接返回
沒成功,則addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式
acquireQueued()使得線程在隊列中休息,有機會(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源之后才會返回。如果在整個等待過程中被中斷過,就返回true,否則返回false
如果線程在等待過程中被中斷過,它不是響應的。只是獲取資源之后才再進行自我中斷selfInterrupt(),將中斷補上
4.release() 是獨占模式下線程共享資源的底層入口,它會釋放指定量的資源,如果徹底釋放了(state = 0)
5.如果獲取鎖的線程在release時異常了,沒有unpark隊列中的其他結點,這時隊列中的其他結點會怎么辦?是不是沒法再被喚醒了?
這時,隊列中等待鎖的線程將永遠處于park狀態,無法再被喚醒!
6.獲取鎖的線程在什么情形下會release拋出異常呢 ?
線程突然死掉了?可以通過thread.stop來停止線程的執行,但該函數的執行條件要嚴苛的多,而且函數注明是非線程安全的,已經標明Deprecated;
線程被interupt了?線程在運行態是不響應中斷的,所以也不會拋出異常;
7.acquireShared()的流程
tryAcquireShared()嘗試獲取資源,成功則直接返回;
失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個等待過程也是忽略中斷的。
8.releaseShared()
釋放掉資源之后,喚醒和后繼
7.不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
volatile是Java提供的關鍵字,是輕量級的同步機制 JSR133提出,Java5增強了語義
volatile關鍵字有三個重要的特點
保證內存可見性
不保證原子性
禁止指令重排序
提到volatile,就要提到JMM - 什么是JMM
JMM:Java Memory Model
本身就是一種抽象的概念,并不真實存在,它描述的是一組規范和規則,通過這種規則定義了程序的各個變量(包括實例字段、靜態字段、和構造數組對象的元素)的訪問方式
JMM關于同步的規定
線程解鎖前,必須把共享變量的值刷新到主內存
線程加鎖前,必須讀取主內存的最新的值到自己的工作內存
加鎖和解鎖必須是同一把鎖
happens-before 規則
前一個操作對下一個操作是完全可見的,如果下一個操作對下下一個操作完全可見,那么前一個操作也對下下個操作可見
重排序
JVM對指令的執行,會進行優化重新排序,可以發生在編譯重排序、CPU重排序
什么是內存屏障?
內存屏障分為2種
讀屏障(LoadBarrier)
寫屏障(Store Barrier)
內存屏障的作用
阻止屏障兩側的指令重排序
強制把緩沖區 / 高速緩存中的臟數據寫回主內存,或者讓緩存中相應的的數據失效
編譯器生成字節碼的時候,會在指令序列中插入內存屏障來禁止特定類型的處理器重排序。編譯器選擇了一個比較保守的JMM內存屏障插入策略,這樣就可以保證在任何處理器平臺,任何程序中都有正確的volatile語義
在每個volatile寫操作之前插入一個StoreStore屏障
在每個volatile寫操作之后入一個StoreLoad屏障
在每個volatile讀操作之前插入一個LoadLoad屏障
在每個volatile讀操作之前插入一個LoadStore屏障
原子性
問:i++為什么不是線程安全的?
因為 i++ 不是原子操作,i++有三個操作
如何解決?
使用 synchronized
使用AtomicInteger [JUC下的原子類]
有序性
1.計算機在執行程序的時候,為了提高性能,編譯器和處理器通常會對指令重排序,一般分為3種-
源代碼 -> 編譯器優化的重排 -> 指令并行的重排 -> 內存系統的重排 -> 最終執行的指令
單線程環境里面確保程序最終執行結果和代碼順序執行的結果一致
處理器在執行重排序之前必須考慮指令之間的數據依賴性
多線程環境種線程交替執行,由于編譯器優化重排序的存在,兩個線程中使用的變量能否保證一致性是無法確定的,結果無法預測
2.指令重排序
多線程環境種線程交替執行,由于編譯器優化重排序的存在,兩個線程中使用的變量能否保證一致性是無法確定的,結果無法預測此時使用volatile禁用指令重排序,就可以解決這個問題
volatile的使用
單例設計模式中的 安全的雙重檢查鎖
volatile的底層實現
根據JMM,所有線程拿到的都是主內存的副本,然后存儲到各自線程的空間,當某一線程修改之后,立即修改主內存,然后主內存通知其他線程修改
Java代碼 instance = new Singleton();//instance 是 volatile 變量 匯編代碼:0x01a3de1d: movb $0x0,0x1104800(%esi);0x01a3de24: lock addl $0x0,(%esp); 有 volatile 變量修飾的共享變量進行寫操作的時候會多第二行匯編代碼,通過查 IA-32 架構軟件開發者手冊可知,lock 前綴的指令在多核處理器下會引發了兩件事情。將當前處理器緩存行的數據會寫回到系統內存。這個寫回內存的操作會引起在其他 CPU 里緩存了該內存地址的數據無效。
如果對聲明了volatile變量進行寫操作,JVM就會向處理器發送一條Lock前綴的指令,將這個變量所在緩存行的數據寫回到系統內存。但是就算寫回到內存,如果其他處理器緩存的值還是舊的,再執行計算操作就會有問題,所以在多處理器下,為了保證各個處理器的緩存是一致的,就會實現緩存一致性協議,每個處理器通過嗅探在總線上傳播的數據來檢查自己緩存的值是不是過期了,當處理器發現自己緩存行對應的內存地址被修改,就會將當前處理器的緩存行設置成無效狀態,當處理器要對這個數據進行修改操作的時候,會強制重新從系統內存里把數據讀到處理器緩存里。
CAS(Compare And Swap)比較并替換,是線程并發運行時用到的一種技術
CAS是原子操作,保證并發安全,而不能保證并發同步
CAS是CPU的一個指令(需要JNI調用Native方法,才能調用CPU的指令)
CAS是非阻塞的、輕量級的樂觀鎖
我們可以實現通過手寫代碼完成CAS自旋鎖
CAS包括三個操作數
內存位置 - V
期望值- A
新值 - B
如果內存位置的值與期望值匹配,那么處理器會自動將該位置的值設置為新值,否則不做改變。無論是哪種情況,都會在CAS指令之前返回該位置的值。
public class Demo { volatile static int count = 0; public static void request() throws Exception { TimeUnit.MILLISECONDS.sleep(5); // 表示期望值 int expectedCount; while (!compareAndSwap(expectedCount = getCount(), expectedCount + 1)) { } } public static synchronized boolean compareAndSwap(int expectedCount, int newValue) { if (expectedCount == getCount()) { count = newValue; return true; } return false; } public static int getCount() { return count; } public static void main(String[] args) throws Exception { long start = System.currentTimeMillis(); int threadSize = 100; CountDownLatch countDownLatch = new CountDownLatch(threadSize); for (int i = 0; i < threadSize; i++) { new Thread(() -> { try { for (int j = 0; j < 10; j++) { request(); } } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); } countDownLatch.await(); long end = System.currentTimeMillis(); System.out.println("count :" + count + " 耗時:" + (end - start)); } }
上述是我們自己書寫的CAS自旋鎖,但是JDK已經提供了響應的方法
Java提供了 CAS 的支持,在 sun.misc.Unsafe 類中,如下
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
參數說明
var1:表示要操作的對象
var2:表示要操作對象中屬性地址的偏移量
var4:表示需要修改數據的期望的值
var5:表示需要修改為的新值
CAS通過調用JNI的代碼實現,JNI:Java Native Interface ,允許Java調用其他語言
而CompareAndSwapXxx系列的方法就是借助“C語言”CPU底層指令實現的
以常用的 Inter x86來說,最后映射到CPU的指令為“cmpxchg”,這個是一個原子指令,CPU執行此命令的時候,實現比較并替換的操作
cmpxchg 如何保證多核心下的線程安全
系統底層進行CAS操作的時候,會判斷當前操作系統是否為多核心,如果是,就給“總線”加鎖,只有一個線程對總線加鎖,保證只有一個線程進行操作,加鎖之后會執行CAS操作,也就是說CAS的原子性是平臺級別的
CAS這么強,有沒有什么問題?
高并發情況下,CAS會一直重試,會損耗性能
CAS的ABA問題
CAS需要在操作值得時候檢查下值有沒有變化,如果沒有發生變化就更新,但是如果原來一個值為A,經過一輪的操作之后,變成了B,然后又是一輪的操作,又變成了A,此時這個位置有沒有發生改變?改變了的,因為不是一直是A,這就是ABA問題
如何解決ABA問題?
解決ABA問題就是給值增加一個修改版本號,每次值的變化,都會修改它的版本號,CAS在操作的時候都會去對比此版本號。
下面給出一個ABA的案例
public class CasAbaDemo { public static AtomicInteger a = new AtomicInteger(1); public static void main(String[] args) { Thread main = new Thread(() -> { System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.get()); try { int executedNum = a.get(); int newNum = executedNum + 1; TimeUnit.SECONDS.sleep(3); boolean isCasSuccess = a.compareAndSet(executedNum, newNum); System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess); } catch (InterruptedException e) { e.printStackTrace(); } }, "主線程"); Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); a.incrementAndGet(); System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.get()); a.decrementAndGet(); System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.get()); } catch (Exception e) { e.printStackTrace(); } }, "干擾線程"); main.start(); thread.start(); } }
Java中ABA解決辦法(AtomicStampedReference)
AtomicStampedReference 主要包含一個引用對象以及一個自動更新的整數 “stamp”的pair對象來解決ABA問題
public class AtomicStampedReference<V> { private static class Pair<T> { // 數據引用 final T reference; // 版本號 final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } private volatile Pair<V> pair; /** * 期望引用 * @param expectedReference the expected value of the reference * 新值引用 * @param newReference the new value for the reference * 期望引用的版本號 * @param expectedStamp the expected value of the stamp * 新值的版本號 * @param newStamp the new value for the stamp * @return {@code true} if successful */ public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return // 期望引用與當前引用一致 expectedReference == current.reference && // 期望版本與當前版本一致 expectedStamp == current.stamp && // 數據一致 ((newReference == current.reference && newStamp == current.stamp) || // 數據不一致 casPair(current, Pair.of(newReference, newStamp))); } }
修改之后完成ABA問題
public class CasAbaDemo02 { public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1), 1); public static void main(String[] args) { Thread main = new Thread(() -> { System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.getReference()); try { Integer executedReference = a.getReference(); Integer newReference = executedReference + 1; Integer expectStamp = a.getStamp(); Integer newStamp = expectStamp + 1; TimeUnit.SECONDS.sleep(3); boolean isCasSuccess = a.compareAndSet(executedReference, newReference, expectStamp, newStamp); System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess); } catch (InterruptedException e) { e.printStackTrace(); } }, "主線程"); Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); a.compareAndSet(a.getReference(), a.getReference() + 1, a.getStamp(), a.getStamp() + 1); System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.getReference()); a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1); System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.getReference()); } catch (Exception e) { e.printStackTrace(); } }, "干擾線程"); main.start(); thread.start(); } }
“Java多線程之鎖怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。