您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關如何進行Deep SORT多目標跟蹤算法代碼解析,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
在《DEEP LEARNING IN VIDEO MULTI-OBJECT TRACKING: A SURVEY》這篇基于深度學習的多目標跟蹤的綜述中,描述了MOT問題中四個主要步驟:
以上就是四個核心步驟,其中核心是檢測,SORT論文的摘要中提到,僅僅換一個更好的檢測器,就可以將目標跟蹤表現提升18.9%。
Deep SORT算法的前身是SORT, 全稱是Simple Online and Realtime Tracking。簡單介紹一下,SORT最大特點是基于Faster R-CNN的目標檢測方法,并利用卡爾曼濾波算法+匈牙利算法,極大提高了多目標跟蹤的速度,同時達到了SOTA的準確率。
這個算法確實是在實際應用中使用較為廣泛的一個算法,核心就是兩個算法:卡爾曼濾波和匈牙利算法。
卡爾曼濾波算法分為兩個過程,預測和更新。該算法將目標的運動狀態定義為8個正態分布的向量。
預測:當目標經過移動,通過上一幀的目標框和速度等參數,預測出當前幀的目標框位置和速度等參數。
更新:預測值和觀測值,兩個正態分布的狀態進行線性加權,得到目前系統預測的狀態。
**匈牙利算法:**解決的是一個分配問題,在MOT主要步驟中的計算相似度的,得到了前后兩幀的相似度矩陣。匈牙利算法就是通過求解這個相似度矩陣,從而解決前后兩幀真正匹配的目標。這部分sklearn庫有對應的函數linear_assignment來進行求解。
SORT算法中是通過前后兩幀IOU來構建相似度矩陣,所以SORT計算速度非常快。
下圖是一張SORT核心算法流程圖:
Detections是通過目標檢測器得到的目標框,Tracks是一段軌跡。核心是匹配的過程與卡爾曼濾波的預測和更新過程。
流程如下:目標檢測器得到目標框Detections,同時卡爾曼濾波器預測當前的幀的Tracks, 然后將Detections和Tracks進行IOU匹配,最終得到的結果分為:
卡爾曼濾波可以根據Tracks狀態預測下一幀的目標框狀態。
卡爾曼濾波更新是對觀測值(匹配上的Track)和估計值更新所有track的狀態。
DeepSort中最大的特點是加入外觀信息,借用了ReID領域模型來提取特征,減少了ID switch的次數。整體流程圖如下:
可以看出,Deep SORT算法在SORT算法的基礎上增加了級聯匹配(Matching Cascade)+新軌跡的確認(confirmed)。總體流程就是:
其中上圖中的級聯匹配展開如下:
上圖非常清晰地解釋了如何進行級聯匹配,上圖由虛線劃分為兩部分:
上半部分中計算相似度矩陣的方法使用到了外觀模型(ReID)和運動模型(馬氏距離)來計算相似度,得到代價矩陣,另外一個則是門控矩陣,用于限制代價矩陣中過大的值。
下半部分中是是級聯匹配的數據關聯步驟,匹配過程是一個循環(max age個迭代,默認為70),也就是從missing age=0到missing age=70的軌跡和Detections進行匹配,沒有丟失過的軌跡優先匹配,丟失較為久遠的就靠后匹配。通過這部分處理,可以重新將被遮擋目標找回,降低被遮擋然后再出現的目標發生的ID Switch次數。
將Detection和Track進行匹配,所以出現幾種情況
論文中提供的代碼是如下地址: https://github.com/nwojke/deep_sort
上圖是Github庫中有關Deep SORT的核心代碼,不包括Faster R-CNN檢測部分,所以主要將講解這部分的幾個文件,筆者也對其中核心代碼進行了部分注釋,地址在: https://github.com/pprp/deep_sort_yolov3_pytorch , 將其中的目標檢測器換成了U版的yolov3, 將deep_sort文件中的核心進行了調用。
下圖是筆者總結的這幾個類調用的類圖(不是特別嚴謹,但是能大概展示各個模塊的關系):
DeepSort是核心類,調用其他模塊,大體上可以分為三個模塊:
DeepSort類對外接口非常簡單:
self.deepsort = DeepSort(args.deepsort_checkpoint)#實例化
outputs = self.deepsort.update(bbox_xcycwh, cls_conf, im)#通過接收目標檢測結果進行更新
在外部調用的時候只需要以上兩步即可,非常簡單。
通過類圖,對整體模塊有了框架上理解,下面深入理解一下這些模塊。
class Detection(object):
"""
This class represents a bounding box detection in a single image.
"""
def __init__(self, tlwh, confidence, feature):
self.tlwh = np.asarray(tlwh, dtype=np.float)
self.confidence = float(confidence)
self.feature = np.asarray(feature, dtype=np.float32)
def to_tlbr(self):
"""Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
`(top left, bottom right)`.
"""
ret = self.tlwh.copy()
ret[2:] += ret[:2]
return ret
def to_xyah(self):
"""Convert bounding box to format `(center x, center y, aspect ratio,
height)`, where the aspect ratio is `width / height`.
"""
ret = self.tlwh.copy()
ret[:2] += ret[2:] / 2
ret[2] /= ret[3]
return ret
Detection類用于保存通過目標檢測器得到的一個檢測框,包含top left坐標+框的寬和高,以及該bbox的置信度還有通過reid獲取得到的對應的embedding。除此以外提供了不同bbox位置格式的轉換方法:
class Track:
# 一個軌跡的信息,包含(x,y,a,h) & v
"""
A single target track with state space `(x, y, a, h)` and associated
velocities, where `(x, y)` is the center of the bounding box, `a` is the
aspect ratio and `h` is the height.
"""
def __init__(self, mean, covariance, track_id, n_init, max_age,
feature=None):
# max age是一個存活期限,默認為70幀,在
self.mean = mean
self.covariance = covariance
self.track_id = track_id
self.hits = 1
# hits和n_init進行比較
# hits每次update的時候進行一次更新(只有match的時候才進行update)
# hits代表匹配上了多少次,匹配次數超過n_init就會設置為confirmed狀態
self.age = 1 # 沒有用到,和time_since_update功能重復
self.time_since_update = 0
# 每次調用predict函數的時候就會+1
# 每次調用update函數的時候就會設置為0
self.state = TrackState.Tentative
self.features = []
# 每個track對應多個features, 每次更新都將最新的feature添加到列表中
if feature is not None:
self.features.append(feature)
self._n_init = n_init # 如果連續n_init幀都沒有出現失配,設置為deleted狀態
self._max_age = max_age # 上限
Track類主要存儲的是軌跡信息,mean和covariance是保存的框的位置和速度信息,track_id代表分配給這個軌跡的ID。state代表框的狀態,有三種:
max_age代表一個Track存活期限,他需要和time_since_update變量進行比對。time_since_update是每次軌跡調用predict函數的時候就會+1,每次調用predict的時候就會重置為0,也就是說如果一個軌跡長時間沒有update(沒有匹配上)的時候,就會不斷增加,直到time_since_update超過max age(默認70),將這個Track從Tracker中的列表刪除。
hits代表連續確認多少次,用在從不確定態轉為確定態的時候。每次Track進行update的時候,hits就會+1, 如果hits>n_init(默認為3),也就是連續三幀的該軌跡都得到了匹配,這時候才將不確定態轉為確定態。
需要說明的是每個軌跡還有一個重要的變量,features列表,存儲該軌跡在不同幀對應位置通過ReID提取到的特征。為何要保存這個列表,而不是將其更新為當前最新的特征呢?這是為了解決目標被遮擋后再次出現的問題,需要從以往幀對應的特征進行匹配。另外,如果特征過多會嚴重拖慢計算速度,所以有一個參數budget用來控制特征列表的長度,取最新的budget個features,將舊的刪除掉。
ReID網絡是獨立于目標檢測和跟蹤器的模塊,功能是提取對應bounding box中的feature,得到一個固定維度的embedding作為該bbox的代表,供計算相似度時使用。
class Extractor(object):
def __init__(self, model_name, model_path, use_cuda=True):
self.net = build_model(name=model_name,
num_classes=96)
self.device = "cuda" if torch.cuda.is_available(
) and use_cuda else "cpu"
state_dict = torch.load(model_path)['net_dict']
self.net.load_state_dict(state_dict)
print("Loading weights from {}... Done!".format(model_path))
self.net.to(self.device)
self.size = (128,128)
self.norm = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.3568, 0.3141, 0.2781],
[0.1752, 0.1857, 0.1879])
])
def _preprocess(self, im_crops):
"""
TODO:
1. to float with scale from 0 to 1
2. resize to (64, 128) as Market1501 dataset did
3. concatenate to a numpy array
3. to torch Tensor
4. normalize
"""
def _resize(im, size):
return cv2.resize(im.astype(np.float32) / 255., size)
im_batch = torch.cat([
self.norm(_resize(im, self.size)).unsqueeze(0) for im in im_crops
],dim=0).float()
return im_batch
def __call__(self, im_crops):
im_batch = self._preprocess(im_crops)
with torch.no_grad():
im_batch = im_batch.to(self.device)
features = self.net(im_batch)
return features.cpu().numpy()
模型訓練是按照傳統ReID的方法進行,使用Extractor類的時候輸入為一個list的圖片,得到圖片對應的特征。
這個類中用到了兩個計算距離的函數:
def _pdist(a, b):
# 用于計算成對的平方距離
# a NxM 代表N個對象,每個對象有M個數值作為embedding進行比較
# b LxM 代表L個對象,每個對象有M個數值作為embedding進行比較
# 返回的是NxL的矩陣,比如dist[i][j]代表a[i]和b[j]之間的平方和距離
# 實現見:https://blog.csdn.net/frankzd/article/details/80251042
a, b = np.asarray(a), np.asarray(b) # 拷貝一份數據
if len(a) == 0 or len(b) == 0:
return np.zeros((len(a), len(b)))
a2, b2 = np.square(a).sum(axis=1), np.square(
b).sum(axis=1) # 求每個embedding的平方和
# sum(N) + sum(L) -2 x [NxM]x[MxL] = [NxL]
r2 = -2. * np.dot(a, b.T) + a2[:, None] + b2[None, :]
r2 = np.clip(r2, 0., float(np.inf))
return r2
def _cosine_distance(a, b, data_is_normalized=False):
# a和b之間的余弦距離
# a : [NxM] b : [LxM]
# 余弦距離 = 1 - 余弦相似度
# https://blog.csdn.net/u013749540/article/details/51813922
if not data_is_normalized:
# 需要將余弦相似度轉化成類似歐氏距離的余弦距離。
a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)
# np.linalg.norm 操作是求向量的范式,默認是L2范式,等同于求向量的歐式距離。
b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)
return 1. - np.dot(a, b.T)
以上代碼對應公式,注意余弦距離=1-余弦相似度。
class NearestNeighborDistanceMetric(object):
# 對于每個目標,返回一個最近的距離
def __init__(self, metric, matching_threshold, budget=None):
# 默認matching_threshold = 0.2 budge = 100
if metric == "euclidean":
# 使用最近鄰歐氏距離
self._metric = _nn_euclidean_distance
elif metric == "cosine":
# 使用最近鄰余弦距離
self._metric = _nn_cosine_distance
else:
raise ValueError("Invalid metric; must be either 'euclidean' or 'cosine'")
self.matching_threshold = matching_threshold
# 在級聯匹配的函數中調用
self.budget = budget
# budge 預算,控制feature的多少
self.samples = {}
# samples是一個字典{id->feature list}
def partial_fit(self, features, targets, active_targets):
# 作用:部分擬合,用新的數據更新測量距離
# 調用:在特征集更新模塊部分調用,tracker.update()中
for feature, target in zip(features, targets):
self.samples.setdefault(target, []).append(feature)
# 對應目標下添加新的feature,更新feature集合
# 目標id : feature list
if self.budget is not None:
self.samples[target] = self.samples[target][-self.budget:]
# 設置預算,每個類最多多少個目標,超過直接忽略
# 篩選激活的目標
self.samples = {k: self.samples[k] for k in active_targets}
def distance(self, features, targets):
# 作用:比較feature和targets之間的距離,返回一個代價矩陣
# 調用:在匹配階段,將distance封裝為gated_metric,
# 進行外觀信息(reid得到的深度特征)+
# 運動信息(馬氏距離用于度量兩個分布相似程度)
cost_matrix = np.zeros((len(targets), len(features)))
for i, target in enumerate(targets):
cost_matrix[i, :] = self._metric(self.samples[target], features)
return cost_matrix
Tracker類是最核心的類,Tracker中保存了所有的軌跡信息,負責初始化第一幀的軌跡、卡爾曼濾波的預測和更新、負責級聯匹配、IOU匹配等等核心工作。
class Tracker:
# 是一個多目標tracker,保存了很多個track軌跡
# 負責調用卡爾曼濾波來預測track的新狀態+進行匹配工作+初始化第一幀
# Tracker調用update或predict的時候,其中的每個track也會各自調用自己的update或predict
"""
This is the multi-target tracker.
"""
def __init__(self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
# 調用的時候,后邊的參數全部是默認的
self.metric = metric
# metric是一個類,用于計算距離(余弦距離或馬氏距離)
self.max_iou_distance = max_iou_distance
# 最大iou,iou匹配的時候使用
self.max_age = max_age
# 直接指定級聯匹配的cascade_depth參數
self.n_init = n_init
# n_init代表需要n_init次數的update才會將track狀態設置為confirmed
self.kf = kalman_filter.KalmanFilter()# 卡爾曼濾波器
self.tracks = [] # 保存一系列軌跡
self._next_id = 1 # 下一個分配的軌跡id
def predict(self):
# 遍歷每個track都進行一次預測
"""Propagate track state distributions one time step forward.
This function should be called once every time step, before `update`.
"""
for track in self.tracks:
track.predict(self.kf)
然后來看最核心的update函數和match函數,可以對照下面的流程圖一起看:
update函數
def update(self, detections):
# 進行測量的更新和軌跡管理
"""Perform measurement update and track management.
Parameters
----------
detections : List[deep_sort.detection.Detection]
A list of detections at the current time step.
"""
# Run matching cascade.
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
# Update track set.
# 1. 針對匹配上的結果
for track_idx, detection_idx in matches:
# track更新對應的detection
self.tracks[track_idx].update(self.kf, detections[detection_idx])
# 2. 針對未匹配的tracker,調用mark_missed標記
# track失配,若待定則刪除,若update時間很久也刪除
# max age是一個存活期限,默認為70幀
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
# 3. 針對未匹配的detection, detection失配,進行初始化
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx])
# 得到最新的tracks列表,保存的是標記為confirmed和Tentative的track
self.tracks = [t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
# 獲取所有confirmed狀態的track id
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
continue
features += track.features # 將tracks列表拼接到features列表
# 獲取每個feature對應的track id
targets += [track.track_id for _ in track.features]
track.features = []
# 距離度量中的 特征集更新
self.metric.partial_fit(np.asarray(features), np.asarray(targets),
active_targets)
match函數:
def _match(self, detections):
# 主要功能是進行匹配,找到匹配的,未匹配的部分
def gated_metric(tracks, dets, track_indices, detection_indices):
# 功能: 用于計算track和detection之間的距離,代價函數
# 需要使用在KM算法之前
# 調用:
# cost_matrix = distance_metric(tracks, detections,
# track_indices, detection_indices)
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
# 1. 通過最近鄰計算出代價矩陣 cosine distance
cost_matrix = self.metric.distance(features, targets)
# 2. 計算馬氏距離,得到新的狀態矩陣
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
# 劃分不同軌跡的狀態
confirmed_tracks = [
i for i, t in enumerate(self.tracks) if t.is_confirmed()
]
unconfirmed_tracks = [
i for i, t in enumerate(self.tracks) if not t.is_confirmed()
]
# 進行級聯匹配,得到匹配的track、不匹配的track、不匹配的detection
'''
!!!!!!!!!!!
級聯匹配
!!!!!!!!!!!
'''
# gated_metric->cosine distance
# 僅僅對確定態的軌跡進行級聯匹配
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric,
self.metric.matching_threshold,
self.max_age,
self.tracks,
detections,
confirmed_tracks)
# 將所有狀態為未確定態的軌跡和剛剛沒有匹配上的軌跡組合為iou_track_candidates,
# 進行IoU的匹配
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a
if self.tracks[k].time_since_update == 1 # 剛剛沒有匹配上
]
# 未匹配
unmatched_tracks_a = [
k for k in unmatched_tracks_a
if self.tracks[k].time_since_update != 1 # 已經很久沒有匹配上
]
'''
!!!!!!!!!!!
IOU 匹配
對級聯匹配中還沒有匹配成功的目標再進行IoU匹配
!!!!!!!!!!!
'''
# 雖然和級聯匹配中使用的都是min_cost_matching作為核心,
# 這里使用的metric是iou cost和以上不同
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost,
self.max_iou_distance,
self.tracks,
detections,
iou_track_candidates,
unmatched_detections)
matches = matches_a + matches_b # 組合兩部分match得到的結果
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
以上兩部分結合注釋和以下流程圖可以更容易理解。
下邊是論文中給出的級聯匹配的偽代碼:
以下代碼是偽代碼對應的實現
# 1. 分配track_indices和detection_indices
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
# cascade depth = max age 默認為70
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
break
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
if len(track_indices_l) == 0: # Nothing to match at this level
continue
# 2. 級聯匹配核心內容就是這個函數
matches_l, _, unmatched_detections = \
min_cost_matching( # max_distance=0.2
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
門控矩陣的作用就是通過計算卡爾曼濾波的狀態分布和測量值之間的距離對代價矩陣進行限制。
代價矩陣中的距離是Track和Detection之間的表觀相似度,假如一個軌跡要去匹配兩個表觀特征非常相似的Detection,這樣就很容易出錯,但是這個時候分別讓兩個Detection計算與這個軌跡的馬氏距離,并使用一個閾值gating_threshold進行限制,所以就可以將馬氏距離較遠的那個Detection區分開,可以降低錯誤的匹配。
def gate_cost_matrix(
kf, cost_matrix, tracks, detections, track_indices, detection_indices,
gated_cost=INFTY_COST, only_position=False):
# 根據通過卡爾曼濾波獲得的狀態分布,使成本矩陣中的不可行條目無效。
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim] # 9.4877
measurements = np.asarray([detections[i].to_xyah()
for i in detection_indices])
for row, track_idx in enumerate(track_indices):
track = tracks[track_idx]
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position)
cost_matrix[row, gating_distance >
gating_threshold] = gated_cost # 設置為inf
return cost_matrix
在Deep SORT中,需要估計Track的以下狀態:
下圖代表卡爾曼濾波器主要過程:
下面這部分主要參考:https://zhuanlan.zhihu.com/p/90835266
如果對卡爾曼濾波算法有較為深入的了解,可以結合卡爾曼濾波算法和代碼進行理解。
預測分兩個公式:
第一個公式:
其中F是狀態轉移矩陣,如下圖:
第二個公式:
P是當前幀(time=t)的協方差,Q是卡爾曼濾波器的運動估計誤差,代表不確定程度。
def predict(self, mean, covariance):
# 相當于得到t時刻估計值
# Q 預測過程中噪聲協方差
std_pos = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-2,
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[3],
1e-5,
self._std_weight_velocity * mean[3]]
# np.r_ 按列連接兩個矩陣
# 初始化噪聲矩陣Q
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
# x' = Fx
mean = np.dot(self._motion_mat, mean)
# P' = FPF^T+Q
covariance = np.linalg.multi_dot((
self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
更新的公式
def project(self, mean, covariance):
# R 測量過程中噪聲的協方差
std = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-1,
self._std_weight_position * mean[3]]
# 初始化噪聲矩陣R
innovation_cov = np.diag(np.square(std))
# 將均值向量映射到檢測空間,即Hx'
mean = np.dot(self._update_mat, mean)
# 將協方差矩陣映射到檢測空間,即HP'H^T
covariance = np.linalg.multi_dot((
self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def update(self, mean, covariance, measurement):
# 通過估計值和觀測值估計最新結果
# 將均值和協方差映射到檢測空間,得到 Hx' 和 S
projected_mean, projected_cov = self.project(mean, covariance)
# 矩陣分解
chol_factor, lower = scipy.linalg.cho_factor(
projected_cov, lower=True, check_finite=False)
# 計算卡爾曼增益K
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
check_finite=False).T
# z - Hx'
innovation = measurement - projected_mean
# x = x' + Ky
new_mean = mean + np.dot(innovation, kalman_gain.T)
# P = (I - KH)P'
new_covariance = covariance - np.linalg.multi_dot((
kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
這個公式中,z是Detection的mean,不包含變化值,狀態為[cx,cy,a,h]。H是測量矩陣,將Track的均值向量 映射到檢測空間。計算的y是Detection和Track的均值誤差。
R是目標檢測器的噪聲矩陣,是一個4x4的對角矩陣。對角線上的值分別為中心點兩個坐標以及寬高的噪聲。
計算的是卡爾曼增益,是作用于衡量估計誤差的權重。
更新后的均值向量x。
更新后的協方差矩陣。
卡爾曼濾波筆者理解也不是很深入,沒有推導過公式,對這部分感興趣的推薦幾個博客:
流程部分主要按照以下流程圖來走一遍:
感謝知乎@貓弟總結的流程圖,講解非常地清晰,如果單純看代碼,非常容易混淆。比如說代價矩陣的計算這部分,連續套了三個函數,才被真正調用。上圖將整體流程總結地非常棒。筆者將參考以上流程結合代碼進行梳理:
class Detector(object):
def __init__(self, args):
self.args = args
if args.display:
cv2.namedWindow("test", cv2.WINDOW_NORMAL)
cv2.resizeWindow("test", args.display_width, args.display_height)
device = torch.device(
'cuda') if torch.cuda.is_available() else torch.device('cpu')
self.vdo = cv2.VideoCapture()
self.yolo3 = InferYOLOv3(args.yolo_cfg,
args.img_size,
args.yolo_weights,
args.data_cfg,
device,
conf_thres=args.conf_thresh,
nms_thres=args.nms_thresh)
self.deepsort = DeepSort(args.deepsort_checkpoint)
初始化DeepSORT對象,更新部分接收目標檢測得到的框的位置,置信度和圖片:
outputs = self.deepsort.update(bbox_xcycwh, cls_conf, im)
class DeepSort(object):
def __init__(self, model_path, max_dist=0.2):
self.min_confidence = 0.3
# yolov3中檢測結果置信度閾值,篩選置信度小于0.3的detection。
self.nms_max_overlap = 1.0
# 非極大抑制閾值,設置為1代表不進行抑制
# 用于提取圖片的embedding,返回的是一個batch圖片對應的特征
self.extractor = Extractor("resnet18",
model_path,
use_cuda=True)
max_cosine_distance = max_dist
# 用在級聯匹配的地方,如果大于改閾值,就直接忽略
nn_budget = 100
# 預算,每個類別最多的樣本個數,如果超過,刪除舊的
# 第一個參數可選'cosine' or 'euclidean'
metric = NearestNeighborDistanceMetric("cosine",
max_cosine_distance,
nn_budget)
self.tracker = Tracker(metric)
def update(self, bbox_xywh, confidences, ori_img):
self.height, self.width = ori_img.shape[:2]
# generate detections
features = self._get_features(bbox_xywh, ori_img)
# 從原圖中crop bbox對應圖片并計算得到embedding
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
detections = [
Detection(bbox_tlwh[i], conf, features[i])
for i, conf in enumerate(confidences) if conf > self.min_confidence
] # 篩選小于min_confidence的目標,并構造一個Detection對象構成的列表
# Detection是一個存儲圖中一個bbox結果
# 需要:1. bbox(tlwh形式) 2. 對應置信度 3. 對應embedding
# run on non-maximum supression
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
# 使用非極大抑制
# 默認nms_thres=1的時候開啟也沒有用,實際上并沒有進行非極大抑制
indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
detections = [detections[i] for i in indices]
# update tracker
# tracker給出一個預測結果,然后將detection傳入,進行卡爾曼濾波操作
self.tracker.predict()
self.tracker.update(detections)
# output bbox identities
# 存儲結果以及可視化
outputs = []
for track in self.tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
box = track.to_tlwh()
x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
track_id = track.track_id
outputs.append(np.array([x1, y1, x2, y2, track_id], dtype=np.int))
if len(outputs) > 0:
outputs = np.stack(outputs, axis=0)
return np.array(outputs)
從這里開始對照以上流程圖會更加清晰。在Deep SORT初始化的過程中有一個核心metric,NearestNeighborDistanceMetric類會在匹配和特征集更新的時候用到。
梳理DeepSORT的update流程:
根據傳入的參數(bbox_xywh, conf, img)使用ReID模型提取對應bbox的表觀特征。
構建detections的列表,列表中的內容就是Detection類,在此處限制了bbox的最小置信度。
使用非極大抑制算法,由于默認nms_thres=1,實際上并沒有用。
Tracker類進行一次預測,然后將detections傳入,進行更新。
最后將Tracker中保存的軌跡中狀態屬于確認態的軌跡返回。
以上核心在Tracker的predict和update函數,接著梳理。
Tracker是一個多目標跟蹤器,保存了很多個track軌跡,負責調用卡爾曼濾波來預測track的新狀態+進行匹配工作+初始化第一幀。Tracker調用update或predict的時候,其中的每個track也會各自調用自己的update或predict
class Tracker:
def __init__(self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
# 調用的時候,后邊的參數全部是默認的
self.metric = metric
self.max_iou_distance = max_iou_distance
# 最大iou,iou匹配的時候使用
self.max_age = max_age
# 直接指定級聯匹配的cascade_depth參數
self.n_init = n_init
# n_init代表需要n_init次數的update才會將track狀態設置為confirmed
self.kf = kalman_filter.KalmanFilter() # 卡爾曼濾波器
self.tracks = [] # 保存一系列軌跡
self._next_id = 1 # 下一個分配的軌跡id
def predict(self):
# 遍歷每個track都進行一次預測
"""Propagate track state distributions one time step forward.
This function should be called once every time step, before `update`.
"""
for track in self.tracks:
track.predict(self.kf)
predict主要是對軌跡列表中所有的軌跡使用卡爾曼濾波算法進行狀態的預測。
Tracker的更新屬于最核心的部分。
def update(self, detections):
# 進行測量的更新和軌跡管理
"""Perform measurement update and track management.
Parameters
----------
detections : List[deep_sort.detection.Detection]
A list of detections at the current time step.
"""
# Run matching cascade.
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
# Update track set.
# 1. 針對匹配上的結果
for track_idx, detection_idx in matches:
# track更新對應的detection
self.tracks[track_idx].update(self.kf, detections[detection_idx])
# 2. 針對未匹配的tracker,調用mark_missed標記
# track失配,若待定則刪除,若update時間很久也刪除
# max age是一個存活期限,默認為70幀
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
# 3. 針對未匹配的detection, detection失配,進行初始化
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx])
# 得到最新的tracks列表,保存的是標記為confirmed和Tentative的track
self.tracks = [t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
# 獲取所有confirmed狀態的track id
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
continue
features += track.features # 將tracks列表拼接到features列表
# 獲取每個feature對應的track id
targets += [track.track_id for _ in track.features]
track.features = []
# 距離度量中的 特征集更新
self.metric.partial_fit(np.asarray(features), np.asarray(targets),active_targets)
這部分注釋已經很詳細了,主要是一些后處理代碼,需要關注的是對匹配上的,未匹配的Detection,未匹配的Track三者進行的處理以及最后進行特征集更新部分,可以對照流程圖梳理。
Tracker的update函數的核心函數是match函數,描述如何進行匹配的流程:
def _match(self, detections):
# 主要功能是進行匹配,找到匹配的,未匹配的部分
def gated_metric(tracks, dets, track_indices, detection_indices):
# 功能: 用于計算track和detection之間的距離,代價函數
# 需要使用在KM算法之前
# 調用:
# cost_matrix = distance_metric(tracks, detections,
# track_indices, detection_indices)
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
# 1. 通過最近鄰計算出代價矩陣 cosine distance
cost_matrix = self.metric.distance(features, targets)
# 2. 計算馬氏距離,得到新的狀態矩陣
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
# 劃分不同軌跡的狀態
confirmed_tracks = [
i for i, t in enumerate(self.tracks) if t.is_confirmed()
]
unconfirmed_tracks = [
i for i, t in enumerate(self.tracks) if not t.is_confirmed()
]
# 進行級聯匹配,得到匹配的track、不匹配的track、不匹配的detection
'''
!!!!!!!!!!!
級聯匹配
!!!!!!!!!!!
'''
# gated_metric->cosine distance
# 僅僅對確定態的軌跡進行級聯匹配
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric,
self.metric.matching_threshold,
self.max_age,
self.tracks,
detections,
confirmed_tracks)
# 將所有狀態為未確定態的軌跡和剛剛沒有匹配上的軌跡組合為iou_track_candidates,
# 進行IoU的匹配
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a
if self.tracks[k].time_since_update == 1 # 剛剛沒有匹配上
]
# 未匹配
unmatched_tracks_a = [
k for k in unmatched_tracks_a
if self.tracks[k].time_since_update != 1 # 已經很久沒有匹配上
]
'''
!!!!!!!!!!!
IOU 匹配
對級聯匹配中還沒有匹配成功的目標再進行IoU匹配
!!!!!!!!!!!
'''
# 雖然和級聯匹配中使用的都是min_cost_matching作為核心,
# 這里使用的metric是iou cost和以上不同
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost,
self.max_iou_distance,
self.tracks,
detections,
iou_track_candidates,
unmatched_detections)
matches = matches_a + matches_b # 組合兩部分match得到的結果
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
對照下圖來看會順暢很多:
可以看到,匹配函數的核心是級聯匹配+IOU匹配,先來看看級聯匹配:
調用在這里:
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric,
self.metric.matching_threshold,
self.max_age,
self.tracks,
detections,
confirmed_tracks)
級聯匹配函數展開:
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
# 級聯匹配
# 1. 分配track_indices和detection_indices
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
# cascade depth = max age 默認為70
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
break
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
if len(track_indices_l) == 0: # Nothing to match at this level
continue
# 2. 級聯匹配核心內容就是這個函數
matches_l, _, unmatched_detections = \
min_cost_matching( # max_distance=0.2
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
可以看到和偽代碼是一致的,文章上半部分也有提到這部分代碼。這部分代碼中還有一個核心的函數min_cost_matching,這個函數可以接收不同的distance_metric,在級聯匹配和IoU匹配中都有用到。
min_cost_matching函數:
def min_cost_matching(
distance_metric, max_distance, tracks, detections, track_indices=None,
detection_indices=None):
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
if len(detection_indices) == 0 or len(track_indices) == 0:
return [], track_indices, detection_indices # Nothing to match.
# -----------------------------------------
# Gated_distance——>
# 1. cosine distance
# 2. 馬氏距離
# 得到代價矩陣
# -----------------------------------------
# iou_cost——>
# 僅僅計算track和detection之間的iou距離
# -----------------------------------------
cost_matrix = distance_metric(
tracks, detections, track_indices, detection_indices)
# -----------------------------------------
# gated_distance中設置距離中最高上限,
# 這里最遠距離實際是在deep sort類中的max_dist參數設置的
# 默認max_dist=0.2, 距離越小越好
# -----------------------------------------
# iou_cost情況下,max_distance的設置對應tracker中的max_iou_distance,
# 默認值為max_iou_distance=0.7
# 注意結果是1-iou,所以越小越好
# -----------------------------------------
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
# 匈牙利算法或者KM算法
row_indices, col_indices = linear_assignment(cost_matrix)
matches, unmatched_tracks, unmatched_detections = [], [], []
# 這幾個for循環用于對匹配結果進行篩選,得到匹配和未匹配的結果
for col, detection_idx in enumerate(detection_indices):
if col not in col_indices:
unmatched_detections.append(detection_idx)
for row, track_idx in enumerate(track_indices):
if row not in row_indices:
unmatched_tracks.append(track_idx)
for row, col in zip(row_indices, col_indices):
track_idx = track_indices[row]
detection_idx = detection_indices[col]
if cost_matrix[row, col] > max_distance:
unmatched_tracks.append(track_idx)
unmatched_detections.append(detection_idx)
else:
matches.append((track_idx, detection_idx))
# 得到匹配,未匹配軌跡,未匹配檢測
return matches, unmatched_tracks, unmatched_detections
注釋中提到distance_metric是有兩個的:
def gated_metric(tracks, dets, track_indices, detection_indices):
# 功能: 用于計算track和detection之間的距離,代價函數
# 需要使用在KM算法之前
# 調用:
# cost_matrix = distance_metric(tracks, detections,
# track_indices, detection_indices)
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
# 1. 通過最近鄰計算出代價矩陣 cosine distance
cost_matrix = self.metric.distance(features, targets)
# 2. 計算馬氏距離,得到新的狀態矩陣
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
對應下圖進行理解(下圖上半部分就是對應的gated_metric函數):
# 雖然和級聯匹配中使用的都是min_cost_matching作為核心,
# 這里使用的metric是iou cost和以上不同
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost,
self.max_iou_distance,
self.tracks,
detections,
iou_track_candidates,
unmatched_detections)
iou_cost代價很容易理解,用于計算Track和Detection之間的IOU距離矩陣。
def iou_cost(tracks, detections, track_indices=None,
detection_indices=None):
# 計算track和detection之間的iou距離矩陣
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
for row, track_idx in enumerate(track_indices):
if tracks[track_idx].time_since_update > 1:
cost_matrix[row, :] = linear_assignment.INFTY_COST
continue
bbox = tracks[track_idx].to_tlwh()
candidates = np.asarray(
[detections[i].tlwh for i in detection_indices])
cost_matrix[row, :] = 1. - iou(bbox, candidates)
return cost_matrix
以上就是如何進行Deep SORT多目標跟蹤算法代碼解析,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。