您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Python怎么實現熱加載配置文件”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Python怎么實現熱加載配置文件”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
由于最近工作需求,需要在已有項目添加一個新功能,實現配置熱加載的功能。所謂的配置熱加載,也就是說當服務收到配置更新消息之后,我們不用重啟服務就可以使用最新的配置去執行任務。
下面我分別采用多進程、多線程、協程的方式去實現配置熱加載。
如果我們代碼實現上使用多進程, 主進程1來更新配置并發送指令,任務的調用是進程2,如何實現配置熱加載呢?
當主進程收到配置更新的消息之后(配置讀取是如何收到配置更新的消息的? 這里我們暫不討論), 主進程就向進子程1發送kill信號,子進程1收到kill的信號就退出,之后由信號處理函數來啟動一個新的進程,使用最新的配置文件來繼續執行任務。
main 函數
def main(): # 啟動一個進程執行任務 p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) # 注冊信號 processes["case100"] = p1 #將進程pid保存 num = 0 while True: # 模擬獲取配置更新 print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") sleep(2) if num == 4: kill_process(processes["case100"]) # kill 當前進程 if num == 8: kill_process(processes["case100"]) # kill 當前進程 if num == 12: kill_process(processes["case100"]) # kill 當前進程 num += 1
signal_handler 函數
def signal_handler(process: Process, func, signum, frame): # print(f"{signum=}") global counts if signum == 17: # 17 is SIGCHILD # 這個循環是為了忽略SIGTERM發出的信號,避免搶占了主進程發出的SIGCHILD for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process")
完整代碼如下
#! /usr/local/bin/python3.8 from multiprocessing import Process from typing import Dict import signal from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN import multiprocessing from multiprocessing import Process from typing import Callable from data import processes import sys from functools import partial import time processes: Dict[str, Process] = {} counts = 2 def run(process: Process): while True: print(f"{process} running...") time.sleep(1) def kill_process(process: Process): print(f"kill {process}") process.terminate() def monitor(process: Process, func: Callable): for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]: # SIGTERM is kill signal. # No SIGCHILD is not trigger singnal_handler, # No SIGINT is not handler ctrl+c, # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'> signal.signal(signame, partial(signal_handler, process, func)) def signal_handler(process: Process, func, signum, frame): print(f"{signum=}") global counts if signum == 17: # 17 is SIGTERM for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process") def main(): p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) processes["case100"] = p1 num = 0 while True: print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") time.sleep(2) if num == 4: kill_process(processes["case100"]) if num == 8: kill_process(processes["case100"]) if num == 12: kill_process(processes["case100"]) num += 1 if __name__ == '__main__': main()
執行結果如下
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} p1 running... p1 running... kill <Process name='Process-1' pid=2533 parent=2532 started> multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} signum=17 Launch a new process p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... kill <Process name='Process-2' pid=2577 parent=2532 started> signum=17 Launch a new process multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1 processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>} p3 running... p3 running... multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1
總結
好處:使用信號量可以處理多進程之間通信的問題。
壞處:代碼不好寫,寫出來代碼不好理解。信號量使用必須要很熟悉,不然很容易自己給自己寫了一個bug.(所有初學者慎用,老司機除外。)
還有一點不是特別理解的就是process.terminate()
發送出信號是SIGTERM
number是15,但是第一次signal_handler
收到信號卻是number=17,如果我要去處理15的信號,就會導致前一個進程不能kill掉的問題。歡迎有對信號量比較熟悉的大佬,前來指點迷津,不甚感謝。
實現邏輯是主進程1 更新配置并發送指令。進程2啟動調度任務。
這時候當主進程1更新好配置之后,發送指令給進程2,這時候的指令就是用Event一個異步事件通知。
直接上代碼
scheduler 函數
def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() # 設置set之后, is_set 為True print(f"Schedule will start ...") while task_schedule_event.is_set(): # is_set 為True的話,那么任務就會一直執行 run(case_configurations) print("Clearing all scheduling job ...")
event_scheduler 函數
def event_scheduler(case_config): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() # clear之后,is_set 為False print(f"Clear scheduler jobs ...") print(f"Schedule job ...")
完整代碼如下
import multiprocessing import time scheduler_notify_queue = multiprocessing.Queue() task_schedule_event = multiprocessing.Event() def run(case_configurations: str): print(f'{case_configurations} running...') time.sleep(3) def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() print(f"Schedule will start ...") while task_schedule_event.is_set(): run(case_configurations) print("Clearing all scheduling job ...") def event_scheduler(case_config: str): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() print(f"Clear scheduler jobs ...") print(f"Schedule job ...") def main(): scheduler_notify_queue.put('1') p = multiprocessing.Process(target=scheduler) p.start() count = 1 print(f'{count=}') while True: if count == 5: event_scheduler('100') if count == 10: event_scheduler('200') count += 1 time.sleep(1) if __name__ == '__main__': main()
執行結果如下
wait message... Got case configurations case_configurations='1'... Schedule will start ... 1 running... 1 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='100'... Schedule will start ... 100 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='200'... Schedule will start ... 200 running... 200 running...
總結
使用Event事件通知,代碼不易出錯,代碼編寫少,易讀。相比之前信號量的方法,推薦大家多使用這種方式。
使用多線程或協程的方式,其實和上述實現方式一致。唯一區別就是調用了不同庫中,queue
和 event
.
# threading scheduler_notify_queue = queue.Queue() task_schedule_event = threading.Event() # async scheduler_notify_queue = asyncio.Queue() task_schedule_event = asyncio.Event()
讀到這里,這篇“Python怎么實現熱加載配置文件”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。