91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何在python 項目中利用Apscheduler實現一個定時任務

發布時間:2020-12-15 17:27:03 來源:億速云 閱讀:304 作者:Leah 欄目:開發技術

本篇文章給大家分享的是有關如何在python 項目中利用Apscheduler實現一個定時任務,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

先簡單介紹一下Apscheduler模塊包含的四種組件:

  • Trigger觸發器

  • Job作業

  • Excutor執行器

  • Scheduler調度器

大概了解了Apscheduler包含的幾種概念,現在先來看一下一個簡單的示例:

# -*- coding: utf-8 -*-

from apscheduler.schedulers.blocking import BlockingScheduler
import time


def hello():
  print(time.strftime("%c"))


if __name__ == "__main__":
  scheduler = BlockingScheduler()
  scheduler.add_job(hello, 'interval', seconds=5)
  scheduler.start()

示例的輸出:

Thu Dec 3 16:01:20 2020
Thu Dec 3 16:01:25 2020
Thu Dec 3 16:01:30 2020
Thu Dec 3 16:01:35 2020
Thu Dec 3 16:01:40 2020
..........

這個簡單的示例,我們用上面提到幾種組件分析一下運行邏輯:

  • 首先是Scheduler調度器,這個示例使用的BlockingScheduler調度器,在官方文檔中的解釋是,BlockingScheduler適合當你的這個定時任務程序是唯一運行的程序;換言之,則是BlockingScheduler調度器是一個阻塞調度器,當程序運行這種調度器,進程則會阻塞,無法執行其他操作;

  • 其次是Job作業和觸發器,這兩個放在一起講是因為,在定義作業的時候,你就需要選擇一個觸發器,這里選擇的是interval觸發器,這種觸發器會以固定時間間隔運行作業。換言之,為調度器添加一個hello的工作,并以每5秒的時間間隔執行任務。

  • 最后就是執行器,默認是ThreadPoolExcutor執行器,他們將任務中可調用對象交給線程池執行操作,等完成操作后,執行器會通知調度程序。

內置的三種Trigger觸發器類型:

  • date:特定時間僅運行一次作業

  • interval: 固定的時間間隔內運行一次作業

  • cron: 在一天內特定的時間定期運行作業

常見的Scheduler調度器:

  • BlockingScheduler: 調度程序是流程中唯一運行的東西

  • BackgroundScheduler: 調度程序在應用程序內部的后臺運行時使用

  • AsyncIOScheduler: 應用程序使用asyncio模塊

  • GeventScheduler: 應用程序使用gevent模塊

  • TornadoScheduler:構建Tornado應用程序時使用

  • TwistedScheduler: 構建Tornado應用程序時使用

  • QtScheduler: 在構建QT應用程序時使用

常見的JobStore:

  • MemoryJobStore

  • MongoDBJobStore

  • SQLAlchemyJobStore

  • RedisJobStore

進階使用

通過上面一個簡單的示例了解大概的工作流程,以及各個組件在整個流程中的作用,以下的示例是Flask Web框架結合使用Apscheduler定時器,定時執行任務。

# -*- coding: utf-8 -*-

from flask import Flask, Blueprint, request
from apscheduler.executors.pool import ThreadPoolExecutor 
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
import time

app = Flask(__name__)
executors = {"default": ThreadPoolExecutor(5)}
default_redis_jobstore = RedisJobStore(db=2, 
    jobs_key="apschedulers.default_jobs",
    run_times_key="apschedulers.default_run_times",
    host = '127.0.0.1',
    port = 6379
    )

scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(default_redis_jobstore)
scheduler.start()

def say_hello():
  print(time.strftime("%c"))


@app.route("/get_job", methods=['GET'])
def get_job():
  if scheduler.get_job("say_hello_test"):
    return "YES"
  else:
    return "NO"

@app.route("/start_job", methods=["GET"])
def start_job():
  if not scheduler.get_job("say_hello_test"):
    scheduler.add_job(say_hello, "interval", seconds=5, id="say_hello_test")
    return "Start Scuessfully!"
  else:
    return "Started Failed"
  
@app.route("/remove_job", methods=["GET"])
def remove_job():
  if scheduler.get_job("say_hello_test"):
    scheduler.remove_job("say_hello_test")
    return "Delete Successfully!"
  else:
    return "Delete Failed"


if __name__ == "__main__":
  app.run(host="127.0.0.1", port=8787, debug=True)
  • 先分析Jobstore,這里使用的是RedisJobstore,將任務序列化存入到Redis數據庫中。這里順便提一下,為什么需要設置作業存儲器,原因是當調度器程序崩潰時,仍然能夠保留作業,當然選擇什么作業存儲器,可以根據具體的工作場景,目前主流的mysql,mongodb,redis,SQLite基本都支持;

  • 然后再看看Scheduler,這里使用的時BackgroundScheduler,因為這里要求調度程序不能阻塞flask程序的正常接收請求,所以選在BackgrounScheduler讓它在開始執行任務時是在后臺運行的,不會阻塞主線程;

  • 最后看看工作的邏輯,這里get_job獲取作業的狀態,查看作業是否存在,start_job則是先判斷作業是否啟動,然后再決定啟動操作,remove_job則是停止作業。而這里的作業定義則是通過interval觸發器,每五秒執行一次say_hello任務;

以上就是如何在python 項目中利用Apscheduler實現一個定時任務,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

静宁县| 青龙| 广河县| 西充县| 新乡县| 岱山县| 乌拉特后旗| 丹凤县| 揭东县| 大埔区| 绍兴县| 潼南县| 卢龙县| 镇平县| 台江县| 文成县| 金寨县| 宿迁市| 汨罗市| 邛崃市| 雷波县| 安乡县| 缙云县| 哈尔滨市| 兴化市| 鄂托克旗| 扎鲁特旗| 太谷县| 青田县| 桃园市| 惠水县| 德州市| 云林县| 阳西县| 襄垣县| 龙川县| 游戏| 丹江口市| 双辽市| 深圳市| 长宁县|