-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7ec8a91
commit 9a8d4e9
Showing
3 changed files
with
277 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,263 @@ | ||
from collections import defaultdict | ||
import random | ||
import string | ||
|
||
import numpy as np | ||
from frigate.config import DetectConfig | ||
from frigate.track import ObjectTracker | ||
from frigate.util import intersection_over_union | ||
from norfair import Detection, Tracker, Drawable, draw_boxes | ||
from norfair.drawing.drawer import Drawer | ||
|
||
|
||
# Normalizes distance from estimate relative to object size | ||
# Other ideas: | ||
# - if estimates are inaccurate for first N detections, compare with last_detection (may be fine) | ||
# - could be variable based on time since last_detection | ||
# - include estimated velocity in the distance (car driving by of a parked car) | ||
# - include some visual similarity factor in the distance for occlusions | ||
def frigate_distance(detection: Detection, tracked_object) -> float: | ||
# calculate distances and normalize it by width and height of previous detection | ||
ld = tracked_object.last_detection | ||
width = ld.points[1][0] - ld.points[0][0] | ||
height = ld.points[1][1] - ld.points[0][1] | ||
difference = (detection.points - tracked_object.estimate).astype(float) | ||
difference[:, 0] /= width | ||
difference[:, 1] /= height | ||
|
||
# calculate euclidean distance and average | ||
return np.linalg.norm(difference, axis=1).mean() | ||
|
||
|
||
class NorfairTracker(ObjectTracker): | ||
def __init__(self, config: DetectConfig): | ||
self.tracked_objects = {} | ||
self.disappeared = {} | ||
self.positions = {} | ||
self.max_disappeared = config.max_disappeared | ||
self.detect_config = config | ||
self.track_id_map = {} | ||
# TODO: could also initialize a tracker per object class if there | ||
# was a good reason to have different distance calculations | ||
self.tracker = Tracker( | ||
distance_function=frigate_distance, | ||
# distance is relative to the size of the last | ||
# detection | ||
distance_threshold=4.0, | ||
initialization_delay=0, | ||
hit_counter_max=self.max_disappeared, | ||
) | ||
|
||
def register(self, track_id, obj): | ||
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) | ||
id = f"{obj['frame_time']}-{rand_id}" | ||
self.track_id_map[track_id] = id | ||
obj["id"] = id | ||
obj["start_time"] = obj["frame_time"] | ||
obj["motionless_count"] = 0 | ||
obj["position_changes"] = 0 | ||
self.tracked_objects[id] = obj | ||
self.disappeared[id] = 0 | ||
self.positions[id] = { | ||
"xmins": [], | ||
"ymins": [], | ||
"xmaxs": [], | ||
"ymaxs": [], | ||
"xmin": 0, | ||
"ymin": 0, | ||
"xmax": self.detect_config.width, | ||
"ymax": self.detect_config.height, | ||
} | ||
|
||
def deregister(self, id): | ||
del self.tracked_objects[id] | ||
del self.disappeared[id] | ||
|
||
# tracks the current position of the object based on the last N bounding boxes | ||
# returns False if the object has moved outside its previous position | ||
def update_position(self, id, box): | ||
position = self.positions[id] | ||
position_box = ( | ||
position["xmin"], | ||
position["ymin"], | ||
position["xmax"], | ||
position["ymax"], | ||
) | ||
|
||
xmin, ymin, xmax, ymax = box | ||
|
||
iou = intersection_over_union(position_box, box) | ||
|
||
# if the iou drops below the threshold | ||
# assume the object has moved to a new position and reset the computed box | ||
if iou < 0.6: | ||
self.positions[id] = { | ||
"xmins": [xmin], | ||
"ymins": [ymin], | ||
"xmaxs": [xmax], | ||
"ymaxs": [ymax], | ||
"xmin": xmin, | ||
"ymin": ymin, | ||
"xmax": xmax, | ||
"ymax": ymax, | ||
} | ||
return False | ||
|
||
# if there are less than 10 entries for the position, add the bounding box | ||
# and recompute the position box | ||
if len(position["xmins"]) < 10: | ||
position["xmins"].append(xmin) | ||
position["ymins"].append(ymin) | ||
position["xmaxs"].append(xmax) | ||
position["ymaxs"].append(ymax) | ||
# by using percentiles here, we hopefully remove outliers | ||
position["xmin"] = np.percentile(position["xmins"], 15) | ||
position["ymin"] = np.percentile(position["ymins"], 15) | ||
position["xmax"] = np.percentile(position["xmaxs"], 85) | ||
position["ymax"] = np.percentile(position["ymaxs"], 85) | ||
|
||
return True | ||
|
||
def is_expired(self, id): | ||
obj = self.tracked_objects[id] | ||
# get the max frames for this label type or the default | ||
max_frames = self.detect_config.stationary.max_frames.objects.get( | ||
obj["label"], self.detect_config.stationary.max_frames.default | ||
) | ||
|
||
# if there is no max_frames for this label type, continue | ||
if max_frames is None: | ||
return False | ||
|
||
# if the object has exceeded the max_frames setting, deregister | ||
if ( | ||
obj["motionless_count"] - self.detect_config.stationary.threshold | ||
> max_frames | ||
): | ||
return True | ||
|
||
return False | ||
|
||
def update(self, track_id, obj): | ||
id = self.track_id_map[track_id] | ||
self.disappeared[id] = 0 | ||
# update the motionless count if the object has not moved to a new position | ||
if self.update_position(id, obj["box"]): | ||
self.tracked_objects[id]["motionless_count"] += 1 | ||
if self.is_expired(id): | ||
self.deregister(id) | ||
return | ||
else: | ||
# register the first position change and then only increment if | ||
# the object was previously stationary | ||
if ( | ||
self.tracked_objects[id]["position_changes"] == 0 | ||
or self.tracked_objects[id]["motionless_count"] | ||
>= self.detect_config.stationary.threshold | ||
): | ||
self.tracked_objects[id]["position_changes"] += 1 | ||
self.tracked_objects[id]["motionless_count"] = 0 | ||
|
||
self.tracked_objects[id].update(obj) | ||
|
||
def update_frame_times(self, frame_time): | ||
# if the object was there in the last frame, assume it's still there | ||
detections = [ | ||
( | ||
obj["label"], | ||
obj["score"], | ||
obj["box"], | ||
obj["area"], | ||
obj["ratio"], | ||
obj["region"], | ||
) | ||
for id, obj in self.tracked_objects.items() | ||
if self.disappeared[id] == 0 | ||
] | ||
self.match_and_update(frame_time, detections=detections) | ||
|
||
def match_and_update(self, frame_time, detections): | ||
norfair_detections = [] | ||
|
||
for obj in detections: | ||
# centroid is used for other things downstream | ||
centroid_x = int((obj[2][0] + obj[2][2]) / 2.0) | ||
centroid_y = int((obj[2][1] + obj[2][3]) / 2.0) | ||
|
||
# track based on top,left and bottom,right corners instead of centroid | ||
points = np.array([[obj[2][0], obj[2][1]], [obj[2][2], obj[2][3]]]) | ||
|
||
norfair_detections.append( | ||
Detection( | ||
points=points, | ||
label=obj[0], | ||
data={ | ||
"label": obj[0], | ||
"score": obj[1], | ||
"box": obj[2], | ||
"area": obj[3], | ||
"ratio": obj[4], | ||
"region": obj[5], | ||
"frame_time": frame_time, | ||
"centroid": (centroid_x, centroid_y), | ||
}, | ||
) | ||
) | ||
|
||
tracked_objects = self.tracker.update(detections=norfair_detections) | ||
|
||
# update or create new tracks | ||
active_ids = [] | ||
for t in tracked_objects: | ||
active_ids.append(t.global_id) | ||
if not t.global_id in self.track_id_map: | ||
self.register(t.global_id, t.last_detection.data) | ||
# if there wasn't a detection in this frame, increment disappeared | ||
elif t.last_detection.data["frame_time"] != frame_time: | ||
id = self.track_id_map[t.global_id] | ||
self.disappeared[id] += 1 | ||
# else update it | ||
else: | ||
self.update(t.global_id, t.last_detection.data) | ||
|
||
# clear expired tracks | ||
expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids] | ||
for e_id in expired_ids: | ||
self.deregister(self.track_id_map[e_id]) | ||
del self.track_id_map[e_id] | ||
|
||
def debug_draw(self, frame, frame_time): | ||
active_detections = [ | ||
Drawable(id=obj.id, points=obj.last_detection.points, label=obj.label) | ||
for obj in self.tracker.tracked_objects | ||
if obj.last_detection.data["frame_time"] == frame_time | ||
] | ||
missing_detections = [ | ||
Drawable(id=obj.id, points=obj.last_detection.points, label=obj.label) | ||
for obj in self.tracker.tracked_objects | ||
if obj.last_detection.data["frame_time"] != frame_time | ||
] | ||
# draw the estimated bounding box | ||
draw_boxes(frame, self.tracker.tracked_objects, color="green", draw_ids=True) | ||
# draw the detections that were detected in the current frame | ||
draw_boxes(frame, active_detections, color="blue", draw_ids=True) | ||
# draw the detections that are missing in the current frame | ||
draw_boxes(frame, missing_detections, color="red", draw_ids=True) | ||
|
||
# draw the distance calculation for the last detection | ||
# estimate vs detection | ||
for obj in self.tracker.tracked_objects: | ||
ld = obj.last_detection | ||
# bottom right | ||
text_anchor = ( | ||
ld.points[1, 0], | ||
ld.points[1, 1], | ||
) | ||
frame = Drawer.text( | ||
frame, | ||
f"{obj.id}: {str(obj.last_distance)}", | ||
position=text_anchor, | ||
size=None, | ||
color=(255, 0, 0), | ||
thickness=None, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters