在FastAPI中實現背景任務可以使用Python的asyncio
庫來實現。以下是一個簡單的示例代碼:
from fastapi import BackgroundTasks, FastAPI
import asyncio
app = FastAPI()
def background_task():
# 模擬一個長時間運行的任務
asyncio.sleep(5)
print("Background task completed")
@app.post("/send-notification/{message}")
async def send_notification(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(background_task)
return {"message": f"Notification '{message}' sent in the background"}
在上面的示例中,我們定義了一個背景任務background_task
,它模擬了一個長時間運行的任務。然后我們定義了一個路由/send-notification/{message}
,當用戶訪問這個路由時,會觸發發送通知的操作,并將背景任務background_task
添加到BackgroundTasks
中。這樣在接收到請求后,就會異步執行這個背景任務,不會阻塞主線程。
請注意,需要在啟動應用程序時運行uvicorn
服務器時添加--reload
參數,以便在代碼更改時重新加載應用程序。