您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關python如何實現多進程,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
多進程的含義
進程(Process)是具有一定獨立功能的程序關于某個數據集合上的一次運行活動,是系統進行資源分配和調度的一個獨立單位。
顧名思義,多進程就是啟用多個進程同時運行。由于進程是線程的集合,而且進程是由一個或多個線程構成的,所以多進程的運行意味著有大于或等于進程數量的線程在運行。
由于進程中 GIL 的存在,Python 中的多線程并不能很好地發揮多核優勢,一個進程中的多個線程,在同一時刻只能有一個線程運行。
而對于多進程來說,每個進程都有屬于自己的 GIL,所以,在多核處理器下,多進程的運行是不會受 GIL 的影響的。因此,多進程能更好地發揮多核的優勢。
當然,對于爬蟲這種 IO 密集型任務來說,多線程和多進程影響差別并不大。對于計算密集型任務來說,Python 的多進程相比多線程,其多核運行效率會有成倍的提升。
總的來說,Python 的多進程整體來看是比多線程更有優勢的。所以,在條件允許的情況下,能用多進程就盡量用多進程。
不過值得注意的是,由于進程是系統進行資源分配和調度的一個獨立單位,所以各個進程之間的數據是無法共享的,如多個進程無法共享一個全局變量,
進程之間的數據共享需要有單獨的機制來實現。
在Python中也有內置的庫來實現多進程,它就是multiprocessing。multiprocessing提供了一系列的組件,如Process(進程)、Queue(隊列)、
Semaphore(信號量)、Pipe(管道)、Lock(鎖)、Pool(進程池)等。
import multiprocessing import time now = lambda: time.time() def work(index): print(index) time.sleep(index) def main(): start_dt = now() for i in range(5): p = multiprocessing.Process(target=work, args=(i,)) p.start() print(f"Time: {now() - start_dt}") if __name__ == "__main__": main()
import multiprocessingimport timenow = lambda: time.time()def work(i, index): print(f'{i}進程啟動') time.sleep(index) print(f'{i}進程結束')def main(): start_dt = now() for i in range(5): p = multiprocessing.Process(target=work, args=(i, 5)) p.start() # 查看本機的 cpu 數量 print(f"CPU Numbers: {multiprocessing.cpu_count()}") # 查看全部活躍子進程的名稱以及pid for p in multiprocessing.active_children(): print(p.name, p.pid) print(f"Time End: {now() - start_dt}")if __name__ == "__main__": main()
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Pid: {self.pid}, Name: {self.name}") def main(): for i in range(2, 5): p = MyProcess(i) p.start() if __name__ == "__main__": main()
在多進程中,同樣存在守護進程的概念,如果一個進程被設置為守護進程,當父進程結束后,子進程會自動被終止,我們可以通過設置daemon屬性來控制是否為守護進程。
還是原來的例子,增加了deamon屬性的設置:
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() if __name__ == "__main__": main() # 主進程沒有做任何事情 直接輸入后結束 同時也終止了子進程的運行。 print("Main End.")
這樣可以有效防止無控制地生成子進程。這樣的寫法可以讓我們在主進程運行結束后無需額外擔心子進程是否關閉,避免了獨立子進程的運行。
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): processes = [] for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() processes.append(p) for p in processes: p.join() if __name__ == "__main__": main() # 主進程沒有做任何事情 直接輸入后結束 同時也終止了子進程的運行。 print("Main End.")
默認情況下,join是無限期的。也就是說,如果有子進程沒有運行完畢,主進程會一直等待。這種情況下,如果子進程出現問題陷入了死循環,主進程也會無限等待下去。
怎么解決這個問題呢?可以給 join 方法傳遞一個超時參數,代表最長等待秒數。如果子進程沒有在這個指定秒數之內完成,會被強制返回,主進程不再會等待。
也就是說這個參數設置了主進程等待該子進程的最長時間。
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): processes = [] for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() processes.append(p) for p in processes: # 主進程最多等待改進程 1 s p.join(1) if __name__ == "__main__": main() # 主進程沒有做任何事情 直接輸入后結束 同時也終止了子進程的運行。 print("Main End.")
當然,終止進程不止有守護進程這一種做法,我們也可以通過 terminate 方法來終止某個子進程,另外我們還可以通過 is_alive 方法判斷進程是否還在運行。
import multiprocessing import time def task(): print("1") time.sleep(5) print("2") if __name__ == "__main__": p = multiprocessing.Process(target=task) # 使用 is_alive 判斷當前進程進程是否在運行 print(f"First: {p}, {p.is_alive()}") p.start() print(f"During: {p}, {p.is_alive()}") p.terminate() # 即使此時已經調用了 terminate 進程的狀態還是 True, 即運行狀態 # 在調用 join 之后才變成了終止狀態 print(f"After: {p}, {p.is_alive()}") p.join() print(f"Joined: {p}, {p.is_alive()}")
我們發現,有的輸出結果沒有換行。這是什么原因造成的呢?這種情況是由多個進程并行執行導致的,兩個進程同時進行了輸出,結果第一個進程的換行沒有來得及輸出,
第二個進程就輸出了結果,導致最終輸出沒有換行。
那如何來避免這種問題?如果我們能保證,多個進程運行期間的任一時間,只能一個進程輸出,其他進程等待,等剛才那個進程輸出完畢之后,另一個進程再進行輸出,
這樣就不會出現輸出沒有換行的現象了。
這種解決方案實際上就是實現了進程互斥,避免了多個進程同時搶占臨界區(輸出)資源。我們可以通過multiprocessing中的Lock來實現。Lock,即鎖,
在一個進程輸出時,加鎖,其他進程等待。等此進程執行結束后,釋放鎖,其他進程可以進行輸出。
首先是一個不加鎖的實例:
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop, lock: multiprocessing.Lock): super(MyProcess, self).__init__() self.loop = loop self.lock = lock def run(self): for count in range(self.loop): time.sleep(0.1) # self.lock.acquire() print(f"Pid: {self.pid}, LoopCount: {count}") # self.lock.release() def main(): lock = multiprocessing.Lock() for i in range(10, 15): p = MyProcess(i, lock) p.start() if __name__ == "__main__": main()
然后取消注釋再次運行。
進程互斥鎖可以使同一時刻只有一個進程能訪問共享資源,如上面的例子所展示的那樣,在同一時刻只能有一個進程輸出結果。
但有時候我們需要允許多個進程來訪問共享資源,同時還需要限制能訪問共享資源的進程的數量。
這種需求該如何實現呢?可以用信號量,信號量是進程同步過程中一個比較重要的角色。它可以控制臨界資源的數量,實現多個進程同時訪問共享資源,限制進程的并發量。
我們可以用 multiprocessing 庫中的 Semaphore 來實現信號量。
那么接下來我們就用一個實例來演示一下進程之間利用 Semaphore 做到多個進程共享資源,同時又限制同時可訪問的進程數量,代碼如下:
import multiprocessing import time ''' Semaphore管理一個內置的計數器, 每當調用acquire()時內置計數器-1; 調用release() 時內置計數器+1; 計數器不能小于0;當計數器為0時,acquire()將阻塞線程直到其他線程(進程)調用release()。 ''' buffer = multiprocessing.Queue(10) empty = multiprocessing.Semaphore(2) # 緩沖區閑適區空余數 full = multiprocessing.Semaphore(0) # 緩沖區占用區占用數 lock = multiprocessing.Lock() class Consumer(multiprocessing.Process): def run(self): global buffer, empty, full, lock while True: full.acquire() lock.acquire() buffer.get() print("Consumer pop an element.") time.sleep(1) lock.release() empty.release() class Producer(multiprocessing.Process): def __init__(self, name): super(Producer, self).__init__() self.name = name def run(self): global buffer, empty, full, lock # 生產者 Producer 使用 acquire 方法來占用一個緩沖區位置,緩沖區空閑區大小減 1,接下來進行加鎖,對緩沖區進行操作, # 然后釋放鎖,最后讓代表占用的緩沖區位置數量加 1,消費者則相反。 # 通過 Semaphore 我們很好地控制了進程對資源的并發訪問數量。 while True: empty.acquire() lock.acquire() buffer.put(1) print(f"{self.name} Producer put an element.") time.sleep(1) lock.release() full.release() def main(): lst = [] for i in range(3): p = Producer(str(i)) p.daemon = True p.start() lst.append(p) c = Consumer() c.daemon = True c.start() c.join() for p in lst: p.join() print("Main End.") if __name__ == "__main__": main()
在上面的例子中我們使用Queue作為進程通信的共享隊列使用。而如果我們把上面程序中的Queue換成普通的list,是完全起不到效果的,因為進程和進程之間的資源是不共享的。
即使在一個進程中改變了這個list,在另一個進程也不能獲取到這個list的狀態,所以聲明全局變量對多進程是沒有用處的。那進程如何共享數據呢?可以用Queue,即隊列。
當然這里的隊列指的是 multiprocessing 里面的 Queue。
剛才我們使用Queue實現了進程間的數據共享,那么進程之間直接通信,如收發信息,用什么比較好呢?可以用Pipe,管道。管道,我們可以把它理解為兩個進程之間通信的通道。
管道可以是單向的,即half-duplex:一個進程負責發消息,另一個進程負責收消息;也可以是雙向的duplex,即互相收發消息。
默認聲明Pipe對象是雙向管道,如果要創建單向管道,可以在初始化的時候傳入 deplex 參數為 False。
import multiprocessing class Consumer(multiprocessing.Process): def __init__(self, pipe): super(Consumer, self).__init__() self.pipe = pipe def run(self): self.pipe.send("Consumer Words.") print(f"Consumer Recv: {self.pipe.recv()}") class Producer(multiprocessing.Process): def __init__(self, pipe): super(Producer, self).__init__() self.pipe = pipe def run(self): self.pipe.send("Producer Words.") print(f"Producer Recv: {self.pipe.recv()}") def main(): # 聲明了一個默認為雙向的管道,然后將管道的兩端分別傳給兩個進程。 # 管道 Pipe 就像進程之間搭建的橋梁,利用它我們就可以很方便地實現進程間通信了。 pipe = multiprocessing.Pipe() c = Consumer(pipe[0]) p = Producer(pipe[1]) c.daemon = True p.daemon = True c.start() p.start() c.join() p.join() print("Main Process Ended.") if __name__ == "__main__": main()
我們講了可以使用Process來創建進程,同時也講了如何用Semaphore來控制進程的并發執行數量。假如現在我們遇到這么一個問題,我有10000個任務,
每個任務需要啟動一個進程來執行,并且一個進程運行完畢之后要緊接著啟動下一個進程,同時我還需要控制進程的并發數量,不能并發太高,
不然CPU處理不過來(如果同時運行的進程能維持在一個最高恒定值當然利用率是最高的)。那么我們該如何來實現這個需求呢?
用Process和Semaphore可以實現,但是實現起來比較煩瑣。而這種需求在平時又是非常常見的。此時,我們就可以派上進程池了,即 multiprocessing 中的 Pool。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,就會創建一個新的進程用來執行該請求;
但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來執行它。
進程池1:
import multiprocessing import time def function(index): print(f"{index} start") time.sleep(3) print(f"{index} end") def main(): # 聲明了一個大小為 3 的進程池,通過 processes 參數來指定,如果不指定,那么會自動根據處理器內核來分配進程數。 pool = multiprocessing.Pool(processes=3) for i in range(4): # 使用 apply_async 方法將進程添加進去,args 可以用來傳遞參數。 pool.apply_async(function, args=(i, )) print("Main start.") # 關閉進程池 使之不再接受新任務 pool.close() pool.join() print("Main end.") if __name__ == "__main__": main()
再介紹進程池一個更好用的map方法,可以將上述寫法簡化很多。map方法是怎么用的呢?第一個參數就是要啟動的進程對應的執行方法,
第2個參數是一個可迭代對象,其中的每個元素會被傳遞給這個執行方法。
舉個例子:現在我們有一個list,里面包含了很多URL,另外我們也定義了一個方法用來抓取每個URL內容并解析,
那么我們可以直接在map的第一個參數傳入方法名,第 2 個參數傳入 URL 數組。
進程池2:
import multiprocessing import requests def scrape(url): try: ret = requests.get(url).text print(ret[:10]) print() except: print("Get Error") def main(): pool = multiprocessing.Pool(processes=3) urls = [ 'http://data.eastmoney.com/hsgt/index.html', 'http://data.eastmoney.com/hsgtcg/gzcglist.html', 'https://www.runoob.com/mysql/mysql-alter.html', 'https://blog.csdn.net/weixin_42329277/article/details/80735009?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task', ] pool.map(scrape, urls) pool.close() # pool.join() print("Main End.") if __name__ == "__main__": main()
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。