91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

【從0開始Python開發實戰】Django集成Celery

發布時間:2020-03-11 22:57:27 來源:網絡 閱讀:470 作者:wx5b3c0a4298f7b 欄目:編程語言

目錄:

1.?Django集成Celery

2.?聲明異步任務

3.?封裝工具task_util.py

4.?單元測試test_task_util.py

5.?創建異步任務

6.?常見問題和解決方法


Celery是一個靈活可靠的分布式系統,用于異步任務調度,主要有3部分組成:消息中間件broker,任務執行單元worker,執行結果存儲task result store。Celery使用第三方消息中間件Redis,RabbitMQ等。

【從0開始Python開發實戰】Django集成Celery?

系統通常將一些耗時的操作任務提交給Celery去異步執行,典型架構示意圖如下本文詳細介紹Django集成Celery配置方法和功能測試

【從0開始Python開發實戰】Django集成Celery

時序圖如下:

【從0開始Python開發實戰】Django集成Celery?

示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_celery

├── __init__.py

├── settings.py

├── celery.py

├── tasks.py

├── util

│ ??└── task_util.py

├── tests

│ ??└── test_task_util.py


一,Django集成Celery


代碼文件

功能要點

Django集成Celery

requirements.txt

安裝Celery, Redis和工具包:

celery == 4.2.1

flower == 0.9.2

redis == 3.2.0

eventlet == 0.24.1

celery.py

配置Celery,依賴的消息中間件broker和后端backend地址配置在settings.py中集中維護。

__init__.py

配置項目加載celery.app

聲明異步任務

tasks.py

聲明Celery可調度的任務@shared_task

封裝工具task_util

task_util.py

異步任務創建和分發

單元測試

test_task_util.py

測試異步任務創建和分發功能

創建異步任務

views.py

增加REST接口/chk/job


1.?新建Django項目,運行:django-admin startproject hello_celery

2.?進到目錄hello_celery,增加應用:python manage.py startapp app

【從0開始Python開發實戰】Django集成Celery

項目的目錄文件結構如下:

【從0開始Python開發實戰】Django集成Celery

3.?安裝Celery和依賴包,pip install celery >= 4.2.1,如果不是新建項目,注意版本兼容問題。

celery == 4.2.1
flower == 0.9.2
redis == 3.2.0
eventlet == 0.24.1

4.?增加celery.py,配置信息:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hello_celery.settings')

app = Celery(
????'hello_celery',
????include=['hello_celery.tasks'],
????broker=settings.CELERY_BROKER,
????backend=settings.CELERY_BACKEND
)

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# ??should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
????CELERY_ACKS_LATE=True,
????CELERY_ACCEPT_CONTENT=['pickle', 'json'],
????CELERYD_FORCE_EXECV=True,
????CELERYD_MAX_TASKS_PER_CHILD=500,
????BROKER_HEARTBEAT=0,
)

# Optional configuration, see the application user guide.
app.conf.update(
????CELERY_TASK_RESULT_EXPIRES=3600, ?# celery任務執行結果的超時時間,即結果在backend里的保存時間,單位s
)

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

platforms.C_FORCE_ROOT = True

5.?打開settings.py,配置BROKER和BACKEND地址:

CELERY_BROKER = 'redis://127.0.0.1:6379/2'
CELERY_BACKEND = 'redis://127.0.0.1:6379/3'

6.?打開__init__.py,增加代碼:

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']


二,增加tasks.py,聲明異步任務

from __future__ import absolute_import, unicode_literals
import logging
import json

from celery import shared_task
from hello_celery.util.task_util import dispatch_task

log = logging.getLogger(__name__)


@shared_task
def task(param_str):
????log.info('task starts: %s, %s' % (type(param_str), param_str))

????param_dict = None
????try:
????????param_dict = json.loads(param_str)
????except Exception as e:
????????log.warning('Exception when parse param: %s' % str(e))

????log.info('parsed param: {}, {}'.format(type(param_dict), param_dict))
????return 'finished'


正確配置后,運行命令:celery -A hello_celery worker -l info -P eventlet,注意Win10環境中需要增加eventlet,Celery成功啟動信息:

【從0開始Python開發實戰】Django集成Celery?

三,封裝工具task_util.py

封裝兩個有用的工具函數,分別用于分發(創建)異步任務和獲取任務信息:

import logging
import json

log = logging.getLogger(__name__)

def dispatch_task(task_func, param_dict):
????param_json = json.dumps(param_dict)

????try:
????????return task_func.apply_async(
????????????[param_json],
????????????retry=True,
????????????retry_policy={
????????????????'max_retries': 1,
????????????????'interval_start': 0,
????????????????'interval_step': 0.2,
????????????????'interval_max': 0.2,
????????????},
????????)
????except Exception as ex:
????????log.info(ex)
????????raise


def get_task_status(task_func, task_id):
????t = task_func.AsyncResult(task_id)
????status = t.state
????progress = 0

????if status == u'SUCCESS':
????????progress = 100
????elif status == u'FAILURE':
????????progress = 0
????elif status == 'PROGRESS':
????????progress = t.info['progress']

????return {'status': status, 'progress': progress}


四,單元測試test_task_util.py

增加測試函數,創建一個任務任務并獲取信息:

import logging

from django.test import TestCase

from hello_celery.tasks import task
from hello_celery.util.task_util import dispatch_task, get_task_status

log = logging.getLogger(__name__)


class TasksTest(TestCase):
????def test_get_task_status(self):
????????t = dispatch_task(task, {'msg': 'test_task'})
????????self.assertIsNotNone(t)

????????ret = get_task_status(task, t.id)
????????log.info('task status: %s,%s, %s' % (ret, t.id, str(task)))
????????self.assertIsNotNone(ret.get('status'))

運行python manage.py test,同時Celery將執行測試函數創建的任務:

【從0開始Python開發實戰】Django集成Celery

五,創建異步任務

1.?views.py中增加請求處理函數,創建一個異步執行的任務:

from django.http import JsonResponse
from hello_celery.tasks import do_task


def chk_job(req):
????param_dict = {
????????'url': req.get_raw_uri(),
????????'path': req.get_full_path(),
????}
????job = do_task(param_dict)
????return JsonResponse({'code': 0, 'msg': 'success', 'job': job.task_id})

2.?urls.py中配置路由

from django.urls import path
from app.views import chk_job

urlpatterns = [
????path('', chk_job, name='chk'),
]

3.?運行命令啟動服務:python manage.py runserver 0.0.0.0:8001

【從0開始Python開發實戰】Django集成Celery

4.?REST接口創建異步任務示例

【從0開始Python開發實戰】Django集成Celery?

六,常見問題和解決方法

1.?啟動Celery: celery -A hello_celery worker -l info,運行出錯:

Unrecoverable error: VersionMismatch('Redis transport requires redis-py versions 3.2.0 or later. You have 2.10.6',)

解決:指定Redis使用3.2.0或更高pip install redis>=3.2.0

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

瓦房店市| 海阳市| 英吉沙县| 府谷县| 京山县| 亳州市| 永善县| 井陉县| 瑞丽市| 南康市| 千阳县| 天峻县| 周宁县| 扶沟县| 太康县| 成都市| 慈利县| 贵州省| 宜阳县| 修水县| 高安市| 商水县| 中阳县| 涿鹿县| 亚东县| 鄂州市| 红桥区| 岳池县| 方山县| 南溪县| 保亭| 政和县| 安龙县| 霍山县| 潼南县| 开化县| 醴陵市| 舟曲县| 甘南县| 金华市| 禹城市|