您好,登錄后才能下訂單哦!
背景
在開發中,我們常常會遇到一些耗時任務,舉個例子:
上傳并解析一個 1w 條數據的 Excel 文件,最后持久化至數據庫。
在我的程序中,這個任務耗時大約 6s,對于用戶來說,6s 的等待已經是個災難了。
比較好的處理方式是:
我們按照這個思路,借助 Celery 進行實現。
實現
本文所使用的環境如下:
使用 Docker 安裝 RabbitMQ
Celery 依賴一個消息后端,可選方案有 RabbitMQ, Redis 等,本文選用 RabbitMQ 。
同時為了安裝方便,RabbitMQ 我直接使用 Docker 安裝:
docker run -d --name anno-rabbit -p 5672:5672 rabbitmq:3
啟動成功后,即可通過 amqp://localhost 訪問該消息隊列。
安裝并配置 Celery
Celery 是 Python 實現的工具,安裝可以直接通過 Pip 完成:
pip install celery
同時假設當前我的項目文件夾為 proj ,項目名為 myproj ,應用名為 myapp
安裝完成后,在 proj/myproj/ 路徑下創建一個 celery.py 文件,用來初始化 Celery 實例:
proj/myproj/celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery, platforms # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings') app = Celery('myproj', broker='amqp://localhost//', backend='amqp://localhost//') # Using a string here means the worker don't have to serialize # the configuration object to child processes.s # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks()
然后在 proj/myproj/__init__.py 中添加對 Celery 對象的引用,確保 Django 啟動后能夠初始化 Celery:
proj/myproj/__init__.py
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
無其他特殊配置的話,Celery 的基本配置就是這些。
編寫一個耗時任務
為了模擬一個耗時任務,我們直接創建一個方法,使其「睡」10s ,并將其設置為 Celery 的任務:
proj/myapp/tasks.py
import time from myproj.celery import app as celery_app @celery_app.task def waste_time(): time.sleep(10) return "Run function 'waste_time' finished."
啟動 Celery Worker
Celery 配置完成,并且任務創建成功后,我們以異步任務的模式啟動 Celery :
celery -A myproj worker -l info
注意到我強調了異步模式,是因為 Celery 除了支持異步任務,還支持定時任務,因此啟動時候要指明。
同時要注意,Celery 一旦啟動,對 Task(此處為 waste_time) 的修改必須重啟 Celery 才會生效。
任務調用
在請求處理的邏輯代碼中,調用上面創建好的任務:
proj/myapp/views.py
from django.http import JsonResponse from django.views.decorators.http import require_http_methods from .tasks import waste_time @require_http_methods(["POST"]) def upload_files(request): waste_time.delay() # Status code 202: Accepted, 表示異步任務已接受,可能還在處理中 return JsonResponse({"results": "操作成功,正在上傳,請稍候..."}, status=202)
調用 waste_time.delay() 方法后, waste_time 會被加入到任務隊列中,等待空閑的 Celery Worker 調用。
效果
當我們發送請求時,這個接口會直接返回 {"results": "操作成功,正在上傳,請稍候..."} 的響應內容而非卡住十秒,用戶體驗要好許多。
總結
用 Celery 處理這種異步任務是 Python 常用的方法,雖然實際執行成功耗時不變甚至有所增加(如 Worker 繁忙導致處理滯后),但是對于用戶體驗來說更容易接受,點擊上傳大文件后可以繼續處理其他事務,而不需要在頁面等待。
Celery 還有更多用法本文未介紹到,其文檔已經非常詳盡,有需要可直接參考。
參考
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
https://hub.docker.com/_/rabbitmq
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。