您好,登錄后才能下訂單哦!
這篇文章主要講解了“Python OpenCV如何使用dlib進行多目標跟蹤”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Python OpenCV如何使用dlib進行多目標跟蹤”吧!
在本指南的第一部分,我將演示如何實現一個簡單、樸素的 dlib 多對象跟蹤腳本。該程序將跟蹤視頻中的多個對象;但是,我們會注意到腳本運行速度有點慢。 為了提高我們的 FPS,我將向您展示一個更快、更高效的 dlib 多對象跟蹤器實現。 最后,我將討論一些改進和建議,以增強我們的多對象跟蹤實現。
你可以使用tree命令查看我們的項目結構:
mobilenet_ssd/ 目錄包含我們的 MobileNet + SSD Caffe 模型文件,它允許我們檢測人(以及其他對象)。 今天我們將回顧兩個 Python 腳本:
multi_object_tracking_slow.py:dlib 多對象跟蹤的簡單“樸素”方法。
multi_object_tracking_fast.py:利用多處理的先進、快速的方法。
我們今天要介紹的第一個 dlib 多對象跟蹤實現是“樸素的”,因為它將:
1.使用一個簡單的跟蹤器對象列表。
2.僅使用我們處理器的單個內核按順序更新每個跟蹤器。
對于某些對象跟蹤任務,此實現將綽綽有余;然而,為了優化我們的 FPS,我們應該將對象跟蹤器分布在多個進程中。
我們將從本節中的簡單實現開始,然后在下一節中轉到更快的方法。 首先,打開multi_object_tracking_slow.py 腳本并插入以下代碼:
# import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2
讓我們解析我們的命令行參數:
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
我們的腳本在運行時處理以下命令行參數:
--prototxt :Caffe 部署 prototxt 文件的路徑。
--model : prototxt 附帶的模型文件的路徑。
--video : 輸入視頻文件的路徑。我們將在此視頻中使用 dlib 執行多對象跟蹤。
--output :輸出視頻文件的可選路徑。如果未指定路徑,則不會將視頻輸出到磁盤。我建議輸出到 .avi 或 .mp4 文件。
--confidence :對象檢測置信度閾值 ,默認是0.2 ,該值表示從對象檢測器過濾弱檢測的最小概率。
讓我們定義這個模型支持的類列表,并從磁盤加載我們的模型:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
我們只關心今天的賽跑示例中的“人”類,但您可以輕松修改以跟蹤其他類。 我們加載了預訓練的對象檢測器模型。我們將使用我們預訓練的 SSD 來檢測視頻中物體的存在。我們將創建一個 dlib 對象跟蹤器來跟蹤每個檢測到的對象。
我們還有一些初始化要執行:
# initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start()
我們初始化我們的視頻流——我們將從輸入視頻中一次讀取一個幀。 隨后,我們的視頻writer被初始化為 None 。在即將到來的 while 循環中,我們將與視頻writer進行更多合作。 現在初始化我們的跟蹤器和標簽列表。 最后,開始我們的每秒幀數計數器。 我們都準備好開始處理視頻了:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
將幀調整為600像素寬,保持高寬比。然后,為了dlib兼容性,幀被轉換為RGB顏色通道排序(OpenCV的默認值是BGR,而dlib的默認值是RGB)。
讓我們開始對象檢測階段:
# if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward()
為了執行對象跟蹤,我們必須首先執行對象檢測
手動,通過停止視頻流并手動選擇每個對象的邊界框。
以編程方式,使用經過訓練的對象檢測器來檢測對象的存在(這就是我們在這里所做的)。
如果沒有對象跟蹤器,那么我們知道我們還沒有執行對象檢測。
我們創建并通過 SSD 網絡傳遞一個 blob 以檢測對象。
接下來,我們繼續循環檢測以查找屬于person類的對象,因為我們的輸入視頻是人類的賽跑:
# loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
我們開始循環檢測,其中我們:
過濾掉弱檢測。
確保每個檢測都是一個person。當然,您可以刪除這行代碼或根據您自己的過濾需求對其進行自定義。
現在我們已經在框架中定位了每個person,讓我們實例化我們的跟蹤器并繪制我們的初始邊界框 + 類標簽:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
要開始跟蹤對象,我們:
計算每個檢測到的對象的邊界框。
實例化邊界框坐標并將其傳遞給跟蹤器。邊界框在這里尤為重要。我們需要為邊界框創建一個 dlib.rectangle 并將其傳遞給 start_track 方法。然后,dlib 可以開始跟蹤對象。
最后,我們用單個跟蹤器填充trackers列表。
因此,在下一個代碼塊中,我們將處理已經建立跟蹤器并且只需要更新位置的情況。 我們在初始檢測步驟中執行了兩個額外的任務:
將類標簽附加到標簽列表。如果您要跟蹤多種類型的對象(例如dog+person),您可能希望知道每個對象的類型。
在對象周圍繪制每個邊界框矩形和類標簽。
如果我們的檢測列表的長度大于0,我們就知道我們處于目標跟蹤階段:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
在目標跟蹤階段,我們遍歷所有trackers和相應的labels。然后我們繼續update每個對象的位置。為了更新位置,我們只需傳遞 rgb 圖像。
提取邊界框坐標后,我們可以為每個被跟蹤對象繪制一個邊界框rectangle和label。
幀處理循環中的其余步驟涉及寫入輸出視頻(如有必要)并顯示結果:
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update()
在這里,我們:
如有必要,將frame寫入視頻。
顯示輸出幀并捕獲按鍵。如果按下q鍵(退出),我們就會跳出循環。 最后,我們更新我們的每秒幀數信息以進行基準測試。
剩下的步驟是在終端打印FPS信息并釋放指針:
# stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
讓我們評估準確性和性能。打開終端并執行以下命令:
$ python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \ --video race.mp4 --output race_output_slow.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 24.51 [INFO] approx. FPS: 13.87
看來我們的多目標跟蹤器起作用了!
但正如你所看到的,我們只獲得了約13幀/秒。
對于某些應用程序來說,這個FPS可能已經足夠了——然而,如果你需要更快的FPS,我建議你看看下面我們更高效的dlib多對象跟蹤器。其次,要明白跟蹤的準確性并不完美。
如果您運行上一節中的 dlib 多對象跟蹤腳本并同時打開系統的監視器,您會注意到只使用了處理器的一個內核。
如果您運行上一節中的 dlib 多對象跟蹤腳本并同時打開系統的活動監視器,您會注意到只使用了處理器的一個內核。
利用進程使我們的操作系統能夠執行更好的進程調度,將進程映射到我們機器上的特定處理器內核(大多數現代操作系統能夠以并行方式有效地調度使用大量 CPU 的進程)。
繼續打開 mutli_object_tracking_fast.py 并插入以下代碼:
# import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2
我們將使用 Python Process 類來生成一個新進程——每個新進程都獨立于原始進程。
為了生成這個進程,我們需要提供一個 Python 可以調用的函數,然后 Python 將使用該函數并創建一個全新的進程并執行它:
def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect)
start_tracker 的前三個參數包括:
box :我們要跟蹤的對象的邊界框坐標,可能是由某種對象檢測器返回的,無論是手動的還是編程的。
label :對象的人類可讀標簽。
rgb :我們將用于啟動初始 dlib 對象跟蹤器的 RGB 圖像。
請記住Python多處理是如何工作的——Python將調用這個函數,然后創建一個全新的解釋器來執行其中的代碼。因此,每個生成的start_tracker進程都將獨立于它的父進程。為了與Python驅動程序腳本通信,我們需要利用管道或隊列(Pipes and Queues)。這兩種類型的對象都是線程/進程安全的,使用鎖和信號量來完成。
本質上,我們正在創建一個簡單的生產者/消費者關系:
我們的父進程將生成新幀并將它們添加到特定對象跟蹤器的隊列中。
然后子進程將消耗幀,應用對象跟蹤,然后返回更新的邊界框坐標。
我決定在這篇文章中使用 Queue 對象;但是,請記住,如果您愿意,也可以使用Pipe
現在讓我們開始一個無限循環,它將在進程中運行:
# loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY)))
我們在這里無限循環——這個函數將作為守護進程調用,所以我們不需要擔心加入它。
首先,我們將嘗試從 inputQueue 中抓取一個新幀。如果幀不為空,我們將抓取幀,然后更新對象跟蹤器,讓我們獲得更新后的邊界框坐標。
最后,我們將標簽和邊界框寫入 outputQueue,以便父進程可以在腳本的主循環中使用它們。
回到父進程,我們將解析命令行參數:
# construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args())
此腳本的命令行參數與我們較慢的非多處理腳本完全相同。
讓我們初始化我們的輸入和輸出隊列:
# initialize our lists of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = []
這些隊列將保存我們正在跟蹤的對象。生成的每個進程都需要兩個 Queue 對象:
一個從其中讀取輸入幀
另一個將結果寫入
下一個代碼塊與我們之前的腳本相同:
# initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start()
我們定義模型的 CLASSES 并加載模型本身。
現在讓我們開始循環視頻流中的幀:
# loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True)
現在讓我們處理沒有 inputQueues 的情況:
# if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue
如果沒有 inputQueues,那么我們需要在對象跟蹤之前應用對象檢測。 我們應用對象檢測,然后繼續循環。我們獲取置信度值并過濾掉弱檢測。 如果我們的置信度滿足我們的命令行參數建立的閾值,我們會考慮檢測,但我們會通過類標簽進一步過濾掉它。在這種情況下,我們只尋找person對象。 假設我們找到了一個person,我們將創建隊列和生成跟蹤進程:
# compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
我們首先計算邊界框坐標。從那里我們創建兩個新隊列 iq 和 oq,分別將它們附加到 inputQueues 和 outputQueues。我們生成一個新的 start_tracker 進程,傳遞邊界框、標簽、rgb 圖像和 iq + oq。
我們還繪制了檢測到的對象的邊界框rectangle和類標簽label。
否則,我們已經執行了對象檢測,因此我們需要將每個 dlib 對象跟蹤器應用于幀:
# otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2)
遍歷每個 inputQueues ,我們將 rgb 圖像添加到它們。然后我們遍歷每個outputQueues,從每個獨立的對象跟蹤器獲取邊界框坐標。最后,我們繪制邊界框+關聯的類標簽label。
# check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
如有必要,我們將幀寫入輸出視頻,并將幀顯示到屏幕。 如果按下q鍵,我們退出,跳出循環。 如果我們繼續處理幀,我們的 FPS 計算器會更新,然后我們再次在 while 循環的開頭開始處理。 否則,我們處理完幀,我們顯示 FPS 信息 + 釋放指針并關閉窗口。
打開終端并執行以下命令:
$ python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \ --video race.mp4 --output race_output_fast.avi [INFO] loading model... [INFO] starting video stream... [INFO] elapsed time: 14.01 [INFO] approx. FPS: 24.26
如您所見,我們更快、更高效的多對象跟蹤器以 24 FPS 運行,比我們之前的實現提高了 45% 以上?! 此外,如果您在此腳本運行時打開活動監視器,您將看到更多系統的CPU 正在被使用。 這種加速是通過允許每個 dlib 對象跟蹤器在單獨的進程中運行來獲得的,這反過來又使您的操作系統能夠執行更有效的 CPU 資源調度。
multi_object_tracking_slow.py
# USAGE # python multi_object_tracking_slow.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import numpy as np import argparse import imutils import dlib import cv2 # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") # ap.add_argument("-v", "--video", required=True, # help="path to input video file") ap.add_argument("-v", "--video", help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") # vs = cv2.VideoCapture(args["video"]) vs = cv2.VideoCapture(0) writer = None # initialize the list of object trackers and corresponding class # labels trackers = [] labels = [] # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if there are no object trackers we first need to detect objects # and then create a tracker for each object if len(trackers) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") # construct a dlib rectangle object from the bounding # box coordinates and start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(startX, startY, endX, endY) t.start_track(rgb, rect) # update our set of trackers and corresponding class # labels labels.append(label) trackers.append(t) # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of the trackers for (t, l) in zip(trackers, labels): # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # draw the bounding box from the correlation object tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, l, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
multi_object_tracking_fast.py
# USAGE # python multi_object_tracking_fast.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \ # --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel --video race.mp4 # import the necessary packages from imutils.video import FPS import multiprocessing import numpy as np import argparse import imutils import dlib import cv2 def start_tracker(box, label, rgb, inputQueue, outputQueue): # construct a dlib rectangle object from the bounding box # coordinates and then start the correlation tracker t = dlib.correlation_tracker() rect = dlib.rectangle(box[0], box[1], box[2], box[3]) t.start_track(rgb, rect) # loop indefinitely -- this function will be called as a daemon # process so we don't need to worry about joining it while True: # attempt to grab the next frame from the input queue rgb = inputQueue.get() # if there was an entry in our queue, process it if rgb is not None: # update the tracker and grab the position of the tracked # object t.update(rgb) pos = t.get_position() # unpack the position object startX = int(pos.left()) startY = int(pos.top()) endX = int(pos.right()) endY = int(pos.bottom()) # add the label + bounding box coordinates to the output # queue outputQueue.put((label, (startX, startY, endX, endY))) # construct the argument parser and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-p", "--prototxt", required=True, help="path to Caffe 'deploy' prototxt file") ap.add_argument("-m", "--model", required=True, help="path to Caffe pre-trained model") ap.add_argument("-v", "--video", required=True, help="path to input video file") ap.add_argument("-o", "--output", type=str, help="path to optional output video file") ap.add_argument("-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections") args = vars(ap.parse_args()) # initialize our list of queues -- both input queue and output queue # for *every* object that we will be tracking inputQueues = [] outputQueues = [] # initialize the list of class labels MobileNet SSD was trained to # detect CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"]) # initialize the video stream and output video writer print("[INFO] starting video stream...") vs = cv2.VideoCapture(args["video"]) writer = None # start the frames per second throughput estimator fps = FPS().start() # loop over frames from the video file stream while True: # grab the next frame from the video file (grabbed, frame) = vs.read() # check to see if we have reached the end of the video file if frame is None: break # resize the frame for faster processing and then convert the # frame from BGR to RGB ordering (dlib needs RGB ordering) frame = imutils.resize(frame, width=600) rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # if we are supposed to be writing a video to disk, initialize # the writer if args["output"] is not None and writer is None: fourcc = cv2.VideoWriter_fourcc(*"MJPG") writer = cv2.VideoWriter(args["output"], fourcc, 30, (frame.shape[1], frame.shape[0]), True) # if our list of queues is empty then we know we have yet to # create our first object tracker if len(inputQueues) == 0: # grab the frame dimensions and convert the frame to a blob (h, w) = frame.shape[:2] blob = cv2.dnn.blobFromImage(frame, 0.007843, (w, h), 127.5) # pass the blob through the network and obtain the detections # and predictions net.setInput(blob) detections = net.forward() # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated # with the prediction confidence = detections[0, 0, i, 2] # filter out weak detections by requiring a minimum # confidence if confidence > args["confidence"]: # extract the index of the class label from the # detections list idx = int(detections[0, 0, i, 1]) label = CLASSES[idx] # if the class label is not a person, ignore it if CLASSES[idx] != "person": continue # compute the (x, y)-coordinates of the bounding box # for the object box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") bb = (startX, startY, endX, endY) # create two brand new input and output queues, # respectively iq = multiprocessing.Queue() oq = multiprocessing.Queue() inputQueues.append(iq) outputQueues.append(oq) # spawn a daemon process for a new object tracker p = multiprocessing.Process( target=start_tracker, args=(bb, label, rgb, iq, oq)) p.daemon = True p.start() # grab the corresponding class label for the detection # and draw the bounding box cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # otherwise, we've already performed detection so let's track # multiple objects else: # loop over each of our input ques and add the input RGB # frame to it, enabling us to update each of the respective # object trackers running in separate processes for iq in inputQueues: iq.put(rgb) # loop over each of the output queues for oq in outputQueues: # grab the updated bounding box coordinates for the # object -- the .get method is a blocking operation so # this will pause our execution until the respective # process finishes the tracking update (label, (startX, startY, endX, endY)) = oq.get() # draw the bounding box from the correlation object # tracker cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2) cv2.putText(frame, label, (startX, startY - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 2) # check to see if we should write the frame to disk if writer is not None: writer.write(frame) # show the output frame cv2.imshow("Frame", frame) key = cv2.waitKey(1) & 0xFF # if the `q` key was pressed, break from the loop if key == ord("q"): break # update the FPS counter fps.update() # stop the timer and display FPS information fps.stop() print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # check to see if we need to release the video writer pointer if writer is not None: writer.release() # do a bit of cleanup cv2.destroyAllWindows() vs.release()
我今天與大家分享的 dlib 多對象跟蹤 Python 腳本可以很好地處理較短的視頻流;但是,如果您打算將此實現用于長時間運行的生產環境(大約數小時到數天的視頻),我建議您進行兩項主要改進:
第一個改進是利用進程池,而不是為每個要跟蹤的對象生成一個全新的進程。今天在這里介紹的實現為我們需要跟蹤的每個對象構建了一個全新的隊列Queue和進程Process。
對于今天的目的來說這很好,但考慮一下如果您想跟蹤視頻中的 50 個對象——這意味著您將生成 50 個進程,每個對象一個。那時,系統管理所有這些進程的開銷將破壞 FPS 的任何增加。相反,您可能希望利用進程池。
如果您的系統有 N 個處理器內核,那么您需要創建一個包含 N – 1 個進程的池,將一個內核留給您的操作系統來執行系統操作。這些進程中的每一個都應該執行多個對象跟蹤,維護一個對象跟蹤器列表,類似于我們今天介紹的第一個多對象跟蹤。
這種改進將允許您利用處理器的所有內核,而無需產生許多獨立進程的開銷。
我要做的第二個改進是清理進程和隊列。如果 dlib 將對象報告為“丟失”或“消失”,我們不會從 start_tracker 函數返回,這意味著該進程將在父腳本的生命周期內存活,并且僅在父腳本退出時被終止。
同樣,這對于我們今天的目的來說很好,但是如果您打算在生產環境中使用此代碼,您應該:
更新 start_tracker 函數以在 dlib 報告對象丟失后返回。
同時刪除對應進程的 inputQueue 和 outputQueue。
未能執行此清理將導致長時間運行作業的不必要的計算消耗和內存開銷。
第三個改進是通過每 N 幀運行一次對象檢測器(而不是在開始時只運行一次)來提高跟蹤精度。
實際上,我在使用 OpenCV 計數的文章中演示了這一點。它需要更多的邏輯和思考,但會產生更準確的跟蹤器。 我選擇放棄這個腳本的實現,這樣我就可以簡明地教你多處理方法。 理想情況下,除了多處理之外,您還可以使用第三個改進。
感謝各位的閱讀,以上就是“Python OpenCV如何使用dlib進行多目標跟蹤”的內容了,經過本文的學習后,相信大家對Python OpenCV如何使用dlib進行多目標跟蹤這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。