您好,登錄后才能下訂單哦!
這篇文章主要介紹了python定時任務apscheduler如何使用的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇python定時任務apscheduler如何使用文章都會有所收獲,下面我們一起來看看吧。
pip install apscheduler
概念性東西,混個臉熟,代碼比這些定義好理解。
觸發器(trigger)包含調度邏輯,每一個作業有它自己的觸發器,用于決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。說人話就是你指定那種方式觸發當前的任務。
類型 | 解釋 |
---|---|
DateTrigger | 到期執行(到xxxx年x月x日 x時x分x秒執行) 對應DateTrigger |
IntervalTrigger | 間隔執行(每5秒執行一次) |
CronTrigger | 一個crontab類型的條件(這個比較復雜,比如周一到周四的4-5點每5秒執行一次) |
作業存儲(job store)存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,并在加載時被反序列化。調度器不能分享同一個作業存儲。
Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態添加Jobstore。每個jobstore
都會綁定一個alias,scheduler在Add Job時,根據指定的jobstore在scheduler中找到相應的jobstore,并
將job添加到jobstore中。Jobstore主要是通過pickle庫的loads和dumps【實現核心是通過python的__getstate__和__setstate__重寫
實現】,每次變更時將Job動態保存到存儲中,使用時再動態的加載出來,作為存儲的可以是redis,也可以
是數據庫【通過sqlarchemy這個庫集成多種數據庫】,也可以是mongodb等
目前APScheduler支持的Jobstore:MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore
執行器(executor)處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
- 說人話就是添加任務時候用它來包裝的,executor的種類會根據不同的調度來選擇,如果選擇AsyncIO作為調度的庫,那么選擇AsyncIOExecutor,如果選擇tornado作為調度的庫,選擇TornadoExecutor,如果選擇啟動進程作為調度,選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以Executor的選擇需要根據實際的scheduler來選擇不同的執行器
目前APScheduler支持的Executor:
AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor
調度器(scheduler)是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業.
Scheduler是APScheduler的核心,所有相關組件通過其定義。scheduler啟動之后,將開始按照配置的任務進行調度。
除了依據所有定義Job的trigger生成的將要調度時間喚醒調度之外。當發生Job信息變更時也會觸發調度。scheduler可根據自身的需求選擇不同的組件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則
選擇TornadoScheduler。
目前APScheduler支持的Scheduler:AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler
import time from apscheduler.schedulers.blocking import BlockingScheduler # 引入后臺 def my_job(): print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sched = BlockingScheduler() sched.add_job(my_job, 'interval', seconds=5) sched.start()
# trigeers 觸發器 # job stores job 存儲 # executors 執行器 # schedulers 調度器 from pytz import utc from sqlalchemy import func from apscheduler.schedulers.background import BackgroundScheduler,AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { # 可以配置多個存儲 #'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存儲鏈接 } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工作線程數20 'processpool': ProcessPoolExecutor(max_workers=5) # 最大工作進程數為5 } job_defaults = { 'coalesce': False, # 關閉新job的合并,當job延誤或者異常原因未執行時 'max_instances': 3 # 并發運行新job默認最大實例多少 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc. scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作為調度程序的時區 import os import time def print_time(name): print(f'{name} - {time.ctime()}') def add_job(job_id, func, args, seconds): """添加job""" print(f"添加間隔執行任務job - {job_id}") scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds) def add_coun_job(job_id, func, args, start_time): """添加job""" print(f"添加一次執行任務job - {job_id}") scheduler.add_job(id=job_id, func=func, args=args, trigger='date',timezone='Asia/Shanghai', run_date=start_time) # scheduler.add_job(func=print_time, trigger='date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2']) def remove_job(job_id): """移除job""" scheduler.remove_job(job_id) print(f"移除job - {job_id}") def pause_job(job_id): """停止job""" scheduler.pause_job(job_id) print(f"停止job - {job_id}") def resume_job(job_id): """恢復job""" scheduler.resume_job(job_id) print(f"恢復job - {job_id}") def get_jobs(): """獲取所有job信息,包括已停止的""" res = scheduler.get_jobs() print(f"所有job - {res}") def print_jobs(): print(f"詳細job信息") scheduler.print_jobs() def start(): """啟動調度器""" scheduler.start() def shutdown(): """關閉調度器""" scheduler.shutdown() if __name__ == '__main__': scheduler = BackgroundScheduler() # start() # print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C')) # add_job('job_A', func=print_time, args=("A", ), seconds=1) # add_job('job_B', func=print_time, args=("B", ), seconds=2) # time.sleep(6) # pause_job('job_A') # 停止a # get_jobs() #得到所有job # time.sleep(6) # print_jobs() # resume_job('job_A') # time.sleep(6) # remove_job('job_A') # time.sleep(6) from datetime import datetime import pytz start() date_temp = datetime(2022, 2, 19, 17, 30, 5) # scheduler.add_job(print_time, 'date', run_date=datetime.now(pytz.timezone('America/Manaus')), args=['text']) # scheduler.add_job(print_time, 'date',timezone='Asia/Shanghai', run_date=datetime(2022, 2, 19, 17, 57, 0).astimezone(), args=['text2']) add_coun_job(job_id="job_C",func=print_time,args=('一次性執行任務',),start_time=datetime(2022, 2, 19, 18, 4, 0).astimezone()) time.sleep(130) try: shutdown() except RuntimeError: pass
關于“python定時任務apscheduler如何使用”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“python定時任務apscheduler如何使用”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。