您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Python怎么用sched模塊實現定時任務”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Python怎么用sched模塊實現定時任務”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
我們先來看下面這個案例,代碼如下
import sched import time def say_hello(name): print(f"Hello, world, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 1, say_hello, ("張三", )) scheduler.run()
那么上述的代碼中,第一步首先則是實例化一個定時器,通過如下的代碼
import sched scheduler = sched.scheduler()
接下來我們通過enter()
方法來執行定時任務的操作,其中的參數分別是延遲的時間、任務的優先級以及具體的執行函數和執行函數中的參數。像如上的代碼就會在延遲5秒鐘之后執行say_hello()
函數
當然要是延遲的時間相等的時候,我們可以設置任務執行的優先級來指定函數方法運行的順序,例如有如下的代碼
import sched import time def say_hello(name): print(f"Hello, world, {name}") def say_hello_2(name): print(f"Hello, {name}") scheduler = sched.scheduler(time.time, time.sleep) scheduler.enter(5, 2, say_hello, ("張三", )) scheduler.enter(5, 1, say_hello_2, ("李四", )) scheduler.run()
如上述代碼,盡管延遲的時間都是一樣的,但是say_hello()
方法的優先級明顯要比say_hello_2()
方法要低一些,因此后者會優先執行。
除了讓函數延遲執行,我們還可以讓其重復執行,具體這樣來操作,代碼如下
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) def repeat_task(): scheduler.enter(5, 1, say_hello, ()) scheduler.enter(5, 1, repeat_task, ()) repeat_task() scheduler.run()
這里我們新建了一個repeat_task()
自定義函數,調用了scheduler.enter()
方法5秒鐘執行一次之前定義的say_hello()
函數
同時我們還可以讓任務在指定的時間執行,這里用到scheduler.entertabs()
方法,代碼如下
import sched import time def say_hello(): print("Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 指定時間執行任務 specific_time = time.time() + 5 # 距離現在的5秒鐘之后執行 scheduler.enterabs(specific_time, 1, say_hello, ()) scheduler.run()
我們傳入其中參數使其在指定的時間,也就是距離當下的5秒鐘之后來執行任務
這里仍然是調用enter()
方法來運行多個任務,代碼如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任務一在兩秒鐘只有執行 scheduler.enter(2, 1, task_one, ()) # 任務二在五秒鐘之后運行 scheduler.enter(5, 1, task_two, ()) scheduler.run()
這里定義了兩個函數,task_one
和task_two
里面分是同樣的執行邏輯,打印出“Hello, world!”,然后task_one()
是在兩秒鐘之后執行而task_two()
則是在5秒鐘之后執行,兩者執行的優先級都是一樣的。
這回我們給task_one()
和task_two()
賦予不同的優先級,看一看執行的結果如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 優先級是1 scheduler.enter(2, 2, task_one, ()) # 優先級是2 scheduler.enter(5, 1, task_two, ()) scheduler.run()
output
Task One - Hello, world!
Task Two - Hello, world!
上述的代碼會在停頓兩秒之后運行task_one()
函數,再停頓3秒之后執行task_two()
函數
我們給定時任務添加上取消的方法,代碼如下
import sched import time def task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!") scheduler = sched.scheduler(time.time, time.sleep) # 任務一在兩秒鐘只有執行 task_one_event = scheduler.enter(2, 1, task_one, ()) # 任務二在五秒鐘之后運行 task_two_event = scheduler.enter(5, 1, task_two, ()) # 取消執行task_one scheduler.cancel(task_one_event) scheduler.run()
我們將兩秒鐘之后執行的task_one()
方法給取消掉,最后就只執行了task_two()
方法,也就打印出來“Task Two - Hello, world!”
我們來寫一個備份的腳本,在每天固定的時間將文件備份,代碼如下
import sched import time import shutil def backup_files(): source = '路徑/files' destination = '路徑二' shutil.copytree(source, destination) def schedule_backup(): # 創建新的定時器 scheduler = sched.scheduler(time.time, time.sleep) # 備份程序在每天的1點來執行 backup_time = time.strptime('01:00:00', '%H:%M:%S') backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ()) # 開啟定時任務 scheduler.run() schedule_backup()
我們通過shutil
模塊當中的copytree()
方法來執行拷貝文件,然后在每天的1點準時執行
最后我們來執行定時分發郵件的程序,代碼如下
import sched import time import smtplib from email.mime.text import MIMEText def send_email(subject, message, from_addr, to_addr, smtp_server): # 郵件的主體信息 email = MIMEText(message) email['Subject'] = subject email['From'] = from_addr email['To'] = to_addr # 發郵件 with smtplib.SMTP(smtp_server) as server: server.send_message(email) def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time): # 創建定時任務的示例 scheduler = sched.scheduler(time.time, time.sleep) # 定時郵件 scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server)) # 開啟定時器 scheduler.run() subject = 'Test Email' message = 'This is a test email' from_addr = 'test@example.com' to_addr = 'test@example.com' smtp_server = 'smtp.test.com' scheduled_time = time.time() + 60 # 一分鐘之后執行程序 send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)
讀到這里,這篇“Python怎么用sched模塊實現定時任務”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。