您好,登錄后才能下訂單哦!
這篇文章主要講解了“python多進程和多線程的實際用法”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“python多進程和多線程的實際用法”吧!
寫在前面
總所周知,unix/linux 為多任務操作系統,,即可以支持遠大于CPU數量的任務同時運行
理解多任務就需要知道操作系統的CPU上下文:
首先,我們都知道cpu一個時間段其實只能運行單個任務,只不過在很短的時間內,CPU快速切換到不同的任務進行執行,造成一種多任務同時執行的錯覺
而在CPU切換到其他任務執行之前,為了確保在切換任務之后還能夠繼續切換回原來的任務繼續執行,并且看起來是一種連續的狀態,就必須將任務的狀態保持起來,以便恢復原始任務時能夠繼續之前的狀態執行,狀態保存的位置位于CPU的寄存器和程序計數器(,PC)
簡單來說寄存器是CPU內置的容量小、但速度極快的內存,用來保存程序的堆棧信息即數據段信息。程序計數器保存程序的下一條指令的位置即代碼段信息。
所以,CPU上下文就是指CPU寄存器和程序計數器中保存的任務狀態信息;CPU上下文切換就是把前一個任務的CPU上下文保存起來,然后加載下一個任務的上下文到這些寄存器和程序計數器,再跳轉到程序計數器所指示的位置運行程序。
python程序默認都是執行單任務的進程,也就是只有一個線程。如果我們要同時執行多個任務怎么辦?
有兩種解決方案:
一種是啟動多個進程,每個進程雖然只有一個線程,但多個進程可以一塊執行多個任務。
還有一種方法是啟動一個進程,在一個進程內啟動多個線程,這樣,多個線程也可以一塊執行多個任務。
當然還有第三種方法,就是啟動多個進程,每個進程再啟動多個線程,這樣同時執行的任務就更多了,當然這種模型更復雜,實際很少采用。
Python中的多進程
在Unix/Linux系統中,提供了一個fork()函數調用,相較于普通函數調用一次,返回一次的機制,fork()調用一次,返回兩次,具體表現為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后分別在父進程和子進程內返回。
子進程永遠返回0,而父進程返回子進程的ID,這樣一個父進程可以輕松fork出很多子進程。且父進程會記下每個子進程的ID,而子進程只需要調用getppid()就可以拿到父進程的ID。
python的os模塊封裝了fork調用方法以實現在python程序中創建子進程,下面具體看兩個例子:
[root@test-yw-01 opt]# cat test.py
import os
print('Process ({}) start...'.format(os.getpid()))
pid = os.fork()
print(pid)
[root@test-yw-01 opt]# python3 test.py
Process (26620) start...
26621
0
[root@test-yunwei-01 opt]# cat process.py
import os
print('Process ({}) start...'.format(os.getpid()))
pid = os.fork()
if pid == 0:
print('The child process is {} and parent process is{}'.format(os.getpid(),os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
[root@test-yunwei-01 opt]# pyhton3 process.py
Process (25863) start...
I (25863) just created a child process (25864)
The child process is 25864 and parent process is 25863
通過fork調用這種方法,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,例如nginx就是由父進程(master process)監聽端口,再fork出子進程(work process)來處理新的http請求。
注意:
Windows沒有fork調用,所以在window pycharm上運行以上代碼無法實現以上效果。
multiprocessing模塊
雖然Windows沒有fork調用,但是可以憑借multiprocessing該多進程模塊所提供的Process類來實現。
下面看一例子:
首先模擬一個使用單進程的下載任務,并打印出進程號
1)單進程執行:
import os
from random import randint
import time
def download(filename):
print("進程號是:%s"%os.getpid())
downloadtime = 3
print('現在開始下載:{}'.format(filename))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
download('水滸傳')
download('西游記')
stop_time = time.time()
print('下載耗時:{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
得出結果
接著通過調用Process模擬開啟兩個子進程:
import time
from os import getpid
from multiprocessing import Process
def download(filename):
print("進程號是:%s"%getpid())
downloadtime = 3
print('現在開始下載:{}'.format(filename))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
task1 = Process(target=download,args=('西游記',))
task1.start() #調用start()開始執行
task2 = Process(target=download,args=('水滸傳',))
task2.start()
task1.join() # join()方法可以等待子進程結束后再繼續往下運行,通常用于進程間的同步
task2.join()
stop_time = time.time()
print('下載耗時:{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
連接池Pool
可以用進程池Pool批量創建子進程的方式來創建大量工作子進程
import os
from random import randint
from multiprocessing import Process,Pool
import time
def download(taskname):
print("進程號是:%s"%os.getpid())
downloadtime = randint(1,3)
print('現在開始下載:{}'.format(taskname))
time.sleep(downloadtime)
def runtask():
start_time = time.time()
pool = Pool(4) #定義進程連接池可用連接數量
for task in range(5):
pool.apply_async(download,args=(task,))
pool.close()
pool.join()
stop_time = time.time()
print('完成下載,下載耗時:{}'.format(stop_time - start_time))
if __name__ == '__main__':
runtask()
需要注意的點:
對pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的進程
pool的默認大小是主機CPU的核數,所以這里設置成4個進程,這樣就避免了cpu關于進程間切換帶來的額外資源消耗,提高了任務的執行效率
進程間通信
from multiprocessing.Queue import Queue
相較于普通Queue普通的隊列的先進先出模式,get方法會阻塞請求,直到有數據get出來為止。這個是多進程并發的Queue隊列,用于解決多進程間的通信問題。
from multiprocessing import Process, Queue
import os, time, random
datas = []
def write_data(args):
print('Process to write: %s' % os.getpid())
for v in "helloword":
datas.append(v)
print("write {} to queue".format(v))
args.put(v)
time.sleep(random.random())
print(datas)
def read_data(args):
print('Process to read: %s' % os.getpid())
while True:
value = args.get(True)
print("read {} from queue".format(value))
if __name__ == '__main__':
queue = Queue()
write = Process(target=write_data,args=(queue,))
read = Process(target=read_data,args=(queue,))
write.start()
read.start()
write.join()
read.terminate()
進程池中使用隊列
由于隊列對象不能在父進程與子進程間通信,所以需要使用Manager().Queue()才能實現隊列中各子進程間進行通信
from multiprocessing import Manager
if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,并傳給各個子進程:
queue = manager.Queue()
pool = Pool()
write = Process(target=write_data,args=(queue,))
read = Process(target=read_data,args=(queue,))
write.start()
read.start()
write.join()
read.terminate()
如果是用進程池,就需要使用Manager().Queue()隊列才能實現在各子進程間進行通信
參考文檔: https://blog.csdn.net/qq_32446743/article/details/79785684
https://blog.csdn.net/u013713010/article/details/53325438
Python中的多線程
相較于資源分配的基本單位進程,線程是任務運行調度的基本單位,且由于每一個進程擁有自己獨立的內存空間,而線程共享所屬進程的內存空間,所以在涉及多任務執行時,線程的上下文切換比進程少了操作系統內核將虛擬內存資源即寄存器中的內容切換出這一步驟,也就大大提升了多任務執行的效率。
首先需要明確幾個概念:
1.當一個進程啟動之后,會默認產生一個主線程,因為線程是程序執行流的最小單元,當設置多線程時,主線程會創建多個子線程,在python中,默認情況下(其實就是setDaemon(False)),主線程執行完自己的任務以后,就退出了,此時子線程會繼續執行自己的任務,直到自己的任務結束
python的提供了關于多線程的threading模塊,和多進程的啟動類似,就是把函數傳入并創建Thread實例,然后調用start()開始執行:
import os
import random
from threading import Thread
import threading
import time
def mysql_dump():
print('開始執行線程{}'.format(threading.current_thread().name)) #返回當前線程實例名稱
dumptime = random.randint(1,3)
time.sleep(dumptime) #利用time.sleep()方法模擬備份數據庫所花費時間
class Mutil_thread(Thread):
def runtask(slef):
thread_list = []
print('當前線程的名字是: ', threading.current_thread().name)
start_time = time.time()
for t in range(5):
task = Mutil_thread(target=slef.tasks)
thread_list.append(task)
for i in thread_list:
# i.setDaemon(False)
i.start()
i.join()#join()所完成的工作就是線程同步,即主線程任務結束之后,進入阻塞狀態,一直等待其他的子線程執行結束之后,主線程在終止
stop_time = time.time()
print('主線程結束!', threading.current_thread().name)
print('下載耗時:{}'.format(stop_time - start_time))
if __name__ == '__main__':
run = Mutil_thread()
run.tasks = mysql_dump
run.runtask()
執行結果:
進程默認就會啟動一個線程,我們把該線程稱為主線程,實例的名為MainThread,主線程又可以啟動新的線程,線程命名依次為Thread-1,Thread-2…
LOCK
多線程不同于進程,在多進程中,例如針對同一個變量,各自有一份拷貝存在于每個進程中,資源相互隔離,互不影響
但在進程中,線程間可以共享進程像系統申請的內存空間,雖然實現多個線程間的通信相對簡單,但是當同一個資源(臨界資源)被多個線程競爭使用時,例如線程共享進程的變量,其就有可能被任何一個線程修改,所以對這種臨界資源的訪問需要加上保護,否則資源會處于“混亂”的狀態。
import time
from threading import Thread,Lock
class Account(object): # 假定這是一個銀行賬戶
def __init__(self):
self.balance = 0 #初始余額為0元
def count(self,money):
new_balance = self.balance + money
time.sleep(0.01) # 模擬每次存款需要花費的時間
self.balance = new_balance #存完之后更新賬戶余額
@property
def get_count(self):
return(self.balance)
class Addmoney(Thread): #模擬存款業務,直接繼承Thread
def __init__(self,action,money):
super().__init__() #在繼承Thread類的基礎上,再新增action及money屬性,便于main()的直接調用
self.action = action
self.money = money
def run(self):
self.action.count(self.money)
def main():
action = Account()
threads = []
for i in range(1000): #開啟1000個線程同時向賬戶存款
t = Addmoney(action,1) #每次只存入一元
threads.append(t)
t.start()
for task in threads:
task.join()
print('賬戶余額為: ¥%s元'%action.get_count)
main()
查看執行結果: 鄭州哪個婦科醫院好 http://www.sptdfk.com/
運行上面的程序,1000線程分別向賬戶中轉入1元錢,結果小于100元。之所以出現這種情況是因為我們沒有對balance余額該線程共享的變量加以保護,當多個線程同時向賬戶中存錢時,會一起執行到new_balance = self.balance + money這行代碼,多個線程得到的賬戶余額都是初始狀態下的0,所以都是0上面做了+1的操作,因此得到了錯誤的結果。
如果我們要確保balance計算正確,就要給Account().count()上一把鎖,當某個線程開始執行Account().count()時,該線程因為獲得了鎖,因此其他線程不能同時執行,只能等待鎖被釋放后,獲得該鎖以后才能更改改。由于鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的沖突。創建一個鎖就是通過threading.Lock()來實現:
import time
from threading import Thread,Lock
import time
from threading import Thread,Lock
class Account(object): # 假定這是一個銀行賬戶
def __init__(self):
self.balance = 0 #初始余額為0元
self.lock = Lock()
def count(self,money):
self.lock.acquire()
try:
new_balance = self.balance + money
time.sleep(0.01) # 模擬每次存款需要花費的時間
self.balance = new_balance #存完之后更新賬戶余額
finally: #在finally中執行釋放鎖的操作保證正常異常鎖都能釋放
self.lock.release()
def get_count(self):
return(self.balance)
class Addmoney(Thread): #模擬存款業務
def __init__(self,action,money):
super().__init__()
self.action = action
self.money = money
self.lock = Lock()
def run(self):
self.action.count(self.money)
def main():
action = Account()
threads = []
for i in range(1000): #開啟100000個線程同時向賬戶存款
t = Addmoney(action,1) #每次只存入一元
threads.append(t)
t.start()
for task in threads:
task.join()
print('賬戶余額為: ¥%s元'%action.get_count())
main()
執行結果:
感謝各位的閱讀,以上就是“python多進程和多線程的實際用法”的內容了,經過本文的學習后,相信大家對python多進程和多線程的實際用法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。