您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關如何在python 項目中利用Apscheduler實現一個定時任務,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
Trigger觸發器
Job作業
Excutor執行器
Scheduler調度器
大概了解了Apscheduler包含的幾種概念,現在先來看一下一個簡單的示例:
# -*- coding: utf-8 -*- from apscheduler.schedulers.blocking import BlockingScheduler import time def hello(): print(time.strftime("%c")) if __name__ == "__main__": scheduler = BlockingScheduler() scheduler.add_job(hello, 'interval', seconds=5) scheduler.start()
示例的輸出:
Thu Dec 3 16:01:20 2020 Thu Dec 3 16:01:25 2020 Thu Dec 3 16:01:30 2020 Thu Dec 3 16:01:35 2020 Thu Dec 3 16:01:40 2020 ..........
這個簡單的示例,我們用上面提到幾種組件分析一下運行邏輯:
首先是Scheduler調度器,這個示例使用的BlockingScheduler調度器,在官方文檔中的解釋是,BlockingScheduler適合當你的這個定時任務程序是唯一運行的程序;換言之,則是BlockingScheduler調度器是一個阻塞調度器,當程序運行這種調度器,進程則會阻塞,無法執行其他操作;
其次是Job作業和觸發器,這兩個放在一起講是因為,在定義作業的時候,你就需要選擇一個觸發器,這里選擇的是interval觸發器,這種觸發器會以固定時間間隔運行作業。換言之,為調度器添加一個hello的工作,并以每5秒的時間間隔執行任務。
最后就是執行器,默認是ThreadPoolExcutor執行器,他們將任務中可調用對象交給線程池執行操作,等完成操作后,執行器會通知調度程序。
內置的三種Trigger觸發器類型:
date:特定時間僅運行一次作業
interval: 固定的時間間隔內運行一次作業
cron: 在一天內特定的時間定期運行作業
常見的Scheduler調度器:
BlockingScheduler: 調度程序是流程中唯一運行的東西
BackgroundScheduler: 調度程序在應用程序內部的后臺運行時使用
AsyncIOScheduler: 應用程序使用asyncio模塊
GeventScheduler: 應用程序使用gevent模塊
TornadoScheduler:構建Tornado應用程序時使用
TwistedScheduler: 構建Tornado應用程序時使用
QtScheduler: 在構建QT應用程序時使用
常見的JobStore:
通過上面一個簡單的示例了解大概的工作流程,以及各個組件在整個流程中的作用,以下的示例是Flask Web框架結合使用Apscheduler定時器,定時執行任務。
# -*- coding: utf-8 -*- from flask import Flask, Blueprint, request from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.redis import RedisJobStore import time app = Flask(__name__) executors = {"default": ThreadPoolExecutor(5)} default_redis_jobstore = RedisJobStore(db=2, jobs_key="apschedulers.default_jobs", run_times_key="apschedulers.default_run_times", host = '127.0.0.1', port = 6379 ) scheduler = BackgroundScheduler(executors=executors) scheduler.add_jobstore(default_redis_jobstore) scheduler.start() def say_hello(): print(time.strftime("%c")) @app.route("/get_job", methods=['GET']) def get_job(): if scheduler.get_job("say_hello_test"): return "YES" else: return "NO" @app.route("/start_job", methods=["GET"]) def start_job(): if not scheduler.get_job("say_hello_test"): scheduler.add_job(say_hello, "interval", seconds=5, id="say_hello_test") return "Start Scuessfully!" else: return "Started Failed" @app.route("/remove_job", methods=["GET"]) def remove_job(): if scheduler.get_job("say_hello_test"): scheduler.remove_job("say_hello_test") return "Delete Successfully!" else: return "Delete Failed" if __name__ == "__main__": app.run(host="127.0.0.1", port=8787, debug=True)
先分析Jobstore,這里使用的是RedisJobstore,將任務序列化存入到Redis數據庫中。這里順便提一下,為什么需要設置作業存儲器,原因是當調度器程序崩潰時,仍然能夠保留作業,當然選擇什么作業存儲器,可以根據具體的工作場景,目前主流的mysql,mongodb,redis,SQLite基本都支持;
然后再看看Scheduler,這里使用的時BackgroundScheduler,因為這里要求調度程序不能阻塞flask程序的正常接收請求,所以選在BackgrounScheduler讓它在開始執行任務時是在后臺運行的,不會阻塞主線程;
最后看看工作的邏輯,這里get_job獲取作業的狀態,查看作業是否存在,start_job則是先判斷作業是否啟動,然后再決定啟動操作,remove_job則是停止作業。而這里的作業定義則是通過interval觸發器,每五秒執行一次say_hello任務;
以上就是如何在python 項目中利用Apscheduler實現一個定時任務,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。