您好,登錄后才能下訂單哦!
這篇“Python第三方模塊apscheduler安裝和使用的方法是什么”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Python第三方模塊apscheduler安裝和使用的方法是什么”文章吧。
安裝apscheduler 模塊
pip install apscheduler
APScheduler(Advanced Python Scheduler)是一個輕量級的Python定時任務調度框架(Python庫)。
APScheduler有三個內置的調度系統,其中包括:
cron式調度(可選開始/結束時間)
基于間隔的執行(以偶數間隔運行作業,也可以選擇開始/結束時間)
一次性延遲執行任務(在指定的日期/時間內運行作業一次)
APScheduler可以任意混合和匹配調度系統和作業存儲的后端,其中支持后端存儲作業包括:
triggers(觸發器)中包含調度邏輯,每個作業都由自己的觸發器來決定下次運行時間。除了他們自己初始配置意外,觸發器完全是無狀態的。
job stores(作業存儲器)存儲被調度的作業,默認的作業存儲器只是簡單地把作業保存在內存中,其他的作業存儲器則是將作業保存在數據庫中。當作業被保存到一個持久化的作業存儲器中的時候,該作業的數據會被序列化,并在加載時被反序列化。作業存儲器不能共享調度器。
executors(執行器)處理作業的運行,他們通常通過在作業中提交指定的可調用對象到一個線程或者進程池來進行。當作業完成時,執行器將會通知調度器。
schedulers(調度器)配置作業存儲器和執行器可以在調度器中完成,例如添加、修改和移除作業。根據不同的應用場景可以選用不同的調度器,可選的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7種。
觸發器
當你調度作業的時候,你需要為這個作業選擇一個觸發器,用來描述這個作業何時被觸發,APScheduler有三種內置的觸發器類型:
date: 一次性指定日期;
interval: 在某個時間范圍內間隔多長時間執行一次;
cron :Linux crontab格式兼容,最為強大。
date 最基本的一種調度,作業只會執行一次。它的參數如下:
1.run_date
(datetime|str) – 作業的運行日期或時間
2.timezone
(datetime.tzinfo|str) – 指定時區
作業存儲器
如果你的應用在每次啟動的時候都會重新創建作業,那么使用默認的作業存儲器(MemoryJobStore)即可,但是如果你需要在調度器重啟或者應用程序奔潰的情況下任然保留作業,你應該根據你的應用環境來選擇具體的作業存儲器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多數RDBMS)
執行器
對執行器的選擇取決于你使用上面哪些框架,大多數情況下,使用默認的ThreadPoolExecutor已經能夠滿足需求。如果你的應用涉及到CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時使用兩者,將ProcessPoolExecutor作為第二執行器。
選擇合適的調度器
BlockingScheduler : 當調度器是你應用中唯一要運行的東西時
BackgroundScheduler : 當你沒有運行任何其他框架并希望調度器在你應用的后臺執行時使用。
AsyncIOScheduler : 當你的程序使用了asyncio(一個異步框架)的時候使用。
GeventScheduler : 當你的程序使用了gevent(高性能的Python并發框架)的時候使用。
TornadoScheduler : 當你的程序基于Tornado(一個web框架)的時候使用。
TwistedScheduler : 當你的程序使用了Twisted(一個異步框架)的時候使用
QtScheduler : 如果你的應用是一個Qt應用的時候可以使用。
添加作業
有兩種方式可以添加一個新的作業:
add_job來添加作業;
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job1(): print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def my_job2(): print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() # 每隔5秒運行一次my_job1 sched.add_job(my_job1, 'interval', seconds=5, id='my_job1') # 每隔5秒運行一次my_job2 sched.add_job(my_job2, 'cron', second='*/5', id='my_job2') sched.start()
裝飾器模式添加作業。
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime sched = BlockingScheduler() # 每隔5秒運行一次my_job1 @sched.scheduled_job('interval', seconds=5, id='my_job1') def my_job1(): print('my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 每隔5秒運行一次my_job2 @sched.scheduled_job('cron', second='*/5', id='my_job2') def my_job2(): print('my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched.start()
移除作業
沒有移除作業
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個作業']) # #如果有多個任務序列的話可以給每個任務設置ID號,可以根據ID號選擇清除對象,且remove放到start前才有效 sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個作業']) sched.start()
代碼執行結果:
使用remove() 移除作業
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個作業']) job.remove() # #如果有多個任務序列的話可以給每個任務設置ID號,可以根據ID號選擇清除對象,且remove放到start前才有效 sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個作業']) sched.start()
代碼執行結果:
使用remove_job()移除作業
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個作業']) # #如果有多個任務序列的話可以給每個任務設置ID號,可以根據ID號選擇清除對象,且remove放到start前才有效 sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個作業']) sched.remove_job('my_job_id') sched.start()
代碼執行結果:
APScheduler有3中內置的觸發器類型:
新建一個調度器(scheduler);
添加一個調度任務(job store);
運行調度任務。
代碼實現
# -*- coding:utf-8 -*- import time import datetime from apscheduler.schedulers.blocking import BlockingScheduler def my_job(text="默認值"): print(text, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) sched = BlockingScheduler() sched.add_job(my_job, 'interval', seconds=3, args=['3秒定時']) # 2018-3-17 00:00:00 執行一次,args傳遞一個text參數 sched.add_job(my_job, 'date', run_date=datetime.date(2019, 10, 17), args=['根據年月日定時執行']) # 2018-3-17 13:46:00 執行一次,args傳遞一個text參數 sched.add_job(my_job, 'date', run_date=datetime.datetime(2019, 10, 17, 14, 10, 0), args=['根據年月日時分秒定時執行']) # sched.start() """ interval 間隔調度,參數如下: weeks (int) – 間隔幾周 days (int) – 間隔幾天 hours (int) – 間隔幾小時 minutes (int) – 間隔幾分鐘 seconds (int) – 間隔多少秒 start_date (datetime|str) – 開始日期 end_date (datetime|str) – 結束日期 timezone (datetime.tzinfo|str) – 時區 """ """ cron參數如下: year (int|str) – 年,4位數字 month (int|str) – 月 (范圍1-12) day (int|str) – 日 (范圍1-31) week (int|str) – 周 (范圍1-53) day_of_week (int|str) – 周內第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun) hour (int|str) – 時 (范圍0-23) minute (int|str) – 分 (范圍0-59) second (int|str) – 秒 (范圍0-59) start_date (datetime|str) – 最早開始日期(包含) end_date (datetime|str) – 最晚結束時間(包含) timezone (datetime.tzinfo|str) – 指定時區 """ # my_job將會在6,7,8,11,12月的第3個周五的1,2,3點運行 sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 截止到2018-12-30 00:00:00,每周一到周五早上五點半運行job_function sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2018-12-31') # 表示2017年3月22日17時19分07秒執行該程序 sched.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7) # 表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序 sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00 sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') # 表示每5秒執行該程序一次,相當于interval 間隔調度中seconds = 5 sched.add_job(my_job, 'cron', second='*/5', args=['5秒定時']) sched.start()
cron表達式 | 參數 | 描述 |
---|---|---|
* | any | Fire on every value |
*/a | any | Fire every a values, starting from the minimum |
a-b | any | Fire on any value within the a-b range (a must be smaller than b) |
a-b/c | any | Fire every c values within the a-b range |
xth y | day | Fire on the x -th occurrence of weekday y within the month |
last x | day | Fire on the last occurrence of weekday x within the month |
last | day | Fire on the last day within the month |
x,y,z | any | Fire on any matching expression; can combine any number of any of the above expressions |
使用SQLAlchemy作業存儲器存放作業
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime import logging sched = BlockingScheduler() def my_job(): print('my_job is running, Now is %s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 使用sqlalchemy作業存儲器 # 根據自己電腦安裝的庫選擇用什么連接 ,如pymysql 其中:scrapy表示數據庫的名稱,操作數據庫之前應創建對應的數據庫 url = 'mysql+pymysql://root:123456@localhost:3306/scrapy?charset=utf8' sched.add_jobstore('sqlalchemy', url=url) # 添加作業 sched.add_job(my_job, 'interval', id='myjob', seconds=5) log = logging.getLogger('apscheduler.executors.default') log.setLevel(logging.INFO) # DEBUG # 設定日志格式 fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s') h = logging.StreamHandler() h.setFormatter(fmt) log.addHandler(h) sched.start()
暫停和恢復作業
# 暫停作業: apsched.job.Job.pause() apsched.schedulers.base.BaseScheduler.pause_job() # 恢復作業: apsched.job.Job.resume() apsched.schedulers.base.BaseScheduler.resume_job()
獲得job列表
get_jobs(),它會返回所有的job實例;
使用print_jobs()來輸出所有格式化的作業列表;
get_job(job_id=“任務ID”)獲取指定任務的作業列表。
代碼實現:
# -*- coding:utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job(text=""): print(text, 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) sched = BlockingScheduler() job = sched.add_job(my_job, 'interval', seconds=2, args=['第一個作業']) sched.add_job(my_job, 'interval', seconds=2, id='my_job_id', args=['第二個作業']) print(sched.get_jobs()) print(sched.get_job(job_id="my_job_id")) sched.print_jobs() sched.start()
關閉調度器
默認情況下調度器會等待所有正在運行的作業完成后,關閉所有的調度器和作業存儲。如果你不想等待,可以將wait選項設置為False。
sched.shutdown() sched.shutdown(wait=False)
以上就是關于“Python第三方模塊apscheduler安裝和使用的方法是什么”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。