您好,登錄后才能下訂單哦!
一、進程間通信
IPC(Inter-Process Communication)
IPC機制:實現進程之間通訊
管道:pipe 基于共享的內存空間
隊列:pipe+鎖的概念--->queue
二、隊列(Queue)
2.1 概念-----multiProcess.Queue
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
Queue([maxsize])
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
2.2 Queue方法使用
2.2.1 q.get的使用:
是從隊列里面取值并且把隊列面的取出來的值刪掉,沒有參數的情況下就是是默認一直等著取值
就算是隊列里面沒有可取的值的時候,程序也不會結束,就會卡在哪里,一直等著
from multiprocessing import Queue q = Queue() # 生成一個隊列對象 # put方法是往隊列里面放值 q.put('Cecilia陳') q.put('xuchen') q.put('喜陳') # get方法是從隊列里面取值 print(q.get()) print(q.get()) print(q.get()) q.put(5) q.put(6) print(q.get())
Cecilia陳
xuchen
喜陳
5
2.2.2 Queue(參數) +參數的使用:
Queue加參數以后,參數是數值
參數實幾就表示實例化的這個Queue隊列可以放幾個值
當隊列已經滿的時候,再放值,程序會阻塞,但不會結束
from multiprocessing import Queue q = Queue(3) q.put('Cecilia陳') q.put('xuchen') q.put('喜陳') print(q.full()) # 判斷隊列是否滿了 返回的是True/False q.put(2) # 當隊列已經滿的時候,再放值,程序會阻塞,但不會結束
True 隊列已經滿了
2.2.3 q.put(參數1,參數2,參數3,參數4):
q.put(self, obj, block=True, timeout=None)
self :put就相當于是Queue里的一個方法,這個時候q.put就相當于是隊列對象q來調用對象的綁定方法,這個參數可以省略即可
obj:是我們需要往隊列里面放的值
block=True :隊列如果滿了的話,再往隊列里放值的話會等待,程序不會結束
timeout=None:是再block這個參數的基礎上的,當block的值為真的時候,timeout是用來等待多少秒,如果再這個時間里,隊列一直是滿的,那么程序就會報錯并結束(Queue.Full異常)
from multiprocessing import Queue q = Queue(3) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=2) q.put('zhao',block=True,timeout=5) # 此時程序將對等待5秒以后報錯了
2.2.4 q.get(參數1,參數2,參數3,參數4):
q.get(self,block=True, timeout=None)
self :get就相當于是Queue里的一個方法,這個時候q.get就相當于是隊列對象q來調用對象的綁定方法,這個參數可以省略即可
block=True :從隊列q對象里面取值,如果娶不到值的話,程序不會結束
timeout=None:是再block這個參數的基礎上的,當block的值為真的時候,timeout是用來等待多少秒,如果再這個時間里,get取不到隊列里面的值的話,那么程序就會報錯并結束(queue.Empty異常)
from multiprocessing import Queue q = Queue() q.put('Cecilia陳') print(q.get()) q.get(block=True,timeout=2) # 此時程序會等待2秒后,報錯了,隊列里面沒有值了
2.2.5 block=False:
如果block的值是False的話,那么put方法再隊列是滿的情況下,不會等待阻塞,程序直接報錯(Queue.Full異常)結束
如果block的值是False的話,那么get方法再隊列里面沒有值的情況下,再去取的時候,不會等待阻塞,程序直接報錯(queue.Empty異常)結束
1.put()的block=False
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.full()) q.put('xichen',block=False) # 隊列已經滿了,我不等待了,直接報錯
2.get()的block=Flase
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.get()) print(q.get()) print(q.get(block=False)) # 隊列已經沒有值了,我不等待了,直接報錯
2.2.6 put_nowait()/get_nowait()
1.put_nowait() 相當于bolok=False,隊列滿的時候,再放值的時候,程序不等待,不阻塞,直接報錯
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.full()) q.put_nowait('xichen') # 程序不等待,不阻塞,直接報錯
2.get_nowait() 相當于bolok=False,當隊列里沒有值的時候,再取值的時候,程序不等待,不阻塞,程序直接報錯
from multiprocessing import Queue q = Queue(2) q.put('Cecilia陳') q.put('喜陳') print(q.get()) print(q.get()) print(q.full()) q.get_nowait()# 再取值的時候,程序不等待,不阻塞,程序直接報錯
三、代碼實例
3.1 單看隊列的存取數據用法
這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。
''' multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基于消息傳遞實現的,但是隊列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 如果隊列已經滿了,程序就會停在這里,等待數據被別人取走,再將數據放入隊列。 # 如果隊列中的數據一直不被取走,程序就會永遠停在這里。 try: q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。 print('隊列已經滿了') # 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。 print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一樣,如果隊列已經空了,那么繼續取就會出現阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經空了') print(q.empty()) #空了
3.2 子進程向父進程發送數據
這是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。
from multiprocessing import Process, Queue def f(q,name,age): q.put(name,age) #調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。 if __name__ == '__main__': q = Queue() #創建一個Queue對象 p = Process(target=f, args=(q,'Cecilia陳',18)) #創建一個進程 p.start() print(q.get()) p.join()
['Cecilia陳', 18]
四、生產者消費者模型
生產者: 生產數據的任務
消費者: 處理數據的任務
生產者--隊列(盆)-->消費者
生產者可以不停的生產,達到了自己最大的生產效率,消費者可以不停的消費,也達到了自己最大的消費效率.
生產者消費者模型大大提高了生產者生產的效率和消費者消費的效率.
補充: queue不適合傳大文件,通產傳一些消息.
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
4.1 為什么要使用生產者和消費者模型
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
4.2 什么是生產者消費者模型
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
4.3 基于Queue隊列實現的生產者消費者模型
from multiprocessing import Queue,Process # 生產者 def producer(q,name,food): for i in range(3): print(f'{name}生產了{food}{i}') res = f'{food}{i}' q.put(res) # 消費者 def consumer(q,name): while True: res = q.get(timeout=5) print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() # 為的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通訊 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) c1 = Process(target=consumer,args=(q,'Tom')) p1.start() c1.start()
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完后就結束了,但是消費者c在取空了q之后,則一直處于死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢后,往隊列中再發一個結束信號,這樣消費者在接收到結束信號后就可以break出死循環。
4.4 改良版----生產者消費者模型
注意:結束信號None,不一定要由生產者發,主進程里同樣可以發,但主進程需要等生產者結束后才應該發送該信號
from multiprocessing import Queue,Process def producer(q,name,food): for i in range(3): print(f'{name}生產了{food}{i}') res = f'{food}{i}' q.put(res) q.put(None) # 當生產者結束生產的的時候,我們再隊列的最后再做一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去隊列里拿東西了 def consumer(q,name): while True: res = q.get(timeout=5) if res == None:break # 判斷隊列拿出的是不是生產者放的結束生產的標識,如果是則不取,直接退出,結束程序 print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() # 為的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通訊 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) c1 = Process(target=consumer,args=(q,'Tom')) p1.start() c1.start()
4.5 主進程在生產者生產結束以后,發送結束信號
使用這個方法的話,是很low的,有幾個消費者就要在主進程中向隊列中put幾個結束信號
from multiprocessing import Queue,Process import time,random def producer(q,name,food): for i in range(3): print(f'{name}生產了{food}{i}') time.sleep((random.randint(1,3))) res = f'{food}{i}' q.put(res) # q.put(None) # 當生產者結束生產的的時候,我們再隊列的最后再做一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去隊列里拿東西了 def consumer(q,name): while True: res = q.get(timeout=5) if res == None:break # 判斷隊列拿出的是不是生產者放的結束生產的標識,如果是則不取,直接退出,結束程序 time.sleep((random.randint(1, 3))) print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() # 為的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通訊 # 多個生產者進程 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) p2 = Process(target=producer,args=(q,'xichen','冰激凌')) p3 = Process(target=producer,args=(q,'喜陳','可樂')) # 多個消費者進程 c1 = Process(target=consumer,args=(q,'Tom')) c2 = Process(target=consumer,args=(q,'jack')) # 告訴操作系統啟動生產者進程 p1.start() p2.start() p3.start() # 告訴操作系統啟動消費者進程 c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) # 幾個消費者put幾次 q.put(None)
五、JoinableQueue方法
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
5.1 方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大于從隊列中刪除的項目數量,將引發ValueError異常。
q.join():生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
5.2 joinableQueue隊列實現生產者消費者模型
from multiprocessing import Queue,Process,JoinableQueue import time,random def producer(q,name,food): for i in range(3): print(f'{name}生產了{food}{i}') # time.sleep((random.randint(1,3))) res = f'{food}{i}' q.put(res) # q.put(None) # 當生產者結束生產的的時候,我們再隊列的最后再做一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去隊列里拿東西了 q.join() def consumer(q,name): while True: res = q.get(timeout=5) # if res == None:break # 判斷隊列拿出的是不是生產者放的結束生產的標識,如果是則不取,直接退出,結束程序 # time.sleep((random.randint(1, 3))) print(f'{name}吃了{res}') q.task_done()#向q.join()發送一次信號,證明一個數據已經被取走了 if __name__ == '__main__': q = JoinableQueue() # 為的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通訊 # 多個生產者進程 p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力')) p2 = Process(target=producer,args=(q,'xichen','冰激凌')) p3 = Process(target=producer,args=(q,'喜陳','可樂')) # 多個消費者進程 c1 = Process(target=consumer,args=(q,'Tom')) c2 = Process(target=consumer,args=(q,'jack')) # 告訴操作系統啟動生產者進程 p1.start() p2.start() p3.start() # 把生產者設為守護進程 c1.daemon = True c2.daemon = True # 告訴操作系統啟動消費者進程 c1.start() c2.start() p1.join() p2.join() p3.join() # 等待生產者生產完畢 print('主進程') ### 分析 # 生產者生產完畢--這是主進程最后一行代碼結束--q.join()消費者已經取干凈了,沒有存在的意義了 # 這是主進程最后一行代碼結束,消費者已經取干凈了,沒有存在的意義了.守護進程的概念.
5.3 測試joinableQueue
from multiprocessing import Process,Queue,JoinableQueue q = JoinableQueue() q.put('zhao') # 放隊列里一個任務 q.put('qian') print(q.get()) q.task_done() # 完成了一次任務 print(q.get()) q.task_done() # 完成了一次任務 q.join() #計數器不為0的時候 阻塞等待計數器為0后通過 # 想象成一個計數器 :put +1 task_done -1
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。