您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關怎么在python中實現事件驅動event,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
一、連續運行型程序
我們要開發的交易系統就是屬于連續運行型程序,而這種程序根據其計算邏輯的運行機制不同,又可以粗略的分為時間驅動和事件驅動兩種。
1.1 時間驅動
時間驅動的程序邏輯相對容易設計,簡單來說就是讓電腦每隔一段時間自動做一些事情。這個事情本身可以很復雜、包括很多步驟,但這些步驟都是線性的,按照順序一步步執行下來。
from time import sleep def demo(): print('BB') while True: demo() sleep(1.0)
時間驅動的程序本質上就是每隔一段時間固定運行一次腳本。盡管腳本自身可以很長、包含非常多的步驟,但是我們可以看出這種程序的運行機制相對比較簡單、容易理解。
時間驅動的程序在量化交易方面還存在一些其他的缺點:如浪費CPU的計算資源、實現異步邏輯復雜度高等等。
1.2 事件驅動
與時間驅動對應的就是事件驅動的程序:當某個新的事件被推送到程序中時,程序立即調用和這個事件相對應的處理函數進行相關的操作。
舉個例子:
有些人喜歡的某個公眾號,然后去關注這個公眾號,哪天這個公眾號發布了篇新的文章,沒多久訂閱者就會在微信里收到這個公眾號推送的新消息,如果感興趣就打開來閱讀。
上面公眾號例子可以翻譯為,監聽器(訂閱者)監聽了(關注了)事件源(公眾號),當事件源的發送事件時(公眾號發布文章),所有監聽該事件的監聽器(訂閱者)都會接收到消息并作出響應(閱讀文章)。
公眾號為事件源
訂閱者為事件監聽器
訂閱者關注公眾號,相當于監聽器監聽了事件源
公眾號發布文章這個動作為發送事件
訂閱者收到事件后,做出閱讀文章的響應動作
事件驅動主要包含以下元素和操作函數:
1.2.1 元素
事件源
事件監聽器
事件對象
1.2.2 操作函數
監聽動作
發送事件
調用監聽器響應函數
現在用python實現來實現上述的業務邏輯,先看流程圖:
1.2.3 EventManager事件管理類代碼如下:
# -*- coding: utf-8 -*- """ Created on Tue Nov 13 13:51:31 2018 @author: 18665 """ # 系統模塊 from queue import Queue, Empty from threading import * ######################################################################## class EventManager: #---------------------------------------------------------------------- def __init__(self): """初始化事件管理器""" # 事件對象列表 self.__eventQueue = Queue() # 事件管理器開關 self.__active = False # 事件處理線程 self.__thread = Thread(target = self.__Run) self.count = 0 # 這里的__handlers是一個字典,用來保存對應的事件的響應函數 # 其中每個鍵對應的值是一個列表,列表中保存了對該事件監聽的響應函數,一對多 self.__handlers = {} #---------------------------------------------------------------------- def __Run(self): """引擎運行""" print('{}_run'.format(self.count)) while self.__active == True: try: # 獲取事件的阻塞時間設為1秒 event = self.__eventQueue.get(block = True, timeout = 1) self.__EventProcess(event) except Empty: pass self.count += 1 #---------------------------------------------------------------------- def __EventProcess(self, event): """處理事件""" print('{}_EventProcess'.format(self.count)) # 檢查是否存在對該事件進行監聽的處理函數 if event.type_ in self.__handlers: # 若存在,則按順序將事件傳遞給處理函數執行 for handler in self.__handlers[event.type_]: handler(event) self.count += 1 #---------------------------------------------------------------------- def Start(self): """啟動""" print('{}_Start'.format(self.count)) # 將事件管理器設為啟動 self.__active = True # 啟動事件處理線程 self.__thread.start() self.count += 1 #---------------------------------------------------------------------- def Stop(self): """停止""" print('{}_Stop'.format(self.count)) # 將事件管理器設為停止 self.__active = False # 等待事件處理線程退出 self.__thread.join() self.count += 1 #---------------------------------------------------------------------- def AddEventListener(self, type_, handler): """綁定事件和監聽器處理函數""" print('{}_AddEventListener'.format(self.count)) # 嘗試獲取該事件類型對應的處理函數列表,若無則創建 try: handlerList = self.__handlers[type_] except KeyError: handlerList = [] self.__handlers[type_] = handlerList # 若要注冊的處理器不在該事件的處理器列表中,則注冊該事件 if handler not in handlerList: handlerList.append(handler) print(self.__handlers) self.count += 1 #---------------------------------------------------------------------- def RemoveEventListener(self, type_, handler): """移除監聽器的處理函數""" print('{}_RemoveEventListener'.format(self.count)) try: handlerList = self.handlers[type_] # 如果該函數存在于列表中,則移除 if handler in handlerList: handlerList.remove(handler) # 如果函數列表為空,則從引擎中移除該事件類型 if not handlerList: del self.handlers[type_] except KeyError: pass self.count += 1 #---------------------------------------------------------------------- def SendEvent(self, event): """發送事件,向事件隊列中存入事件""" print('{}_SendEvent'.format(self.count)) self.__eventQueue.put(event) self.count += 1 ######################################################################## """事件對象""" class Event: def __init__(self, type_=None): self.type_ = type_ # 事件類型 self.dict = {} # 字典用于保存具體的事件數據
1.2.4 測試代碼
# -*- coding: utf-8 -*- """ Created on Tue Nov 13 13:50:45 2018 @author: 18665 """ # encoding: UTF-8 import sys from datetime import datetime from threading import * #sys.path.append('D:\\works\\TestFile') #print(sys.path) from eventManager import * #事件名稱 新文章 EVENT_ARTICAL = "Event_Artical" #事件源 公眾號 class PublicAccounts: def __init__(self,eventManager): self.__eventManager = eventManager def WriteNewArtical(self): #事件對象,寫了新文章 event = Event(type_=EVENT_ARTICAL) event.dict["artical"] = u'如何寫出更優雅的代碼\n' #發送事件 self.__eventManager.SendEvent(event) print(u'公眾號發送新文章\n') #監聽器 訂閱者 class Listener: def __init__(self,username): self.__username = username #監聽器的處理函數 讀文章 def ReadArtical(self,event): print(u'%s 收到新文章' % self.__username) print(u'正在閱讀新文章內容:%s' % event.dict["artical"]) """測試函數""" #-------------------------------------------------------------------- def test(): # 實例化監聽器 listner1 = Listener("thinkroom") #訂閱者1 listner2 = Listener("steve") #訂閱者2 # 實例化事件操作函數 eventManager = EventManager() #綁定事件和監聽器響應函數(新文章) eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical) eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical) # 啟動事件管理器,# 啟動事件處理線程 eventManager.Start() publicAcc = PublicAccounts(eventManager) timer = Timer(2, publicAcc.WriteNewArtical) timer.start() if __name__ == '__main__': test()
關于怎么在python中實現事件驅動event就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。