您好,登錄后才能下訂單哦!
這篇文章主要介紹“Python進程間的通信方式是什么”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Python進程間的通信方式是什么”文章能幫助大家解決問題。
這里舉一個例子接介紹通信的機制:通信 一詞大家并不陌生,比如一個人要給他的女友打電話。當建立了通話之后,在這個通話的過程中就是建立了一條隱形的 隊列 (記住這個詞)。此時這個人就會通過對話的方式不停的將信息告訴女友,而這個人的女友也是在傾聽著。(嗯…我個人覺得大部分情況下可能是反著來的)。
這里可以將他們兩個人比作是兩個進程,"這個人"的進程需要將信息發送給"女友"的進程,就需要一個隊列的幫助。而女友需要不停的接收隊列的信息,可以做一些其他的事情,所以兩個進程之間的通信主要依賴于隊列。
這個隊列可以支持發送消息與接收消息,“這個人"負責發送消息,反之"女友” 負責的是接收消息。
既然隊列才是重點,那么來看一下隊列要如何創建。
依然使用 multiprocessing 模塊,調用該模塊的 Queue 函數來實現隊列的創建。
函數名 | 介紹 | 參數 | 返回值 |
---|---|---|---|
Queue | 隊列的創建 | mac_count | 隊列對象 |
Queue 函數功能介紹:調用 Queue 可以創建隊列;它有一個參數 mac_count 代表隊列最大可以創建多少信息,如果不傳默認是無限長度。實例化一個隊列對象之后,需要操作這個隊列的對象進行放入與取出數據。
函數名 | 介紹 | 參數 | 返回值 |
---|---|---|---|
put | 將消息放入隊列 | message | 無 |
get | 獲取隊列消息 | 無 | str |
put 函數功能介紹:將數據傳入。它有一個參數 message ,是一個字符串類型。
get 函數功能介紹:用來接收隊列中的數據。(其實這里就是一個常用的json場景,有很多的數據傳輸都是 字符串 的,隊列的插入與獲取就是使用的字符串,所以 json 就非常適用這個場景。)
接下來就來練習一下 隊列的使用 。
代碼示例如下:
# coding:utf-8 import json import multiprocessing class Work(object): # 定義一個 Work 類 def __init__(self, queue): # 構造函數傳入一個 '隊列對象' --> queue self.queue = queue def send(self, message): # 定義一個 send(發送) 函數,傳入 message # [這里有個隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數、類、集合等依然會報錯] if not isinstance(message, str): # 判斷傳入的 message 是否為字符串,若不是,則進行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的隊列實例化對象將 message 發送出去 def receive(self): # 定義一個 receive(接收) 函數,不需傳入參數,但是因為接收是一個源源不斷的過程,所以需要使用 while 循環 while 1: result = self.queue.get() # 獲取 '隊列對象' --> queue 傳入的message # 由于我們接收的 message 可能不是一個字符串,所以要進程異常的捕獲 try: # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res res = json.loads(result) except: res = result print('接收到的信息為:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},)) receive = multiprocessing.Process(target=work.receive) send.start() receive.start()
使用隊列建立進程間通信遇到的異常
但是這里會出現一個 報錯,如下圖:
報錯截圖示例如下:
這里的報錯提示是 文件沒有被發現的意思 。其實這里是我們使用 隊列做 put() 和 get()的時候 有一把無形的鎖加了上去,就是上圖中圈中的 .SemLock 。我們不需要去關心造成這個錯誤的具體原因,要解決這個問題其實也很簡單。
FileNotFoundError: [Errno 2] No such file or directory 異常的解決
我們只需要給 send 或者 receive 其中一個子進程添加 join 阻塞進程即可,理論上如此。但是我們的 receive子進程是一個 while循環,它會一直執行,所以只需要給 send 子進程加上一個 join 即可。
解決示意圖如下:
PS:雖然解決了報錯問題,但是程序沒有正常退出。
實際上由于我們的 receive 進程是個 while循環,并不知道要處理到什么時候,沒有辦法立刻終止。所以我們需要在 receive 進程 使用 terminate() 函數終結接收端。
運行結果如下:
新建一個函數,寫入 for循環 模擬批量添加要發送的消息
然后再給這個模擬批量發送數據的函數添加一個線程。
示例代碼如下:
# coding:utf-8 import json import time import multiprocessing class Work(object): # 定義一個 Work 類 def __init__(self, queue): # 構造函數傳入一個 '隊列對象' --> queue self.queue = queue def send(self, message): # 定義一個 send(發送) 函數,傳入 message # [這里有個隱藏的bug,就是只判斷了傳入的是否字符串類型;如果傳入的是函數、類、集合等依然會報錯] if not isinstance(message, str): # 判斷傳入的 message 是否為字符串,若不是,則進行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的隊列實例化對象將 message 發送出去 def send_all(self): # 定義一個 send_all(發送)函數,然后通過for循環模擬批量發送的 message for i in range(20): self.queue.put('第 {} 次循環,發送的消息為:{}'.format(i, i)) time.sleep(1) def receive(self): # 定義一個 receive(接收) 函數,不需傳入參數,但是因為接收是一個源源不斷的過程,所以需要使用 while 循環 while 1: result = self.queue.get() # 獲取 '隊列對象' --> queue 傳入的message # 由于我們接收的 message 可能不是一個字符串,所以要進程異常的捕獲 try: # 如果傳入的 message 符合 JSON 格式將賦值給 res ;若不符合,則直接使用 result 賦值 res res = json.loads(result) except: res = result print('接收到的信息為:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '這是一條測試的消息'},)) receive = multiprocessing.Process(target=work.receive) send_all = multiprocessing.Process(target=work.send_all,) send_all.start() # 這里因為 send 只執行了1次,然后就結束了。而 send_all 卻要循環20次,它的執行時間是最長的,信息也是發送的最多的 send.start() receive.start() # send.join() # 使用 send 的阻塞會造成 send_all 循環還未結束 ,receive.terminate() 函數接收端就會終結。 send_all.join() # 所以我們只需要阻塞最長使用率的進程就可以了 receive.terminate()
運行結果如下:
從上圖中我們可以看到 send 與 send_all 兩個進程都可以通過 queue這個實例化的 Queue 對象發送消息,同樣的 receive接收函數也會將兩個進程傳入的 message 打印輸出出來。
該章節我們通過隊列的方式實現了進程間通信的方法,并且了解了隊列的使用方法。一個隊列中,有一端(這里我們演示的是 send端)通過 put方法實現添加相關的信息,另一端使用 get 方法獲取相關的信息;兩個進程相互配合達到一個進程通信的效果。
其實進程之間的通信不僅僅只有隊列這一種方式,感興趣的話還可以通過 管道、信號量、共享內存的方式來實現。可以自行拓展一下。
python提供了多種進程通信的方式,包括信號,管道,消息隊列,信號量,共享內存,socket等
主要Queue和Pipe這兩種方式,Queue用于多個進程間實現通信,Pipe是兩個進程的通信。
1.管道:分為匿名管道和命名管道
匿名管道:在內核中申請一塊固定大小的緩沖區,程序擁有寫入和讀取的權利,一般使用fock函數實現父子進程的通信
命名管道:在內存中申請一塊固定大小的緩沖區,程序擁有寫入和讀取的權利,沒有血緣關系的進程也可以進程間通信
特點:面向字節流;生命周期隨內核;自帶同步互斥機制;半雙工,單向通信,兩個管道實現雙向通信
2.消息隊列:在內核中創建一個隊列,隊列中每個元素是一個數據報,不同的進程可以通過句柄去訪問這個隊列。消息隊列提供了一個從一個進程向另外一個進程發送一塊數據的方法。每個數據塊都被認為是有一個類型,接收者進程接收的數據塊可以有不同的類型。消息隊列也有管道一樣的不足,就是每個消息的最大長度是有上限的,每個消息隊列的總的字節數是有上限的,系統上消息隊列的總數也有一個上限
特點:消息隊列可以被認為是一個全局的一個鏈表,鏈表節點中存放著數據報的類型和內容,有消息隊列的標識符進行標記;消息隊列允許一個或多個進程寫入或讀取消息;消息隊列的生命周期隨內核;消息隊列可實現雙向通信
3.信號量:在內核中創建一個信號量集合(本質上是數組),數組的元素(信號量)都是1,使用P操作進行-1,使用V操作+1
P(sv):如果sv的值大于零,就給它減1;如果它的值為零,就掛起該程序的執行
V(sv):如果有其他進程因等待sv而被掛起,就讓它恢復運行,如果沒有進程因等待sv而掛起,就給它加1
PV操作用于同一個進程,實現互斥;PV操作用于不同進程,實現同步
功能:對臨界資源進行保護
4.共享內存:將同一塊物理內存一塊映射到不同的進程的虛擬地址空間中,實現不同進程間對同一資源的共享。共享內存可以說是最有用的進程間通信方式,也是最快的IPC形式
特點:不同從用戶態到內核態的頻繁切換和拷貝數據,直接從內存中讀取就可以;共享內存是臨界資源,所以需要操作時必須要保證原子性。使用信號量或者互斥鎖都可以.
關于“Python進程間的通信方式是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。