您好,登錄后才能下訂單哦!
如何分析Python全棧中的隊列,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
知識點:
lock.acquire()# 上鎖 lock.release()# 解鎖 #同一時間允許一個進程上一把鎖 就是Lock 加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲速度卻保證了數據安全。 #同一時間允許多個進程上多把鎖 就是[信號量Semaphore] 信號量是鎖的變形: 實際實現是 計數器 + 鎖,同時允許多個進程上鎖 # 互斥鎖Lock : 互斥鎖就是進程的互相排斥,誰先搶到資源,誰就上鎖改資源內容,為了保證數據的同步性 # 注意:多個鎖一起上,不開鎖,會造成死鎖.上鎖和解鎖是一對.
程序實現:
# ### 鎖 lock 互斥鎖 from multiprocessing import Process,Lock """ 上鎖和解鎖是一對, 連續上鎖不解鎖是死鎖 ,只有在解鎖的狀態下,其他進程才有機會上鎖 """ """ # 創建一把鎖 lock = Lock() # 上鎖 lock.acquire() # lock.acquire() # 連續上鎖,造成了死鎖現象; print("我在裊裊炊煙 .. 你在焦急等待 ... 廁所進行時 ... ") # 解鎖 lock.release() """ # ### 12306 搶票軟件 import json,time,random # 1.讀寫數據庫當中的票數 def wr_info(sign , dic=None): if sign == "r": with open("ticket",mode="r",encoding="utf-8") as fp: dic = json.load(fp) return dic elif sign == "w": with open("ticket",mode="w",encoding="utf-8") as fp: json.dump(dic,fp) # dic = wr_info("w",dic={"count":0}) # print(dic , type(dic) ) # 2.執行搶票的方法 def get_ticket(person): # 先獲取數據庫中實際票數 dic = wr_info("r") # 模擬一下網絡延遲 time.sleep(random.uniform(0.1,0.7)) # 判斷票數 if dic["count"] > 0: print("{}搶到票了".format(person)) # 搶到票后,讓當前票數減1 dic["count"] -= 1 # 更新數據庫中的票數 wr_info("w",dic) else: print("{}沒有搶到票哦".format(person)) # 3.對搶票和讀寫票數做一個統一的調用 def main(person,lock): # 查看剩余票數 dic = wr_info("r") print("{}查看票數剩余: {}".format(person,dic["count"])) # 上鎖 lock.acquire() # 開始搶票 get_ticket(person) # 解鎖 lock.release() if __name__ == "__main__": lock = Lock() lst = ["梁新宇","康裕康","張保張","于朝志","薛宇健","韓瑞瑞","假摔先","劉子濤","黎明輝","趙鳳勇"] for i in lst: p = Process( target=main,args=( i , lock ) ) p.start() """ 創建進程,開始搶票是異步并發程序 直到開始搶票的時候,變成同步程序, 先搶到鎖資源的先執行,后搶到鎖資源的后執行; 按照順序依次執行;是同步程序; 搶票的時候,變成同步程序,好處是可以等到數據修改完成之后,在讓下一個人搶,保證數據不亂。 如果不上鎖的話,只剩一張票的時候,那么所有的人都能搶到票,因為程序執行的速度太快,所以接近同步進程,導致數據也不對。 """
ticket文件
{"count": 0}
# ### 信號量 Semaphore 本質上就是鎖,只不過是多個進程上多把鎖,可以控制上鎖的數量 """Semaphore = lock + 數量 """ from multiprocessing import Semaphore , Process import time , random """ # 同一時間允許多個進程上5把鎖 sem = Semaphore(5) #上鎖 sem.acquire() print("執行操作 ... ") #解鎖 sem.release() """ def singsong_ktv(person,sem): # 上鎖 sem.acquire() print("{}進入了唱吧ktv , 正在唱歌 ~".format(person)) # 唱一段時間 time.sleep( random.randrange(4,8) ) # 4 5 6 7 print("{}離開了唱吧ktv , 唱完了 ... ".format(person)) # 解鎖 sem.release() if __name__ == "__main__": sem = Semaphore(5) lst = ["趙鳳勇" , "沈思雨", "趙萬里" , "張宇" , "假率先" , "孫杰龍" , "陳璐" , "王雨涵" , "楊元濤" , "劉一鳳" ] for i in lst: p = Process(target=singsong_ktv , args = (i , sem) ) p.start() """ # 總結: Semaphore 可以設置上鎖的數量 , 同一時間上多把鎖 創建進程時,是異步并發,執行任務時,是同步程序; """ # 趙萬里進入了唱吧ktv , 正在唱歌 ~ # 趙鳳勇進入了唱吧ktv , 正在唱歌 ~ # 張宇進入了唱吧ktv , 正在唱歌 ~ # 沈思雨進入了唱吧ktv , 正在唱歌 ~ # 孫杰龍進入了唱吧ktv , 正在唱歌 ~
# ### 事件 (Event) """ # 阻塞事件 : e = Event()生成事件對象e e.wait()動態給程序加阻塞 , 程序當中是否加阻塞完全取決于該對象中的is_set() [默認返回值是False] # 如果是True 不加阻塞 # 如果是False 加阻塞 # 控制這個屬性的值 # set()方法 將這個屬性的值改成True # clear()方法 將這個屬性的值改成False # is_set()方法 判斷當前的屬性是否為True (默認上來是False) """ from multiprocessing import Process,Event import time , random # 1 ''' e = Event() # 默認屬性值是False. print(e.is_set()) # 判斷內部成員屬性是否是False e.wait() # 如果是False , 代碼程序阻塞 print(" 代碼執行中 ... ") ''' # 2 ''' e = Event() # 將這個屬性的值改成True e.set() # 判斷內部成員屬性是否是True e.wait() # 如果是True , 代碼程序不阻塞 print(" 代碼執行中 ... ") # 將這個屬性的值改成False e.clear() e.wait() print(" 代碼執行中 .... 2") ''' # 3 """ e = Event() # wait(3) 代表最多等待3秒; e.wait(3) print(" 代碼執行中 .... 3") """ # ### 模擬經典紅綠燈效果 # 紅綠燈切換 def traffic_light(e): print("紅燈亮") while True: if e.is_set(): # 綠燈狀態 -> 切紅燈 time.sleep(1) print("紅燈亮") # True => False e.clear() else: # 紅燈狀態 -> 切綠燈 time.sleep(1) print("綠燈亮") # False => True e.set() # e = Event() # traffic_light(e) # 車的狀態 def car(e,i): # 判斷是否是紅燈,如果是加上wait阻塞 if not e.is_set(): print("car{} 在等待 ... ".format(i)) e.wait() # 否則不是,代表綠燈通行; print("car{} 通行了 ... ".format(i)) """ # 1.全國紅綠燈 if __name__ == "__main__": e = Event() # 創建交通燈 p1 = Process(target=traffic_light , args=(e,)) p1.start() # 創建小車進程 for i in range(1,21): time.sleep(random.randrange(2)) p2 = Process(target=car , args=(e,i)) p2.start() """ # 2.包頭紅綠燈,沒有車的時候,把紅綠燈關了,省電; if __name__ == "__main__": lst = [] e = Event() # 創建交通燈 p1 = Process(target=traffic_light , args=(e,)) # 設置紅綠燈為守護進程 p1.daemon = True p1.start() # 創建小車進程 for i in range(1,21): time.sleep(random.randrange(2)) p2 = Process(target=car , args=(e,i)) lst.append(p2) p2.start() # 讓所有的小車全部跑完,把紅綠燈炸飛 print(lst) for i in lst: i.join() print("關閉成功 .... ")
事件知識點:
# 阻塞事件 : e = Event()生成事件對象e e.wait()動態給程序加阻塞 , 程序當中是否加阻塞完全取決于該對象中的is_set() [默認返回值是False] # 如果是True 不加阻塞 # 如果是False 加阻塞 # 控制這個屬性的值 # set()方法 將這個屬性的值改成True # clear()方法 將這個屬性的值改成False # is_set()方法 判斷當前的屬性是否為True (默認上來是False)
# ### 進程隊列(進程與子進程是相互隔離的,如果兩者想要進行通信,可以利用隊列實現) from multiprocessing import Process,Queue # 引入線程模塊; 為了捕捉queue.Empty異常; import queue # 1.基本語法 """順序: 先進先出,后進后出""" # 創建進程隊列 q = Queue() # put() 存放 q.put(1) q.put(2) q.put(3) # get() 獲取 """在獲取不到任何數據時,會出現阻塞""" # print( q.get() ) # print( q.get() ) # print( q.get() ) # print( q.get() ) # get_nowait() 拿不到數據報異常 """[windows]效果正常 [linux]不兼容""" try: print( q.get_nowait() ) print( q.get_nowait() ) print( q.get_nowait() ) print( q.get_nowait() ) except : #queue.Empty pass # put_nowait() 非阻塞版本的put # 設置當前隊列最大長度為3 ( 元素個數最多是3個 ) """在指定隊列長度的情況下,如果塞入過多的數據,會導致阻塞""" # q2 = Queue(3) # q2.put(111) # q2.put(222) # q2.put(333) # q2.put(444) """使用put_nowait 在隊列已滿的情況下,塞入數據會直接報錯""" q2 = Queue(3) try: q2.put_nowait(111) q2.put_nowait(222) q2.put_nowait(333) q2.put_nowait(444) except: pass # 2.進程間的通信IPC def func(q): # 2.子進程獲取主進程存放的數據 res = q.get() print(res,"<22>") # 3.子進程中存放數據 q.put("劉一縫") if __name__ == "__main__": q3 = Queue() p = Process(target=func,args=(q3,)) p.start() # 1.主進程存入數據 q3.put("趙鳳勇") # 為了等待子進程把數據存放隊列后,主進程在獲取數據; p.join() # 4.主進程獲取子進程存放的數據 print(q3.get() , "<33>")
小提示: 一般主進程比子進程執行的快一些
隊列知識點:
# 進程間通信 IPC # IPC Inter-Process Communication # 實現進程之間通信的兩種機制: # 管道 Pipe # 隊列 Queue # put() 存放 # get() 獲取 # get_nowait() 拿不到報異常 # put_nowait() 非阻塞版本的put q.empty() 檢測是否為空 (了解) q.full() 檢測是否已經存滿 (了解)
# ### 生產者和消費者模型 """ # 爬蟲案例 1號進程負責抓取其他多個網站中相關的關鍵字信息,正則匹配到隊列中存儲(mysql) 2號進程負責把隊列中的內容拿取出來,將經過修飾后的內容布局到自個的網站中 1號進程可以理解成生產者 2號進程可以理解成消費者 從程序上來看 生產者負責存儲數據 (put) 消費者負責獲取數據 (get) 生產者和消費者比較理想的模型: 生產多少,消費多少 . 生產數據的速度 和 消費數據的速度 相對一致 """ # 1.基礎版生產著消費者模型 """問題 : 當前模型,程序不能正常終止 """ """ from multiprocessing import Process,Queue import time,random # 消費者模型 def consumer(q,name): while True: # 獲取隊列中的數據 food = q.get() time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生產者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產的數據 print( "{}生產了{}".format( name , food+str(i) ) ) # 存儲生產的數據在隊列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "趙萬里") ) p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) ) p1.start() p2.start() p2.join() """ # 2.優化模型 """特點 : 手動在隊列的最后,加入標識None, 終止消費者模型""" """ from multiprocessing import Process,Queue import time,random # 消費者模型 def consumer(q,name): while True: # 獲取隊列中的數據 food = q.get() # 如果最后一次獲取的數據是None , 代表隊列已經沒有更多數據可以獲取了,終止循環; if food is None: break time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生產者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產的數據 print( "{}生產了{}".format( name , food+str(i) ) ) # 存儲生產的數據在隊列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "趙萬里") ) p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) ) p1.start() p2.start() p2.join() q.put(None) # 香蕉0 香蕉1 香蕉2 香蕉3 香蕉4 None """ # 3.多個生產者和消費者 """ 問題 : 雖然可以解決問題 , 但是需要加入多個None , 代碼冗余""" from multiprocessing import Process,Queue import time,random # 消費者模型 def consumer(q,name): while True: # 獲取隊列中的數據 food = q.get() # 如果最后一次獲取的數據是None , 代表隊列已經沒有更多數據可以獲取了,終止循環; if food is None: break time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生產者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產的數據 print( "{}生產了{}".format( name , food+str(i) ) ) # 存儲生產的數據在隊列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "趙萬里") ) p1_1 = Process( target=consumer,args=(q , "趙世超") ) p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) ) p2_2 = Process( target=producer,args=(q , "趙鳳勇" , "大蒜" ) ) p1.start() p1_1.start() p2.start() p2_2.start() # 等待所有數據填充完畢 p2.join() p2_2.join() # 把None 關鍵字放在整個隊列的最后,作為跳出消費者循環的標識符; q.put(None) # 給第一個消費者加一個None , 用來終止 q.put(None) # 給第二個消費者加一個None , 用來終止 # ...
# ### JoinableQueue 隊列 """ put 存放 get 獲取 task_done 計算器屬性值-1 join 配合task_done來使用 , 阻塞 put 一次數據, 隊列的內置計數器屬性值+1 get 一次數據, 通過task_done讓隊列的內置計數器屬性值-1 join: 會根據隊列計數器的屬性值來判斷是否阻塞或者放行 隊列計數器屬性是 等于 0 , 代碼不阻塞放行 隊列計數器屬性是 不等 0 , 意味著代碼阻塞 """ from multiprocessing import JoinableQueue jq = JoinableQueue() jq.put("王同培") # +1 jq.put("王偉") # +2 print(jq.get()) print(jq.get()) # print(jq.get()) 阻塞 jq.task_done() # -1 jq.task_done() # -1 jq.join() print(" 代碼執行結束 .... ") # ### 2.使用JoinableQueue 改造生產著消費者模型 from multiprocessing import Process,Queue import time,random # 消費者模型 def consumer(q,name): while True: # 獲取隊列中的數據 food = q.get() time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 讓隊列的內置計數器屬性-1 q.task_done() # 生產者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生產的數據 print( "{}生產了{}".format( name , food+str(i) ) ) # 存儲生產的數據在隊列中 q.put(food+str(i)) if __name__ == "__main__": q = JoinableQueue() p1 = Process( target=consumer,args=(q , "趙萬里") ) p2 = Process( target=producer,args=(q , "趙沈陽" , "香蕉" ) ) p1.daemon = True p1.start() p2.start() p2.join() # 必須等待隊列中的所有數據全部消費完畢,再放行 q.join() print("程序結束 ... ")
ipc可以讓進程之間進行通信 lock其實也讓進程之間進行通信了,多個進程去搶一把鎖,一個進程搶到 這 把鎖了,其他的進程就搶不到這把鎖了,進程通過socket底層互相發 消息,告訴其他進程當前狀態已經被鎖定了,不能再強了。 進程之間默認是隔離的,不能通信的,如果想要通信,必須通過ipc的 方式(lock、joinablequeue、Manager)
看完上述內容,你們掌握如何分析Python全棧中的隊列的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。