您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“如何實現Python重試超時裝飾器”,內容詳細,步驟清晰,細節處理妥當,希望這篇“如何實現Python重試超時裝飾器”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
在寫業務代碼時候,有許多場景需要重試某塊業務邏輯,例如網絡請求、購物下單等,希望發生異常的時候多重試幾次。
一個重試裝飾器,最重要的就是發生意外異常處理失敗自動重試,有如下幾點需要注意
失敗不能一直重試,因為可能會出現死循環浪費資源,因此需要有 最大重試次數 或者 最大超時時間
不能重試太頻繁,因為太頻繁容易導致重試次數很快用完,卻沒有成功響應,需要有 重試時間間隔 來限制,有時可以加大成功概率,例如網絡請求時有一段時間是堵塞的,或者對方服務負載太高導致一段時間無法響應等。
簡單分析完,我們的重試裝飾器,就要支持可配置最大重試次數、最大超時時間、重試間隔,所以裝飾器就要設計成帶參數裝飾器。
分析完畢后,看看第一版的裝飾器
import time from functools import wraps def task_retry(max_retry_count: int = 5, time_interval: int = 2): """ 任務重試裝飾器 Args: max_retry_count: 最大重試次數 默認5次 time_interval: 每次重試間隔 默認2s """ def _task_retry(task_func): @wraps(task_func) def wrapper(*args, **kwargs): # 函數循環重試 for retry_count in range(max_retry_count): print(f"execute count {retry_count + 1}") try: task_result = task_func(*args, **kwargs) return task_result except Exception as e: print(f"fail {str(e)}") time.sleep(time_interval) return wrapper return _task_retry
裝飾器內部閉包,就簡單通過 for 循環 來執行指定重試次數,成功獲取結果就直接 return 返回,發生異常則睡眠配置重試間隔時間后繼續循環
寫個例子來模擬測試下看看效果
沒有異常正常執行,在函數中模擬一個異常來進行重試看看
@task_retry(max_retry_count=3, time_interval=1) def user_place_order(): a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} ret = user_place_order() print("user place order ret", ret) >>>out fail division by zero execute count 2 fail division by zero execute count 3 fail division by zero user place order ret None
可以看到 user_place_order 函數執行了三遍,都發生了除零異常,最后超過最大執行次數,返回了 None 值,我們可以在主邏輯中來判斷返回值是否為 None 來進行超過最大重試次數失敗的業務邏輯處理
ret = user_place_order() print("user place order ret", ret) if not ret: print("user place order failed") ...
現在只能配置 最大重試次數 沒有最大超時時間,有時候我們想不但有重試,還得在規定時間內完成,不想浪費太多試錯時間。所以增加一個 最大超時時間配置選項默認為None,有值時超過最大超時時間退出重試。
def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None): """ 任務重試裝飾器 Args: max_retry_count: 最大重試次數 默認 5 次 time_interval: 每次重試間隔 默認 2s max_timeout: 最大超時時間,單位s 默認為 None, """ def _task_retry(task_func): @wraps(task_func) def wrapper(*args, **kwargs): # 函數循環重試 start_time = time.time() for retry_count in range(max_retry_count): print(f"execute count {retry_count + 1}") use_time = time.time() - start_time if max_timeout and use_time > max_timeout: # 超出最大超時時間 print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}") return try: return task_func(*args, **kwargs) except Exception as e: print(f"fail {str(e)}") time.sleep(time_interval) return wrapper return _task_retry
看看效果
# 超時 @task_retry(max_retry_count=3, time_interval=1, max_timeout=2) def user_place_order(): a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} >>>out execute count 1 fail division by zero execute count 2 fail division by zero execute count 3 execute timeout, use time 2.010528802871704s, max timeout 2 user place order ret None # 超過最大重試次數 @task_retry(max_retry_count=3, time_interval=1) def user_place_order(): a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} >>>out execute count 1 fail division by zero execute count 2 fail division by zero execute count 3 fail division by zero user place order ret None # 正常 @task_retry(max_retry_count=3, time_interval=1, max_timeout=2) def user_place_order(): # a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} >>>out execute count 1 user place order success user place order ret {'code': 0, 'msg': 'ok'}
到這重試裝飾器基本功能就實現了,但還可以加強,Python現在支持 async 異步方式寫法,因此要是可以兼容異步寫法那就更好了。先看看裝飾異步函數會是什么樣的效果
import time import asyncio import functools def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None): """ 任務重試裝飾器 Args: max_retry_count: 最大重試次數 默認 5 次 time_interval: 每次重試間隔 默認 2s max_timeout: 最大超時時間,單位s 默認為 None, """ def _task_retry(task_func): @wraps(task_func) def wrapper(*args, **kwargs): # 函數循環重試 start_time = time.time() for retry_count in range(max_retry_count): print(f"execute count {retry_count + 1}") use_time = time.time() - start_time if max_timeout and use_time > max_timeout: # 超出最大超時時間 print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}") return try: return task_func(*args, **kwargs) except Exception as e: print(f"fail {str(e)}") time.sleep(time_interval) return wrapper return _task_retry @task_retry(max_retry_count=3, time_interval=1, max_timeout=2) def user_place_order(): # a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} @task_retry(max_retry_count=3, time_interval=2, max_timeout=5) async def user_place_order_async(): """異步函數重試案例""" a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} async def main(): # 同步案例 # ret = user_place_order() # print(f"user place order ret {ret}") # 異步案例 ret = await user_place_order_async() print(f"user place order ret {ret}") if __name__ == '__main__': asyncio.run(main()) # 正常時候 execute count 1 user place order success user place order ret {'code': 0, 'msg': 'ok'} # 異常時候 >>>out execute count 1 Traceback (most recent call last): File "G:/code/python/py-tools/decorator/base.py", line 138, in <module> asyncio.run(main()) File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\base_events.py", line 587, in run_until_complete return future.result() File "G:/code/python/py-tools/decorator/base.py", line 133, in main ret = await user_place_order_async() File "G:/code/python/py-tools/decorator/base.py", line 121, in user_place_order_async a = 1 / 0 ZeroDivisionError: division by zero Process finished with exit code 1
發現發生異常的時候并沒有重試,為什么呢?其實在執行 task_func() 它并沒有真正的執行內部邏輯,而是返回一個 coroutine 協程對象,并不會報異常,所以再裝飾器中執行一遍就成功就出來了,外面 ret = await user_place_order_async(), 后才真正的等待執行,然后執行函數內的邏輯再報異常就沒有捕獲到。我們可以打斷點驗證下
這樣裝飾器就不支持異步函數的重試,需要加強它,可以使用 asyncio.iscoroutinefunction() 來進行異步函數的判斷, 然后再加一個異步函數的閉包就可以實現異步、同步函數都兼容的重試裝飾器。
def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None): """ 任務重試裝飾器 Args: max_retry_count: 最大重試次數 默認 5 次 time_interval: 每次重試間隔 默認 2s max_timeout: 最大超時時間,單位s 默認為 None, """ def _task_retry(task_func): @functools.wraps(task_func) def sync_wrapper(*args, **kwargs): # 同步循環重試 start_time = time.time() for retry_count in range(max_retry_count): print(f"execute count {retry_count + 1}") use_time = time.time() - start_time if max_timeout and use_time > max_timeout: # 超出最大超時時間 print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}") return try: task_ret = task_func(*args, **kwargs) return task_ret except Exception as e: print(f"fail {str(e)}") time.sleep(time_interval) @functools.wraps(task_func) async def async_wrapper(*args, **kwargs): # 異步循環重試 start_time = time.time() for retry_count in range(max_retry_count): print(f"execute count {retry_count + 1}") use_time = time.time() - start_time if max_timeout and use_time > max_timeout: # 超出最大超時時間 print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}") return try: return await task_func(*args, **kwargs) except Exception as e: print(f"fail {str(e)}") await asyncio.sleep(time_interval) # 異步函數判斷 wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper return wrapper_func return _task_retry
注意時間等待 await asyncio.sleep(time_interval) 會導致函數掛起,程序不會在這里等待,而是去事件循環loop中執行其他的已經就緒的任務,如果其他函數運行時間太久了才切換回來,會導致時間超時,換成 time.sleep()的話其實也沒有用,如果函數內部還有異步函數執行還是會切換出去,因此異步的時候感覺超時參數意義不大。
模擬測試下
@task_retry(max_retry_count=5, time_interval=2, max_timeout=5) async def user_place_order_async(): """異步函數重試案例""" a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} async def io_test(): """模擬io阻塞""" print("io test start") time.sleep(3) print("io test end") return "io test end" async def main(): # 同步案例 # ret = user_place_order() # print(f"user place order ret {ret}") # 異步案例 # ret = await user_place_order_async() # print(f"user place order ret {ret}") # 并發異步 order_ret, io_ret = await asyncio.gather( user_place_order_async(), io_test(), ) print(f"io ret {io_ret}") print(f"user place order ret {order_ret}") if __name__ == '__main__': asyncio.run(main()) >>>out execute count 1 fail division by zero io test start io test end execute count 2 fail division by zero execute count 3 execute timeout, use time 5.015768527984619s, max timeout 5 io ret io test end user place order ret None
可以看出執行一遍后自動切換到了 io_test 中執行由于 io test 中的 time.sleep(3) 會導致整個線程阻塞,一定要等到io_test執行完后才會切換回去,然后再執行兩遍就超時了,你可能會說都用異步的庫,是的異步的庫是可以加速,但我想表達就是這時候統計的耗時是整個程序的而不是單獨一個函數的。大家可以在評論區幫我想想有沒有其他的方法,要么就不要用這個超時參數。
可以兼容異步函數、然后超時參數可以不配置,影響不大,O(∩_∩)O~
最終版就是利用拋異常的方式來結束超過最大重試次數、最大超時,而不是直接返回None,然后再添加一個可配置捕獲指定異常的參數,當發生特定異常的時候才重試。
import time import asyncio import functools from typing import Type class MaxRetryException(Exception): """最大重試次數異常""" pass class MaxTimeoutException(Exception): """最大超時異常""" pass def task_retry( max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None, catch_exc: Type[BaseException] = Exception ): """ 任務重試裝飾器 Args: max_retry_count: 最大重試次數 默認 5 次 time_interval: 每次重試間隔 默認 2s max_timeout: 最大超時時間,單位s 默認為 None, catch_exc: 指定捕獲的異常類用于特定的異常重試 默認捕獲 Exception """ def _task_retry(task_func): @functools.wraps(task_func) def sync_wrapper(*args, **kwargs): # 函數循環重試 start_time = time.time() for retry_count in range(max_retry_count): print(f"execute count {retry_count + 1}") use_time = time.time() - start_time if max_timeout and use_time > max_timeout: # 超出最大超時時間 raise MaxTimeoutException(f"execute timeout, use time {use_time}s, max timeout {max_timeout}") try: task_ret = task_func(*args, **kwargs) return task_ret except catch_exc as e: print(f"fail {str(e)}") time.sleep(time_interval) else: # 超過最大重試次數, 拋異常終止 raise MaxRetryException(f"超過最大重試次數失敗, max_retry_count {max_retry_count}") @functools.wraps(task_func) async def async_wrapper(*args, **kwargs): # 異步循環重試 start_time = time.time() for retry_count in range(max_retry_count): print(f"execute count {retry_count + 1}") use_time = time.time() - start_time if max_timeout and use_time > max_timeout: # 超出最大超時時間 raise MaxTimeoutException(f"execute timeout, use time {use_time}s, max timeout {max_timeout}") try: return await task_func(*args, **kwargs) except catch_exc as e: print(f"fail {str(e)}") await asyncio.sleep(time_interval) else: # 超過最大重試次數, 拋異常終止 raise MaxRetryException(f"超過最大重試次數失敗, max_retry_count {max_retry_count}") # 異步函數判斷 wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper return wrapper_func return _task_retry @task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError,max_timeout=5) def user_place_order(): a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} @task_retry(max_retry_count=5, time_interval=2, max_timeout=5) async def user_place_order_async(): """異步函數重試案例""" a = 1 / 0 print("user place order success") return {"code": 0, "msg": "ok"} async def io_test(): """模擬io阻塞""" print("io test start") time.sleep(3) print("io test end") return "io test end" async def main(): # 同步案例 try: ret = user_place_order() print(f"user place order ret {ret}") except MaxRetryException as e: # 超過最大重試次數處理 print("MaxRetryException", e) except MaxTimeoutException as e: # 超過最大超時處理 print("MaxTimeoutException", e) # 異步案例 # ret = await user_place_order_async() # print(f"user place order ret {ret}") # 并發異步 # order_ret, io_ret = await asyncio.gather( # user_place_order_async(), # io_test(), # ) # print(f"io ret {io_ret}") # print(f"user place order ret {order_ret}") if __name__ == '__main__': asyncio.run(main())
測試捕獲指定異常
# 指定捕獲除零錯誤,正常捕獲重試 @task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError) def user_place_order(): a = 1 / 0 # a = [] # b = a[0] print("user place order success") return {"code": 0, "msg": "ok"} # out execute count 1 fail division by zero execute count 2 fail division by zero execute count 3 fail division by zero MaxRetryException 超過最大重試次數失敗, max_retry_count 3 # 指定捕獲除零錯誤,報索引越界錯誤,未正常捕獲重試,直接退出 @task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError) def user_place_order(): # a = 1 / 0 a = [] b = a[0] print("user place order success") return {"code": 0, "msg": "ok"} # out Traceback (most recent call last): File "G:/code/python/py-tools/decorator/base.py", line 184, in <module> asyncio.run(main()) File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\base_events.py", line 587, in run_until_complete return future.result() File "G:/code/python/py-tools/decorator/base.py", line 161, in main ret = user_place_order() File "G:/code/python/py-tools/decorator/base.py", line 97, in sync_wrapper task_ret = task_func(*args, **kwargs) File "G:/code/python/py-tools/decorator/base.py", line 137, in user_place_order b = a[0] IndexError: list index out of range Process finished with exit code 1
把重試里的超時計算單獨抽離出來,這樣功能不會太藕合,分兩個裝飾實現
def set_timeout(timeout: int, use_signal=False): """ 超時處理裝飾器 Args: timeout: 超時時間,單位秒 use_signal: 使用信號量機制只能在 unix內核上使用,默認False Raises: TimeoutException """ def _timeout(func: Callable): def _handle_timeout(signum, frame): raise MaxTimeoutException(f"Function timed out after {timeout} seconds") @functools.wraps(func) def sync_wrapper(*args, **kwargs): # 同步函數處理超時 if use_signal: # 使用信號量計算超時 signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(timeout) try: return func(*args, **kwargs) finally: signal.alarm(0) else: # 使用線程 with ThreadPoolExecutor() as executor: future = executor.submit(func, *args, **kwargs) try: return future.result(timeout) except TimeoutError: raise MaxTimeoutException(f"Function timed out after {timeout} seconds") @functools.wraps(func) async def async_wrapper(*args, **kwargs): # 異步函數處理超時 try: ret = await asyncio.wait_for(func(*args, **kwargs), timeout) return ret except asyncio.TimeoutError: raise MaxTimeoutException(f"Function timed out after {timeout} seconds") return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return _timeout def retry( max_count: int = 5, interval: int = 2, catch_exc: Type[BaseException] = Exception ): """ 重試裝飾器 Args: max_count: 最大重試次數 默認 5 次 interval: 每次異常重試間隔 默認 2s catch_exc: 指定捕獲的異常類用于特定的異常重試 默認捕獲 Exception Raises: MaxRetryException """ def _retry(task_func): @functools.wraps(task_func) def sync_wrapper(*args, **kwargs): # 函數循環重試 for retry_count in range(max_count): logger.info(f"{task_func} execute count {retry_count + 1}") try: return task_func(*args, **kwargs) except catch_exc: logger.error(f"fail {traceback.print_exc()}") if retry_count < max_count - 1: # 最后一次異常不等待 time.sleep(interval) # 超過最大重試次數, 拋異常終止 raise MaxRetryException(f"超過最大重試次數失敗, max_retry_count {max_count}") @functools.wraps(task_func) async def async_wrapper(*args, **kwargs): # 異步循環重試 for retry_count in range(max_count): logger.info(f"{task_func} execute count {retry_count + 1}") try: return await task_func(*args, **kwargs) except catch_exc as e: logger.error(f"fail {str(e)}") if retry_count < max_count - 1: await asyncio.sleep(interval) # 超過最大重試次數, 拋異常終止 raise MaxRetryException(f"超過最大重試次數失敗, max_retry_count {max_count}") # 異步函數判斷 wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper return wrapper_func return _retry
讀到這里,這篇“如何實現Python重試超時裝飾器”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。