您好,登錄后才能下訂單哦!
這篇文章主要介紹“Python中怎么使用multiprocessing實現進程間通信”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Python中怎么使用multiprocessing實現進程間通信”文章能幫助大家解決問題。
python的多線程代碼效率由于受制于GIL,不能利用多核CPU來加速,而多進程方式可以繞過GIL, 發揮多CPU加速的優勢,能夠明顯提高程序的性能
但進程間通信卻是不得不考慮的問題。 進程不同于線程,進程有自己的獨立內存空間,不能使用全局變量在進程間傳遞數據。
實際項目需求中,常常存在密集計算、或實時性任務,進程之間有時需要傳遞大量數據,如圖片、大對象等,傳遞數據如果通過文件序列化、或網絡接口來進行,難以滿足實時性要求,采用redis,或者kaffka, rabbitMQ 之第3方消息隊列包,又使系統復雜化了。
Python multiprocessing 模塊本身就提供了消息機制、同步機制、共享內存等各種非常高效的進程間通信方式。
了解并掌握 python 進程間通信的各類方式的使用,以及安全機制,可以幫助大幅提升程序運行性能。
進程間通信的主要方式總結如下
關于進程間通信的內存安全
內存安全意味著,多進程間可能會因同搶,意外銷毀等原因造成共享變量異常。
Multiprocessing 模塊提供的Queue, Pipe, Lock, Event 對象,都已實現了進程間通信安全機制。
采用共享內存方式通信,需要在代碼中自已來跟蹤、銷毀這些共享內存變量,否則可能會出同搶、未正常銷毀等。造成系統異常。 除非開發者很清楚共享內存使用特點,否則不建議直接使用此共享內存,而是通過Manager管理器來使用共享內存。
內存管理器Manager
Multiprocessing提供了內存管理器Manager類,可統一解決進程通信的內存安全問題,可以將各種共享數據加入管理器,包括 list, dict, Queue, Lock, Event, Shared Memory 等,由其統一跟蹤與銷毀。
類似于1上簡單的socket通道,雙端均可收發消息。
Pipe 對象的構建方法:
parent_conn, child_conn = Pipe(duplex=True/False)
參數說明
duplex=True, 管道為雙向通信
duplex=False, 管道為單向通信,只有child_conn可以發消息,parent_conn只能接收。
示例代碼:
from multiprocessing import Process, Pipe def myfunction(conn): conn.send(['hi!! I am Python']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=myfunction, args=(child_conn,)) p.start() print (parent_conn.recv() ) p.join()
Multiprocessing 的Queue 類,是在python queue 3.0版本上修改的, 可以很容易實現生產者 – 消息者間傳遞數據,而且Multiprocessing的Queue 模塊實現了lock安全機制。
Queue模塊共提供了3種類型的隊列。
(1) FIFO queue , 先進先出,
class queue.Queue(maxsize=0)
(2) LIFO queue, 后進先出, 實際上就是堆棧
class queue.LifoQueue(maxsize=0)
(3) 帶優先級隊列, 優先級最低entry value lowest 先了列
class queue.PriorityQueue(maxsize=0)
Multiprocessing.Queue類的主要方法:
method | Description |
---|---|
queue.qsize() | 返回隊列長度 |
queue.full() | 隊列滿,返回 True, 否則返回False |
queue.empty() | 隊列空,返回 True, 否則返回False |
queue.put(item) | 將數據寫入隊列 |
queue.get() | 將數據拋出隊列 , |
queue.put_nowait(item), queue.get_nowait() | 無等待寫入或拋出 |
說明:
put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。
Multiprocessing 的Queue類沒有提供Task_done, join方法
Queue模塊的其它隊列類:
(1) SimpleQueue
簡潔版的FIFO隊列, 適事簡單場景使用
(2) JoinableQueue子類
Python 3.5 后新增的 Queue的子類,擁有 task_done(), join() 方法
task_done()表示,最近讀出的1個任務已經完成。
join()阻塞隊列,直到queue中的所有任務都已完成。
producer – consumer 場景,使用Queue的示例
import multiprocessing def producer(numbers, q): for x in numbers: if x % 2 == 0: if q.full(): print("queue is full") break q.put(x) print(f"put {x} in queue by producer") return None def consumer(q): while not q.empty(): print(f"take data {q.get()} from queue by consumer") return None if __name__ == "__main__": # 設置1個queue對象,最大長度為5 qu = multiprocessing.Queue(maxsize=5,) # 創建producer子進程,把queue做為其中1個參數傳給它,該進程負責寫 p5 = multiprocessing.Process( name="producer-1", target=producer, args=([random.randint(1, 100) for i in range(0, 10)], qu) ) p5.start() p5.join() #創建consumer子進程,把queue做為1個參數傳給它,該進程中隊列中讀 p6 = multiprocessing.Process( name="consumer-1", target=consumer, args=(qu,) ) p6.start() p6.join() print(qu.qsize())
Multiprocessing也提供了與threading 類似的同步鎖機制,確保某個時刻只有1個子進程可以訪問某個資源或執行某項任務, 以避免同搶。
例如:多個子進程同時訪問數據庫表時,如果沒有同步鎖,用戶A修改1條數據后,還未提交,此時,用戶B也進行了修改,可以預見,用戶A提交的將是B個修改的數據。
添加了同步鎖,可以確保同時只有1個子進程能夠進行寫入數據庫與提交操作。
如下面的示例,同時只有1個進程可以執行打印操作。
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
Event 機制的工作原理:
1個event 對象實例管理著1個 flag標記, 可以用set()方法將其置為true, 用clear()方法將其置為false, 使用wait()將阻塞當前子進程,直至flag被置為true.
這樣由1個進程通過event flag 就可以控制、協調各子進程運行。
Event object的使用方法:
1)主函數: 創建1個event 對象, flag = multiprocessing.Event() , 做為參數傳給各子進程
2) 子進程A: 不受event影響,通過event 控制其它進程的運行
o 先clear(),將event 置為False, 占用運行權.
o 完成工作后,用set()把flag置為True。
3) 子進程B, C: 受event 影響
o 設置 wait() 狀態,暫停運行
o 直到flag重新變為True,恢復運行
主要方法:
set(), clear()設置 True/False,
wait() 使進程等待,直到flag被改為true.
is_set(), Return True if and only if the internal flag is true.
驗證進程間通信 – Event
import multiprocessing import time import random def joo_a(q, ev): print("subprocess joo_a start") if not ev.is_set(): ev.wait() q.put(random.randint(1, 100)) print("subprocess joo_a ended") def joo_b(q, ev): print("subprocess joo_b start") ev.clear() time.sleep(2) q.put(random.randint(200, 300)) ev.set() print("subprocess joo_b ended") def main_event(): qu = multiprocessing.Queue() ev = multiprocessing.Event() sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev)) sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,)) sub_a.start() sub_b.start() # ev.set() sub_a.join() sub_b.join() while not qu.empty(): print(qu.get()) if __name__ == "__main__": main_event()
子進程之間共存內存變量,要用 multiprocessing.Value(), Array() 來定義變量。 實際上是ctypes 類型,由multiprocessing.sharedctypes模塊提供相關功能
注意 使用 share memory 要考慮同搶等問題,釋放等問題,需要手工實現。因此在使用共享變量時,建議使用Manager管程來管理這些共享變量。
def func(num): num.value=10.78 #子進程改變數值的值,主進程跟著改變 if __name__=="__main__": num = multiprocessing.Value("d", 10.0) # d表示數值,主進程與子進程可共享這個變量。 p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num.value)
進程之間共享數據(數組型):
import multiprocessing def func(num): num[2]=9999 #子進程改變數組,主進程跟著改變 if __name__=="__main__": num=multiprocessing.Array("i",[1,2,3,4,5]) p=multiprocessing.Process(target=func,args=(num,)) p.start() p.join() print(num[:])
如果進程間需要共享對象數據,或共享內容,數據較大,multiprocessing 提供了SharedMemory類來實現進程間實時通信,不需要通過發消息,讀寫磁盤文件來實現,速度更快。
注意:直接使用SharedMemory 存在著同搶、泄露隱患,應通過SharedMemory Manager 管程類來使用, 以確保內存安全。
創建共享內存區:
multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)
方法:
父進程創建shared_memory 后,子進程可以使用它,當不再需要后,使用close(), 刪除使用unlink()方法
相關屬性:
獲取內存區內容: shm.buf
獲取內存區名稱: shm.name
獲取內存區字節數: shm.size
示例:
>>> from multiprocessing import shared_memory >>> shm_a = shared_memory.SharedMemory(create=True, size=10) >>> type(shm_a.buf) <class 'memoryview'> >>> buffer = shm_a.buf >>> len(buffer) 10 >>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once >>> buffer[4] = 100 # Modify single byte at a time >>> # Attach to an existing shared memory block >>> shm_b = shared_memory.SharedMemory(shm_a.name) >>> import array >>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array array('b', [22, 33, 44, 55, 100]) >>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes >>> bytes(shm_a.buf[:5]) # Access via shm_a b'howdy' >>> shm_b.close() # Close each SharedMemory instance >>> shm_a.close() >>> shm_a.unlink() # Call unlink only once to release the shared memory
sharedMemory類還提供了1個共享列表類型,這樣就更方便了,進程間可以直接共享python強大的列表
構建方法:
multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)
from multiprocessing import shared_memory >>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42]) >>> [ type(entry) for entry in a ] [<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>] >>> a[2] -273.154 >>> a[2] = -78.5 >>> a[2] -78.5 >>> a[2] = 'dry ice' # Changing data types is supported as well >>> a[2] 'dry ice' >>> a[2] = 'larger than previously allocated storage space' Traceback (most recent call last): ... ValueError: exceeds available storage for existing str >>> a[2] 'dry ice' >>> len(a) 7 >>> a.index(42) 6 >>> a.count(b'howdy') 0 >>> a.count(b'HoWdY') 1 >>> a.shm.close() >>> a.shm.unlink() >>> del a # Use of a ShareableList after call to unlink() is unsupported b = shared_memory.ShareableList(range(5)) # In a first process >>> c = shared_memory.ShareableList(name=b.shm.name) # In a second process >>> c ShareableList([0, 1, 2, 3, 4], name='...') >>> c[-1] = -999 >>> b[-1] -999 >>> b.shm.close() >>> c.shm.close() >>> c.shm.unlink()
Multiprocessing 提供了 Manager 內存管理器類,當調用1個Manager實例對象的start()方法時,會創建1個manager進程,其唯一目的就是管理共享內存, 避免出現進程間共享數據不同步,內存泄露等現象。
其原理如下:
Manager管理器相當于提供了1個共享內存的服務,不僅可以被主進程創建的多個子進程使用,還可以被其它進程訪問,甚至跨網絡訪問。本文僅聚焦于由單一主進程創建的各進程之間的通信。
相關類:multiprocessing.Manager
子類有:
multiprocessing.managers.SharedMemoryManager
multiprocessing.managers.BaseManager
支持共享變量類型:
python基本類型 int, str, list, tuple, list
進程通信對象: Queue, Lock, Event,
Condition, Semaphore, Barrier ctypes類型: Value, Array
1)創建管理器對象
snm = Manager() snm = SharedMemoryManager()
2)創建共享內存變量
新建list, dict
sl = snm.list(), snm.dict()
新建1塊bytes共享內存變量,需要指定大小
sx = snm.SharedMemory(size)
新建1個共享列表變量,可用列表來初始化
sl = snm.ShareableList(sequence) 如 sl = smm.ShareableList([‘howdy', b'HoWdY', -273.154, 100, True])
新建1個queue, 使用multiprocessing 的Queue類型
snm = Manager() q = snm.Queue()
示例 :
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
將打印
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
方法一:
調用snm.shutdown()方法,會自動調用每個內存塊的unlink()方法釋放內存。或者 snm.close()
方法二:
使用with語句,結束后會自動釋放所有manager變量
>>> with SharedMemoryManager() as smm: ... sl = smm.ShareableList(range(2000)) ... # Divide the work among two processes, storing partial results in sl ... p1 = Process(target=do_work, args=(sl, 0, 1000)) ... p2 = Process(target=do_work, args=(sl, 1000, 2000)) ... p1.start() ... p2.start() # A multiprocessing.Pool might be more efficient ... p1.join() ... p2.join() # Wait for all work to complete in both processes ... total_result = sum(sl) # Consolidate the partial results now in sl
managers的子類BaseManager提供register()方法,支持注冊自定義數據類型。如下例,注冊1個自定義MathsClass類,并生成實例。
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) # prints 7 print(maths.mul(7, 8))
關于“Python中怎么使用multiprocessing實現進程間通信”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。