您好,登錄后才能下訂單哦!
今天小編給大家分享一下java開發非公平鎖不可打斷的源碼怎么寫的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
package test; import java.util.concurrent.locks.ReentrantLock; public class TestReenTrantLock { public static void main(String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock(); new Thread(() -> { System.out.println("1 start"); lock.lock(); System.out.println("1 entry"); try { Thread.sleep(1000 * 60 * 10); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } },"t1").start(); Thread.sleep(2000 ); new Thread(() -> { System.out.println("2 start"); lock.lock(); System.out.println("2 entry"); try { } finally { lock.unlock(); } },"t2").start(); } }
保證線程1先獲取到鎖,睡眠10分鐘,因為需要打斷點,線程2再去獲取鎖。
public void lock() { sync.lock(); }
final void lock() { //首先用線程1使用 cas 嘗試將 state 從 0 改為 1,如果成功表示獲得了鎖 //因為線程1獲取到了鎖state現在等于1,所以此時線程2獲取鎖失敗。 //線程2執行acquire(1); //非公平的體現:上來就加鎖 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
//arg = 1 public final void acquire(int arg) { //線程2執行tryAcquire(arg),返回false代表鎖獲取失敗,!tryAcquire(arg) ==true //由于是&&判斷 //所以線程2調用addWaiter做尾部入隊操作 //線程2接著調用acquireQueued進入park阻塞 if (!tryAcquire(arg) && //addWaiter(Node.EXCLUSIVE) 返回的是 線程2的所在的Node節點 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ //acquireQueued方法返回的是打斷標志 如果阻塞狀態或者運行狀態被打斷 //返回true 那么會執行selfInterrupt自我打斷 //selfInterrupt方法只有1句代碼:Thread.currentThread().interrupt(); selfInterrupt(); } }
//acquires=1 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
//此時 線程2累計嘗試2次加鎖 final boolean nonfairTryAcquire(int acquires) { //acquires=1 final Thread current = Thread.currentThread(); int c = getState(); //如果線程1已經釋放鎖 此時c==0滿足 會再次使用cas嘗試加鎖 //這里線程1仍然持有鎖 條件不滿足 if (c == 0) { // 嘗試用 cas 獲得, 這里體現了非公平性: 不去檢查 AQS 隊列 // 非公平鎖可以提高并發度,但是會導致饑餓,可以使用超時時間解決饑餓 // 線程切換的開銷,其實就是非公平鎖效率高于公平鎖的原因 // 因為非公平鎖減少了線程掛起的幾率,后來的線程有一定幾率節省被掛起的開銷 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); //代表加鎖成功 return true; } } // 判斷是否鎖重入 else if (current == getExclusiveOwnerThread()) { //使用原來的state + acquires,這里acquires = 1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //返回false代表線程2獲取鎖失敗 return false; }
addWaiter方法的第一個參數是mode,Node.EXCLUSIVE是個null。
static final Node EXCLUSIVE = null;
acquireQueued方法的第一個參數是node,其實就是線程2所在的Node節點。第二個參數是1:代表了本次state加鎖成功累加的數量。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
由于acquireQueued方法的參數是addWaiter方法的返回值,因此先看addWaiter方法
//node是Node.EXCLUSIVE 默認是null //enq方法創建的隊列如下:頭結點->尾結點(線程2所在節點) //后續的節點都在addWaiter方法中入隊,不再進入enq:頭結點->尾結點(線程2所在節點)->尾結點(線程3所在節點) private Node addWaiter(Node mode) { //static final Node EXCLUSIVE = null; //node為持有當前線程的node //mode為null 可以看到賦值給了 nextWaiter //也就是線程2所在節點的next指針指向了null //注意:nextWaiter是等待隊列中的指針 /*** Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } ***/ Node node = new Node(Thread.currentThread(), mode); //獲取同步隊列的尾部節點 Node pred = tail; //此時同步隊列的tail是null,因為到目前為止并沒有執行過enq方法 //如果tail不為null:使用cas嘗試將Node對象插入隊列尾部,作為新的尾結點 if (pred != null) { //將當前node節點的前一個節點指向原tail節點 node.prev = pred; //將當前node節點作為新的尾節點 if (compareAndSetTail(pred, node)) { //原來的尾節點作為當前節點的下一個節點 pred.next = node; return node; } } //因為tail節點是null 嘗試將Node加入隊列 enq(node); //返回線程2節點 return node; }
//下面解釋中的當前節點指的是Thread-2所在的節點 //enq相當于是初始化頭尾結點和第一個入隊的節點 //只有第1個入隊的節點才會進入該方法 //后續的線程都會直接執行enq(node)之前的代碼加入尾節點 //enq方法構造了1個雙向隊列:頭結點->尾結點(線程2所在節點) private Node enq(final Node node) { for (;;) { Node t = tail; //第一次進入循環 tail是尾節點為null if (t == null) { //第一次進入循環:設置頭結點為哨兵節點也叫啞結點:因為沒有對應的線程與之關聯 // head節點的屬性:thread=null if (compareAndSetHead(new Node())) //第一次進入循環:將頭結點賦值給尾節點 此時頭和尾是同一個節點 這點很重要 tail = head; } else { //第二次進入循環:此處的t就是head,將當前節點的前置指針指向頭節點 node.prev = t; //第二次進入循環:使用cas將尾節點設置為當前節點 //第二次進入循環:此時頭結點是哨兵節點(啞結點),尾節點即Thread-2所在的線程的節點 if (compareAndSetTail(t, node)) { //第二次進入循環:將head.next指向當前節點那么這個鏈表是雙向鏈表 t.next = node; //循環結束 return t; } } } }
//node是 線程2的節點 //arg = 1 //node.predecessor():獲取當前節點的上一個節點 //node.predecessor()和node.prev不同的是: //node.prev如果是null不會拋出異常 //node.predecessor()中如果 node.prev是 null 會拋出異常 //acquireQueued方法返回的是打斷狀態 final boolean acquireQueued(final Node node, int arg) { //node即Thread-2所在的線程的節點 boolean failed = true; try { boolean interrupted = false; //死循環開始 for (;;) { //p是Thread-2所在的線程的節點的前置節點即頭結點 final Node p = node.predecessor(); //p == head 即Thread-2所在的線程的節點的前置節點是頭結點 //tryAcquire(arg) 使用cas再次嘗試獲取鎖 獲取鎖失敗 代碼不進入if向下執行 //此時累計嘗試3次 if (p == head && tryAcquire(arg)) { //如果獲取鎖成功將當前節點設置為頭結點并將當前節點的thread屬性和prev屬性設置為null //也就是當前節點的prev和原來的頭節點斷開 //因為當前節點獲取鎖成功,意味著線程1已經釋放鎖,此時需要和代表線程1的原來的頭結點斷開。 setHead(node); //將原來的頭節點斷開和當前節點的連接 相當于原來的節點出隊 p.next = null; // help GC failed = false; //注意這是在死循環里 //如果interrupted返回的是true 將會執行 selfInterrupt(); 自我中斷 // if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){selfInterrupt();} //即:獲取鎖阻塞的過程中被打斷,也要重新進入死循環一直等到獲取鎖才能執行打斷,這就是不可打斷。 //可打斷是指在等待鎖的過程中,其它線程可以用interrupt方法終止等待,synchronized鎖是不可打斷的。 //我們要想在等鎖的過程中被打斷,就要使用lockInterruptibly()方法對lock對象加鎖,而不是lock()方法。 return interrupted; } //第一次進入shouldParkAfterFailedAcquire //將當前節點的前置節點即頭結點改為-1 返回false (累計嘗試4次) //如果當前節點的前置節點以及更前面的節點有取消的節點 //要斷開這些節點 包括當前節點的前置節點 //第二次進入shouldParkAfterFailedAcquire //如果當前節點的前置節點是-1 返回true //shouldParkAfterFailedAcquire 返回true時 //會進入parkAndCheckInterrupt()方法中,然后會park當前線程 //Thread-2所在node被阻塞,然后等待喚醒,此時node的waitStatus=0 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){ //在不可打斷模式中 //線程park在parkAndCheckInterrupt方法里 //如果線程被打斷,parkAndCheckInterrupt方法返回true //執行以下代碼 //Interrupted = true //Interrupted = true 代表被阻塞期間打斷過 //然后繼續進入死循環直到獲取鎖 //獲取鎖后返回Interrupted = true //最后返回到acquire方法 //進入selfInterrupt();執行Thread.currentThread().interrupt(); //在可打斷模式中 //線程park在parkAndCheckInterrupt方法里 //如果線程被打斷,parkAndCheckInterrupt方法返回true //執行以下代碼 //throw new InterruptedException(); interrupted = true; } }//死循環結束 } finally { /** 這里的failed 什么時候變成true的? 默認的failed=true 在死循環一直都是true!!!因為一直沒有獲取鎖成功!! 除非是獲取到了鎖才被賦值為false 1.try代碼塊拋出異常 ***/ if (failed) cancelAcquire(node); } }
//node是當前節點 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
//p是Thread-2所在節點的前置節點即頭結點 //node是 Thread-2所在節點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //p是Thread-2所在節點的前置節點即頭結點 //頭結點的waitStatus=0 int ws = pred.waitStatus; //第一次進入 ws=0 修改waitStatus為-1 //第二次進入ws=-1 Node.SIGNAL=-1 代表等待喚醒 返回true if (ws == Node.SIGNAL){ // 上一個節點=-1 都在阻塞, 那么自己也阻塞好了 //返回true代表要park return true; } //如果當前節點的前置節點的waitStatus>0 //說明當前節點D的前置節點C被取消,那么要把當前節點D的前置節點重新設置為[當前節點的前置節點C的前置節點B] //B<---C<---D //假如B節點被取消,此時需要斷開C那么直接將D指向B即可 //A<---B<---C<---D //假如BC節點被取消,此時需要斷開BC那么直接將D指向A即可 if (ws > 0) { do { //首先做 do //獲取當前節點的前置節點的前置節點pred.prev //因為當前節點的前置節點pred的status大于0 說明當前節點是被取消的 需要斷開 //繼續往前找當前節點的前置節點的前置節點pred.prev //如果當前節點的前置節點的前置節點pred.prev的status還是大于0 說明也是被取消的 //那么繼續往前找 //一直到將當前節點的前置節點以及當前節點的前置節點之前被取消的節點都斷開 //看看代碼是怎么做的 //獲取當前節點的前置節點的前置節點作為當前節點的前置節點 pred = pred.prev; //然后將當前節點的前置指針指向當前節點的前置節點的前置節點 node.prev = pred; } while (pred.waitStatus > 0); //斷開的是當前節點的前置節點 以及 當前節點的前置節點之前被取消的節點 //從后往前斷開的 pred.next = node; } else { //將當前節點的前置節點即頭結點改為-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //第一次進入返回false 因為是死循環 等第二次進入的時候 // 符合 ws == Node.SIGNAL 會返回true return false; }
//代碼塊10 private final boolean parkAndCheckInterrupt() { //此處park LockSupport.park(this); //當前線程被unpark喚醒時,當前方法返回true或者false都要重新進入死循環然后陷入阻塞,一直等獲取到鎖才能被打斷 //不同的是 //parkAndCheckInterrupt:返回true //會執行interrupted = true; //再次進入死循環,再次執行shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()然后阻塞 //parkAndCheckInterrupt:返回false //會直接進入死循環再次執行shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt()然后阻塞 //這里為什么要調用interrupted() 而不是isInterrupted() ? //interrupted會重置打斷標記為false 而isInterrupted只是返回打斷標記 //當park的線程在被調用interrupt方法時,會把中斷狀態設置為true。 //然后park方法會去判斷中斷狀態,如果為true,就直接返回,然后往下繼續執行,如果為false繼續阻塞 return Thread.interrupted(); }
注意 是否需要進入park阻塞是由當前節點的前驅節點的waitStatus == Node.SIGNAL 來決定,而不是本節點的waitStatus 決定。
目前頭結點的waitStatus==Node.SIGNAL==-1,線程2所在節點的waitStatus==0。
判斷前置節點waitstatus是否是SIGNAL即-1阻塞等待喚醒,如果前置節點是-1那么自己也進入阻塞 如果前置節點的waitstatus是大于0,說明節點已經被取消,遞歸斷開這些節點返回false。 繼續進入死循環判斷前置節點狀態,此時前置節點的waitstatus是0,將當前節點的前置節點即頭結點改為-1,返回false。 繼續進入死循環判斷前置節點狀態,此時前置節點的waitstatus是-1,那么自己也進入阻塞返回true。
加鎖是從當前節點往前找,如果前置節點已經被取消,那么繼續往前找,找到一個沒有被取消的節點為止。
解鎖是從當前節點往后找,如果后置節點已經被取消,那么繼續從后往前找,找到一個沒有被取消的節點為止。
出隊是有條件的:必須拋出異常。只有在打斷模式下才會拋出異常進入finally調用cancelAcquire方法出隊。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //省略代碼 }finally { /** 這里的failed 什么時候變成true的? 默認的failed=true 在死循環一直都是true!!!因為一直沒有獲取鎖成功!! 除非是獲取到了鎖才被賦值為false 1.try代碼塊拋出異常 ***/ if (failed) cancelAcquire(node); }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
獲取鎖或者阻塞過程中,線程宕掉(系統異常或手動kill線程) 。
則會進入到acquireQueued的finally代碼里,并判斷failed是否為true,若為true則執行cancelAcquire方法放棄獲取鎖。
我們一般都說這個方法是用來中斷線程的,那么這個中斷應該怎么理解呢? 就是說把當前正在執行的線程中斷掉,不讓它繼續往下執行嗎?
其實,不然。 此處,說的中斷僅僅是給線程設置一個中斷的標識(設置為true),線程還是會繼續往下執行的。而線程怎么停止,則需要由我們自己去處理。 一會兒會用代碼來說明這個。
下面的示例代碼說明當1個線程在park狀態下被interrupt()方法打斷或者被stop,會從之前阻塞的代碼處喚醒并繼續往下執行代碼,而不是我們想象的直接跳出代碼。
//示例代碼1 public static void main(String[] args) { Thread thread = new Thread(() -> { try { while(true){ System.out.println("start"); LockSupport.park(); System.out.println("park"); } } finally { System.out.println("end"); } }, "t2"); thread.start(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //此處可以用 thread.stop(); 不推薦 thread.interrupt(); }
但是有1個問題,為什么跳不出來循環呢?
原來當調用interrupt方法時,會把中斷狀態設置為true,然后park方法會去判斷中斷狀態,如果為true,就直接返回,然后往下繼續執行,并不會拋出異常。
注意,這里并不會清除中斷標志。
此時我們想到使用Thread.interrupted();方法重置打斷標記為false
//示例代碼2 static volatile Boolean flag = false; public static void main(String[] args) { Thread thread = new Thread(() -> { try { while(true){ System.out.println("start"); LockSupport.park(); System.out.println("park"); if (flag){ Thread.interrupted(); } } } finally { System.out.println("end"); } }, "t2"); thread.start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //此處可以用 thread.stop(); 不推薦 thread.interrupt(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } flag=true; }
發現上面的代碼還是跳不出循環,而是被park阻塞。這個時候我們嘗試使用拋出異常。
//示例代碼3 static volatile Boolean flag = false; public static void main(String[] args) { Thread thread = new Thread(() -> { try { while(true){ System.out.println("start"); LockSupport.park(); System.out.println("park"); if (flag){ throw new RuntimeException(); } } } finally { System.out.println("end"); } }, "t2"); thread.start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //此處可以用 thread.stop(); 不推薦 thread.interrupt(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } flag=true; }
拋出異常成功終止循環并執行了finally。
其實上面的示例2就是不可打斷模式的原理,示例2是可打斷模式的原理。
// 解鎖實現 public void unlock() { sync.release(1); }
// AQS 繼承過來的方法, 方便閱讀, 放在此處 public final boolean release(int arg) { // 如果所有的鎖釋放成功即state=0 if (tryRelease(arg)) { // 隊列頭節點 Node h = head; // 頭結點不為null 且 waitStatus不等于0 才需要喚醒頭結點的后置節點 // h != null 說明有等待隊列 // h.waitStatus != 0 說明頭結點后面有節點在等待鎖 // 假設頭結點的下一個節點還沒來得及修改h.waitStatus= -1 會有問題嗎? // 不會 因為如果h.waitStatus=0,此時頭結點的下一個節點還會再嘗試一次獲取鎖 // 因為鎖在這里已經被釋放 所以頭結點的下一個節點必定能獲取到鎖 if (h != null && h.waitStatus != 0) { // h是隊列頭節點 // unpark AQS 中等待的線程, 進入 ㈡ unparkSuccessor(h); } return true; } return false; }
// ㈠ Sync 繼承過來的方法, 方便閱讀, 放在此處 protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持鎖重入, state 減為 0, 表明所有的鎖都釋放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //設置state為c,c不一定等于0 setState(c); //返回鎖標志位 return free; }
// ㈡ AQS 繼承過來的方法, 方便閱讀, 放在此處 //node是頭結點 private void unparkSuccessor(Node node) { // 此處的node節點為頭結點 // 如果頭節點的狀態小于0 嘗試重置頭節點的狀態為0 //改為0的意義在于:在下面的代碼中:頭結點的下一個節點被喚醒時會再次嘗試加鎖 //在shouldParkAfterFailedAcquire 方法中有1個判斷 //if (ws == Node.SIGNAL) { return true; } //返回true代表獲取鎖失敗進入parkAndCheckInterrupt方法阻塞 //這里改為0以后 那么頭結點的下一個節點會在被unpark的時候再一次嘗試加鎖 //如果不改為0 那么頭結點的下一個節點會直接進入死循環被park 陷入了死循環無解了。 int ws = node.waitStatus; if (ws < 0) { //配合喚醒線程 再一次嘗試加鎖 //配合喚醒線程 再一次嘗試加鎖 //配合喚醒線程 再一次嘗試加鎖 compareAndSetWaitStatus(node, ws, 0); } //獲取頭結點的下一個節點 Node s = node.next; //node是頭節點 //如果頭結點的后置節點為空或被取消 //那么從隊列的末尾從后往前找,找到最前面一個需要unpark的節點 //如果頭結點的后置節點不為空且沒被取消 //那么就喚醒頭節點的下一個節點 //這里也是非公平的體現 if (s == null || s.waitStatus > 0) { s = null; //循環遍歷從 AQS 隊列從隊列的末尾從后往前找,找到最前面一個需要unpark的節點 //注意這里做了判斷t不等于null且t不等于頭結點且t.waitStatus <= 0 //也就是找到的節點必定是有效的 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0){ s = t; } } //喚醒頭結點的下一個節點 或者 從后往前找到的第1個t.waitStatus<= 0的節點 if (s != null) //喚醒線程 配合 compareAndSetWaitStatus(node, ws, 0); 再一次嘗試加鎖 //喚醒線程 配合 compareAndSetWaitStatus(node, ws, 0); 再一次嘗試加鎖 //喚醒線程 配合 compareAndSetWaitStatus(node, ws, 0); 再一次嘗試加鎖 LockSupport.unpark(s.thread); } }
static final class NonfairSync extends Sync { // Sync 繼承過來的方法, 方便閱讀, 放在此處 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果已經獲得了鎖, 線程還是當前線程, 表示發生了鎖重入 else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // Sync 繼承過來的方法, 方便閱讀, 放在此處 protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 支持鎖重入, 只有 state 減為 0, 才釋放成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } }
以上就是“java開發非公平鎖不可打斷的源碼怎么寫”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。