您好,登錄后才能下訂單哦!
先提到線程同步是個什么,概念是什么,就是線程通訊中通過使用某種技術訪問數據時,而一旦此線程訪問到,其他線程也就不能訪問了,直到該線程對數據完成操作才結束。
Event事件是一種實現方式:通過內部的標記看看是不是變化,也就是true or false了,
將set(),clear(),is_set(),為true,wait(timeout=None)此種設置true的時長,等到返回false,不等到超時返回false,無線等待為None,
來看一個wait的使用:
from threading import Event, Thread
import logging
logging.basicConfig(level=logging.INFO)
def A(event:Event, interval:int):
while not event.wait(interval): # 要么true or false
logging.info('hello')
e = Event()
Thread(target=A, args=(e, 3)).start()
e.wait(8)
e.set()
print('end--------------')
使用鎖Lock解決數據資源在爭搶,從而使資源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默認阻塞,阻塞設置超時時間,非阻塞,timeout禁止使用,成功獲取鎖,返回True,否則False。
有阻塞就有釋放 ,解開鎖,release(),從線程釋放鎖,上鎖的鎖重置為unloced未上鎖調用,拋出RuntimeError異常。
import threading
from threading import Thread, Lock
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
lock = Lock()
def worker(count=10):
logging.info("I'm working for U.")
flag = False
while True:
lock.acquire() # 獲取鎖
if len(cups) >= count:
flag = True
time.sleep(0.0001)
if not flag:
cups.append(1)
if flag:
break
logging.info('I finished. cups = {}'.format(len(cups)))
for _ in range(10):
Thread(target=worker, args=(1000,)).start()
使用鎖的過程中,總是不經意加上鎖,出現死鎖的產生,出現了死鎖,如何解決呢?
使用try finally 將鎖釋放,另一種使用with上下文管理。
鎖的使用場景在于應該少用鎖,還要就是如若上鎖,將鎖的使用時間縮短,避免時間太長而出現無法釋放鎖的結果。
可重入鎖Lock,
import threading
import time
lock = threading.RLock()
print(lock.acquire())
print('------------')
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False, timeout=10)) # 異常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {}'.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock)) # 注意觀察lock對象的信息
lock.release()
#lock.release() #多了一次
print('===============')
print()
print(lock.acquire(blocking=False)) # 1次
#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨線程了,異常
lock.release()
print('~~~~~~~~~~~~~~~~~')
print()
# 測試多線程
print(lock.acquire())
def sub(l):
print('{}: {}'.format(threading.current_thread(), l.acquire())) # 阻塞
print('{}: {}'.format(threading.current_thread(), l.acquire(False)))
print('lock in sub thread {}'.format(lock))
l.release()
print('sub 1')
l.release()
print('sub 2')
# l.release() # 多了一次
threading.Timer(2, sub, args=(lock,)).start() # 傳入同一個lock對象
print('++++++++++++++++++++++')
print()
print(lock.acquire())
lock.release()
time.sleep(5)
print('釋放主線程鎖')
lock.release()
使用構造方法Condition(lock=None),默認是Rloc,
具體方法為;
acquire(*args),獲取鎖
wait(self,timeout=None),等待或超時
notify(n=1),喚醒線程,沒有等待就沒有任何操作,指線程
notify_all(),喚醒所有等待的線程。
Condition主要用于生產者和消費者模型中,解決匹配的問題。
使用方式:先獲取acquire,使用完了要釋放release,避免死鎖最好使用with上下文;生產者和消費者可以使用notify and notify_all。
如下例子:
from threading import Thread, Event
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是為了演示,不考慮線程安全問題
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是為了使用方便,與邏輯無關
def produce(self, total):
for _ in range(total):
data = random.randint(0,100)
logging.info(data)
self.data = data
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
data = self.data
logging.info("recieved {}".format(data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()
這里代碼會有缺陷:優化如下:
from threading import Thread, Event, Condition
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
## 此例只是為了演示,不考慮線程安全問題
class Dispatcher:
def __init__(self):
self.data = None
self.event = Event() # event只是為了使用方便,與邏輯無關
self.cond = Condition()
def produce(self, total):
for _ in range(total):
data = random.randint(0,100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1) # 模擬產生數據速度
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait() # 阻塞等通知
logging.info("received {}".format(self.data))
self.event.wait(0.5) # 模擬消費的速度
d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
# 增加消費者
for i in range(5):
c = Thread(target=d.consume, name='consumer-{}'.format(i))
c.start()
p.start()
Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),構建barrier對象,timeout未指定的默認值;
n_waiting ,當前barrier等待的線程數。;
parties ,需要等待
wait(timeout=None),wait方法設置超時并超時發送,barrie處于broken狀態。
而broken的狀態方法:
broken,打開狀態,返回true;
abort(),barrier在broken狀態中,wait等待的線程會拋出BrokenBarrierError異常,直到reset恢復barrier;
reset(),恢復barrier,重新開始攔截。
barrier不做演示:
還有semaphore信號量,每次acquire時,都會減一,到0時的線程再到release后,大于0,恢復阻塞的線程。
方法:
Semaphore(value=1) 構造方法,alue小于0,拋ValueError異常;
acquire(blocking=True, timeout=None) 獲取信號量,計數器減1,獲取成功返回True;
release() 釋放信號量,計數器加1。
使用信號量處理時,需要注意release超界問題,邊界問題,其實,在使用中,python有GIL的存在,有的多線程就變成線程安全的,注意一點,但實際上它們并不是線程安全類型。因此我們在使用中要具體場景具體分析具體使用。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。