您好,登錄后才能下訂單哦!
這篇文章主要介紹了Python中多進程并發與多線程并發編程的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
具體如下:
這里對python支持的幾種并發方式進行簡單的總結。
Python支持的并發分為多線程并發與多進程并發(異步IO本文不涉及)。概念上來說,多進程并發即運行多個獨立的程序,優勢在于并發處理的任務都由操作系統管理,不足之處在于程序與各進程之間的通信和數據共享不方便;多線程并發則由程序員管理并發處理的任務,這種并發方式可以方便地在線程間共享數據(前提是不能互斥)。Python對多線程和多進程的支持都比一般編程語言更高級,最小化了需要我們完成的工作。
一.多進程并發
Mark Summerfield指出,對于計算密集型程序,多進程并發優于多線程并發。計算密集型程序指的程序的運行時間大部分消耗在CPU的運算處理過程,而硬盤和內存的讀寫消耗的時間很短;相對地,IO密集型程序指的則是程序的運行時間大部分消耗在硬盤和內存的讀寫上,CPU的運算時間很短。
對于多進程并發,python支持兩種實現方式,一種是采用進程安全的數據結構:multiprocessing.JoinableQueue,這種數據結構自己管理“加鎖”的過程,程序員無需擔心“死鎖”的問題;python還提供了一種更為優雅而高級的實現方式:采用進程池。下面一一介紹。
1.隊列實現——使用multiprocessing.JoinableQueue
multiprocessing是python標準庫中支持多進程并發的模塊,我們這里采用multiprocessing中的數據結構:JoinableQueue,它本質上仍是一個FIFO的隊列,它與一般隊列(如queue中的Queue)的區別在于它是多進程安全的,這意味著我們不用擔心它的互斥和死鎖問題。JoinableQueue主要可以用來存放執行的任務和收集任務的執行結果。舉例來看(以下皆省去導入包的過程):
def read(q): while True: try: value = q.get() print('Get %s from queue.' % value) time.sleep(random.random()) finally: q.task_done() def main(): q = multiprocessing.JoinableQueue() pw1 = multiprocessing.Process(target=read, args=(q,)) pw2 = multiprocessing.Process(target=read, args=(q,)) pw1.daemon = True pw2.daemon = True pw1.start() pw2.start() for c in [chr(ord('A')+i) for i in range(26)]: q.put(c) try: q.join() except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
對于windows系統的多進程并發,程序文件里必須含有“入口函數”(如main函數),且結尾處必須調用入口點。例如以if __name__ == '__main__': main()
結尾。
在這個最簡單的多進程并發例子里,我們用多進程實現將26個字母打印出來。首先定義一個存放任務的JoinableQueue對象,然后實例化兩個Process對象(每個對象對應一個子進程),實例化Process對象需要傳送target和args參數,target是實現每個任務工作中的具體函數,args是target函數的參數。
pw1.daemon = True pw2.daemon = True
這兩句話將子進程設置為守護進程——主進程結束后隨之結束。
pw1.start() pw2.start()
一旦運行到這兩句話,子進程就開始獨立于父進程運行了,它會在單獨的進程里調用target引用的函數——在這里即read函數,它是一個死循環,將參數q中的數一一讀取并打印出來。
value = q.get()
這是多進程并發的要點,q是一個JoinableQueue對象,支持get方法讀取第一個元素,如果q中沒有元素,進程就會阻塞,直至q中被存入新元素。
因此執行完pw1.start()
pw2.start()
這兩句話后,子進程雖然開始運行了,但很快就堵塞住。
for c in [chr(ord('A')+i) for i in range(26)]: q.put(c)
將26個字母依次放入JoinableQueue對象中,這時候兩個子進程不再阻塞,開始真正地執行任務。兩個子進程都用value = q.get()來讀取數據,它們都在修改q對象,而我們并不用擔心同步問題,這就是multiProcessing.Joinable數據結構的優勢所在——它是多進程安全的,它會自動處理“加鎖”的過程。
try: q.join()
q.join()
方法會查詢q中的數據是否已讀完——這里指的就是任務是否執行完,如果沒有,程序會阻塞住等待q中數據讀完才開始繼續執行(可以用Ctrl+C強制停止)。
對Windows系統,調用任務管理器應該可以看到有多個子進程在運行。
2.進程池實現——使用concurrent.futures.ProcessPoolExecutor
Python還支持一種更為優雅的多進程并發方式,直接看例子:
def read(q): print('Get %s from queue.' % q) time.sleep(random.random()) def main(): futures = set() with concurrent.futures.ProcessPoolExecutor() as executor: for q in (chr(ord('A')+i) for i in range(26)): future = executor.submit(read, q) futures.add(future) try: for future in concurrent.futures.as_completed(futures): err = future.exception() if err is not None: raise err except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
這里我們采用concurrent.futures.ProcessPoolExecutor對象,可以把它想象成一個進程池,子進程往里“填”。我們通過submit方法實例一個Future對象,然后把這里Future對象都填到池——futures里,這里futures是一個set對象。只要進程池里有future,就會開始執行任務。這里的read函數更為簡單——只是把一個字符打印并休眠一會而已。
try: for future in concurrent.futures.as_completed(futures):
這是等待所有子進程都執行完畢。子進程執行過程中可能拋出異常,err = future.exception()
可以收集這些異常,便于后期處理。
可以看出用Future對象處理多進程并發更為簡潔,無論是target函數的編寫、子進程的啟動等等,future對象還可以向使用者匯報其狀態,也可以匯報執行結果或執行時的異常。
二.多線程并發
對于IO密集型程序,多線程并發可能要優于多進程并發。因為對于網絡通信等IO密集型任務來說,決定程序效率的主要是網絡延遲,這時候是使用進程還是線程就沒有太大關系了。
1.隊列實現——使用queue.Queue
程序與多進程基本一致,只是這里我們不必使用multiProcessing.JoinableQueue
對象了,一般的隊列(來自queue.Queue)就可以滿足要求:
def read(q): while True: try: value = q.get() print('Get %s from queue.' % value) time.sleep(random.random()) finally: q.task_done() def main(): q = queue.Queue() pw1 = threading.Thread(target=read, args=(q,)) pw2 = threading.Thread(target=read, args=(q,)) pw1.daemon = True pw2.daemon = True pw1.start() pw2.start() for c in [chr(ord('A')+i) for i in range(26)]: q.put(c) try: q.join() except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
并且這里我們實例化的是Thread對象,而不是Process對象,程序的其余部分看起來與多進程并沒有什么兩樣。
2. 線程池實現——使用concurrent.futures.ThreadPoolExecutor
直接看例子:
def read(q): print('Get %s from queue.' % q) time.sleep(random.random()) def main(): futures = set() with concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count()*4) as executor: for q in (chr(ord('A')+i) for i in range(26)): future = executor.submit(read, q) futures.add(future) try: for future in concurrent.futures.as_completed(futures): err = future.exception() if err is not None: raise err except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()
用ThreadPoolExecutor與用ProcessPoolExecutor看起來沒什么區別,只是改了一下簽名而已。
不難看出,不管是使用隊列還是使用進/線程池,從多進程轉化到多線程是十分容易的——僅僅是修改了幾個簽名而已。當然內部機制完全不同,只是python的封裝非常好,使我們可以不用關心這些細節,這正是python優雅之處。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Python中多進程并發與多線程并發編程的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。