您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關怎么在Python中利用APScheduler實現一個定時任務,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
一、安裝APScheduler
pip install apscheduler
二、基本概念
APScheduler有四大組件:
1、觸發器 triggers :
觸發器包含調度邏輯。每個作業都有自己的觸發器,用于確定下一個任務何時運行。除了初始配置之外,觸發器是完全無狀態的。
有三種內建的trigger:
(1)date: 特定的時間點觸發
(2)interval: 固定時間間隔觸發
(3)cron: 在特定時間周期性地觸發
2、任務儲存器 job stores:用于存放任務,把任務存放在內存(為默認MemoryJobStore)或數據庫中。
3、執行器 executors: 執行器是將任務提交到線程池或進程池中運行,當任務完成時,執行器通知調度器觸發相應的事件。
4、調度器 schedulers: 把上方三個組件作為參數,通過創建調度器實例來運行
根據開發需求選擇相應的組件,下面是不同的調度器組件:
BlockingScheduler 阻塞式調度器:適用于只跑調度器的程序。
BackgroundScheduler 后臺調度器:適用于非阻塞的情況,調度器會在后臺獨立運行。
AsyncIOScheduler AsyncIO調度器,適用于應用使用AsnycIO的情況。
GeventScheduler Gevent調度器,適用于應用通過Gevent的情況。
TornadoScheduler Tornado調度器,適用于構建Tornado應用。
TwistedScheduler Twisted調度器,適用于構建Twisted應用。
QtScheduler Qt調度器,適用于構建Qt應用。
三、使用步驟
1、新建一個調度器schedulers
2、添加調度任務
3、運行調度任務
四、使用實例
1、觸發器date
特定的時間點觸發,只執行一次。參數如下:
參數 | 說明 |
run_date (datetime 或 str) | 作業的運行日期或時間 |
timezone (datetime.tzinfo 或 str) | 指定時區 |
使用例子:
from datetime import datetime from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler def job(text): print(text) scheduler = BlockingScheduler() # 在 2019-8-30 運行一次 job 方法 scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1']) # 在 2019-8-30 01:00:00 運行一次 job 方法 scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2']) # 在 2019-8-30 01:00:01 運行一次 job 方法 scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3']) scheduler.start()
2、觸發器interval
固定時間間隔觸發。參數如下:
參數 | 說明 |
weeks (int) | 間隔幾周 |
days (int) | 間隔幾天 |
hours (int) | 間隔幾小時 |
minutes (int) | 間隔幾分鐘 |
seconds (int) | 間隔多少秒 |
start_date (datetime 或 str) | 開始日期 |
end_date (datetime 或 str) | 結束日期 |
timezone (datetime.tzinfo 或str) |
使用例子:
import time from apscheduler.schedulers.blocking import BlockingScheduler def job(text): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('{} --- {}'.format(text, t)) scheduler = BlockingScheduler() # 每隔 1分鐘 運行一次 job 方法 scheduler.add_job(job, 'interval', minutes=1, args=['job1']) # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期間,每隔1分30秒 運行一次 job 方法 scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2']) scheduler.start() ''' 運行結果: job2 --- 2019-08-29 22:15:00 job1 --- 2019-08-29 22:15:46 job2 --- 2019-08-29 22:16:30 job1 --- 2019-08-29 22:16:46 job1 --- 2019-08-29 22:17:46 ...余下省略... '''
3、觸發器cron
在特定時間周期性地觸發。參數如下:
參數 | 說明 |
year (int 或 str) | 年,4位數字 |
month (int 或 str) | 月 (范圍1-12) |
day (int 或 str) | 日 (范圍1-31) |
week (int 或 str) | 周 (范圍1-53) |
day_of_week (int 或 str) | 周內第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
hour (int 或 str) | 時 (范圍0-23) |
minute (int 或 str) | 分 (范圍0-59) |
second (int 或 str) | 秒 (范圍0-59) |
start_date (datetime 或 str) | 最早開始日期(包含) |
end_date (datetime 或 str) | 最晚結束時間(包含) |
timezone (datetime.tzinfo 或str) | 指定時區 |
這些參數支持算數表達式,取值格式有如下:
使用例子:
import time from apscheduler.schedulers.blocking import BlockingScheduler def job(text): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('{} --- {}'.format(text, t)) scheduler = BlockingScheduler() # 在每天22點,每隔 1分鐘 運行一次 job 方法 scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1']) # 在每天22和23點的25分,運行一次 job 方法 scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2']) scheduler.start() ''' 運行結果: job1 --- 2019-08-29 22:25:00 job2 --- 2019-08-29 22:25:00 job1 --- 2019-08-29 22:26:00 job1 --- 2019-08-29 22:27:00 ...余下省略... '''
4、通過裝飾器scheduled_job()添加方法
添加任務的方法有兩種:
(1)通過調用add_job()---見上面1至3代碼
(2)通過裝飾器scheduled_job():
第一種方法是最常用的方法。第二種方法主要是方便地聲明在應用程序運行時不會更改的任務。該 add_job()方法返回一個apscheduler.job.Job實例,可以使用該實例稍后修改或刪除該任務。
import time from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() @scheduler.scheduled_job('interval', seconds=5) def job1(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job1 --- {}'.format(t)) @scheduler.scheduled_job('cron', second='*/7') def job2(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job2 --- {}'.format(t)) scheduler.start() ''' 運行結果: job2 --- 2019-08-29 22:36:35 job1 --- 2019-08-29 22:36:37 job2 --- 2019-08-29 22:36:42 job1 --- 2019-08-29 22:36:42 job1 --- 2019-08-29 22:36:47 job2 --- 2019-08-29 22:36:49 ...余下省略... '''
關于怎么在Python中利用APScheduler實現一個定時任務就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。