您好,登錄后才能下訂單哦!
這篇“Python協程是怎么實現的”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Python協程是怎么實現的”文章吧。
我們知道一臺主機的資源有限,一顆 CPU、一塊磁盤、一張網卡,如何同時服務上百個請求呢?多進程模式是最初的解決方案。內核把 CPU 的執行時間切分成許多時間片(timeslice),比如 1 秒鐘可以切分為 100 個 10 毫秒的時間片,每個時間片再分發給不同的進程,通常,每個進程需要多個時間片才能完成一個請求。
這樣雖然微觀上,比如說就這 10 毫秒時間 CPU 只能執行一個進程,但宏觀上 1 秒鐘執行了 100 個時間片,于是每個時間片所屬進程中的請求也得到了執行,這就實現了請求的并發執行。
不過,每個進程的內存空間都是獨立的,因此使用多進程實現并發就有兩個缺點:一是內核的管理成本高,二是無法簡單地通過內存同步數據,很不方便。于是,多線程模式就出現了,多線程模式通過共享內存地址空間,解決了這兩個問題。
然而,共享地址空間雖然可以方便地共享對象,但這也導致一個問題,那就是任何一個線程出錯時,進程中的所有線程會跟著一起崩潰。這也是如 Nginx 等強調穩定性的服務堅持使用多進程模式的原因。
但事實上無論基于多進程還是多線程,都難以實現高并發,主要有以下兩個原因。
首先,單個線程消耗的內存過多,比如 64 位的 Linux 為每個線程的棧分配了 8MB 的內存,此外為了提升后續內存分配的性能,還為每個線程預分配了 64MB 的內存作為堆內存池(Thread Area)。所以,我們沒有足夠的內存去開啟幾萬個線程實現并發。
其次,切換請求是內核通過切換線程實現的,什么時候會切換線程呢?不只時間片用盡,當調用阻塞方法時,內核為了讓 CPU 充分工作,也會切換到其他線程執行。而一次上下文切換的成本在幾十納秒到幾微秒之間,當線程繁忙且數量眾多時,這些切換會消耗絕大部分的 CPU 運算能力。
下圖以磁盤 IO 為例,描述了多線程中使用阻塞方法讀磁盤,2 個線程間的切換方式。
通過多線程的方式,一個線程處理一個請求,從而實現并發。但很明顯,操作系統能創建線程數是有限的,因為線程越多資源占用就越多,而且線程之間的切換成本也比較大,因為涉及到內核態和用戶態之間的切換。
那么問題來了,怎么才能實現高并發呢?答案是「把上圖中由內核實現的請求切換工作,交由用戶態的代碼來完成就可以了」。異步化編程通過應用層代碼實現了請求切換,降低了切換成本和內存占用空間。
異步化依賴于 IO 多路復用機制,比如 Linux 的 epoll,同時,必須把阻塞方法更改為非阻塞方法,才能避免內核切換帶來的巨大消耗。Nginx、Redis 等高性能服務都依賴異步化實現了百萬量級的并發。
下圖描述了異步 IO 的非阻塞讀和異步框架結合后,是如何切換請求的。
注意圖中的變化,之前是一個線程處理一個請求,現在是一個線程處理多個請求,這就是我們之前說的「非阻塞+回調」的方式。它依賴操作系統提供的 IO 多路復用,比如 Linux 的 epoll,BSD 的 kqueue。
此時的讀寫操作都相當于一個事件,并為每一個事件都注冊相應的回調函數,然后線程不會阻塞(因為讀寫操作此時是非阻塞的),而是可以做其它事情,然后由 epoll 來對這些事件進行統一管理。
一旦事件發生(滿足可讀、可寫時),那么 epoll 就會告知線程,然后線程執行為該事件注冊的回調函數。
為了更好地理解,我們再以 Redis 為例,介紹一下非阻塞 IO 和 IO 多路復用。
127.0.0.1:6379> get name "satori"
首先我們可以使用 get 命令,獲取一個 key 對應的 value,那么問題來了,以上對于 Redis 服務端而言,都發生了哪些事情呢?
服務端必須要先監聽客戶端請求(bind/listen),然后當客戶端到來時與其建立連接(accept),從 socket 中讀取客戶端的請求(recv),對請求進行解析(parse),這里解析出的請求類型是 get、key 是 "name",再根據 key 獲取對應的 value,最后返回給客戶端,也就是向 socket 寫入數據(send)。
以上所有操作都是由 Redis 主線程依次執行的,但是里面會有潛在的阻塞點,分別是 accept 和 recv。
如果是阻塞 IO,當 Redis 監聽到一個客戶端有連接請求、但卻一直未能成功建立連接,那么主線程會一直阻塞在 accept 函數這里,導致其它客戶端無法和 Redis 建立連接。類似的,當 Redis 通過 recv 從客戶端讀取數據時,如果數據一直沒有到達,那么 Redis 主線程也會一直阻塞在 recv 這一步,因此這就導致了 Redis 的效率會變得低下。
但很明顯,Redis 不會允許這種情況發生,因為以上都是阻塞 IO 會面臨的情況,而 Redis 采用的是非阻塞 IO,也就是將 socket 設置成了非阻塞模式。首先在 socket 模型中,調用 socket() 方法會返回主動套接字;調用 bind() 方法綁定 IP 和 端口,再調用 listen() 方法將主動套接字轉化為監聽套接字;最后監聽套接字調用 accept() 方法等待客戶端連接的到來,當和客戶端建立連接時再返回已連接套接字,而后續就通過已連接套接字來和客戶端進行數據的接收與發送。
但是注意:我們說在 listen() 這一步,會將主動套接字轉化為監聽套接字,而此時的監聽套接字的類型是阻塞的,阻塞類型的監聽套接字在調用 accept() 方法時,如果沒有客戶端來連接的話,就會一直處于阻塞狀態,那么此時主線程就沒法干其它事情了。所以在 listen() 的時候可以將其設置為非阻塞,而非阻塞的監聽套接字在調用 accept() 時,如果沒有客戶端連接請求到達時,那么主線程就不會傻傻地等待了,而是會直接返回,然后去做其它的事情。
類似的,我們在創建已連接套接字的時候也可以將其類型設置為非阻塞,因為阻塞類型的已連接套接字在調用 send() / recv() 的時候也會處于阻塞狀態,比如當客戶端一直不發數據的時候,已連接套接字就會一直阻塞在 rev() 這一步。如果是非阻塞類型的已連接套接字,那么當調用 recv() 但卻收不到數據時,也不用處于阻塞狀態,同樣可以直接返回去做其它事情。
但是有兩點需要注意:
1)雖然 accept() 不阻塞了,在沒有客戶端連接時 Redis 主線程可以去做其它事情,但如果后續有客戶端來連接,Redis 要如何得知呢?因此必須要有一種機制,能夠繼續在監聽套接字上等待后續連接請求,并在請求到來時通知 Redis。
2)send() / recv() 不阻塞了,相當于 IO 的讀寫流程不再是阻塞的,讀寫方法都會瞬間完成并且返回,也就是它會采用能讀多少就讀多少、能寫多少就寫多少的策略來執行 IO 操作,這顯然更符合我們對性能的追求。但這樣同樣會面臨一個問題,就是當我們執行讀取操作時,有可能只讀取了一部分數據,剩余的數據客戶端還沒發過來,那么這些這些數據何時可讀呢?同理寫數據也是這種情況,當緩沖區滿了,而我們的數據還沒有寫完,那么剩下的數據又何時可寫呢?因此同樣要有一種機制,能夠在 Redis 主線程做別的事情的時候繼續監聽已連接套接字,并且有數據可讀寫的時候通知 Redis。
這樣才能保證 Redis 線程既不會像基本 IO 模型中一直在阻塞點等待,也不會無法處理實際到達的客戶端連接請求和可讀寫的數據,而上面所提到的機制便是 IO 多路復用。
I/O 多路復用機制是指一個線程處理多個 IO 流,也就是我們經常聽到的 select/poll/epoll。關于這三者的區別我們就不說了,它們所做的事情都一樣,無非是性能和實現原理上有差異。select 是所有系統都支持,而 epoll 只有 Linux 支持。
簡單來說,在 Redis 只運行單線程的情況下,該機制允許內核中同時存在多個監聽套接字和已連接套接字。內核會一直監聽這些套接字上的連接請求或數據請求,一旦有請求到達就會交給 Redis 線程處理,這樣就實現了一個 Redis 線程處理多個 IO 流的效果。
上圖就是基于多路復用的 Redis IO 模型,圖中的 FD 就是套接字,可以是監聽套接字,也可以是已連接套接字,Redis 會通過 epoll 機制來讓內核幫忙監聽這些套接字。而此時 Redis 線程或者說主線程,不會阻塞在某一個特定的套接字上,也就是說不會阻塞在某一個特定的客戶端請求處理上。因此 Redis 可以同時和多個客戶端連接并處理請求,從而提升并發性。
但為了在請求到達時能夠通知 Redis 線程,epoll 提供了基于事件的回調機制,即針對不同事件的發生,調用相應的處理函數。
那么回調機制是怎么工作的呢?以上圖為例,首先 epoll 一旦監測到 FD 上有請求到達,就會觸發相應的事件。這些事件會被放進一個隊列中,Redis 主線程會對該事件隊列不斷進行處理,這樣一來 Redis 就無需一直輪詢是否有請求發生,從而避免資源的浪費。
同時,Redis 在對事件隊列中的事件進行處理時,會調用相應的處理函數,這就實現了基于事件的回調。因為 Redis 一直在對事件隊列進行處理,所以能及時響應客戶端請求,提升 Redis 的響應性能。
我們以實際的連接請求和數據讀取請求為例,再解釋一下。連接請求和數據讀取請求分別對應 Accept 事件和 Read 事件,Redis 分別對這兩個事件注冊 accept 和 get 回調函數,當 Linux 內核監聽到有連接請求或數據讀取請求時,就會觸發 Accept 事件或 Read 事件,然后通知主線程,回調注冊的 accept 函數或 get 函數。
就像病人去醫院看病,在醫生實際診斷之前每個病人(類似于請求)都需要先分診、測體溫、登記等等。如果這些工作都由醫生完成,那么醫生的工作效率就會很低。所以醫院設置了分診臺,分診臺會一直處理這些診斷前的工作(類似于 Linux 內核監聽請求),然后再轉交給醫生做實際診斷,這樣即使一個醫生(相當于 Redis 的主線程)也能有很高的效率。
這里需要再補充一下:我們上面提到的異步 IO 不是真正意義上的異步 IO,而是基于 IO 多路復用實現的異步化。但 IO 多路復用本質上是同步 IO,只是它可以同時監聽多個文件描述符,一旦某個描述符的讀寫操作就緒,就能夠通知應用程序進行相應的讀寫操作。至于真正意義的異步 IO,操作系統也是支持的,但支持的不太理想,所以現在使用的都是 IO 多用復用,并代指異步 IO。
必須要承認的是,編寫這種異步化代碼能夠帶來很高的性能收益,Redis、Nginx 已經證明了這一點。
但是這種編程模式,在實際工作中很容易出錯,因為所有阻塞函數,都需要通過非阻塞的系統調用加上回調注冊的方式拆分成兩個函數。說白了就是我們的邏輯不能夠直接執行,必須把它們放在一個單獨的函數里面,然后這個函數以回調的方式注冊給 IO 多路復用。
這種編程模式違反了軟件工程的內聚性原則,函數之間同步數據也更復雜。特別是條件分支眾多、涉及大量系統調用時,異步化的改造工作會非常困難,盡管它的性能很高。
下面我們用 Python 編寫一段代碼,實際體驗一下這種編程模式,看看它復雜在哪里。
from urllib.parse import urlparse import socket from io import BytesIO # selectors 里面提供了多種"多路復用器" # 除了 select、poll、epoll 之外 # 還有 kqueue,這個是針對 BSD 平臺的 try: from selectors import ( SelectSelector, PollSelector, EpollSelector, KqueueSelector ) except ImportError: pass # 由于種類比較多,所以提供了DefaultSelector # 會根據當前的系統種類,自動選擇一個合適的多路復用器 from selectors import ( DefaultSelector, EVENT_READ,# 讀事件 EVENT_WRITE,# 寫事件 ) class RequestHandler: """ 向指定的 url 發請求 獲取返回的內容 """ selector = DefaultSelector() tasks = {"unfinished": 0} def __init__(self, url): """ :param url: http://localhost:9999/v1/index """ self.tasks["unfinished"] += 1 url = urlparse(url) # 根據 url 解析出 域名、端口、查詢路徑 self.netloc = url.netloc# 域名:端口 self.path = url.path or "/"# 查詢路徑 # 創建 socket self.client = socket.socket() # 設置成非阻塞 self.client.setblocking(False) # 用于接收數據的緩存 self.buffer = BytesIO() def get_result(self): """ 發送請求,進行下載 :return: """ # 連接到指定的服務器 # 如果沒有 : 說明只有域名沒有端口 # 那么默認訪問 80 端口 if ":" not in self.netloc: host, port = self.netloc, 80 else: host, port = self.netloc.split(":") # 由于 socket 非阻塞,所以連接可能尚未建立好 try: self.client.connect((host, int(port))) except BlockingIOError: pass # 我們上面是建立連接,連接建立好就該發請求了 # 但是連接什么時候建立好我們并不知道,只能交給操作系統 # 所以我們需要通過 register 給 socket 注冊一個回調函數 # 參數一:socket 的文件描述符 # 參數二:事件 # 參數三:當事件發生時執行的回調函數 self.selector.register(self.client.fileno(), EVENT_WRITE, self.send) # 表示當 self.client 這個 socket 滿足可寫時 # 就去執行 self.send # 翻譯過來就是連接建立好了,就去發請求 # 可以看到,一個阻塞調用,我們必須拆成兩個函數去寫 def send(self, key): """ 連接建立好之后,執行的回調函數 回調需要接收一個參數,這是一個 namedtuple 內部有如下字段:'fileobj', 'fd', 'events', 'data' key.fd 就是 socket 的文件描述符 key.data 就是給 socket 綁定的回調 :param key: :return: """ payload = (f"GET {self.path} HTTP/1.1rn" f"Host: {self.netloc}rn" "Connection: closernrn") # 執行此函數,說明事件已經觸發 # 我們要將綁定的回調函數取消 self.selector.unregister(key.fd) # 發送請求 self.client.send(payload.encode("utf-8")) # 請求發送之后就要接收了,但是啥時候能接收呢? # 還是要交給操作系統,所以仍然需要注冊回調 self.selector.register(self.client.fileno(), EVENT_READ, self.recv) # 表示當 self.client 這個 socket 滿足可讀時 # 就去執行 self.recv # 翻譯過來就是數據返回了,就去接收數據 def recv(self, key): """ 數據返回時執行的回調函數 :param key: :return: """ # 接收數據,但是只收了 1024 個字節 # 如果實際返回的數據超過了 1024 個字節怎么辦? data = self.client.recv(1024) # 很簡單,只要數據沒收完,那么數據到來時就會可讀 # 那么會再次調用此函數,直到數據接收完為止 # 注意:此時是非阻塞的,數據有多少就收多少 # 沒有接收的數據,會等到下一次再接收 # 所以這里不能寫 while True if data: # 如果有數據,那么寫入到 buffer 中 self.buffer.write(data) else: # 否則說明數據讀完了,那么將注冊的回調取消 self.selector.unregister(key.fd) # 此時就拿到了所有的數據 all_data = self.buffer.getvalue() # 按照 rnrn 進行分隔得到列表 # 第一個元素是響應頭,第二個元素是響應體 result = all_data.split(b"rnrn")[1] print(f"result: {result.decode('utf-8')}") self.client.close() self.tasks["unfinished"] -= 1 @classmethod def run_until_complete(cls): # 基于 IO 多路復用創建事件循環 # 驅動內核不斷輪詢 socket,檢測事件是否發生 # 當事件發生時,調用相應的回調函數 while cls.tasks["unfinished"]: # 輪詢,返回事件已經就緒的 socket ready = cls.selector.select() # 這個 key 就是回調里面的 key for key, mask in ready: # 拿到回調函數并調用,這一步需要我們手動完成 callback = key.data callback(key) # 因此當事件發生時,調用綁定的回調,就是這么實現的 # 整個過程就是給 socket 綁定一個事件 + 回調 # 事件循環不停地輪詢檢測,一旦事件發生就會告知我們 # 但是調用回調不是內核自動完成的,而是由我們手動完成的 # "非阻塞 + 回調 + 基于 IO 多路復用的事件循環" # 所有框架基本都是這個套路
一個簡單的 url 獲取,居然要寫這么多代碼,而它的好處就是性能高,因為不用把時間浪費在建立連接、等待數據上面。只要有事件發生,就會執行相應的回調,極大地提高了 CPU 利用率。而且這是單線程,也沒有線程切換帶來的開銷。
那么下面測試一下吧。
import time start = time.perf_counter() for _ in range(10): # 這里面只是注冊了回調,但還沒有真正執行 RequestHandler(url="https://localhost:9999/index").get_result() # 創建事件循環,驅動執行 RequestHandler.run_until_complete() end = time.perf_counter() print(f"總耗時: {end - start}")
我用 FastAPI 編寫了一個服務,為了更好地看到現象,服務里面刻意 sleep 了 1 秒。然后發送十次請求,看看效果如何。
總共耗時 1 秒鐘,我們再采用同步的方式進行編寫,看看效果如何。
可以看到回調的這種寫法性能非常高,但是它和我們傳統的同步代碼的寫法大相徑庭。如果是同步代碼,那么會先建立連接、然后發送數據、再接收數據,這顯然更符合我們人類的思維,邏輯自上而下,非常自然。
但是回調的方式,就讓人很不適應,我們在建立完連接之后,不能直接發送數據,必須將發送數據的邏輯放在一個單獨的函數(方法)中,然后再將這個函數以回調的方式注冊進去。
同理,在發送完數據之后,也不能立刻接收。同樣要將接收數據的邏輯放在一個單獨的函數中,然后再以回調的方式注冊進去。
所以好端端的自上而下的邏輯,因為回調而被分割的四分五裂,這種代碼在編寫和維護的時候是非常痛苦的。
比如回調可能會層層嵌套,容易陷入回調地獄,如果某一個回調執行出錯了怎么辦?代碼的可讀性差導致不好排查,即便排查到了也難處理。
另外,如果多個回調需要共享一個變量該怎么辦?因為回調是通過事件循環調用的,在注冊回調的時候很難把變量傳過去。簡單的做法是把該變量設置為全局變量,或者說多個回調都是某個類的成員函數,然后把共享的變量作為一個屬性綁定在 self 上面。但當邏輯復雜時,就很容易導致全局變量滿天飛的問題。
所以這種模式就使得開發人員在編寫業務邏輯的同時,還要關注并發細節。
因此使用回調的方式編寫異步化代碼,雖然并發量能上去,但是對開發者很不友好;而使用同步的方式編寫同步代碼,雖然很容易理解,可并發量卻又上不去。那么問題來了,有沒有一種辦法,能夠讓我們在享受異步化帶來的高并發的同時,又能以同步的方式去編寫代碼呢?也就是我們能不能以同步的方式去編寫異步化的代碼呢?
答案是可以的,使用「協程」便可以辦到。協程在異步化之上包了一層外衣,兼顧了開發效率與運行效率。
協程與異步編程相似的地方在于,它們必須使用非阻塞的系統調用與內核交互,把切換請求的權力牢牢掌握在用戶態的代碼中。但不同的地方在于,協程把異步化中的兩段函數,封裝為一個阻塞的協程函數。
這個函數執行時,會使調用它的協程無感知地放棄執行權,由協程框架切換到其他就緒的協程繼續執行。當這個函數的結果滿足后,協程框架再選擇合適的時機,切換回它所在的協程繼續執行。我們還是以讀取磁盤文件為例,看一張協程的示意圖:
看起來非常棒,所以異步化是通過回調函數來完成請求切換的,業務邏輯與并發實現關聯在一起,很容易出錯。而協程不需要什么「回調函數」,它允許用戶調用「阻塞的」協程方法,用同步編程方式寫業務邏輯。
再回到之前的那個 socket 發請求的例子,我們用協程的方式重寫一遍,看看它和基于回調的異步化編程有什么區別?
import time from urllib.parse import urlparse import asyncio async def download(url): url = urlparse(url) # 域名:端口 netloc = url.netloc if ":" not in netloc: host, port = netloc, 80 else: host, port = netloc.split(":") path = url.path or "/" # 創建連接 reader, writer = await asyncio.open_connection(host, port) # 發送數據 payload = (f"GET {path} HTTP/1.1rn" f"Host: {netloc}rn" "Connection: closernrn") writer.write(payload.encode("utf-8")) await writer.drain() # 接收數據 result = (await reader.read()).split(b"rnrn")[1] writer.close() print(f"result: {result.decode('utf-8')}") # 以上就是發送請求相關的邏輯 # 我們看到代碼是自上而下的,沒有涉及到任何的回調 # 完全就像寫同步代碼一樣 async def main(): # 發送 10 個請求 await asyncio.gather( *[download("http://localhost:9999/index") for _ in range(10)] ) start = time.perf_counter() # 同樣需要創建基于 IO 多路復用的事件循環 # 協程會被丟進事件循環中,依靠事件循環驅動執行 loop = asyncio.get_event_loop() loop.run_until_complete(main()) end = time.perf_counter() print(f"總耗時: {end - start}")
代碼邏輯很好理解,和我們平時編寫的同步代碼沒有太大的區別,那么它的效率如何呢?
我們看到用了 3 秒鐘,比同步的方式快,但是比異步化的方式要慢。因為一開始就說過,協程并不比異步化的方式快,但我們之所以選擇它,是因為它的編程模型更簡單,能夠讓我們以同步的方式編寫異步的代碼。如果是基于回調方式的異步化,雖然性能很高(比如 Redis、Nginx),但對開發者是一個挑戰。
回到上面那個協程的例子中,我們一共發了 10 個請求,并在可能阻塞的地方加上了 await。意思就是,在執行某個協程 await 后面的代碼時如果阻塞了,那么該協程會主動將執行權交給事件循環,然后事件循環再選擇其它的協程執行。并且協程本質上也是個單線程,雖然協程可以有多個,但是背后的線程只有一個。
那么問題來了,協程的切換是如何完成的呢?
實際上,用戶態的代碼切換協程,與內核切換線程的原理是一樣的。內核通過管理 CPU 的寄存器來切換線程,我們以最重要的棧寄存器和指令寄存器為例,看看協程切換時如何切換程序指令與內存。
每個線程有獨立的棧,而棧既保留了變量的值,也保留了函數的調用關系、參數和返回值,CPU 中的棧寄存器 SP 指向了當前線程的棧,而指令寄存器 IP 保存著下一條要執行的指令地址。
因此,從線程 1 切換到線程 2 時,首先要把 SP、IP 寄存器的值為線程 1 保存下來,再從內存中找出線程 2 上一次切換前保存好的寄存器的值,并寫入 CPU 的寄存器,這樣就完成了線程切換(其他寄存器也需要管理、替換,原理與此相同,不再贅述)。
協程的切換與此相同,只是把內核的工作轉移到協程框架來實現而已,下圖是協程切換前的狀態:
當遇到阻塞時會進行協程切換,從協程 1 切換到協程 2 后的狀態如下圖所示:
創建協程時,會從進程的堆中分配一段內存作為協程的棧。線程的棧有 8MB,而協程棧的大小通常只有幾十 KB。而且,C 庫內存池也不會為協程預分配內存,它感知不到協程的存在。這樣,更低的內存占用空間為高并發提供了保證,畢竟十萬并發請求,就意味著 10 萬個協程。
另外棧縮小后,就盡量不要使用遞歸函數,也不能在棧中申請過多的內存,這是實現高并發必須付出的代價。當然啦,如果能像 Go 一樣,協程棧可以自由伸縮的話,就不用擔心了。
由此可見,協程就是用戶態的線程。然而,為了保證所有切換都在用戶態進行,協程必須重新封裝所有的阻塞系統調用,否則一旦協程觸發了線程切換,會導致這個線程進入休眠狀態,進而其上的所有協程都得不到執行。
比如普通的 sleep 函數會讓當前線程休眠,由內核來喚醒線程,而協程化改造后,sleep 只會讓當前協程休眠,由協程框架在指定時間后喚醒協程,所以在 Python 的協程里面我們不能寫 time.sleep,而是應該寫 asyncio.sleep。再比如,線程間的互斥鎖是使用信號量實現的,而信號量也會導致線程休眠,協程化改造互斥鎖后,同樣由框架來協調、同步各協程的執行。
所以協程的高性能,建立在切換必須由用戶態代碼完成之上,這要求協程生態是完整的,要盡量覆蓋常見的組件。
還是以 Python 為例,我經常看見有人在 async def 里面寫 requests.get 發請求,這是不對的。requests.get 底層調用的是同步阻塞的 socket,這會使得線程阻塞,而線程一旦阻塞,就會導致所有的協程阻塞,此時就等價于串行。所以把它放在 async def 里面沒有任何意義,正確的做法是使用 aiohttp 或 httpx。因此如果想使用協程,那么需要重新封裝底層的系統調用,如果實在沒辦法就扔到線程池中運行。
再比如 MySQL 官方提供的客戶端 SDK,它使用了阻塞 socket 做網絡訪問,會導致線程休眠,必須用非阻塞 socket 把 SDK 改造為協程函數后,才能在協程中使用。
當然,并不是所有的函數都能用協程改造,比如磁盤的異步 IO 讀。它雖然是非阻塞的,但無法使用 PageCache,反而降低了系統吞吐量。如果使用緩存 IO 讀文件,在沒有命中 PageCache 時是可能發生阻塞的。這種時候,如果對性能有更高的要求,就需要把線程與協程結合起來用,把可能阻塞的操作扔到線程池中執行,通過生產者 / 消費者模型與協程配合工作。
實際上,面對多核系統,也需要協程與線程配合工作。因為協程的載體是線程,而一個線程同一時間只能使用一顆 CPU,所以通過開啟更多的線程,將所有協程分布在這些線程中,就能充分使用 CPU 資源。有過 Go 語言使用經驗的話,應該很清楚這一點。
除此之外,為了讓協程獲得更多的 CPU 時間,還可以設置所在線程的優先級,比如在 Linux 中把線程的優先級設置到 -20,就可以每次獲得更長的時間片。另外 CPU 緩存對程序性能也是有影響的,為了減少 CPU 緩存失效的比例,還可以把線程綁定到某個 CPU 上,增加協程執行時命中 CPU 緩存的機率。
雖然這里一直說協程框架在調度協程,然而你會發現,很多協程庫只提供了創建、掛起、恢復執行等基本方法,并沒有協程框架的存在,需要業務代碼自行調度協程。這是因為,這些通用的協程庫(比如 asyncio)并不是專為服務器設計的,服務器中可以由客戶端網絡連接的建立,驅動著創建出協程,同時伴隨著請求的結束而終止。
而在協程的運行條件不滿足時,多路復用框架會將它掛起,并根據優先級策略選擇另一個協程執行。因此,使用協程實現服務器端的高并發服務時,并不只是選擇協程庫,還要從其生態中找到結合 IO 多路復用的協程框架(比如 Tornado),這樣可以加快開發速度。
從廣義上講,協程是一種輕量級的并發模型,說的比較高大上。但從狹義上講,協程就是調用一個可以暫停并切換的函數。像我們使用 async def 定義的就是一個協程函數,本質上也是個函數,而調用協程函數就會得到一個協程。
將協程丟進事件循環,由事件循環驅動執行,一旦發生阻塞,便將執行權主動交給事件循環,事件循環再驅動其它協程執行。所以自始至終都只有一個線程,而協程只不過是我們參考線程的結構,在用戶態模擬出來的。
所以調用一個普通函數,會一直將內部的代碼邏輯全部執行完;而調用一個協程函數,在內部出現了阻塞,那么會切換到其它的協程。
但是協程出現阻塞能夠切換有一個重要的前提,就是這個阻塞不能涉及任何的系統調用,比如 time.sleep、同步的 socket 等等。這些都需要內核參與,而內核一旦參與了,那么造成的阻塞就不單單是阻塞某個協程那么簡單了(OS 也感知不到協程),而是會使線程阻塞。線程一旦阻塞,在其之上的所有協程都會阻塞,由于協程是以線程作為載體的,實際執行的肯定是線程,如果每個協程都會使得線程阻塞,那么此時不就相當于串行了嗎?
所以想使用協程,必須將阻塞的系統調用重新封裝,我們舉個栗子:
@app.get(r"/index1") async def index1(): time.sleep(30) return "index1" @app.get(r"/index2") async def index2(): return "index2"
這是一個基于 FastAPI 編寫的服務,我們只看視圖函數。如果我們先訪問 /index1,然后訪問 /index2,那么必須等到 30 秒之后,/index2 才會響應。因為這是一個單線程,/index1 里面的 time.sleep 會觸發系統調用,使得整個線程都進入阻塞,線程一旦阻塞了,所有的協程就都別想執行了。
如果將上面的例子改一下:
@app.get(r"/index1") async def index(): await asyncio.sleep(30) return "index1" @app.get(r"/index2") async def index(): return "index2"
訪問 /index1 依舊會進行 30 秒的休眠,但此時再訪問 /index2 的話則是立刻返回。原因是 asyncio.sleep(30) 重新封裝了阻塞的系統調用,此時的休眠是在用戶態完成的,沒有經過內核。換句話說,此時只會導致協程休眠,不會導致線程休眠,那么當訪問 /index2 的時候,對應的協程會立刻執行,然后返回結果。
同理我們在發網絡請求的時候,也不能使用 requests.get,因為它會導致線程阻塞。當然,還有一些數據庫的驅動,例如 pymysql, psycopg2 等等,這些阻塞的都是線程。為此,在開發協程項目時,我們應該使用 aiohttp, asyncmy, asyncpg 等等。
為什么早期 Python 的協程都沒有人用,原因就是協程想要運行,必須基于協程庫 asyncio,但問題是 asyncio 只支持發送 TCP 請求(對于協程庫而言足夠了)。如果你想通過網絡連接到某個組件(比如數據庫、Redis),只能手動發 TCP 請求,而且這些組件對發送的數據還有格式要求,返回的數據也要手動解析,可以想象這是多么麻煩的事情。
如果想解決這一點,那么必須基于 asyncio 重新封裝一個 SDK。所以同步 SDK 和協程 SDK 最大的區別就是,一個是基于同步阻塞的 socket,一個是基于 asyncio。比如 redis 和 aioredis,連接的都是 Redis,只是在 TCP 層面發送數據的方式不同,至于其它方面則是類似的。
而早期,還沒有出現這些協程 SDK,自己封裝的話又是一個龐大的工程,所以 Python 的協程用起來就很艱難,因為達不到期望的效果。不像 Go 在語言層面上就支持協程,一個 go 關鍵字就搞定了。而且 Python 里面一處異步、處處異步,如果某處的阻塞切換不了,那么協程也就沒有意義了。
但現在 Python 已經進化到 3.10 了,協程相關的生態也越來越完善,感謝這些開源的作者們。發送網絡請求、連接數據庫、編寫 web 服務等等,都有協程化的 SDK 和框架,現在完全可以開發以協程為主導的項目了。
以上就是關于“Python協程是怎么實現的”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。