您好,登錄后才能下訂單哦!
前言
前面的例子都是多個線程在做相同的操作,比如4個線程都對共享數據做tickets–操作。大多情況下,程序中需要不同的線程做不同的事,比如一個線程對共享變量做tickets++操作,另一個線程對共享變量做tickets–操作,這就是大名鼎鼎的生產者和消費者模式。
正文
一,生產者-消費者模式也是多線程
生產者和消費者模式也是多線程的范例。所以其編程需要遵循多線程的規矩。
首先,既然是多線程,就必然要使用同步。上回說到,synchronized關鍵字在修飾函數的時候,使用的是“this”鎖,所以在同一個類中的函數被synchronized修飾后,使用的是同一把鎖。線程調用這些函數時,不管調用的是tickets++操作函數,還是tickets–函數,都會先去判斷是否加鎖了,得到鎖之后再去進行具體的操作。
我們先用代碼把程序中的資源,生產者,消費者表示出來。
package com.jimmy.ThreadCommunication; class Resource{ // 資源類 private String productName; // 資源名稱 private int count = 1; // 資源編號 public void produce(String name){ // 生產資源函數 this.productName = name + count; count ++; // 資源編號遞增,用來模擬資源遞增 System.out.println(Thread.currentThread().getName()+"...生產者.."+this.productName); } public void consume() { // 消費資源函數 System.out.println(Thread.currentThread().getName()+"...消費者.."+this.productName); } } class Producer implements Runnable{ // 生產者類,用于開啟生產者線程 private Resource res; //生產者初始化就要分配資源 public Producer(Resource res) { this.res = res; } @Override public void run() { for (int i = 0; i < 10; i++) { res.produce("bread"); // 循環生產10次 } } } class Comsumer implements Runnable{ // 消費者類,用于開啟消費者線程 private Resource res; //同理,消費者一初始化也要分配資源 public Comsumer(Resource res) { this.res = res; } @Override public void run() { for (int i = 0; i < 10; i++) { res.consume(); // 循環消費10次 } } } public class ProducerAndConsumer1 { public static void main(String[] args) { Resource resource = new Resource(); // 實例化資源 Producer producer = new Producer(resource); // 實例化生產者和消費者類,它們取得同一個資源 Comsumer comsumer = new Comsumer(resource); Thread threadProducer = new Thread(producer); // 創建1個生產者線程 Thread threadComsumer = new Thread(comsumer); // 創建1個消費者線程 threadProducer.start(); // 分別開啟線程 threadComsumer.start(); } }
架子搭好了,就來運行一下,當然會出現錯誤的結果,如下所示:
Thread-0...生產者..bread1 Thread-0...生產者..bread2 Thread-0...生產者..bread3 Thread-0...生產者..bread4 Thread-0...生產者..bread5 Thread-1...消費者..bread1 Thread-1...消費者..bread6 Thread-1...消費者..bread6 Thread-1...消費者..bread6 Thread-1...消費者..bread6 Thread-1...消費者..bread6 Thread-0...生產者..bread6 Thread-0...生產者..bread7 Thread-1...消費者..bread6 Thread-1...消費者..bread8 Thread-1...消費者..bread8 Thread-1...消費者..bread8 Thread-0...生產者..bread8 Thread-0...生產者..bread9 Thread-0...生產者..bread10
很明顯,出現了線程安全錯誤。這時,就需要“同步”來保證對共享變量的互斥訪問。上面代碼中需要同步的就是Resource資源類中的produce和consume方法,分別使用synchronized來修飾,由于synchronized修飾方法時使用的是“this”鎖,所以同一個類中的所有被修飾的方法用的都是同一個鎖,那么線程一次只能訪問其中一個方法。加鎖后的Resource類方法如下:
class Resource{ // 資源類 private String productName; // 資源名稱 private int count = 1; // 資源編號 public synchronized void produce(String name){ // 生產資源函數 this.productName = name + count; count ++; // 資源編號遞增,用來模擬資源遞增 System.out.println(Thread.currentThread().getName()+"...生產者.."+this.productName); } public synchronized void consume() { // 消費資源函數 System.out.println(Thread.currentThread().getName()+"...消費者.."+this.productName); } }
再來跑一次代碼,又出現問題了:
Thread-0...生產者..bread1 Thread-0...生產者..bread2 Thread-0...生產者..bread3 Thread-0...生產者..bread4 Thread-0...生產者..bread5 Thread-0...生產者..bread6 Thread-0...生產者..bread7 Thread-0...生產者..bread8 Thread-0...生產者..bread9 Thread-0...生產者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10 Thread-1...消費者..bread10
雖然沒有了線程安全錯誤,但是問題來了,生產者不停的生產,還沒等消費者消費呢,就將后面的資源覆蓋了前面的資源,導致消費者消費不到前面的資源,這樣很容易造成系統資源浪費。理想中的結果應該是,生產者生產一個,消費者消費一個,和諧運行。對此,java為多線程引入了”等待-喚醒”機制。
二,等待喚醒機制
與線程做同樣的操作不同,不同線程之間的操作需要等待喚醒機制來保證線程間的執行順序。生產者和消費者模式中,生產者和消費者是兩類不同的線程, 這兩類中又可以有很多線程來協同工作。通俗來說就是,系統為資源設置一個標志flag,該標志用來標明資源是否存在,所有的線程執行操作前都要判斷資源是否存在。舉例來說,系統初始化后,資源是空的。接下來要執行的可能是生產者線程,也可能是消費者線程。如果是消費者線程獲得執行權,先判斷資源,此時為空,就會進入阻塞狀態,交出執行權,并喚醒其他線程。如果是生產者線程獲得執行權,先判斷資源,此時為空,立馬進行生產,完了交出執行權并喚醒其他線程。
注意,上面提到了兩點,第一點是標志位flag,也就是等待機制,生產者要判斷系統沒有資源才進行生產,不然要等待,消費者要判斷系統有資源才進行消費,不然也要等待。第二點是喚醒機制,不管是生產者還是消費者,它們在生產完或者消費完后,都要執行一個喚醒操作。java提供的等待喚醒機制是由java.lang.Object類中的wait()和notify()函數組來實現的。其中notify()函數隨機喚醒一個被wait()的線程,而notifyAll()喚醒所有被wait()的線程。很遺憾,并沒有直接喚醒對方線程的函數。
notify()適用于單生產者和單消費者模式,而notifyAll()適用于多生產者或多消費者模式。
下面來看2個生產者和2個消費者線程處理一個共享變量的代碼示例:
package com.jimmy.ThreadCommunication; class Resource2{ private String productName; private int count = 1; private boolean flag = false; // 資源類增加一個標志位,默認false,也就是沒有資源 public synchronized void produce(String name){ while (flag == true) { // 如果flag為true,也就是有資源了,生產者線程就去等待。 try { wait(); // wait函數拋出的異常只能被截獲 } catch (InterruptedException e) { e.printStackTrace(); } } this.productName = name + count; count ++; System.out.println(Thread.currentThread().getName()+"....生產者.."+this.productName); flag = true; // 生產完了就將flag修改為true notifyAll(); // 然后喚醒其他線程 } public synchronized void consume() { while (flag == false) { // 如果flag為false,也就是沒有資源,消費者線程就去等待 try { // 判斷flag要用while,因為線程被喚醒后會再次判斷flag wait(); // 而如果是if來判斷,被喚醒后不會再判斷flag,那么多個生產者線程就可能死鎖 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+"...消費者.."+this.productName); flag = false; // 消費完了就把標志改為false notifyAll(); // 然后喚醒其他線程,因為有多個生產者和消費者線程,所以要用notifyAll, // 因為notify只喚醒一個,喚醒到同類型的線程就不好了。 } } class Producer2 implements Runnable{ private Resource2 res; //生產者初始化就要分配資源 public Producer2(Resource2 res) { this.res = res; } @Override public void run() { for (int i = 0; i < 5; i++) { res.produce("bread"); } } } class Comsumer2 implements Runnable{ private Resource2 res; //同理,消費者一初始化也要分配資源 public Comsumer2(Resource2 res) { this.res = res; } @Override public void run() { for (int i = 0; i < 10; i++) { res.consume(); } } } public class ProducerAndConsumer2 { public static void main(String[] args) { Resource2 resource = new Resource2(); // 實例化資源 Producer2 producer = new Producer2(resource); // 實例化生產者,并傳入資源對象 Comsumer2 comsumer = new Comsumer2(resource); // 實例化消費者,并傳入相同的資源對象 Thread threadProducer1 = new Thread(producer); // 創建2個生產者線程 Thread threadProducer2 = new Thread(producer); Thread threadComsumer1 = new Thread(comsumer); // 創建2個消費者線程 Thread threadComsumer2 = new Thread(comsumer); threadProducer1.start(); threadProducer2.start(); threadComsumer1.start(); threadComsumer2.start(); } }
上述代碼的輸出結果如下,是理想中的生產一個,消費一個依次進行。
Thread-0....生產者..bread1 Thread-3...消費者..bread1 Thread-1....生產者..bread2 Thread-2...消費者..bread2 Thread-1....生產者..bread3 Thread-3...消費者..bread3 Thread-0....生產者..bread4 Thread-3...消費者..bread4 Thread-1....生產者..bread5 Thread-2...消費者..bread5 Thread-1....生產者..bread6 Thread-3...消費者..bread6 Thread-0....生產者..bread7 Thread-3...消費者..bread7 Thread-1....生產者..bread8 Thread-2...消費者..bread8 Thread-0....生產者..bread9 Thread-3...消費者..bread9 Thread-0....生產者..bread10 Thread-2...消費者..bread10
可以看出,線程0和1是生產者線程,他們每次只有一個進行生產。線程2和3是消費者線程,同樣的,每次只有一個進行消費。
注意,上述代碼中的問題有2點需要注意,第一點是用if還是while來判斷flag,第二點是用notify還是notifyAll函數。統一來說,while判斷在線程喚醒后還會再次判斷,如果只有一個生產者和消費者線程的話可以用if,如果有多個生產者或者消費者,就必須用while判斷,不然會出現死鎖。所以,最終要用while和notifyAll()的組合。
總結
多線程編程往往是多個線程執行不同的任務,不同的任務不僅需要“同步”,還需要“等待喚醒機制”。兩者結合就可以實現多線程編程,其中的生產者消費者模式就是經典范例。
然而,使用synchronized修飾同步函數和使用Object類中的wait,notify方法實現等待喚醒是有弊端的。就是效率問題,notifyAll方法喚醒所有被wait的線程,包括本類型的線程,如果本類型的線程被喚醒,還要再次判斷并進入wait,這就產生了很大的效率問題。理想狀態下,生產者線程要喚醒消費者線程,而消費者線程要喚醒生產者線程。為此,jdk1.5引入了java.util.concurrent.locks包,并提供了Lock和Condition接口及實現類。
以上就是本文關于Java多線程之線程通信生產者消費者模式及等待喚醒機制代碼詳解的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站:Java編程之多線程死鎖與線程間通信簡單實現代碼、Java多線程編程小實例模擬停車場系統等,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。