您好,登錄后才能下訂單哦!
前言
對于線程安全,我們有說不盡的話題。大多數保證線程安全的方法是添加各種類型鎖,使用各種同步機制,用限制對共享的、可變的類變量并發訪問的方式來保證線程安全。文本從另一個角度,使用“比較交換算法”(CompareAndSwap)實現同樣的需求。我們實現一個簡單的“棧”,并逐步重構代碼來進行講解。
本文通俗易懂,不會涉及到過多的底層知識,適合初學者閱讀(言外之意是各位大神可以繞道了)。
旅程開始
1.先定個小目標,實現一個“棧”
“棧”(stack)是大家經常使用的抽象數據類型(啥?!不知道,請自行百度)。“棧”滿足“后進先出”特性。我們用鏈表數據結構完成一個簡單的實現:
public class Stack<E> { //鏈表結構頭部節點 private Node<E> head; /** * 入棧 * @param item */ public void push(E item) { //為新插入item創建一個新node Node<E> newHead = new Node<>(item); if(head!=null){ //將新節點的下一個節點指向原來的頭部 newHead.next = head; } //將頭部指向新的節點 head=newHead; } /** * 出棧 * @return */ public E pop() { if(head==null){ //當前鏈表為空 return null; } //暫存當前節點。 Node<E> oldHead=head; //將當前節點指向當前節點的下一個節點 head=head.next; //從暫存的當前節點記錄返回數據 return oldHead.item; } /** * 鏈表中的節點 * @param <E> */ private static class Node<E> { //節點保存的數據 public final E item; //指向下一個鏈表中下一個節點 public Node<E> next; public Node(E item) { this.item = item; } } }
代碼使用鏈表數據結構實現“棧”,在Stack中維護一個鏈表的“頭部節點”,通過對頭部節點的操作完成入棧和出棧操作。
我們運行代碼測試一下:
public static void main(String[] args) { Stack<Integer> stack=new Stack<>(); for (int i = 0; i < 3; i++) { //入棧1、2、3 stack.push(i+1); } for (int i = 0; i < 3; i++) { //出棧3、2、1 System.out.println(stack.pop()); } }
結果為:
3 2 1
我們使用入棧方法向Stack插入1、2、3,使用出棧方法打印為3、2、1,符合預期。
2.讓多線程搗搗亂
前面我們已經測試過我們的方法,符合我們對Stack功能的預期,那是不是任何情況先我們的“棧”都能正常工作呢?
我們運行如下代碼:
public static void main(String[] args) { Stack<Integer> stack=new Stack<>(); int max=3; Thread[] threads=new Thread[max]; for (int i = 0; i < max; i++) { int temp=i; //入棧1、2、3 Thread thread=new Thread(new Runnable() { @Override public void run() { stack.push(temp+1); } }); thread.start(); threads[temp]=thread; } //等待所有線程完成。 for (int i = 0; i < max; i++) { try { threads[i].join(); } catch (InterruptedException e) { } } for (int i = 0; i < max; i++) { //出棧3、2、1 System.out.println(stack.pop()); } }
你可能運行了很多次,每次運行時除了打印順序(3、2、1或2、3、1或1、2、3)有變化之外也沒有發現其他異常,你可能會說打印順序變化很正常呀,因為我們的將入棧操作放到異步線程中操作,三個線程的執行過程由系統調度,所以入棧操作的內容自然每次都有可能不同。
好吧,你說的沒錯,至少從大量運行的結果上看是這樣的,但是這就是多線程編程的奇(tao)幻(yan)之處,也許你運行一次沒有問題,兩次沒有問題,一萬次也沒有問題,但是終有一次你會得到那個意想不到的結果(你也不想得到,因為那是bug)。這就像一個“黑天鵝事件”,小概率但是一定會發生,且發生后對你的系統影響不堪設想。
下面讓我帶你看看如何得到意料之外的結果:
我們使用調試模式運行上面的程序在Stack中push()方法第一行打一個斷點,然后按照表格中的順序切換不同的線程以單步調試(step over)方式運行run方法中的每一步,直到遇到Resume。
執行順序 | thread-0 | thread-1 | thread-2 |
---|---|---|---|
1 | Node<E> newHead = new Node<>(item); | -- | -- |
2 | head=newHead; | -- | -- |
3 | (Resume) | -- | -- |
4 | -- | Node<E> newHead = new Node<>(item); | -- |
5 | -- | -- | Node<E> newHead = new Node<>(item); |
6 | -- | newHead.next = head; | -- |
7 | -- | -- | newHead.next = head; |
8 | -- | head=newHead; | -- |
9 | -- | -- | head=newHead; |
10 | -- | (Resume) | |
11 | -- | -- | (Resume) |
當你再次看到打印結果,你會發現結果為3、1、null,“黑天鵝”出現了。
異常結果是如何產生的?
1.當thread-0執行到順序3時,head表示的鏈表為node(1)。
2.當thread-1執行到順序10時,head表示的鏈表為node(2)->node(1)。
3.當thread-2執行到順序11時,head表示的鏈表為node(3)->node(1)。
當三個線程都執行完畢之后,head的最終表示為node(3)->node(1),也就是說thread-2將thread-1的執行結果覆蓋了。
語句newHead.next = head;
是對頭部節點的讀取。語句head=newHead
;是對頭部節點的寫入操作。這兩條語句組成了一個“讀取——設置——寫入”語句模式(就像n=n+1)。
如果一個線程執行了共享頭部變量讀取語句,切換其他線程執行了修改共享變量的值,再切回到第一個線程后,第一個線程中修改頭部結點的數據就不是最新的數據為依據的,所以修改之后其他線程的修改就被覆蓋了。
只有保證這兩條語句及中間語句以原子方式執行,才能避免多線程覆蓋問題。
大家可以任意調整代碼中讀取頭部節點和寫入頭部節點的調試順序,制造多線程交錯讀寫觀察不同的異常結果。
為什么我們直接執行無法看到異常結果呢?
因為我們的run方法很簡單,在CPU分配的時間片內能運行完,沒有出現在不同的運行周期中交錯運行的狀態。所以我們才要用調試模式這種交錯運行。
為什么上文中我說過這種異常一定會發生?
原因在于我們在Stack類中對共享的、可變的變量head進行的多線程讀寫操作。
怎么才能保證類Stack在多線程情況下運行正確?
引用一段《JAVA并發編程實踐》中的話:
無論何時,只要有多于一個的線程訪問給定的狀態變量,而且其中某個線程會寫入該變量,此時必須使用同步來協調線程對該變量的訪問。
好吧,看來我們必須采用“同步”方法了,來保障我們的Stack類在多線程并行和單線程串行的情況下都有正確的結果,也就是說將Stack變成一個線程安全的類。
3.讓你搗亂,請家長!
既然多線程總來搗亂,我們就請他的家長,讓家長管管他,守守規矩,不在搗亂。
我們已經知道了Stack類問什么不能再多線程下正確的運行的原因,所有我們要限制多線程對Stack類中head變量的并發寫入,Stack方法中push()和pop()方法都會對head進行寫操作,所以要限制這兩個方法不能多線程并發訪問,所以我們想到了synchronized
關鍵字。
程序重構:
public class SynchronizedStack<E> { //鏈表結構頭部節點 private Node<E> head; /** * 入棧 * @param item */ public synchronized void push(E item) { //為新插入item創建一個新node Node<E> newHead = new Node<>(item); if(head!=null){ //將新節點的下一個節點指向原來的頭部 newHead.next = head; } //將頭部指向新的節點 head=newHead; } /** * 出棧 * @return */ public synchronized E pop() { if(head==null){ //當前鏈表為空 return null; } //暫存當前節點。 Node<E> oldHead=head; //將當前節點指向當前節點的下一個節點 head=head.next; //從暫存的當前節點記錄返回數據 return oldHead.item; } /** * 鏈表中的節點 * @param <E> */ private static class Node<E> { //節點保存的數據 public final E item; //指向下一個鏈表中下一個節點 public Node<E> next; public Node(E item) { this.item = item; } } }
將Stack
類替換為SynchronizedStack
類的測試方法。
public static void main(String[] args) { SynchronizedStack<Integer> stack=new SynchronizedStack<>(); int max=3; Thread[] threads=new Thread[max]; for (int i = 0; i < max; i++) { int temp=i; //入棧1、2、3 Thread thread=new Thread(new Runnable() { @Override public void run() { stack.push(temp+1); } }); thread.start(); threads[temp]=thread; } //等待所有線程完成。 for (int i = 0; i < max; i++) { try { threads[i].join(); } catch (InterruptedException e) { } } for (int i = 0; i < max; i++) { //出棧3、2、1 System.out.println(stack.pop()); } }
我們再次運行第二章為多線程準備的測試方法,發現當執行一個線程的方法時,其他線程的方法均被阻塞,只能等到第一個線程方法執行完成之后才能執行其他線程方法。
我們只不過是在push()
和pop()
方法上加入了synchronized
關鍵字,就將這兩個方法編程了同步方法,在多線程并發的情況下也如同單線程串行調用一般,方法再不能在線程間交替運行,也就不能對head變量做并發更改了,這樣修改的Stack類就是線程安全的了。
除了synchronized關鍵字,還有其他的方式實現加鎖嗎?
除了synchronized
關鍵字還可以使用java.util.concurrent.locks
包中各種鎖來保證同步,但是大概思路都是相同的,都是使用阻塞其他線程的方式在達到防止并發寫入的目的。
阻塞線程是否會影響執行效率?
如果和不加通過的“棧”類相比,在多線程執行的之后效率一定會有影響,因為同步方法限制了線程之間的并發性,但是為了保證“棧”類的在多線程環境時功能正確,我們不得不做出效率和正確性的權衡。
必須要對整個方法加上鎖嗎?
我們上面已經分析了需要加鎖的范圍,只要保證讀取頭部節點和寫入頭部節點之間的語句原子性就可以。所以我們可以這樣執行。
/** * 入棧 * * @param item */ public void push(E item) { //為新插入item創建一個新node Node<E> newHead = new Node<>(item); synchronized (this) { if (head != null) { //將新節點的下一個節點指向原來的頭部 newHead.next = head; } //將頭部指向新的節點 head = newHead; } } /** * 出棧 * * @return */ public E pop() { synchronized (this) { if (head == null) { //當前鏈表為空 return null; } //暫存當前節點。 Node<E> oldHead = head; //將當前節點指向當前節點的下一個節點 head = head.next; //從暫存的當前節點記錄返回數據 return oldHead.item; } }
通過synchronized
塊實現,因為方法比較簡單,所以也沒有很明顯的縮小加鎖范圍。
除了加鎖的方式,是否還有其他方式?
當然,我們還有無鎖化編程來解決線程之間同步的問題。這就是下面要介紹的比較交換算法。
4.換個思路,樂觀一點
加鎖實現線程同步的方式是預防性方式。無論共享變量是否會被并發修改,我們都只允許同一時刻只有一個線程運行方法來阻止并發發生。這就相當于我們假設并發一定會發生,所以比較悲觀。
現在我們換一種思路,樂觀一點,不要假設對變量的并發修改一定發生,這樣也就不用對方法加鎖阻止多線程并行運行方法了。但是一旦發生了并發修改,我們想法發解決就是了,解決的方法就是將這個操作重試一下。
繼續重構“棧”代碼:
public class TreiberStack<E> { private AtomicReference<Node<E>> headNode = new AtomicReference<>(); public void push(E item) { Node<E> newHead = new Node<>(item); Node<E> oldHead; do { oldHead = headNode.get(); newHead.next = oldHead; } while (!headNode.compareAndSet(oldHead, newHead)); } public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = headNode.get(); if (oldHead == null) return null; newHead = oldHead.next; } while (!headNode.compareAndSet(oldHead, newHead)); return oldHead.item; } private static class Node<E> { public final E item; public Node<E> next; public Node(E item) { this.item = item; } } }
這個就是大名鼎鼎的Treiber Stack,我也只是做了一次代碼的搬運工。
我們來看看TreiberStack和我們前面的Stack有什么不同。
首先關注第一行:
private AtomicReference<Node<E>> headNode = new AtomicReference<>();
我們用了一個AtomicReference類存儲鏈表的頭部節點,這個類可以獲取存儲對象的最新值,并且在修改存儲值時候采用比較交換算法保證原子操作,具體大家可以自行百度。
然后重點關注pop()
和push()
方法中都有的一個代碼結構:
//略... do { oldHead = headNode.get(); //略... } while (!headNode.compareAndSet(oldHead, newHead)); //略...
我們AtomicReference
中get()
方法最新的獲取頭部節點,然后調用AtomicReference
中compareAndSet()
將設置新頭部節點,如果當前線程執行這兩端代碼的時候如果有其他已經修改了頭部節點的值,'compareAndSet()'方法返回false ,表明修改失敗,循環繼續,否則修改成功,跳出循環。
這樣一個代碼結構和synchronized
關鍵字修飾的方法一樣,都保證了對于頭部節點的讀取和寫入操作及中間代碼在一個線程下原子執行,前者是通過其他線程修改過就重試的方式,后者通過阻塞其他線程的方式,一個是樂觀的方式,一個是悲觀的方式。
大家可以按照前面的例子自己寫測試方法測試。
后記
我們通過對“棧”的一步一步代碼重構,逐步介紹了什么是線程安全及保證線程安全的各種方法。這里需要說明一點,對于一個類來說,是否需要支持線程安全是由類的使用場景決定,不是有類所提供的功能決定的,如果一個類不會被應用于多線程的情況下也就無需將他轉化為線程安全的類。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。