91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python模擬事件觸發機制詳解

發布時間:2020-10-03 14:51:57 來源:腳本之家 閱讀:176 作者:魂~ 欄目:開發技術

本文實例為大家分享了python模擬事件觸發機制的具體代碼,供大家參考,具體內容如下

EventManager.py

# -*- encoding: UTF-8 -*-

# 系統模塊
from queue import Queue, Empty
from threading import *


class EventManager:
  def __init__(self):
    """初始化事件管理器"""
    # 事件對象列表
    self.__eventQueue = Queue()
    # 事件管理器開關
    self.__active = False
    # 事件處理線程
    self.__thread = Thread(target = self.__Run)

    # 這里的__handlers是一個字典,用來保存對應的事件的響應函數
    # 其中每個鍵對應的值是一個列表,列表中保存了對該事件監聽的響應函數,一對多
    self.__handlers = {}  # {事件類型:[處理事件的方法]}

  def __Run(self):
    """引擎運行"""
    while self.__active == True:
      try:
        # 獲取事件的阻塞時間設為1秒
        event = self.__eventQueue.get(block = True, timeout = 1) 
        self.__EventProcess(event)
      except Empty:
        pass

  def __EventProcess(self, event):
    """處理事件"""
    # 檢查是否存在對該事件進行監聽的處理函數
    if event.type_ in self.__handlers:
      # 若存在,則按順序將事件傳遞給處理函數執行
      for handler in self.__handlers[event.type_]:
        handler(event)

  def Start(self):
    """啟動"""
    # 將事件管理器設為啟動
    self.__active = True
    # 啟動事件處理線程
    self.__thread.start()

  def Stop(self):
    """停止"""
    # 將事件管理器設為停止
    self.__active = False
    # 等待事件處理線程退出
    self.__thread.join()

  def AddEventListener(self, type_, handler):
    """綁定事件和監聽器處理函數"""
    # 嘗試獲取該事件類型對應的處理函數列表,若無則創建
    try:
      handlerList = self.__handlers[type_]
    except KeyError:
      handlerList = []

    self.__handlers[type_] = handlerList
    # 若要注冊的處理器不在該事件的處理器列表中,則注冊該事件
    if handler not in handlerList:
      handlerList.append(handler)

  def RemoveEventListener(self, type_, handler):
    """移除監聽器的處理函數"""
    #讀者自己試著實現

  def SendEvent(self, event):
    """發送事件,向事件隊列中存入事件"""
    self.__eventQueue.put(event)

"""事件對象"""
class Event:
  def __init__(self, type_=None):
    self.type_ = type_   # 事件類型
    self.dict = {}     # 字典用于保存具體的事件數據

test.py

# -*- encoding: UTF-8 -*-

from threading import *
from EventManager import *
import time

#事件名稱 新文章
EVENT_ARTICAL = "Event_Artical"


#事件源 公眾號
class PublicAccounts:
  def __init__(self,eventManager):
    self.__eventManager = eventManager

  def WriteNewArtical(self):
    #事件對象,寫了新文章
    event = Event(type_=EVENT_ARTICAL)
    event.dict["artical"] = u'如何寫出更優雅的代碼\n'
    #發送事件
    self.__eventManager.SendEvent(event)
    print(u'公眾號發送新文章')


#監聽器 訂閱者
class Listener:
  def __init__(self,username):
    self.__username = username

  #監聽器的處理函數 讀文章
  def ReadArtical(self,event):
    print(u'%s 收到新文章' % self.__username)
    print(u'正在閱讀新文章內容:%s' % event.dict["artical"])


"""測試函數"""
def test():
  listner1 = Listener("thinkroom") #訂閱者1
  listner2 = Listener("steve")#訂閱者2

  eventManager = EventManager()

  #綁定事件和監聽器響應函數(新文章)
  eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
  eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
  eventManager.Start()

  publicAcc = PublicAccounts(eventManager)
  while True:
    publicAcc.WriteNewArtical()
    time.sleep(2)

if __name__ == '__main__':
  test()

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黄陵县| 普宁市| 商丘市| 朝阳县| 朝阳市| 天柱县| 海门市| 定州市| 威海市| 呼图壁县| 青岛市| 乌恰县| 石阡县| 永和县| 全州县| 宝鸡市| 宣武区| 淮安市| 济南市| 江陵县| 万宁市| 张北县| 焉耆| 大同市| 高阳县| 巴林左旗| 浙江省| 安丘市| 石狮市| 天镇县| 江安县| 怀化市| 喀喇沁旗| 前郭尔| 尼勒克县| 孟津县| 巴马| 常山县| 鹤庆县| 武义县| 会泽县|