您好,登錄后才能下訂單哦!
本篇內容介紹了“Python異步方法如何使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
要了解異步編程的動機,我們首先必須了解是什么限制了我們的代碼運行速度。理想情況下,我們希望我們的代碼以光速運行,立即跳過我們的代碼,沒有任何延遲。然而,由于兩個因素,實際上代碼運行速度要慢得多:
CPU時間(處理器執行指令的時間)
IO時間(等待網絡請求或存儲讀/寫的時間)
當我們的代碼在等待 IO 時,CPU 基本上是空閑的,等待某個外部設備響應。通常,內核會檢測到這一點并立即切換到執行系統中的其他線程。因此,如果我們想加快處理一組 IO 密集型任務,我們可以為每個任務創建一個線程。當其中一個線程停止,等待 IO 時,內核將切換到另一個線程繼續處理。
這在實踐中效果很好,但有兩個缺點:
線程有開銷(尤其是在 Python 中)
我們無法控制內核何時選擇在線程之間切換
例如,如果我們想要執行 10,000 個任務,我們要么必須創建 10,000 個線程,這將占用大量 RAM,要么我們需要創建較少數量的工作線程并以較少的并發性執行任務。此外,最初生成這些線程會占用 CPU 時間。
由于內核可以隨時選擇在線程之間切換,因此我們代碼中的任何時候都可能出現相互競爭。
在傳統的基于同步線程的代碼中,內核必須檢測線程何時是IO綁定的,并選擇在線程之間隨意切換。使用 Python 異步,程序員使用關鍵字await
確認聲明 IO 綁定的代碼行,并確認授予執行其他任務的權限。例如,考慮以下執行Web請求的代碼:
async def request_google(): reader, writer = await asyncio.open_connection('google.com', 80) writer.write(b'GET / HTTP/2\n\n') await writer.drain() response = await reader.read() return response.decode()
在這里,在這里,我們看到該代碼在兩個地方await
。因此,在等待我們的字節被發送到服務器(writer.drain()
)時,在等待服務器用一些字節(reader.read()
)回復時,我們知道其他代碼可能會執行,全局變量可能會更改。然而,從函數開始到第一次等待,我們可以確保代碼逐行運行,而不會切換到運行程序中的其他代碼。這就是異步的美妙之處。
asyncio
是一個標準庫,可以讓我們用這些異步函數做一些有趣的事情。例如,如果我們想同時向Google執行兩個請求,我們可以:
async def request_google_twice(): response_1, response_2 = await asyncio.gather(request_google(), request_google()) return response_1, response_2
當我們調用request_google_twice()
時,神奇的asyncio.gather
會啟動一個函數調用,但是當我們調用時await writer.drain()
,它會開始執行第二個函數調用,這樣兩個請求就會并行發生。然后,它等待第一個或第二個請求的writer.drain()
調用完成并繼續執行該函數。
最后,有一個重要的細節被遺漏了:asyncio.run
。要從常規的 [同步] Python 函數實際調用異步函數,我們將調用包裝在asyncio.run(...)
:
async def async_main(): r1, r2 = await request_google_twice() print('Response one:', r1) print('Response two:', r2) return 12 return_val = asyncio.run(async_main())
請注意,如果我們只調用async_main()
而不調用await ...
或者 asyncio.run(...)
,則不會發生任何事情。這只是由異步工作方式的性質所限制的。
那么,異步究竟是如何工作的,這些神奇的asyncio.run
和asyncio.gather
函數有什么作用呢?閱讀下文以了解詳情。
要了解async
的魔力,我們首先需要了解一個更簡單的 Python 構造:生成器
生成器是 Python 函數,它逐個返回一系列值(可迭代)。例如:
def get_numbers(): print("|| get_numbers begin") print("|| get_numbers Giving 1...") yield 1 print("|| get_numbers Giving 2...") yield 2 print("|| get_numbers Giving 3...") yield 3 print("|| get_numbers end") print("| for begin") for number in get_numbers(): print(f"| Got {number}.") print("| for end")
| for begin || get_numbers begin || get_numbers Giving 1... | Got 1. || get_numbers Giving 2... | Got 2. || get_numbers Giving 3... | Got 3. || get_numbers end | for end
因此,我們看到,對于for循環的每個迭代,我們在生成器中只執行一次。我們可以使用Python的next()
函數更明確地執行此迭代:
In [3]: generator = get_numbers() In [4]: next(generator) || get_numbers begin || get_numbers Giving 1... Out[4]: 1 In [5]: next(generator) || get_numbers Giving 2... Out[5]: 2 In [6]: next(generator) || get_numbers Giving 3... Out[6]: 3 In [7]: next(generator) || get_numbers end --------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-154-323ce5d717bb> in <module> ----> 1 next(generator) StopIteration:
這與異步函數的行為非常相似。正如異步函數從函數開始直到第一次等待時連續執行代碼一樣,我們第一次調用next()
時,生成器將從函數頂部執行到第一個yield
語句。然而,現在我們只是從生成器返回數字。我們將使用相同的思想,但返回一些不同的東西來使用生成器創建類似異步的函數。
讓我們使用生成器來創建我們自己的小型異步框架。
但是,為簡單起見,讓我們將實際 IO 替換為睡眠(即。time.sleep
)。讓我們考慮一個需要定期發送更新的應用程序:
def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): time.sleep(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
因此,如果我們調用send_updates(3, 1.0)
,它將輸出這三條消息,每條消息間隔 1 秒:
[1.0] Sending update 1/3. [1.0] Sending update 2/3. [1.0] Sending update 3/3.
現在,假設我們要同時運行幾個不同的時間間隔。例如,send_updates(10, 1.0)
,send_updates(5, 2.0)
和send_updates(4, 3.0)
。我們可以使用線程來做到這一點,如下所示:
threads = [ threading.Thread(target=send_updates, args=(10, 1.0)), threading.Thread(target=send_updates, args=(5, 2.0)), threading.Thread(target=send_updates, args=(4, 3.0)) ] for i in threads: i.start() for i in threads: i.join()
這可行,在大約 12 秒內完成,但使用具有前面提到的缺點的線程。讓我們使用生成器構建相同的東西。
在演示生成器的示例中,我們返回了整數。為了獲得類似異步的行為,而不是返回任意值,我們希望返回一些描述要等待的IO的對象。在我們的例子中,我們的“IO”只是一個計時器,它將等待一段時間。因此,讓我們創建一個計時器對象,用于此目的:
class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration
現在,讓我們從我們的函數中產生這個而不是調用time.sleep
:
def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): yield AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
現在,每次我們調用send_updates(...)
時調用next(...)
,我們都會得到一個AsyncTimer
對象,告訴我們直到我們應該等待什么時候:
generator = send_updates(3, 1.5) timer = next(generator) # [1.5] Sending update 1/3. print(timer.done_time - time.time()) # 1.498...
由于我們的代碼現在實際上并沒有調用time.sleep
,我們現在可以同時執行另一個send_updates
調用。
所以,為了把這一切放在一起,我們需要退后一步,意識到一些事情:
生成器就像部分執行的函數,等待一些 IO(計時器)。
每個部分執行的函數都有一些 IO(計時器),它在繼續執行之前等待。
因此,我們程序的當前狀態是每個部分執行的函數(生成器)和該函數正在等待的 IO(計時器)對的對列表
現在,要運行我們的程序,我們只需要等到某個 IO 準備就緒(即我們的一個計時器已過期),然后再向前一步執行相應的函數,得到一個阻塞該函數的新 IO。
實現此邏輯為我們提供了以下信息:
# Initialize each generator with a timer of 0 so it immediately executes generator_timer_pairs = [ (send_updates(10, 1.0), AsyncTimer(0)), (send_updates(5, 2.0), AsyncTimer(0)), (send_updates(4, 3.0), AsyncTimer(0)) ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair # Wait until this timer is ready time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: # Execute one more step of this function new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: # When the function is complete pass
有了這個,我們有了一個使用生成器的類似異步函數的工作示例。請注意,當生成器完成時,它會引發StopIteration
,并且當我們不再有部分執行的函數(生成器)時,我們的函數就完成了
現在,我們把它包裝在一個函數中,我們得到了類似于asyncio.run
的東西。結合asyncio.gather
運行:
def async_run_all(*generators): generator_timer_pairs = [ (generator, AsyncTimer(0)) for generator in generators ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) )
實現我們的caveman版本的asyncio
的最后一步是支持Python 3.5中引入的async/await
語法。await
的行為類似于yield
,只是它不是直接返回提供的值,而是返回next((...).__await__())
。async
函數返回“協程”,其行為類似于生成器,但需要使用.send(None)
而不是next()
(請注意,正如生成器在最初調用時不返回任何內容一樣,異步函數在逐步執行之前不會執行任何操作,這解釋了我們前面提到的)。
因此,鑒于這些信息,我們只需進行一些調整即可將我們的示例轉換為async/await
。以下是最終結果:
class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration def __await__(self): yield self async def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): await AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count)) def _wait_until_io_ready(ios): min_timer = min(ios, key=lambda x: x.done_time) time.sleep(max(0, min_timer.done_time - time.time())) return ios.index(min_timer) def async_run_all(*coroutines): coroutine_io_pairs = [ (coroutine, AsyncTimer(0)) for coroutine in coroutines ] while coroutine_io_pairs: ios = [io for cor, io in coroutine_io_pairs] ready_index = _wait_until_io_ready(ios) coroutine, _ = coroutine_io_pairs.pop(ready_index) try: new_io = coroutine.send(None) coroutine_io_pairs.append((coroutine, new_io)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) )
我們有了它,我們的迷你異步示例完成了,使用async/await
. 現在,您可能已經注意到我將 timer 重命名為 io 并將查找最小計時器的邏輯提取到一個名為_wait_until_io_ready
. 這是有意將這個示例與最后一個主題聯系起來:真實 IO。
在這里,我們完成了我們的小型異步示例,使用了async/await
。現在,你可能已經注意到我將timer
重命名為io,并將用于查找最小計時器的邏輯提取到一個名為_wait_until_io_ready
的函數中。這是為了將本示例與最后一個主題:真正的IO,連接起來。
所以,所有這些例子都很棒,但是它們與真正的 asyncio 有什么關系,我們希望在真正 IO 上等待 TCP 套接字和文件讀/寫?嗯,美麗就在那個_wait_until_io_ready
功能中。為了讓真正的 IO 正常工作,我們所要做的就是創建一些AsyncReadFile
類似于AsyncTimer
包含文件描述符的新對象。然后,AsyncReadFile
我們正在等待的對象集對應于一組文件描述符。最后,我們可以使用函數 (syscall) select()等待這些文件描述符之一準備好。由于 TCP/UDP 套接字是使用文件描述符實現的,因此這也涵蓋了網絡請求。
所以,所有這些例子都很好,但它們與真正的異步IO有什么關系呢?我們希望等待實際的IO,比如TCP套接字和文件讀/寫?好吧,其優點在于_wait_until_io_ready
函數。要使真正的IO工作,我們需要做的就是創建一些新的AsyncReadFile
,類似于AsyncTimer
,它包含一個文件描述符。然后,我們正在等待的一組AsyncReadFile
對象對應于一組文件描述符。最后,我們可以使用函數(syscall
)select()
等待這些文件描述符之一準備好。由于TCP/UDP套接字是使用文件描述符實現的,因此這也涵蓋了網絡請求。
“Python異步方法如何使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。