您好,登錄后才能下訂單哦!
這篇文章主要講解了“Python Asyncio庫之同步原語常用函數有哪些”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Python Asyncio庫之同步原語常用函數有哪些”吧!
Asyncio
的同步原語可以簡化我們編寫資源競爭的代碼和規避資源競爭導致的Bug的出現。 但是由于協程的特性,在大部分業務代碼中并不需要去考慮資源競爭的出現,導致Asyncio
同步原語被使用的頻率比較低,但是如果想基于Asyncio
編寫框架則需要學習同步原語的使用。
同步原語都是適用于某些條件下對某個資源的爭奪,在代碼中大部分的資源都是屬于一個代碼塊,而Python
對于代碼塊的管理的最佳實踐是使用with
語法,with
語法實際上是調用了一個類中的__enter__
和__exit__
方法,比如下面的代碼:
class Demo(object): def __enter__(self): return def __exit__(self, exc_type, exc_val, exc_tb): return with Demo(): pass
代碼中的Demo
類實現了__enter__
和__exit__
方法后,就可以被with
語法調用,其中__enter__
方法是進入代碼塊執行的邏輯,__enxi__
方法是用于退出代碼塊(包括異常退出)的邏輯。這兩個方法符合同步原語中對資源的爭奪和釋放,但是__enter__
和__exit__
兩個方法都是不支持await
調用的,為了解決這個問題,Python
引入了async with
語法。
async with
語法和with
語法類似 ,我們只要編寫一個擁有__aenter__
和__aexit__
方法的類,那么這個類就支持asyncio with
語法了,如下:
import asyncio class Demo(object): async def __aenter__(self): return async def __aexit__(self, exc_type, exc_val, exc_tb): return async def main(): async with Demo(): pass asyncio.run(main())
其中,類中的__aenter__
方法是進入代碼塊時執行的方法,__aexit__
是退出代碼塊時執行的方法。
有了async with
語法的加持,asyncio
的同步原語使用起來會比較方便,所以asyncio
中對資源爭奪的同步原語都會繼承于_ContextManagerMixin
類:
class _ContextManagerMixin: async def __aenter__(self): await self.acquire() # We have no use for the "as ..." clause in the with # statement for locks. return None async def __aexit__(self, exc_type, exc, tb): self.release()
并實現了acquire
和release
方法,供__aenter__
和__aexit__
方法調用,同時我們在使用同步原語的時候盡量用到async with
語法防止忘記釋放資源的占用。
由于協程的特性,在編寫協程代碼時基本上可以不考慮到鎖的情況,但在一些情況下我們還是需要用到鎖,并通過鎖來維護并發時的數據安全性,如下例子:
import asyncio share_data = {} async def sub(i): # 賦上相同的key和value share_data[i] = i await asyncio.sleep(0) print(i, share_data[i] == i) async def sub_add(i): # 賦上的value值是原來的+1 share_data[i] = i + 1 await asyncio.sleep(0) print(i, share_data[i] == i + 1) async def main(): # 創建并發任務 task_list = [] for i in range(10): task_list.append(sub(i)) task_list.append(sub_add(i)) # 并發執行 await asyncio.gather(*task_list) if __name__ == "__main__": asyncio.run(main())
在這個例子中程序會并發的執行sub
和sub_add
函數,他們是由不同的asyncio.Task
驅動的,這意味著會出現這樣一個場景。 當負責執行sub(1)
函數的asyncio.Task
在執行完share_data[i]=i
后就執行await asyncio.sleep(0)
從而主動讓出控制權并交還給事件循環,等待事件循環的下一次調度。 不過事件循環不會空下來,而是馬上安排下一個asyncio.Task
執行,此時會先執行到sub_add(1)
函數的share_data[i] = i + 1
,并同樣的在執行到await asyncio.sleep(0)
的時候把控制權交會給事件循環。 這時候控制權會由事件循環轉移給原先執行sub(1)
函數的asyncio.Task
,獲取到控制權l后sub(1)
函數的邏輯會繼續走,但由于share_data[i]
的數據已經被share_data[i] = i + 1
修改了,導致最后執行print
時,share_data[i]
的數據已經變為臟數據,而不是原本想要的數據了。
為了解決這個問題,我們可以使用asyncio.Lock
來解決資源的沖突,如下:
import asyncio share_data = {} # 存放對應資源的鎖 lock_dict = {} async def sub(i): async with lock_dict[i]: # <-- 通過async with語句來控制鎖的粒度 share_data[i] = i await asyncio.sleep(0) print(i, share_data[i] == i) async def sub_add(i): async with lock_dict[i]: share_data[i] = i + 1 await asyncio.sleep(0) print(i, share_data[i] == i + 1) async def main(): task_list = [] for i in range(10): lock_dict[i] = asyncio.Lock() task_list.append(sub(i)) task_list.append(sub_add(i)) await asyncio.gather(*task_list) if __name__ == "__main__": asyncio.run(main())
從例子可以看到asyncio.Lock
的使用方法跟多線程的Lock
差不多,通過async with
語法來獲取和釋放鎖,它的原理也很簡單,主要做了如下幾件事:
1.確保某一協程獲取鎖后的執行期間,別的協程在獲取鎖時需要一直等待,直到執行完成并釋放鎖。
2.當有協程持有鎖的時候,其他協程必須等待,直到持有鎖的協程釋放了鎖。
2.確保所有協程能夠按照獲取的順序獲取到鎖。
這意味著需要有一個數據結構來維護當前持有鎖的協程的和下一個獲取鎖協程的關系,同時也需要一個隊列來維護多個獲取鎖的協程的喚醒順序。
asyncio.Lock
跟其它asyncio
功能的用法一樣,使用asyncio.Future
來同步協程之間鎖的狀態,使用deque
維護協程間的喚醒順序,源碼如下:
class Lockl(_ContextManagerMixin, mixins._LoopBoundMixin): def __init__(self): self._waiters = None self._locked = False def locked(self): return self._locked async def acquire(self): if (not self._locked and (self._waiters is None or all(w.cancelled() for w in self._waiters))): # 目前沒有其他協程持有鎖,當前協程可以運行 self._locked = True return True if self._waiters is None: self._waiters = collections.deque() # 創建屬于自己的容器,并推送到`_waiters`這個雙端隊列中 fut = self._get_loop().create_future() self._waiters.append(fut) try: try: await fut finally: # 如果執行完畢,需要把自己移除,防止被`wake_up_first`調用 self._waiters.remove(fut) except exceptions.CancelledError: # 如果是等待的過程中被取消了,需要喚醒下一個調用`acquire` if not self._locked: self._wake_up_first() raise # 持有鎖 self._locked = True return True def release(self): if self._locked: # 釋放鎖 self._locked = False self._wake_up_first() else: raise RuntimeError('Lock is not acquired.') def _wake_up_first(self): if not self._waiters: return # 獲取還處于鎖狀態協程對應的容器 try: # 獲取下一個等待獲取鎖的waiter fut = next(iter(self._waiters)) except StopIteration: return # 設置容器為True,這樣對應協程就可以繼續運行了。 if not fut.done(): fut.set_result(True)
通過源碼可以知道,鎖主要提供了獲取和釋放的功能,對于獲取鎖需要區分兩種情況:
1:當有協程想要獲取鎖時會先判斷鎖是否被持有,如果當前鎖沒有被持有就直接返回,使協程能夠正常運行。
2:如果協程獲取鎖時,鎖發現自己已經被其他協程持有則創建一個屬于當前協程的asyncio.Future
,用來同步狀態,并添加到deque
中。
而對于釋放鎖就比較簡單,只要獲取deque
中的第一個asyncio.Future
,并通過fut.set_result(True)
進行標記,使asyncio.Future
從peding
狀態變為done
狀態,這樣一來,持有該asyncio.Future
的協程就能繼續運行,從而持有鎖。
不過需要注意源碼中acquire
方法中對CancelledError
異常進行捕獲,再喚醒下一個鎖,這是為了解決acquire
方法執行異常導致鎖一直被卡住的場景,通常情況下這能解決大部分的問題,但是如果遇到錯誤的封裝時,我們需要親自處理異常,并執行鎖的喚醒。比如在通過繼承asyncio.Lock
編寫一個超時鎖時,最簡單的實現代碼如下:
import asyncio class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: return await asyncio.wait_for(super().acquire(), self.timeout)
這份代碼非常簡單,他只需要在__init__
方法傳入timeout
參數,并在acuiqre
方法中通過wait_for
來實現鎖超時即可,現在假設wait_for
方法是一個無法傳遞協程cancel
的方法,且編寫的acquire
沒有進行捕獲異常再釋放鎖的操作,當異常發生的時候會導致鎖一直被卡住。 為了解決這個問題,只需要對TimeoutLock
的acquire
方法添加異常捕獲,并在捕獲到異常時釋放鎖即可,代碼如下:
class TimeoutLock(asyncio.Lock): def __init__(self, timeout, *, loop=None): self.timeout = timeout super().__init__(loop=loop) async def acquire(self) -> bool: try: return await asyncio.wait_for(super().acquire(), self.timeout) except Exception: self._wake_up_first() raise
asyncio.Event
也是一個簡單的同步原語,但它跟asyncio.Lock
不一樣,asyncio.Lock
是確保每個資源只能被一個協程操作,而asyncio.Event
是確保某個資源何時可以被協程操作,可以認為asyncio.Lock
鎖的是資源,asyncio.Event
鎖的是協程,所以asyncio.Event
并不需要acquire
來鎖資源,release
釋放資源,所以也用不到async with
語法。
asyncio.Event
的簡單使用示例如下:
import asyncio async def sub(event: asyncio.Event) -> None: await event.wait() print("I'm Done") async def main() -> None: event = asyncio.Event() for _ in range(10): asyncio.create_task(sub(event)) await asyncio.sleep(1) event.set() asyncio.run(main())
在這個例子中會先創建10個asyncio.Task
來執行sub
函數,但是所有sub
函數都會在event.wait
處等待,直到main
函數中調用event.set
后,所有的sub
函數的event.wait
會放行,使sub
函數能繼續執行。
可以看到asyncio.Event
功能比較簡單,它的源碼實現也很簡單,源碼如下:
class Event(mixins._LoopBoundMixin): def __init__(self): self._waiters = collections.deque() self._value = False def is_set(self): return self._value def set(self): if not self._value: # 確保每次只能set一次 self._value = True # 設置每個協程存放的容器為True,這樣對應的協程就可以運行了 for fut in self._waiters: if not fut.done(): fut.set_result(True) def clear(self): # 清理上一次的set self._value = False async def wait(self): if self._value: # 如果設置了,就不需要等待了 return True # 否則需要創建一個容器,并需要等待容器完成 fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut return True finally: self._waiters.remove(fut)
通過源碼可以看到wait
方法主要是創建了一個asyncio.Future
,并把它加入到deque
隊列后就一直等待著,而set
方法被調用時會遍歷整個deque
隊列,并把處于peding
狀態的asyncio.Future
設置為done
,這時其他在調用event.wait
方法的協程就會得到放行。
通過源碼也可以看出,asyncio.Event
并沒有繼承于_ContextManagerMixin
,這是因為它鎖的是協程,而不是資源。
asyncio.Event
的使用頻率比asyncio.Lock
多許多,不過通常都會讓asyncio.Event
和其他數據結構進行封裝再使用,比如實現一個服務器的優雅關閉功能,這個功能會確保服務器在等待n秒后或者所有連接都關閉后才關閉服務器,這個功能就可以使用set
與asyncio.Event
結合,如下:
import asyncio class SetEvent(asyncio.Event): def __init__(self, *, loop=None): self._set = set() super().__init__(loop=loop) def add(self, value): self._set.add(value) self.clear() def remove(self, value): self._set.remove(value) if not self._set: self.set()
這個SetEvent
結合了set
和SetEvent
的功能,當set
有數據的時候,會通過clear
方法使SetEvent
變為等待狀態,而set
沒數據的時候,會通過set
方法使SetEvent
變為無需等待的狀態,所有調用wait
的協程都可以放行,通過這種結合,SetEvent
擁有了等待資源為空的功能。 接下來就可以用于服務器的優雅退出功能:
async def mock_conn_io() -> None: await asyncio.sleep(1) def conn_handle(set_event: SetEvent): task: asyncio.Task = asyncio.create_task(mock_conn_io()) set_event.add(task) task.add_done_callback(lambda t: set_event.remove(t)) async def main(): set_event: SetEvent = SetEvent() for _ in range(10): conn_handle(set_event) # 假設這里收到了退出信號 await asyncio.wait(set_event.wait(), timeout=9) asyncio.run(main())
在這個演示功能中,mock_conn_io
用于模擬服務器的連接正在處理中,而conn_handle
用于創建服務器連接,main
則是先創建10個連接,并模擬在收到退出信號后等待資源為空或者超時才退出服務。
這只是簡單的演示,實際上的優雅關閉功能要考慮的東西不僅僅是這些。
condition只做簡單介紹
asyncio.Condition
是同步原語中使用最少的一種,因為他使用情況很奇怪,而且大部分場景可以被其他寫法代替,比如下面這個例子:
import asyncio async def task(condition, work_list): await asyncio.sleep(1) work_list.append(33) print('Task sending notification...') async with condition: condition.notify() async def main(): condition = asyncio.Condition() work_list = list() print('Main waiting for data...') async with condition: _ = asyncio.create_task(task(condition, work_list)) await condition.wait() print(f'Got data: {work_list}') asyncio.run(main()) # >>> Main waiting for data... # >>> Task sending notification... # >>> Got data: [33]
在這個例子中可以看到,notify
和wait
方法只能在async with condition
中可以使用,如果沒有在async with condition
中使用則會報錯,同時這個示例代碼有點復雜,沒辦法一看就知道執行邏輯是什么,其實這個邏輯可以轉變成一個更簡單的寫法:
import asyncio async def task(work_list): await asyncio.sleep(1) work_list.append(33) print('Task sending notification...') return async def main(): work_list = list() print('Main waiting for data...') _task = asyncio.create_task(task(work_list)) await _task print(f'Got data: {work_list}') asyncio.run(main()) # >>> Main waiting for data... # >>> Task sending notification... # >>> Got data: [33]
通過這個代碼可以看到這個寫法更簡單一點,而且更有邏輯性,而condition
的寫法卻更有點Go
協程寫法/或者回調函數寫法的感覺。 所以建議在認為自己的代碼可能會用到asyncio.Conditon
時需要先考慮到是否需要asyncio.Codition
?是否有別的方案代替,如果沒有才考慮去使用asyncio.Conditon
k。
asyncio.Semaphore
--信號量是同步原語中被使用最頻繁的,大多數都是用在限流場景中,比如用在爬蟲中和客戶端網關中限制請求頻率。
asyncio.Semaphore
可以認為是一個延緩觸發的asyncio.Lock
,asyncio.Semaphore
內部會維護一個計數器,無論何時進行獲取或釋放,它都會遞增或者遞減(但不會超過邊界值),當計數器歸零時,就會進入到鎖的邏輯,但是這個鎖邏輯會在計數器大于0的時候釋放j,它的用法如下:`
import asyncio async def main(): semaphore = asyncio.Semaphore(10): async with semaphore: pass asyncio.run(main())
示例中代碼通過async with
來指明一個代碼塊(代碼用pass
代替),這個代碼塊是被asyncio.Semaphore
管理的,每次協程在進入代碼塊時,asyncio.Semaphore
的內部計數器就會遞減一,而離開代碼塊則asyncio.Semaphore
的內部計數器會遞增一。
當有一個協程進入代碼塊時asyncio.Semaphore
發現計數器已經為0了,則會使當前協程進入等待狀態,直到某個協程離開這個代碼塊時,計數器會遞增一,并喚醒等待的協程,使其能夠進入代碼塊中繼續執行。
asyncio.Semaphore
的源碼如下,需要注意的是由于asyncio.Semaphore
是一個延緩的asyncio.Lock
,所以當調用一次release
后可能會導致被喚醒的協程和剛進入代碼塊的協程起沖突,所以在acquire
方法中要通過一個while
循環來解決這個問題:`
class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): def __init__(self, value=1): if value < 0: raise ValueError("Semaphore initial value must be >= 0") self._value = value self._waiters = collections.deque() self._wakeup_scheduled = False def _wake_up_next(self): while self._waiters: # 按照放置順序依次彈出容器 waiter = self._waiters.popleft() if not waiter.done(): # 設置容器狀態,使對應的協程可以繼續執行 waiter.set_result(None) # 設置標記 self._wakeup_scheduled = True return def locked(self): return self._value == 0 async def acquire(self): # 如果`self._wakeup_scheduled`為True或者value小于0 while self._wakeup_scheduled or self._value <= 0: # 創建容器并等待執行完成 fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut self._wakeup_scheduled = False except exceptions.CancelledError: # 如果被取消了,也要喚醒下一個協程 self._wake_up_next() raise self._value -= 1 return True def release(self): # 釋放資源占用,喚醒下一個協程。 self._value += 1 self._wake_up_next()
針對asyncio.Semaphore
進行修改可以實現很多功能,比如基于信號量可以實現一個簡單的協程池,這個協程池可以限制創建協程的量,當協程池滿的時候就無法繼續創建協程,只有協程中的協程執行完畢后才能繼續創建(當然無法控制在協程中創建新的協程),代碼如下:
import asyncio import time from typing import Coroutine class Pool(object): def __init__(self, max_concurrency: int): self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency) async def create_task(self, coro: Coroutine) -> asyncio.Task: await self._semaphore.acquire() task: asyncio.Task = asyncio.create_task(coro) task.add_done_callback(lambda t: self._semaphore.release()) return task async def demo(cnt: int) -> None: print(f"{int(time.time())} create {cnt} task...") await asyncio.sleep(cnt) async def main() -> None: pool: Pool = Pool(3) for i in range(10): await pool.create_task(demo(i)) asyncio.run(main()) # >>> 1677517996 create 0 task... # >>> 1677517996 create 1 task... # >>> 1677517996 create 2 task... # >>> 1677517996 create 3 task... # >>> 1677517997 create 4 task... # >>> 1677517998 create 5 task... # >>> 1677517999 create 6 task... # >>> 1677518001 create 7 task... # >>> 1677518003 create 8 task... # >>> 1677518005 create 9 task...
感謝各位的閱讀,以上就是“Python Asyncio庫之同步原語常用函數有哪些”的內容了,經過本文的學習后,相信大家對Python Asyncio庫之同步原語常用函數有哪些這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。