您好,登錄后才能下訂單哦!
本篇內容主要講解“python怎么使用contextvars模塊”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“python怎么使用contextvars模塊”吧!
在Python3.7后官方庫出現了contextvars
模塊, 它的主要功能就是可以為多線程以及asyncio生態添加上下文功能,即使程序在多個協程并發運行的情況下,也能調用到程序的上下文變量, 從而使我們的邏輯解耦.
上下文,可以理解為我們說話的語境, 在聊天的過程中, 有些話脫離了特定的語境,他的意思就變了,程序的運行也是如此.在線程中也是有他的上下文,只不過稱為堆棧,如在python中就是保存在thread.local變量中,而協程也有他自己的上下文,但是沒有暴露出來,不過有了contextvars
模塊后我們可以通過contextvars
模塊去保存與讀取.
使用contextvars
的好處不僅可以防止’一個變量傳遍天’的事情發生外,還能很好的結合TypeHint,可以讓自己的代碼可以被mypy以及IDE檢查,讓自己的代碼更加適應工程化.
不過用了contextvars
后會多了一些隱性的調用, 需要解決好這些隱性的成本.
切換web框架sanic
為starlette
增加一個自己編寫且可用于starlette
,fastapi
的context說明
更新fast_tools.context的最新示例以及簡單的修改行文。
如果有用過Flask
框架, 就知道了Flask
擁有自己的上下文功能, 而contextvars跟它很像, 而且還增加了對asyncio的上下文提供支持。Flask
的上下文是基于threading.local
實現的, threading.local
的隔離效果很好,但是他是只針對線程的,只隔離線程之間的數據狀態, 而werkzeug
為了支持在gevent
中運行,自己實現了一個Local
變量, 常用的Flask
上下文變量request
的例子如下:
from flask import Flask, request app = Flask(__name__) @app.route('/') def root(): so1n_name = request.get('so1n_name') return f'Name is {so1n_name}'
與之相比的是Python
的另一個經典Web框架Djano
, 它沒有上下文的支持, 所以只能顯示的傳request
對象, 例子如下:
from django.http import HttpResponse def root(request): so1n_name = request.get('so1n_name') return HttpResponse(f'Name is {so1n_name}')
通過上面兩者的對比可以發現, 在Django
中,我們需要顯示的傳一個叫request的變量,而Flask
則是import一個叫request的全局變量,并在視圖中直接使用,達到解耦的目的.
可能會有人說, 也就是傳個變量的區別,為了省傳這個變量,而花許多功夫去維護一個上下文變量,有點不值得,那可以看看下面的例子,如果層次多就會出現’一個參數傳一天’的情況(不過分層做的好或者需求不坑爹一般不會出現像下面的情況,一個好的程序員能做好代碼的分層, 但可能也有出現一堆爛需求的時候)
# 偽代碼,舉個例子一個request傳了3個函數 from django.http import HttpResponse def is_allow(request, uid): if request.ip == '127.0.0.1' and check_permissions(uid): return True else: return False def check_permissions(request, uid): pass def root(request): user_id = request.GET.get('uid') if is_allow(request, id): return HttpResponse('ok') else return HttpResponse('error')
此外, 除了防止一個參數傳一天
這個問題外, 通過上下文, 可以進行一些解耦, 比如有一個最經典的技術業務需求就是在日志打印request_id, 從而方便鏈路排查, 這時候如果有上下文模塊, 就可以把讀寫request_id給解耦出來, 比如下面這個基于Flask
框架讀寫request_id的例子:
import logging from typing import Any from flask import g # type: ignore from flask.logging import default_handler # 這是一個Python logging.Filter的對象, 日志在生成之前會經過Filter步驟, 這時候我們可以為他綁定request_id變量 class RequestIDLogFilter(logging.Filter): """ Log filter to inject the current request id of the request under `log_record.request_id` """ def filter(self, record: Any) -> Any: record.request_id = g.request_id or None return record # 配置日志的format格式, 這里多配了一個request_id變量 format_string: str = ( "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s" ) # 為flask的默認logger設置format和增加一個logging.Filter對象 default_handler.setFormatter(logging.Formatter(format_string)) default_handler.addFilter(RequestIDLogFilter()) # 該方法用于設置request_id def set_request_id() -> None: g.request_id = request.headers.get("X-Request-Id", str(uuid4())) # 初始化FLask對象, 并設置before_request app: Flask = Flask("demo") app.before_request(set_request_id)
這里舉了一個例子, 但這個例子也有別的解決方案. 只不過通過這個例子順便說如何使用contextvar模塊
首先看看未使用contextvars
時,asyncio的web框架是如何傳變量的,根據starlette
的文檔,在未使用contextvars
時,傳遞Redis
客戶端實例的辦法是通過request.stat這個變量保存Redis
客戶端的實例,改寫代碼如下:
# demo/web_tools.py # 通過中間件把變量給存進去 class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: request.stat.redis = REDIS_POOL response = await call_next(request) return response # demo/server.py # 調用變量 @APP.route('/') async def homepage(request): # 偽代碼,這里是執行redis命令 await request.stat.redis.execute() return JSONResponse({'hello': 'world'})
代碼非常簡便, 也可以正常的運行, 但你下次在重構時, 比如簡單的把redis這個變量名改為new_redis, 那IDE不會識別出來, 需要一個一個改。 同時, 在寫代碼的時候, IDE永遠不知道這個方法調用到的變量的類型是什么, IDE也無法智能的幫你檢查(如輸入request.stat.redis.時,IDE不會出現execute,或者出錯時,IDE并不會提示). 這非常不利于項目的工程化, 而通過contextvars
和TypeHints
, 恰好能解決這個問題.
說了那么多, 下面以一個Redis
client為例子,展示如何在asyncio生態中使用contextvars
, 并引入TypeHints
(詳細解釋見代碼).
# demo/context.py # 該文件存放contextvars相關 import contextvars if TYPE_CHECKING: from demo.redis_dal import RDS # 這里是一個redis的封裝實例 # 初始化一個redis相關的全局context redis_pool_context = contextvars.ContextVar('redis_pool') # 通過函數調用可以獲取到當前協程運行時的context上下文 def get_redis() -> 'RDS': return redis_pool_context.get() # demo/web_tool.py # 該文件存放starlette相關模塊 from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.middleware.base import RequestResponseEndpoint from starlette.responses import Response from demo.redis_dal import RDS # 初始化一個redis客戶端變量,當前為空 REDIS_POOL = None # type: Optional[RDS] class RequestContextMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: # 通過中間件,在進入路由之前,把redis客戶端放入當前協程的上下文之中 token = redis_pool_context.set(REDIS_POOL) try: response = await call_next(request) return response finally: # 調用完成,回收當前請求設置的redis客戶端的上下文 redis_pool_context.reset(token) async def startup_event() -> None: global REDIS_POOL REDIS_POOL = RDS() # 初始化客戶端,里面通過asyncio.ensure_future邏輯延后連接 async def shutdown_event() -> None: if REDIS_POOL: await REDIS_POOL.close() # 關閉redis客戶端 # demo/server.py # 該文件存放starlette main邏輯 from starlette.applications import Starlette from starlette.responses import JSONResponse from demo.web_tool import RequestContextMiddleware from demo.context import get_redis APP = Starlette() APP.add_middleware(RequestContextMiddleware) @APP.route('/') async def homepage(request): # 偽代碼,這里是執行redis命令 # 只要驗證 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那證明contextvars可以為asyncio維護一套上下文狀態 await get_redis().execute() return JSONResponse({'hello': 'world'})
從上面的示例代碼來看, 使用contextvar
和TypeHint
確實能讓讓IDE可以識別到這個變量是什么了, 但增加的代碼太多了,更恐怖的是, 每多一個變量,就需要自己去寫一個context,一個變量的初始化,一個變量的get函數,同時在引用時使用函數會比較別扭.
自己在使用了contextvars
一段時間后,覺得這樣太麻煩了,每次都要做一堆重復的操作,且平時使用最多的就是把一個實例或者提煉出Headers的參數放入contextvars中,所以寫了一個封裝fast_tools.context(同時兼容fastapi
和starlette
), 它能屏蔽所有與contextvars的相關邏輯,其中由ContextModel負責contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader負責托管Headers相關的參數, 調用者只需要在ContextModel中寫入自己需要的變量,引用時調用ContextModel的屬性即可.
以下是調用者的代碼示例, 這里的實例化變量由一個http client代替, 且都會每次請求分配一個客戶端實例, 但在實際使用中并不會為每一個請求都分配一個客戶端實例, 很影響性能:
import asyncio import uuid from contextvars import Context, copy_context from functools import partial from typing import Optional, Set import httpx from fastapi import FastAPI, Request, Response from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper app: FastAPI = FastAPI() check_set: Set[int] = set() class ContextModel(ContextBaseModel): """ 通過該實例可以屏蔽大部分與contextvars相關的操作,如果要添加一個變量,則在該實例添加一個屬性即可. 屬性必須要使用Type Hints的寫法,不然不會識別(強制使用Type Hints) """ # 用于把自己的實例(如上文所說的redis客戶端)存放于contextvars中 http_client: httpx.AsyncClient # HeaderHepler用于把header的變量存放于contextvars中 request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4())) ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host) user_agent: str = HeaderHelper.i("User-Agent") async def before_request(self, request: Request) -> None: # 請求之前的鉤子, 通過該鉤子可以設置自己的變量 self.http_client = httpx.AsyncClient() check_set.add(id(self.http_client)) async def before_reset_context(self, request: Request, response: Optional[Response]) -> None: # 準備退出中間件的鉤子, 這步奏后會清掉上下文 await self.http_client.aclose() context_model: ContextModel = ContextModel() app.add_middleware(ContextMiddleware, context_model=context_model) async def test_ensure_future() -> None: assert id(context_model.http_client) in check_set def test_run_in_executor() -> None: assert id(context_model.http_client) in check_set def test_call_soon() -> None: assert id(context_model.http_client) in check_set @app.get("/") async def root() -> dict: # 在使用asyncio.ensure_future開啟另外一個子協程跑任務時, 也可以復用上下文 asyncio.ensure_future(test_ensure_future()) loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop() # 使用call_soon也能復用上下文 loop.call_soon(test_call_soon) # 使用run_in_executor也能復用上下文, 但必須使用上下文的run方法, copy_context表示復制當前的上下文 ctx: Context = copy_context() await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor)) # type: ignore return { "message": context_model.to_dict(is_safe_return=True), # not return CustomQuery "client_id": id(context_model.http_client), } if __name__ == "__main__": import uvicorn # type: ignore uvicorn.run(app)
可以從例子中看到, 通過封裝的上下文調用會變得非常愉快, 只要通過一兩步方法就能設置好自己的上下文屬性, 同時不用考慮如何編寫上下文的生命周期. 另外也能通過這個例子看出, 在asyncio生態中, contextvars能運用到包括子協程, 多線程等所有的場景中.
在第一次使用時,我就很好奇contextvars是如何去維護程序的上下文的,好在contextvars的作者出了一個向下兼容的contextvars庫,雖然他不支持asyncio,但我們還是可以通過代碼了解到他的基本原理.
代碼倉中有ContextMeta
,ContextVarMeta
和TokenMeta
這幾個對象, 它們的功能都是防止用戶來繼承Context
,ContextVar
和Token
,原理都是通過元類來判斷類名是否是自己編寫類的名稱,如果不是則拋錯.
class ContextMeta(type(collections.abc.Mapping)): # contextvars.Context is not subclassable. def __new__(mcls, names, bases, dct): cls = super().__new__(mcls, names, bases, dct) if cls.__module__ != 'contextvars' or cls.__name__ != 'Context': raise TypeError("type 'Context' is not an acceptable base type") return cls
上下文的本質是一個堆棧, 每次set一次對象就向堆棧增加一層數據, 每次reset就是pop掉最上層的數據, 而在Contextvars
中, 通過Token
對象來維護堆棧之間的交互.
class Token(metaclass=TokenMeta): MISSING = object() def __init__(self, context, var, old_value): # 分別存放上下文變量, 當前set的數據以及上次set的數據 self._context = context self._var = var self._old_value = old_value self._used = False @property def var(self): return self._var @property def old_value(self): return self._old_value def __repr__(self): r = '<Token ' if self._used: r += ' used' r += ' var={!r} at {:0x}>'.format(self._var, id(self)) return r
可以看到Token
的代碼很少, 它只保存當前的context
變量, 本次調用set的數據和上一次被set的舊數據. 用戶只有在調用contextvar.context
后才能得到Token
, 返回的Token
可以被用戶在調用context后, 通過調用context.reset(token)來清空保存的上下文,方便本次context的變量能及時的被回收, 回到上上次的數據.
前面說過, Python中由threading.local()
負責每個線程的context, 協程屬于線程的’子集’,所以contextvar直接基于threading.local()
生成自己的全局context. 從他的源代碼可以看到, _state
就是threading.local()
的引用, 并通過設置和讀取_state
的context
屬性來寫入和讀取當前的上下文, copy_context
調用也很簡單, 同樣也是調用到threading.local()
API.
def copy_context(): return _get_context().copy() def _get_context(): ctx = getattr(_state, 'context', None) if ctx is None: ctx = Context() _state.context = ctx return ctx def _set_context(ctx): _state.context = ctx _state = threading.local()
關于threading.local()
,雖然不是本文重點,但由于contextvars
是基于threading.local()
進行封裝的,所以還是要明白threading.local()
的原理,這里并不直接通過源碼分析, 而是做一個簡單的示例解釋.
在一個線程里面使用線程的局部變量會比直接使用全局變量的性能好,因為局部變量只有線程自己能看見,不會影響其他線程,而全局變量的修改必須加鎖, 性能會變得很差, 比如下面全局變量的例子:
pet_dict = {} def get_pet(pet_name): return pet_dict[pet_name] def set_pet(pet_name): return pet_dict[pet_name]
這份代碼就是模仿一個簡單的全局變量調用, 如果是多線程調用的話, 那就需要加鎖啦, 每次在讀寫之前都要等到持有鎖的線程放棄了鎖后再去競爭, 而且還可能污染到了別的線程存放的數據.
而線程的局部變量則是讓每個線程有一個自己的pet_dict
, 假設每個線程調用get_pet
,set_pet
時,都會把自己的pid傳入進來, 那么就可以避免多個線程去同時競爭資源, 同時也不會污染到別的線程的數據, 那么代碼可以改為這樣子:
pet_dict = {} def get_pet(pet_name, pid): return pet_dict[pid][pet_name] def set_pet(pet_name, pid): return pet_dict[pid][pet_name]
不過這樣子使用起來非常方便, 同時示例例子沒有對異常檢查和初始化等處理, 如果值比較復雜, 我們還要維護異常狀況, 這樣太麻煩了.
這時候threading.local()
就應運而生了,他負責幫我們處理這些維護的工作,我們只要對他進行一些調用即可,調用起來跟單線程調用一樣簡單方便, 應用threading.local()
后的代碼如下:
import threading thread_local=threading.local() def get_pet(pet_name): return thread_local[pet_name] def set_pet(pet_name): return thread_local[pet_name]
可以看到代碼就像調用全局變量一樣, 但是又不會產生競爭狀態。
contextvars
自己封裝的Context比較簡單, 這里只展示他的兩個核心方法(其他的魔術方法就像dict
的魔術方法一樣):
class Context(collections.abc.Mapping, metaclass=ContextMeta): def __init__(self): self._data = immutables.Map() self._prev_context = None def run(self, callable, *args, **kwargs): if self._prev_context is not None: raise RuntimeError( 'cannot enter context: {} is already entered'.format(self)) self._prev_context = _get_context() try: _set_context(self) return callable(*args, **kwargs) finally: _set_context(self._prev_context) self._prev_context = None def copy(self): new = Context() new._data = self._data return new
首先, 在__init__
方法可以看到self._data,這里使用到了一個叫immutables.Map()的不可變對象,并對immutables.Map()進行一些封裝,所以context可以看成一個不可變的dict。這樣可以防止調用copy方法后得到的上下文的變動會影響到了原本的上下文變量。
查看immutables.Map()的示例代碼可以看到,每次對原對象的修改時,原對象并不會發生改變,并會返回一個已經發生改變的新對象.
map2 = map.set('a', 10) print(map, map2) # will print: # <immutables.Map({'a': 1, 'b': 2})> # <immutables.Map({'a': 10, 'b': 2})> map3 = map2.delete('b') print(map, map2, map3) # will print: # <immutables.Map({'a': 1, 'b': 2})> # <immutables.Map({'a': 10, 'b': 2})> # <immutables.Map({'a': 10})>
此外,context還有一個叫run
的方法, 上面在執行loop.run_in_executor
時就用過run
方法, 目的就是可以產生一個新的上下文變量給另外一個線程使用, 同時這個新的上下文變量跟原來的上下文變量是一致的.
執行run的時候,可以看出會copy一個新的上下文來調用傳入的函數, 由于immutables.Map
的存在, 函數中對上下文的修改并不會影響舊的上下文變量, 達到進程復制數據時的寫時復制的目的. 在run
方法的最后, 函數執行完了會再次set舊的上下文, 從而完成一次上下文切換.
def run(self, callable, *args, **kwargs): # 已經存在舊的context,拋出異常,防止多線程循環調用 if self._prev_context is not None: raise RuntimeError( 'cannot enter context: {} is already entered'.format(self)) self._prev_context = _get_context() # 保存當前的context try: _set_context(self) # 設置新的context return callable(*args, **kwargs) # 執行函數 finally: _set_context(self._prev_context) # 設置為舊的context self._prev_context = None
我們一般在使用contextvars模塊時,經常使用的就是ContextVar
這個類了,這個類很簡單,主要提供了set–設置值,get–獲取值,reset–重置值三個方法, 從Context
類中寫入和獲取值, 而set和reset的就是通過上面的token類進行交互的.
set – 為當前上下文設置變量
def set(self, value): ctx = _get_context() # 獲取當前上下文對象`Context` data = ctx._data try: old_value = data[self] # 獲取Context舊對象 except KeyError: old_value = Token.MISSING # 獲取不到則填充一個object(全局唯一) updated_data = data.set(self, value) # 設置新的值 ctx._data = updated_data return Token(ctx, self, old_value) # 返回帶有舊值的token
get – 從當前上下文獲取變量
def get(self, default=_NO_DEFAULT): ctx = _get_context() # 獲取當前上下文對象`Context` try: return ctx[self] # 返回獲取的值 except KeyError: pass if default is not _NO_DEFAULT: return default # 返回調用get時設置的值 if self._default is not _NO_DEFAULT: return self._default # 返回初始化context時設置的默認值 raise LookupError # 都沒有則會拋錯
reset – 清理本次用到的上下文數據
def reset(self, token): if token._used: # 判斷token是否已經被使用 raise RuntimeError("Token has already been used once") if token._var is not self: # 判斷token是否是當前contextvar返回的 raise ValueError( "Token was created by a different ContextVar") if token._context is not _get_context(): # 判斷token的上下文是否跟contextvar上下文一致 raise ValueError( "Token was created in a different Context") ctx = token._context if token._old_value is Token.MISSING: # 如果沒有舊值則刪除該值 ctx._data = ctx._data.delete(token._var) else: # 有舊值則當前contextvar變為舊值 ctx._data = ctx._data.set(token._var, token._old_value) token._used = True # 設置flag,標記token已經被使用了
則此,contextvar的原理了解完了,接下來再看看他是如何在asyncio運行的.
由于向下兼容的contextvars
并不支持asyncio, 所以這里通過aiotask-context的源碼簡要的了解如何在asyncio中如何獲取和設置context。
相比起contextvars復雜的概念,在asyncio中,我們可以很簡單的獲取到當前協程的task, 然后通過task就可以很方便的獲取到task的context了,由于Pyhon3.7對asyncio的高級API 重新設計,所以可以看到需要對獲取當前task進行封裝
PY37 = sys.version_info >= (3, 7) if PY37: def asyncio_current_task(loop=None): """Return the current task or None.""" try: return asyncio.current_task(loop) except RuntimeError: # simulate old behaviour return None else: asyncio_current_task = asyncio.Task.current_task
不同的版本有不同的獲取task方法, 之后我們就可以通過調用asyncio_current_task().context
即可獲取到當前的上下文了…
同樣的,在得到上下文后, 我們這里也需要set, get, reset的操作,不過十分簡單, 類似dict一樣的操作即可, 它沒有token的邏輯:
set
def set(key, value): """ Sets the given value inside Task.context[key]. If the key does not exist it creates it. :param key: identifier for accessing the context dict. :param value: value to store inside context[key]. :raises """ current_task = asyncio_current_task() if not current_task: raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key)) current_task.context[key] = value
get
def get(key, default=None): """ Retrieves the value stored in key from the Task.context dict. If key does not exist, or there is no event loop running, default will be returned :param key: identifier for accessing the context dict. :param default: None by default, returned in case key is not found. :return: Value stored inside the dict[key]. """ current_task = asyncio_current_task() if not current_task: raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key)) return current_task.context.get(key, default)
clear – 也就是contextvar.ContextVars
中的reset
def clear(): """ Clear the Task.context. :raises ValueError: if no current task. """ current_task = asyncio_current_task() if not current_task: raise ValueError("No event loop found") current_task.context.clear()
在Python的更高級版本中,已經支持設置context了,所以這兩個方法可以不再使用了.他們最后都用到了task_factory
的方法.task_factory
簡單說就是創建一個新的task,再通過工廠方法合成context,最后把context設置到task
def task_factory(loop, coro, copy_context=False, context_factory=None): """ By default returns a task factory that uses a simple dict as the task context, but allows context creation and inheritance to be customized via ``context_factory``. """ # 生成context工廠函數 context_factory = context_factory or partial( dict_context_factory, copy_context=copy_context) # 創建task, 跟asyncio.ensure_future一樣 task = asyncio.tasks.Task(coro, loop=loop) if task._source_traceback: del [-1] # 獲取task的context try: context = asyncio_current_task(loop=loop).context except AttributeError: context = None # 從context工廠中處理context并賦值在task task.context = context_factory(context) return task
aiotask-context
提供了兩個對context處理的函數dict_context_factory
和chainmap_context_factory
.在aiotask-context
中,context是一個dict對象,dict_context_factory
可以選擇賦值或者設置新的context
def dict_context_factory(parent_context=None, copy_context=False): """A traditional ``dict`` context to keep things simple""" if parent_context is None: # initial context return {} else: # inherit context new_context = parent_context if copy_context: new_context = deepcopy(new_context) return new_context
chainmap_context_factory
與dict_context_factory
的區別就是在合并context而不是直接繼承.同時借用ChainMap
保證合并context后,還能同步context的改變
def chainmap_context_factory(parent_context=None): """ A ``ChainMap`` context, to avoid copying any data and yet preserve strict one-way inheritance (just like with dict copying) """ if parent_context is None: # initial context return ChainMap() else: # inherit context if not isinstance(parent_context, ChainMap): # if a dict context was previously used, then convert # (without modifying the original dict) parent_context = ChainMap(parent_context) return parent_context.new_child()
到此,相信大家對“python怎么使用contextvars模塊”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。