91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python怎么使用APScheduler調度任務

發布時間:2022-06-01 11:42:18 來源:億速云 閱讀:140 作者:iii 欄目:大數據

這篇文章主要介紹“Python怎么使用APScheduler調度任務”,在日常操作中,相信很多人在Python怎么使用APScheduler調度任務問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Python怎么使用APScheduler調度任務”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

 任務調度應用場景

所謂的任務調度是指安排任務的執行計劃,即何時執行,怎么執行等。在現實項目中經常出現它們的身影;特別是數據類項目,比如實時統計每5分鐘網站的訪問量,就需要每5分鐘定時從日志數據分析訪問量。

總結下任務調度應用場景:

  •  離線作業調度:按時間粒度執行某項任務

  •  共享緩存更新:定時刷新緩存,如redis緩存;不同進程間的共享數據

任務調度工具

  •  linux的crontab, 支持按照分鐘/小時/天/月/周粒度,執行任務

  •  java的Quartz

  •  windows的任務計劃

本文介紹的是python中的任務調度庫,APScheduler(advance python scheduler)。如果你了解Quartz的話,可以看出APScheduler是Quartz的python實現;APScheduler提供了基于時間,固定時間點和crontab方式的任務調用方案, 可以當作一個跨平臺的調度工具來使用。

APScheduler

組件介紹

APScheduler由5個部分組成:觸發器、調度器、任務存儲器、執行器和任務事件。

  •  任務job:任務id和任務執行func

  •  觸發器triggers:確定任務何時開始執行

  •  任務存儲器job stores: 保存任務的狀態

  •  執行器executors:確定任務怎么執行

  •  任務事件event:監控任務執行異常情況

  •  調度器schedulers:串聯任務的整個生命周期,添加編輯任務到任務存儲器,在任務的執行時間到來時,把任務交給執行器執行返回結果;同時發出事件監聽,監控任務事件 。

安裝

pip install apscheduler

簡單例子

from apscheduler.schedulers.background import BackgroundScheduler  from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore  from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR  import logging  import datetime  # 任務執行函數  def job_func(job_id):      print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))  # 事件監聽  def job_exception_listener(event):      if event.exception:          # todo:異常處理, 告警等          print('The job crashed :(')      else:          print('The job worked :)')  # 日志  logging.basicConfig()  logging.getLogger('apscheduler').setLevel(logging.DEBUG)  # 定義一個后臺任務非阻塞調度器  scheduler = BackgroundScheduler()  # 添加一個任務到內存中   # 觸發器:trigger='interval' seconds=10 每10s觸發執行一次  # 執行器:executor='default' 線程執行  # 任務存儲器:jobstore='default' 默認內存存儲  # 最大并發數:max_instances  scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)  # 設置任務監聽  scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)  # 啟動調度器  scheduler.start()

運行情況:

job 1 is runed at 2020-03-21 20:00:38  The job worked :)  job 1 is runed at 2020-03-21 20:00:48  The job worked :)  job 1 is runed at 2020-03-21 20:00:58  The job worked :)

觸發器

觸發器決定何時執行任務,APScheduler支持的觸發器有3種

  •  trigger='interval':按固定時間周期執行,支持weeks,days,hours,minutes, seconds, 還可指定時間范圍   

sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
  •  trigger='date': 固定時間,執行一次   

sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
  •   trigger='cron': 支持crontab方式,執行任務

  •   參數:分鐘/小時/天/月/周粒度,也可指定時間范圍     

year (int|str) – 4-digit year        month (int|str) – month (1-12)        day (int|str) – day of the (1-31)        week (int|str) – ISO week (1-53)        day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)        hour (int|str) – hour (0-23)        minute (int|str) – minute (0-59)        second (int|str) – second (0-59)        start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)        end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
  •   例子           

# 星期一到星期五,5點30執行任務job_function,直到2014-05-30 00:00:00             sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')             # 按照crontab格式執行, 格式為:分鐘 小時 天 月 周,*表示所有             # 5月到8月的1號到15號,0點0分執行任務job_function             sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))

執行器

執行器決定如何執行任務;APScheduler支持4種不同執行器,常用的有pool(線程/進程)和gevent(io多路復用,支持高并發),默認為pool中線程池, 不同的執行器可以在調度器的配置中進行配置(見調度器)

  •  apscheduler.executors.asyncio:同步io,阻塞

  •  apscheduler.executors.gevent:io多路復用,非阻塞

  •  apscheduler.executors.pool: 線程ThreadPoolExecutor和進程ProcessPoolExecutor

  •  apscheduler.executors.twisted:基于事件驅動

任務存儲器

任務存儲器決定任務的保存方式, 默認存儲在內存中(MemoryJobStore),重啟后就沒有了。APScheduler支持的任務存儲器有:

  •  apscheduler.jobstores.memory:內存

  •  apscheduler.jobstores.mongodb:存儲在mongodb

  •  apscheduler.jobstores.redis:存儲在redis

  •  apscheduler.jobstores.rethinkdb:存儲在rethinkdb

  •  apscheduler.jobstores.sqlalchemy:支持sqlalchemy的數據庫如mysql,sqlite等

  •  apscheduler.jobstores.zookeeper:zookeeper

不同的任務存儲器可以在調度器的配置中進行配置(見調度器)

調度器

APScheduler支持的調度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler

  •  BlockingScheduler:適用于調度程序是進程中唯一運行的進程,調用start函數會阻塞當前線程,不能立即返回。

  •  BackgroundScheduler:適用于調度程序在應用程序的后臺運行,調用start后主線程不會阻塞。

  •  AsyncIOScheduler:適用于使用了asyncio模塊的應用程序。

  •  GeventScheduler:適用于使用gevent模塊的應用程序。

  •  TwistedScheduler:適用于構建Twisted的應用程序。

  •  QtScheduler:適用于構建Qt的應用程序。

從前面的例子,我們可以看到,調度器可以操作任務(并為任務指定觸發器、任務存儲器和執行器)和監控任務。

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)

我們來詳細看下各個部分

  •  調度器配置:在add_job我們看到jobstore和executor都是default,APScheduler在定義調度器時可以指定不同的任務存儲和執行器,以及初始的參數   

from pytz import utc     from apscheduler.schedulers.background import BackgroundScheduler     from apscheduler.jobstores.mongodb import MongoDBJobStore     from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore     from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor     # 通過dict方式執行不同的jobstores、executors和默認的參數     jobstores = {         'mongo': MongoDBJobStore(),         'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')     }     executors = {         'default': ThreadPoolExecutor(20),         'processpool': ProcessPoolExecutor(5)     }     job_defaults = {         'coalesce': False,         'max_instances': 3     }     # 定義調度器     scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc)     def job_func(job_id):         print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))     # 添加任務     scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)     # 啟動調度器     scheduler.start()
  •  操作任務:調度器可以增加,刪除,暫停,恢復和修改任務。需要注意的是這里的操作只是對未執行的任務起作用,已經執行和正在執行的任務不受這些操作的影響。

  •   add_job     

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
  •   remove_job: 通過任務唯一的id,刪除的時候對應的任務存儲器里記錄也會刪除 

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')   scheduler.remove_job('my_job_id')
  •   Pausing and resuming jobs:暫停和重啟任務       

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')          scheduler.pause_job('my_job_id')          scheduler.resume_job('my_job_id')
  •   Modifying jobs:修改任務的配置       

job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)         # 修改任務的屬性         job.modify(max_instances=6, name='Alternate name')         # 修改任務的觸發器         scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
  •  監控任務事件類型,比較常用的類型有:

    •   EVENT_JOB_ERROR: 表示任務在執行過程的出現異常觸發

    •   EVENT_JOB_EXECUTED:任務執行成功時

    •   EVENT_JOB_MAX_INSTANCES:調度器上執行的任務超過配置的參數時       

scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

到此,關于“Python怎么使用APScheduler調度任務”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

江山市| 五家渠市| 沈丘县| 临沭县| 扶绥县| 彭州市| 广汉市| 桃源县| 延吉市| 禹城市| 平山县| 龙门县| 翁牛特旗| 鞍山市| 峨眉山市| 天长市| 江北区| 磴口县| 明溪县| 金阳县| 同仁县| 锡林郭勒盟| 南漳县| 皋兰县| 凤城市| 龙口市| 遂昌县| 苍南县| 永修县| 金湖县| 万安县| 安远县| 桐柏县| 卢湾区| 涿州市| 阜城县| 离岛区| 楚雄市| 永川市| 巫溪县| 濉溪县|