在Python中,協程(coroutine)是一種特殊的函數,可以在執行過程中暫停和恢復。處理協程中的異常情況需要使用asyncio
庫中的一些功能。以下是一些處理協程異常的方法:
try-except
語句捕獲異常:在協程內部使用try-except
語句捕獲異常,就像在普通函數中一樣。當異常發生時,它會被捕獲并存儲在except
子句中。
import asyncio
async def my_coroutine():
try:
# 你的協程代碼
except Exception as e:
print(f"捕獲到異常: {e}")
asyncio.run(my_coroutine())
asyncio.gather
處理多個協程的異常:asyncio.gather
函數可以同時運行多個協程,并收集它們的結果。如果其中一個協程引發異常,gather
會立即停止執行剩余的協程,并將異常傳遞給return_exceptions
參數。
import asyncio
async def my_coroutine(num):
if num == 2:
raise ValueError("這是一個故意引發的異常")
return f"協程 {num} 完成"
async def main():
coroutines = [my_coroutine(i) for i in range(1, 4)]
results = await asyncio.gather(*coroutines, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"捕獲到異常: {result}")
else:
print(result)
asyncio.run(main())
asyncio.create_task
和await
處理異常:當你使用asyncio.create_task
創建一個任務時,可以使用await
關鍵字等待協程完成。如果協程引發異常,它會被捕獲并存儲在asyncio.Task
對象的exception()
方法中。
import asyncio
async def my_coroutine(num):
if num == 2:
raise ValueError("這是一個故意引發的異常")
return f"協程 {num} 完成"
async def main():
task = asyncio.create_task(my_coroutine(2))
try:
result = await task
except Exception as e:
print(f"捕獲到異常: {e}")
else:
print(result)
asyncio.run(main())
這些方法可以幫助你處理Python協程中的異常情況。在實際編程中,你可能需要根據具體需求選擇合適的方法。