Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Storing records on remote s3 storage #6586

Draft
wants to merge 8 commits into
base: dev
Choose a base branch
from
18 changes: 18 additions & 0 deletions docs/docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ database:
# The path to store the SQLite DB (default: shown below)
path: /config/frigate.db

# Optional: S3 Storage configuration
storage:
s3:
# Write records to s3 instead of local storage (default: shown below)
enabled: false
# Upload expired records to s3 before delete from local storage (default: shown below)
archive: false
# Required: S3 Access Key ID
access_key_id:
# Required: S3 Secret Access Key
secret_access_key:
# Required: S3 bucket name
bucket_name:
# Required: S3 Endpoint address
endpoint_url: http:https://hostname:port
# Optional: S3 Endpoint Path (default: shown below)
path: "/"

# Optional: model modifications
model:
# Optional: path to the model (default: automatic based on detector)
Expand Down
21 changes: 21 additions & 0 deletions frigate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,24 @@ class SnapshotsConfig(FrigateBaseModel):
)


class StorageS3Config(FrigateBaseModel):
enabled: bool = Field(default=False, title="S3 enabled.")
archive: bool = Field(
default=False, title="Archive expired records to S3 instead of delete"
)
access_key_id: str = Field(default="", title="AWS_ACCESS_KEY_ID")
secret_access_key: str = Field(default="", title="AWS_SECRET_ACCESS_KEY")
bucket_name: str = Field(default="", title="Bucket name")
endpoint_url: str = Field(default="", title="Endpoint URL")
path: str = Field(default="/", title="Base Path")


class StorageConfig(FrigateBaseModel):
s3: StorageS3Config = Field(
default_factory=StorageS3Config, title="S3 configuration"
)


class ColorConfig(FrigateBaseModel):
red: int = Field(default=255, ge=0, le=255, title="Red")
green: int = Field(default=255, ge=0, le=255, title="Green")
Expand Down Expand Up @@ -880,6 +898,9 @@ class FrigateConfig(FrigateBaseModel):
snapshots: SnapshotsConfig = Field(
default_factory=SnapshotsConfig, title="Global snapshots configuration."
)
storage: StorageConfig = Field(
default_factory=StorageConfig, title="Global storage configuration."
)
rtmp: RtmpConfig = Field(
default_factory=RtmpConfig, title="Global RTMP restreaming configuration."
)
Expand Down
11 changes: 10 additions & 1 deletion frigate/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
vainfo_hwaccel,
get_tz_modifiers,
)
from frigate.storage import StorageMaintainer
from frigate.storage import StorageMaintainer, StorageS3
from frigate.version import VERSION

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1323,6 +1323,12 @@ def recordings(camera_name):
def recording_clip(camera_name, start_ts, end_ts):
download = request.args.get("download", type=bool)

if (
current_app.frigate_config.storage.s3.enabled
or current_app.frigate_config.storage.s3.archive
):
s3 = StorageS3(current_app.frigate_config)

recordings = (
Recordings.select()
.where(
Expand All @@ -1337,6 +1343,9 @@ def recording_clip(camera_name, start_ts, end_ts):
playlist_lines = []
clip: Recordings
for clip in recordings:
if recordings.storage == "s3":
clip.path = s3.download_file_from_s3(clip.path)

playlist_lines.append(f"file '{clip.path}'")
# if this is the starting clip, add an inpoint
if clip.start_time < start_ts:
Expand Down
1 change: 1 addition & 0 deletions frigate/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ class Recordings(Model): # type: ignore[misc]
motion = IntegerField(null=True)
objects = IntegerField(null=True)
segment_size = FloatField(default=0) # this should be stored as MB
storage = CharField(max_length=20)
23 changes: 21 additions & 2 deletions frigate/record/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
from pathlib import Path

from peewee import DoesNotExist
import boto3
from multiprocessing.synchronize import Event as MpEvent

from frigate.config import RetainModeEnum, FrigateConfig
from frigate.const import RECORD_DIR, SECONDS_IN_DAY
from frigate.models import Event, Recordings, Timeline
from frigate.record.util import remove_empty_directories
from frigate.storage import StorageS3

logger = logging.getLogger(__name__)

Expand All @@ -27,6 +29,9 @@ def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
self.config = config
self.stop_event = stop_event

if self.config.storage.s3.enabled or self.config.storage.s3.archive:
self.s3 = StorageS3(config)

def clean_tmp_clips(self) -> None:
# delete any clips more than 5 minutes old
for p in Path("/tmp/cache").rglob("clip_*.mp4"):
Expand Down Expand Up @@ -54,6 +59,7 @@ def expire_recordings(self) -> None:
)

deleted_recordings = set()
moved_recordings = set()
for recording in no_camera_recordings:
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
Expand Down Expand Up @@ -99,6 +105,7 @@ def expire_recordings(self) -> None:
# TODO: expire segments based on segment stats according to config
event_start = 0
deleted_recordings = set()
moved_recordings = set()
for recording in recordings.objects().iterator():
keep = False
# Now look for a reason to keep this recording segment
Expand Down Expand Up @@ -137,8 +144,15 @@ def expire_recordings(self) -> None:
and recording.objects == 0
)
):
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
if self.config.storage.s3.archive:
s3path = self.s3.upload_file_to_s3(recording.path)
if s3path != "":
moved_recordings.add({"id": recording.id, "path": s3path})
else:
Path(recording.path).unlink(missing_ok=True)
else:
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)

# delete timeline entries relevant to this recording segment
Timeline.delete().where(
Expand All @@ -158,6 +172,11 @@ def expire_recordings(self) -> None:
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()

for recording in moved_recordings:
Recordings.update(
{Recordings.storage: "s3", Recordings.path: recording["path"]}
).where(Recordings.id == recording["id"]).execute()

logger.debug(f"End camera: {camera}.")

logger.debug("End all cameras.")
Expand Down
18 changes: 18 additions & 0 deletions frigate/record/maintainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from frigate.models import Event, Recordings
from frigate.types import RecordMetricsTypes
from frigate.util import area
from frigate.storage import StorageS3

logger = logging.getLogger(__name__)

Expand All @@ -42,6 +43,9 @@ def __init__(
self.recordings_info: dict[str, Any] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}

if self.config.storage.s3.enabled:
self.s3 = StorageS3(config)

def move_files(self) -> None:
cache_files = sorted(
[
Expand Down Expand Up @@ -335,6 +339,19 @@ def store_segment(
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
storage = "local"
if self.config.storage.s3.enabled:
s3path = self.s3.upload_file_to_s3(file_path)
if s3path != "":
Path(file_path).unlink(missing_ok=True)
file_path = s3path
storage = "s3"
else:
logger.error(
f"Unable to upload recording segment {file_path} to s3, fallback to local"
)
logger.error(e)

Recordings.create(
id=f"{start_time.timestamp()}-{rand_id}",
camera=camera,
Expand All @@ -346,6 +363,7 @@ def store_segment(
# TODO: update this to store list of active objects at some point
objects=active_count,
segment_size=segment_size,
storage=storage,
)
except Exception as e:
logger.error(f"Unable to store recording segment {cache_path}")
Expand Down
6 changes: 6 additions & 0 deletions frigate/record/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
"""Recordings Utilities."""

import os
import boto3
import tempfile
import logging
from frigate.config import FrigateConfig

logger = logging.getLogger(__name__)


def remove_empty_directories(directory: str) -> None:
Expand Down
5 changes: 5 additions & 0 deletions frigate/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from frigate.version import VERSION
from frigate.util import get_cpu_stats, get_bandwidth_stats
from frigate.object_detection import ObjectDetectProcess
from frigate.storage import StorageS3

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -274,6 +275,10 @@ def stats_snapshot(
"mount_type": get_fs_type(path),
}

if config.storage.s3.enabled or config.storage.s3.archive:
s3 = StorageS3(config)
stats["service"]["storage"]["s3"] = s3.get_bucket_stats()

stats["processes"] = {}
for name, pid in stats_tracking["processes"].items():
stats["processes"][name] = {
Expand Down
101 changes: 100 additions & 1 deletion frigate/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
from pathlib import Path
import shutil
import threading

import boto3
from botocore import UNSIGNED
from botocore.config import Config
from botocore.session import Session as boto_session
from botocore.exceptions import BotoCoreError, ClientError
from peewee import fn
import os
import tempfile


from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR
Expand All @@ -17,6 +24,98 @@
)


class StorageS3:
def __init__(self, config: FrigateConfig) -> None:
self.config = config
if self.config.storage.s3.enabled or self.config.storage.s3.archive:
if self.config.storage.s3.endpoint_url.startswith("http:https://"):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use ssl by default here?

try:
session = boto_session()
session.set_config_variable(
"s3",
{
"use_ssl": False,
"verify": False,
},
)
self.s3_client = session.create_client(
"s3",
aws_access_key_id=self.config.storage.s3.access_key_id,
aws_secret_access_key=self.config.storage.s3.secret_access_key,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are forcing the use of keys.

If config.storage.s3.access_key_id is NOT set, you should follow the default chain that allows users to use environment variable, ~/.aws/credentials etc.

See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html

endpoint_url=self.config.storage.s3.endpoint_url,
config=Config(),
)
except (BotoCoreError, ClientError) as error:
logger.error(f"Failed to create S3 client: {error}")
return None
else:
try:
self.s3_client = boto3.client(
"s3",
aws_access_key_id=self.config.storage.s3.access_key_id,
aws_secret_access_key=self.config.storage.s3.secret_access_key,
endpoint_url=self.config.storage.s3.endpoint_url,
)
except (BotoCoreError, ClientError) as error:
logger.error(f"Failed to create S3 client: {error}")
return None

self.s3_bucket = self.config.storage.s3.bucket_name
self.s3_path = self.config.storage.s3.path

def upload_file_to_s3(self, file_path) -> str:
try:
s3_filename = self.s3_path + "/" + os.path.relpath(file_path, RECORD_DIR)
self.s3_client.upload_file(file_path, self.s3_bucket, s3_filename)
logger.debug(f"Uploading {file_path} to S3 {s3_filename}")
except Exception as e:
logger.error(
f"Error occurred while uploading {file_path} to S3 {s3_filename}: {e}"
)
return ""
return s3_filename

def download_file_from_s3(self, s3_file_name) -> str:
if self.config.storage.s3.enabled or self.config.storage.s3.archive:
# Create a temporary directory
temp_dir = tempfile.gettempdir()

# Create a temporary file name with the same name as the original S3 file
local_file_path = os.path.join(temp_dir, os.path.basename(s3_file_name))

try:
# Download the file from S3
self.s3_client.download_file(
self.s3_bucket, s3_file_name, local_file_path
)
logger.debug(f"Downloaded {s3_file_name} to {local_file_path}")
return local_file_path
except Exception as e:
logger.error(
f"Error occurred while downloading {s3_file_name} from S3: {e}"
)
return None
else:
return False

def get_bucket_stats(self):
try:
total_size = 0
total_files = 0
for obj in self.s3_client.list_objects(Bucket=self.s3_bucket).get(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use list_objects_v2 and use pagination. Otherwise 1000+ objects will fail.

Something along those liines:

import boto3

def calculate_s3_bucket_size_and_file_count(bucket_name):
    s3 = boto3.client('s3')
    total_size = 0
    total_files = 0

    paginator = s3.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket_name):
        for obj in page['Contents']:
            total_size += obj['Size']
            total_files += 1

    return total_size, total_files

bucket_name = 'my-bucket'  # Replace with your bucket name
total_size, total_files = calculate_s3_bucket_size_and_file_count(bucket_name)

print(f'Total size: {total_size / 1024**3} GB')  # Convert bytes to GB
print(f'Total number of files: {total_files}')

"Contents", []
):
total_size += obj["Size"]
total_files += 1

total_size_gb = total_size / (1024**3) # Convert bytes to gigabytes
return {"total_files": total_files, "total_size_gb": total_size_gb}

except ClientError as e:
print(f"Error getting bucket stats: {e}")
return None


class StorageMaintainer(threading.Thread):
"""Maintain frigates recording storage."""

Expand Down
16 changes: 16 additions & 0 deletions migrations/017_recording_storage_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import datetime as dt
import peewee as pw
from playhouse.sqlite_ext import *
from decimal import ROUND_HALF_EVEN
from frigate.models import Recordings


def migrate(migrator, database, fake=False, **kwargs):
migrator.add_fields(
Recordings,
storage=pw.CharField(max_length=20,default="local"),
)


def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_fields(Recordings, ["storage"])
1 change: 1 addition & 0 deletions requirements-wheels.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
boto3 == 1.26.*
click == 8.1.*
Flask == 2.3.*
imutils == 0.5.*
Expand Down
Loading