Skip to content

Commit

Permalink
save clips for tracked objects
Browse files Browse the repository at this point in the history
  • Loading branch information
blakeblackshear committed Aug 1, 2020
1 parent 53ccc90 commit 7383db6
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 5 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ RUN apt -qq update && apt -qq install --no-install-recommends -y \
numpy \
imutils \
scipy \
psutil \
&& python3.7 -m pip install -U \
Flask \
paho-mqtt \
Expand All @@ -49,6 +50,8 @@ RUN wget -q https://dl.google.com/coral/canned_models/coco_labels.txt -O /labelm
RUN wget -q https://github.com/google-coral/edgetpu/raw/master/test_data/ssd_mobilenet_v2_coco_quant_postprocess.tflite -O /cpu_model.tflite


RUN mkdir /cache && mkdir /clips

WORKDIR /opt/frigate/
ADD frigate frigate/
COPY detect_objects.py .
Expand Down
9 changes: 9 additions & 0 deletions config/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ cameras:
################
take_frame: 1

################
# This will save a clip for each tracked object by frigate along with a json file that contains
# data related to the tracked object. This works by telling ffmpeg to write video segments to /cache
# from the video stream without re-encoding. Clips are them created by using ffmpeg to merge segments
# without re-encoding. The segements saved are unaltered from what frigate receives to avoid re-encoding.
# They do not contain bounding boxes. 30 seconds of video is added to the start of the clip.
################
save_clips: False

################
# Configuration for the snapshots in the debug view and mqtt
################
Expand Down
30 changes: 28 additions & 2 deletions detect_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
from frigate.object_processing import TrackedObjectProcessor
from frigate.events import EventProcessor
from frigate.util import EventsPerSecond
from frigate.edgetpu import EdgeTPUProcess

Expand Down Expand Up @@ -176,6 +177,9 @@ def on_connect(client, userdata, flags, rc):

# Queue for cameras to push tracked objects to
tracked_objects_queue = mp.SimpleQueue()

# Queue for clip processing
event_queue = mp.Queue()

# Start the shared tflite process
tflite_process = EdgeTPUProcess()
Expand All @@ -190,6 +194,25 @@ def on_connect(client, userdata, flags, rc):
ffmpeg_hwaccel_args = ffmpeg.get('hwaccel_args', FFMPEG_DEFAULT_CONFIG['hwaccel_args'])
ffmpeg_input_args = ffmpeg.get('input_args', FFMPEG_DEFAULT_CONFIG['input_args'])
ffmpeg_output_args = ffmpeg.get('output_args', FFMPEG_DEFAULT_CONFIG['output_args'])
if config.get('save_clips', False):
ffmpeg_output_args = [
"-f",
"segment",
"-segment_time",
"10",
"-segment_format",
"mp4",
"-reset_timestamps",
"1",
"-strftime",
"1",
"-c",
"copy",
"-an",
"-map",
"0",
f"/cache/{name}-%Y%m%d%H%M%S.mp4"
] + ffmpeg_output_args
ffmpeg_cmd = (['ffmpeg'] +
ffmpeg_global_args +
ffmpeg_hwaccel_args +
Expand Down Expand Up @@ -239,8 +262,11 @@ def on_connect(client, userdata, flags, rc):
for name, camera_process in camera_processes.items():
camera_process['process'].start()
print(f"Camera_process started for {name}: {camera_process['process'].pid}")

object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue)

event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue)
event_processor.start()

object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue)
object_processor.start()

camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process)
Expand Down
152 changes: 152 additions & 0 deletions frigate/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import os
import time
import psutil
import threading
from collections import defaultdict
import json
import datetime
import subprocess as sp
import queue

class EventProcessor(threading.Thread):
def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue):
threading.Thread.__init__(self)
self.config = config
self.camera_processes = camera_processes
self.cache_dir = cache_dir
self.clip_dir = clip_dir
self.cached_clips = {}
self.event_queue = event_queue
self.events_in_process = {}

def refresh_cache(self):
cached_files = os.listdir(self.cache_dir)

files_in_use = []
for process_data in self.camera_processes.values():
try:
ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_process'].pid)
flist = ffmpeg_process.open_files()
if flist:
for nt in flist:
if nt.path.startswith(self.cache_dir):
files_in_use.append(nt.path.split('/')[-1])
except:
continue

for f in cached_files:
if f in files_in_use or f in self.cached_clips:
continue

camera = f.split('-')[0]
start_time = datetime.datetime.strptime(f.split('-')[1].split('.')[0], '%Y%m%d%H%M%S')

ffprobe_cmd = " ".join([
'ffprobe',
'-v',
'error',
'-show_entries',
'format=duration',
'-of',
'default=noprint_wrappers=1:nokey=1',
f"{os.path.join(self.cache_dir,f)}"
])
p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
if p_status == 0:
duration = float(output.decode('utf-8').strip())
else:
print(f"bad file: {f}")
os.remove(os.path.join(self.cache_dir,f))
continue

self.cached_clips[f] = {
'path': f,
'camera': camera,
'start_time': start_time.timestamp(),
'duration': duration
}

if len(self.events_in_process) > 0:
earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time']
else:
earliest_event = datetime.datetime.now().timestamp()

for f, data in list(self.cached_clips.items()):
if earliest_event-90 > data['start_time']+data['duration']:
del self.cached_clips[f]
os.remove(os.path.join(self.cache_dir,f))

def create_clip(self, camera, event_data):
# get all clips from the camera with the event sorted
sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])

while sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']:
time.sleep(5)
self.refresh_cache()
# get all clips from the camera with the event sorted
sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])

playlist_start = event_data['start_time']-30
playlist_end = event_data['end_time']+5
playlist_lines = []
for clip in sorted_clips:
# clip ends before playlist start time, skip
if clip['start_time']+clip['duration'] < playlist_start:
continue
# clip starts after playlist ends, finish
if clip['start_time'] > playlist_end:
break
playlist_lines.append(f"file '{os.path.join(self.cache_dir,clip['path'])}'")
# if this is the starting clip, add an inpoint
if clip['start_time'] < playlist_start:
playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}")
# if this is the ending clip, add an outpoint
if clip['start_time']+clip['duration'] > playlist_end:
playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}")

clip_name = f"{camera}-{event_data['id']}"
ffmpeg_cmd = [
'ffmpeg',
'-y',
'-protocol_whitelist',
'pipe,file',
'-f',
'concat',
'-safe',
'0',
'-i',
'-',
'-c',
'copy',
f"{os.path.join(self.clip_dir, clip_name)}.mp4"
]

p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
if p.returncode != 0:
print(p.stderr)
return

with open(f"{os.path.join(self.clip_dir, clip_name)}.json", 'w') as outfile:
json.dump(event_data, outfile)

def run(self):
while True:
try:
event_type, camera, event_data = self.event_queue.get(timeout=10)
except queue.Empty:
self.refresh_cache()
continue

self.refresh_cache()

if event_type == 'start':
self.events_in_process[event_data['id']] = event_data

if event_type == 'end':
if self.config[camera].get('save_clips', False) and len(self.cached_clips) > 0:
self.create_clip(camera, event_data)
del self.events_in_process[event_data['id']]


30 changes: 27 additions & 3 deletions frigate/object_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
COLOR_MAP[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])

class TrackedObjectProcessor(threading.Thread):
def __init__(self, config, client, topic_prefix, tracked_objects_queue):
def __init__(self, config, client, topic_prefix, tracked_objects_queue, event_queue):
threading.Thread.__init__(self)
self.config = config
self.client = client
self.topic_prefix = topic_prefix
self.tracked_objects_queue = tracked_objects_queue
self.event_queue = event_queue
self.camera_data = defaultdict(lambda: {
'best_objects': {},
'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')),
Expand All @@ -50,12 +51,35 @@ def get_current_frame(self, camera):

def run(self):
while True:
camera, frame_time, tracked_objects = self.tracked_objects_queue.get()
camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get()

config = self.config[camera]
best_objects = self.camera_data[camera]['best_objects']
current_object_status = self.camera_data[camera]['object_status']
self.camera_data[camera]['tracked_objects'] = tracked_objects
tracked_objects = self.camera_data[camera]['tracked_objects']

current_ids = current_tracked_objects.keys()
previous_ids = tracked_objects.keys()
removed_ids = list(set(previous_ids).difference(current_ids))
new_ids = list(set(current_ids).difference(previous_ids))
updated_ids = list(set(current_ids).intersection(previous_ids))

for id in new_ids:
tracked_objects[id] = current_tracked_objects[id]
# publish events to mqtt
self.client.publish(f"{self.topic_prefix}/{camera}/events/start", json.dumps(tracked_objects[id]), retain=False)
self.event_queue.put(('start', camera, tracked_objects[id]))

for id in updated_ids:
tracked_objects[id] = current_tracked_objects[id]

for id in removed_ids:
# publish events to mqtt
tracked_objects[id]['end_time'] = frame_time
self.client.publish(f"{self.topic_prefix}/{camera}/events/end", json.dumps(tracked_objects[id]), retain=False)
self.event_queue.put(('end', camera, tracked_objects[id]))
del tracked_objects[id]

self.camera_data[camera]['current_frame_time'] = frame_time

###
Expand Down
4 changes: 4 additions & 0 deletions frigate/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self, max_disappeared):
def register(self, index, obj):
id = f"{obj['frame_time']}-{index}"
obj['id'] = id
obj['start_time'] = obj['frame_time']
obj['top_score'] = obj['score']
self.add_history(obj)
self.tracked_objects[id] = obj
Expand All @@ -45,6 +46,9 @@ def add_history(self, obj):
}
if 'history' in obj:
obj['history'].append(entry)
# only maintain the last 20 in history
if len(obj['history']) > 20:
obj['history'] = obj['history'][-20:]
else:
obj['history'] = [entry]

Expand Down

0 comments on commit 7383db6

Please sign in to comment.