您好,登錄后才能下訂單哦!
本文實例講述了Python自定義線程池實現方法。分享給大家供大家參考,具體如下:
關于python的多線程,由與GIL的存在被廣大群主所詬病,說python的多線程不是真正的多線程。但多線程處理IO密集的任務效率還是可以杠杠的。
我實現的這個線程池其實是根據銀角的思路來實現的。
主要思路:
任務獲取和執行:
1、任務加入隊列,等待線程來獲取并執行。
2、按需生成線程,每個線程循環取任務。
線程銷毀:
1、獲取任務是終止符時,線程停止。
2、線程池close()時,向任務隊列加入和已生成線程等量的終止符。
3、線程池terminate()時,設置線程下次任務取到為終止符。
流程概要設計:
詳細代碼:
import threading import contextlib from Queue import Queue import time class ThreadPool(object): def __init__(self, max_num): self.StopEvent = 0#線程任務終止符,當線程從隊列獲取到StopEvent時,代表此線程可以銷毀。可設置為任意與任務有區別的值。 self.q = Queue() self.max_num = max_num #最大線程數 self.terminal = False #是否設置線程池強制終止 self.created_list = [] #已創建線程的線程列表 self.free_list = [] #空閑線程的線程列表 self.Deamon=False #線程是否是后臺線程 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: :return: 如果線程池已經終止,則返回True否則None """ if len(self.free_list) == 0 and len(self.created_list) < self.max_num: self.create_thread() task = (func, args, callback,) self.q.put(task) def create_thread(self): """ 創建一個線程 """ t = threading.Thread(target=self.call) t.setDaemon(self.Deamon) t.start() self.created_list.append(t)#將當前線程加入已創建線程列表created_list def call(self): """ 循環去獲取任務函數并執行任務函數 """ current_thread = threading.current_thread() #獲取當前線程對象· event = self.q.get() #從任務隊列獲取任務 while event != self.StopEvent: #判斷獲取到的任務是否是終止符 func, arguments, callback = event#從任務中獲取函數名、參數、和回調函數名 try: result = func(*arguments) func_excute_status =True#func執行成功狀態 except Exception as e: func_excute_status = False result =None print '函數執行產生錯誤', e#打印錯誤信息 if func_excute_status:#func執行成功后才能執行回調函數 if callback is not None:#判斷回調函數是否是空的 try: callback(result) except Exception as e: print '回調函數執行產生錯誤', e # 打印錯誤信息 with self.worker_state(self.free_list,current_thread): #執行完一次任務后,將線程加入空閑列表。然后繼續去取任務,如果取到任務就將線程從空閑列表移除 if self.terminal:#判斷線程池終止命令,如果需要終止,則使下次取到的任務為StopEvent。 event = self.StopEvent else: #否則繼續獲取任務 event = self.q.get() # 當線程等待任務時,q.get()方法阻塞住線程,使其持續等待 else:#若線程取到的任務是終止符,就銷毀線程 #將當前線程從已創建線程列表created_list移除 self.created_list.remove(current_thread) def close(self): """ 執行完所有的任務后,所有線程停止 """ full_size = len(self.created_list)#按已創建的線程數量往線程隊列加入終止符。 while full_size: self.q.put(self.StopEvent) full_size -= 1 def terminate(self): """ 無論是否還有任務,終止線程 """ self.terminal = True while self.created_list: self.q.put(self.StopEvent) self.q.queue.clear()#清空任務隊列 def join(self): """ 阻塞線程池上下文,使所有線程執行完后才能繼續 """ for t in self.created_list: t.join() @contextlib.contextmanager#上下文處理器,使其可以使用with語句修飾 def worker_state(self, state_list, worker_thread): """ 用于記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) if __name__ == '__main__': def Foo(arg): return arg # time.sleep(0.1) def Bar(res): print res pool=ThreadPool(5) # pool.Deamon=True#需在pool.run之前設置 for i in range(1000): pool.run(func=Foo,args=(i,),callback=Bar) pool.close() pool.join() # pool.terminate() print "任務隊列里任務數%s" %pool.q.qsize() print "當前存活子線程數量:%d" % threading.activeCount() print "當前線程創建列表:%s" %pool.created_list print "當前線程創建列表:%s" %pool.free_list
關于上下文處理:
來個簡單例子說明:
下面的代碼手動自定義了一個myopen方法,模擬我們常見的with open() as f:語句。具體的contextlib模塊使用,會單獨開章來將。
# coding:utf-8 import contextlib @contextlib.contextmanager#定義該函數支持上下文with語句 def myopen(filename,mode): f=open(filename,mode) try: yield f.readlines()#正常執行返回f.readlines() except Exception as e: print e finally: f.close()#最后在with代碼快執行完畢后返回執行finally下的f.close()實現關閉文件 if __name__ == '__main__': with myopen(r'c:\ip1.txt','r') as f: for line in f: print line
更多關于Python相關內容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結》、《Python Socket編程技巧總結》、《Python數據結構與算法教程》、《Python函數使用技巧總結》、《Python字符串操作技巧匯總》、《Python入門與進階經典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設計有所幫助。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。