From 718b4f3fd783a7ae4fb8d391195df6778272a077 Mon Sep 17 00:00:00 2001 From: Blake Blackshear Date: Sat, 13 Feb 2021 09:33:32 -0600 Subject: [PATCH] relay mqtt to clients --- frigate/app.py | 2 +- frigate/http.py | 43 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index e3e63802fe..8e5178c793 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -137,7 +137,7 @@ def init_stats(self): self.stats_tracking = stats_init(self.camera_metrics, self.detectors) def init_web_server(self): - self.flask_app = create_app(self.config, self.db, self.stats_tracking, self.detected_frames_processor) + self.flask_app = create_app(self.config, self.db, self.stats_tracking, self.detected_frames_processor, self.mqtt_client) def init_mqtt(self): self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics) diff --git a/frigate/http.py b/frigate/http.py index d292281a63..ae4587ad52 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -1,11 +1,13 @@ import base64 import datetime +import json import logging import os import time from functools import reduce import cv2 +import gevent import numpy as np from flask import (Blueprint, Flask, Response, current_app, jsonify, make_response, request) @@ -24,7 +26,37 @@ bp = Blueprint('frigate', __name__) ws = Blueprint('ws', __name__) -def create_app(frigate_config, database: SqliteDatabase, stats_tracking, detected_frames_processor): +class MqttBackend(): + """Interface for registering and updating WebSocket clients.""" + + def __init__(self, mqtt_client, topic_prefix): + self.clients = list() + self.mqtt_client = mqtt_client + self.topic_prefix = topic_prefix + + def register(self, client): + """Register a WebSocket connection for Mqtt updates.""" + self.clients.append(client) + + def run(self): + def send(client, userdata, message): + """Sends mqtt messages to clients.""" + logger.info(f"Sending mqtt to ws clients {len(self.clients)}") + ws_message = json.dumps({ + 'topic': message.topic, + 'payload': message.payload.decode() + }) + for client in self.clients: + client.send(ws_message) + + logger.info(f"Subscribing to {self.topic_prefix}/#") + self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send) + + def start(self): + """Maintains mqtt subscription in the background.""" + gevent.spawn(self.run) + +def create_app(frigate_config, database: SqliteDatabase, stats_tracking, detected_frames_processor, mqtt_client): app = Flask(__name__) sockets = Sockets(app) @@ -44,6 +76,9 @@ def _db_close(exc): app.register_blueprint(bp) sockets.register_blueprint(ws) + app.mqtt_backend = MqttBackend(mqtt_client, frigate_config.mqtt.topic_prefix) + app.mqtt_backend.start() + return app @bp.route('/') @@ -311,6 +346,8 @@ def imagestream(detected_frames_processor, camera_name, fps, height, draw_option @ws.route('/ws') def echo_socket(socket): + current_app.mqtt_backend.register(socket) + while not socket.closed: - message = socket.receive() - socket.send(message) + # Context switch while `ChatBackend.start` is running in the background. + gevent.sleep(0.1)