您好,登錄后才能下訂單哦!
本文實例為大家分享了python模擬事件觸發機制的具體代碼,供大家參考,具體內容如下
EventManager.py
# -*- encoding: UTF-8 -*- # 系統模塊 from queue import Queue, Empty from threading import * class EventManager: def __init__(self): """初始化事件管理器""" # 事件對象列表 self.__eventQueue = Queue() # 事件管理器開關 self.__active = False # 事件處理線程 self.__thread = Thread(target = self.__Run) # 這里的__handlers是一個字典,用來保存對應的事件的響應函數 # 其中每個鍵對應的值是一個列表,列表中保存了對該事件監聽的響應函數,一對多 self.__handlers = {} # {事件類型:[處理事件的方法]} def __Run(self): """引擎運行""" while self.__active == True: try: # 獲取事件的阻塞時間設為1秒 event = self.__eventQueue.get(block = True, timeout = 1) self.__EventProcess(event) except Empty: pass def __EventProcess(self, event): """處理事件""" # 檢查是否存在對該事件進行監聽的處理函數 if event.type_ in self.__handlers: # 若存在,則按順序將事件傳遞給處理函數執行 for handler in self.__handlers[event.type_]: handler(event) def Start(self): """啟動""" # 將事件管理器設為啟動 self.__active = True # 啟動事件處理線程 self.__thread.start() def Stop(self): """停止""" # 將事件管理器設為停止 self.__active = False # 等待事件處理線程退出 self.__thread.join() def AddEventListener(self, type_, handler): """綁定事件和監聽器處理函數""" # 嘗試獲取該事件類型對應的處理函數列表,若無則創建 try: handlerList = self.__handlers[type_] except KeyError: handlerList = [] self.__handlers[type_] = handlerList # 若要注冊的處理器不在該事件的處理器列表中,則注冊該事件 if handler not in handlerList: handlerList.append(handler) def RemoveEventListener(self, type_, handler): """移除監聽器的處理函數""" #讀者自己試著實現 def SendEvent(self, event): """發送事件,向事件隊列中存入事件""" self.__eventQueue.put(event) """事件對象""" class Event: def __init__(self, type_=None): self.type_ = type_ # 事件類型 self.dict = {} # 字典用于保存具體的事件數據
test.py
# -*- encoding: UTF-8 -*- from threading import * from EventManager import * import time #事件名稱 新文章 EVENT_ARTICAL = "Event_Artical" #事件源 公眾號 class PublicAccounts: def __init__(self,eventManager): self.__eventManager = eventManager def WriteNewArtical(self): #事件對象,寫了新文章 event = Event(type_=EVENT_ARTICAL) event.dict["artical"] = u'如何寫出更優雅的代碼\n' #發送事件 self.__eventManager.SendEvent(event) print(u'公眾號發送新文章') #監聽器 訂閱者 class Listener: def __init__(self,username): self.__username = username #監聽器的處理函數 讀文章 def ReadArtical(self,event): print(u'%s 收到新文章' % self.__username) print(u'正在閱讀新文章內容:%s' % event.dict["artical"]) """測試函數""" def test(): listner1 = Listener("thinkroom") #訂閱者1 listner2 = Listener("steve")#訂閱者2 eventManager = EventManager() #綁定事件和監聽器響應函數(新文章) eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical) eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical) eventManager.Start() publicAcc = PublicAccounts(eventManager) while True: publicAcc.WriteNewArtical() time.sleep(2) if __name__ == '__main__': test()
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。