您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關利用Django-celery-beat怎么動態添加一個周期性任務,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
前期準備
1.beat插件安裝
pip3 install django-celery-beat
2.注冊APP
INSTALLED_APPS = [
....
'django_celery_beat',
]
3.數據庫變更
python3 manage.py migrate django_celery_beat
配置工作
目錄結構請參考://www.jb51.net/article/200659.htm
1.配置celerypro.py
from __future__ import absolute_import import os from celery import Celery from django.conf import settings from django.utils import timezone # set the default Django settings module for the 'celery' program. # 為celery設置環境變量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'voice_quality_assurance_configure.settings') # 創建celery app app = Celery('voice_quality_assurance_configure') # Using a string here means the worker will not have to # pickle the object when using Windows. # 從單獨的配置模塊中加載配置 app.config_from_object('voice_quality_assurance_configure.celeryconfig') # 設置app自動加載任務 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # 解決時區問題,定時任務啟動就循環輸出 app.now = timezone.now
2.配置celeryconfig.py
from __future__ import absolute_import from kombu import Queue from django.conf import settings # 設置代理人broker CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2' # 指定 Backend CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # 指定時區,默認是 UTC CELERY_TIMEZONE='Asia/Shanghai' # celery 序列化與反序列化配置 CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle' CELERY_ACCEPT_CONTENT = ['pickle', 'json'] CELERY_IGNORE_RESULT = True # celery 的啟動工作數量設置 CELERY_WORKER_CONCURRENCY = 10 # 任務預取功能,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。 CELERYD_PREFETCH_MULTIPLIER = 20 # 有些情況下可以防止死鎖 CELERYD_FORCE_EXECV = True # celery 的 worker 執行多少個任務后進行重啟操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果網絡資源有限,不建議開足馬力。 CELERY_DISABLE_RATE_LIMITS = True # celery beat配置(周期性任務設置) CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
3.分別啟動woker和beta
項目根目錄終端執行(voice_quality_assurance_configure為項目名稱,簡單來說,和manage.py文件同級)
celery -A voice_quality_assurance_configure beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler #
啟動beta 調度器使用數據庫
celery worker -A voice_quality_assurance_configure --loglevel=info -n worker1 #啟動celery worker
4.創建周期性任務
from datetime import datetime, timedelta import json import os,django os.environ.setdefault("DJANGO_SETTINGS_MODULE", "voice_quality_assurance_configure.settings")# project_name 項目名稱 django.setup() from django_celery_beat.models import PeriodicTask, IntervalSchedule schedule, created = IntervalSchedule.objects.get_or_create(every=10,period=IntervalSchedule.SECONDS,) # 帶參數的創建方法,如下: PeriodicTask.objects.create( interval=schedule, # 上面創建10秒的間隔 interval 對象 name='test_task', # 設置任務的name值 task='mission.tasks.my_task', # 指定需要周期性執行的任務 args=json.dumps([10, 2, 76]), expires=datetime.utcnow() + timedelta(seconds=30) )
詳解創建周期性任務的方法
創建基于interval的周期性任務
第一步創建間隔對象
schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, )
IntervalSchedule.DAYS 固定間隔天數
IntervalSchedule.HOURS 固定間隔小時數
IntervalSchedule.MINUTES 固定間隔分鐘數
IntervalSchedule.SECONDS 固定間隔秒數
IntervalSchedule.MICROSECONDS 固定間隔微秒
第二步創建任務
無參數的創建方法:
PeriodicTask.objects.create( interval=schedule, # we created this above. name='test_task', # simply describes this periodic task. task='app名.tasks.任務函數名', # name of task.)
有參數的創建方法:
PeriodicTask.objects.create( interval=schedule, # we created this above. name='test'_task', # simply describes this periodic task. task='app名.tasks.任務函數名', # name of task. args=json.dumps(['arg1', 'arg2']), kwargs=json.dumps({ 'be_careful': True, }), expires=datetime.utcnow() + timedelta(seconds=30) )
class MonitorDeviceTask(object): """ 設備創建,增加周期性任務 """ def __init__(self, device_obj): self.device_obj = device_obj self.periodic_task = PeriodicTask.objects.create( interval=schedule, name='test_task', task='mission.tasks.my_task', args=json.dumps([self.device_obj.ip]) ) def starttask(self): """ 啟動任務 """ self.periodic_task.enabled = True self.periodic_task.save() def stoptask(self): """ 停止任務 """ self.periodic_task.enabled = False self.periodic_task.save() def deltask(self): """ 刪除任務 """ self.periodic_task.delete() self.periodic_task.save()
創建基于 crontab 的周期性任務
from django_celery_beat.models import CrontabSchedule, PeriodicTask schedule, _ = CrontabSchedule.objects.get_or_create( minute='30', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', timezone=pytz.timezone('Canada/Pacific') )
關于利用Django-celery-beat怎么動態添加一個周期性任務就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。