您好,登錄后才能下訂單哦!
前言
在python中使用多進程和多線程都能達到同時運行多個任務,和多進程和多線程的選擇上,應該優先選擇多進程的方式,因為多進程更加穩定,且對于進程的操作管理也更加方便,但有一點是多進程獨有的殺手锏,多進程可以將進程分步到多臺機器上跑,假如有很多個任務,一臺機器即使開了多進程或者多進程跑起來還是要耗很多時間,那么這時就要想一下可否將任務分配到多臺機器上跑,這樣可以更快的完成任務。
在分步式進程運算中,進程之前的通信還是依賴于Queue,但此時的隊列不能直接使用,需要使用multiprocessing.managers.BaseManager
進行包裝,通過回調以后才能使用,既然是分步式的調用,那么應該有一個服務端和一個客戶端,服務端通過網絡協議將隊列中的信息給各個客戶端進行調用,客戶端也可以通過隊列將結果返回,然后服務端進行結果的收集展示,流程如下
分步式流程
服務端將任務放到 task_queue 中,然后四個客戶端通過網絡端口從task_queue中獲取到任務,然后進行計算,再將結果放到result_queue中,最后服務端統一處理結果。整體的流程比較清晰,只是需要強調,這里的隊列不能是原始的隊列,需要使用BaseManager 進行包裝。
先看一下服務端的代碼
#coding:gbk import time, queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support # 任務個數 task_number = 10 # 定義收發隊列 task_queue = queue.Queue(task_number) result_queue = queue.Queue(task_number) def gettask(): return task_queue def getresult(): return result_queue def test(): # windows下綁定調用接口不能使用lambda,所以只能先定義函數再綁定 BaseManager.register('get_task', callable=gettask) BaseManager.register('get_result', callable=getresult) # 綁定端口并設置驗證碼,windows下需要填寫ip地址,linux下不填默認為本地 manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123') # 啟動 manager.start() try: # 通過網絡獲取任務隊列和結果隊列 task = manager.get_task() result = manager.get_result() # 添加任務 for i in range(task_number): print('Put task %d...' % i) task.put(i) # 每秒檢測一次是否所有任務都被執行完 while not result.full(): print(task.qsize()) time.sleep(1) for i in range(result.qsize()): ans = result.get() print('task %d is finish , runtime:%d s' % ans) except: print('Manager error') finally: manager.shutdown() if __name__ == '__main__': # windows下多進程可能會炸,添加這句可以緩解 freeze_support() test()
這里重點說一下 BaseManager.register('get_task', callable=gettask)
這行代碼,它的意思是注冊一個get_task的操作,執行的操作是gettask()
函數,上面定義了gettask()
函數,返回的是task_queue,這也是之前說的不能直接使用queue.Queue
,必須要使用通過BaseManager的register接口封裝過的的隊列,下面使用task = manager.get_task()
來獲取到這個隊列。
manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123')
這行代碼初始了一個manager,它綁定了本機的5002端口,并且在客戶端連接的時候需要一個密碼:123。
接下來看一下客戶端代碼。
#coding:gbk import time, sys, queue, random from multiprocessing.managers import BaseManager BaseManager.register('get_task') BaseManager.register('get_result') conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123') try: conn.connect() except: print('連接失敗') sys.exit() task = conn.get_task() result = conn.get_result() while not task.empty(): print(task.qsize()) n = task.get(timeout = 1) print('run task %d' % n) sleeptime = random.randint(0,3) time.sleep(sleeptime) rt = (n, sleeptime) result.put(rt) if __name__ == '__main__': pass;
這里主要看以下的代碼
BaseManager.register('get_task') BaseManager.register('get_result')
這兩個是注冊函數,和之前的服務端所對應,之前服務端注冊了這兩個函數,這里才能注冊使用,注意這里不能注冊服務端沒有注冊的函數
運行一下,先運行服務端,然后再啟兩個cmd運行客戶端,也可以在局域網中的另外的機器上運行,但是要修改服務端的ip地址
服務端的結果如下
Put task 0...
Put task 1...
Put task 2...
Put task 3...
Put task 4...
Put task 5...
Put task 6...
Put task 7...
Put task 8...
Put task 9...
task 0 is finish , runtime:3 s
task 1 is finish , runtime:0 s
task 2 is finish , runtime:2 s
task 4 is finish , runtime:1 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 5 is finish , runtime:3 s
task 8 is finish , runtime:2 s
task 9 is finish , runtime:3 s
兩個客戶端的結果分別如下
客戶端1
10
run task 0
9
run task 1
8
run task 2
6
run task 4
5
run task 5
1
run task 9
客戶端2
7
run task 3
4
run task 6
3
run task 7
2
run task 8
一起運行的截圖如下
結果
由于隊列是線程安全的,所以這里不用加鎖,在客戶端中打印print(task.qsize()) 當前的隊列大小,可以看到隊列的信息中同步到各個客戶端的。
最后還是要多說一句,分步式多進程雖然可以把任務分散到不同的機器上運行,可以處理多任務,但是如果此時服務端掛掉的話,任務就全丟掉了,所以在生產環境下還是考慮使用消息中間件如kafka等。
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。