您好,登錄后才能下訂單哦!
Python 利用selectors模塊實現非阻塞式編程?很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
前面介紹的 socket 都是采用阻塞方式進行通信的,當程序調用 recv() 方法從 socket 中讀取數據時,如果沒有讀取到有效的數據,當前線程就會被阻塞。為了解決這個問題,上面程序采用了多線程并發編程,即服務器端為每個客戶端連接都啟動一個單獨的線程,不同的線程負責對應的 socket 的通信工作。
通過 selectors 模塊允許 socket 以非阻塞方式進行通信,selectors 相當于一個事件注冊中心,程序只要將 socket 的所有事件注冊給 selectors 管理,當 selectors 檢測到 socket 中的特定事件之后,程序就調用相應的監聽方法進行處理。
selectors 主要支持兩種事件:
selectors.EVENT_READ:當 socket 有數據可讀時觸發該事件。當有客戶端連接進來時也會觸發該事件。
selectors.EVENT_WRITE:當 socket 將要寫數據時觸發該事件。
使用 selectors 實現非阻塞式編程的步驟大致如下:
創建 selectors 對象。
通過 selectors 對象為 socket 的 selectors.EVENT_READ 或 selectors.EVENT_WRITE 事件注冊監聽器函數。每當 socket 有數據需要讀寫時,系統負責觸發所注冊的監昕器函數。
在監聽器函數中處理 socket 通信。
下面程序使用 selectors 模塊實現非阻塞式通信的服務器端:
import selectors, socket # 創建默認的selectors對象 sel = selectors.DefaultSelector() # 負責監聽“有數據可讀”事件的函數 def read(skt, mask): try: # 讀取數據 data = skt.recv(1024) if data: # 將讀取的數據采用循環向每個socket發送一次 for s in socket_list: s.send(data) # Hope it won't block else: # 如果該socket已被對方關閉,關閉該socket, # 并從socket_list列表中刪除 print('關閉', skt) sel.unregister(skt) skt.close() socket_list.remove(skt) # 如果捕捉到異常, 將該socket關閉,并從socket_list列表中刪除 except: print('關閉', skt) sel.unregister(skt) skt.close() socket_list.remove(skt) socket_list = [] # 負責監聽“客戶端連接進來”事件的函數 def accept(sock, mask): conn, addr = sock.accept() # 使用socket_list保存代表客戶端的socket socket_list.append(conn) conn.setblocking(False) # 使用sel為conn的EVENT_READ事件注冊read監聽函數 sel.register(conn, selectors.EVENT_READ, read) #② sock = socket.socket() sock.bind(('192.168.1.88', 30000)) sock.listen() # 設置該socket是非阻塞的 sock.setblocking(False) # 使用sel為sock的EVENT_READ事件注冊accept監聽函數 sel.register(sock, selectors.EVENT_READ, accept) #① # 采用死循環不斷提取sel的事件 while True: events = sel.select() for key, mask in events: # key的data屬性獲取為該事件注冊的監聽函數 callback = key.data # 調用監聽函數, key的fileobj屬性獲取被監聽的socket對象 callback(key.fileobj, mask)
上面程序中定義了兩個監聽器函數 accept() 和 read(),其中 accept() 函數作為“有客戶端連接進來”事件的監聽函數,主程序中的 ① 號代碼負責為 socket 的 selectors.EVENT_READ 事件注冊該函數;read() 函數則作為“有數據可讀”事件的監聽函數,如 accept() 函數中的 ② 號代碼所示。
通過上面這種方式,程序避免了采用死循環不斷地調用 socket 的 accept() 方法來接受客戶端連接,也避免了采用死循環不斷地調用 socket 的 recv() 方法來接收數據。socket 的 accept()、recv() 方法調用都是寫在事件監聽函數中的,只有當事件(如“有客戶端連接進來”事件、“有數據可讀”事件)發生時,accept() 和 recv() 方法才會被調用,這樣就避免了阻塞式編程。
為了不斷地提取 selectors 中的事件,程序最后使用一個死循環不斷地調用 selectors 的 select() 方法“監測”事件,每當監測到相應的事件之后,程序就會調用對應的事件監聽函數。
下面是該示例的客戶端程序。該客戶端程序更加簡單,客戶端程序只需要讀取 socket 中的數據,因此只要使用 selectors 為 socket 注冊“有數據可讀”事件的監聽函數即可。
import selectors, socket, threading # 創建默認的selectors對象 sel = selectors.DefaultSelector() # 負責監聽“有數據可讀”事件的函數 def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print(data.decode('utf-8')) else: print('closing', conn) sel.unregister(conn) conn.close() # 創建socket對象 s = socket.socket() # 連接遠程主機 s.connect(('192.168.1.88', 30000)) # 設置該socket是非阻塞的 s.setblocking(False) # 使用sel為s的EVENT_READ事件注冊read監聽函數 sel.register(s, selectors.EVENT_READ, read) # ① # 定義不斷讀取用戶鍵盤輸入的函數 def keyboard_input(s): while True: line = input('') if line is None or line == 'exit': break # 將用戶的鍵盤輸入內容寫入socket s.send(line.encode('utf-8')) # 采用線程不斷讀取用戶的鍵盤輸入 threading.Thread(target=keyboard_input, args=(s, )).start() while True: # 獲取事件 events = sel.select() for key, mask in events: # key的data屬性獲取為該事件注冊的監聽函數 callback = key.data # 調用監聽函數, key的fileobj屬性獲取被監聽的socket對象 callback(key.fileobj, mask)
上面程序中的 ① 號代碼為 socket 的 EVENT_READ 事件注冊了 read() 監聽函數,這樣每當 socket 中有數據可讀時,程序就會觸發 read() 函數來讀取 socket 中的數據。
程序最后也采用死循環不斷地調用 selectors 的 select() 方法“監測”事件,每當監測到相應的事件之后,程序就會調用對應的事件監聽函數。
先運行上面的服務器端程序,該程序運行后只是作為服務器,看不到任何輸出信息。再運行多個客戶端程序(相當于啟動多個聊天室客戶端登錄該服務器)。接下來可以在任何一個客戶端通過鍵盤輸入一些內容,然后按回車鍵,即可在所有客戶端(包括自己)的控制臺上接收到剛剛輸入的內容。這也是一個粗略的 C/S 結構的聊天室應用。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。