Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/hash-file-contents #60

Merged
merged 5 commits into from
Jun 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 21 additions & 27 deletions cdp_backend/pipeline/event_gather_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import hashlib
import logging
from typing import Callable, List

from prefect import Flow, case
from prefect import Flow
from prefect.tasks.control_flow import case

from ..database import functions as db_functions
from ..file_store import functions as fs_functions
Expand Down Expand Up @@ -70,11 +70,8 @@ def create_event_gather_flow(
for session in event.sessions:
# TODO create/get transcript

# Create unique key for video uri
key = hashlib.sha256(session.video_uri.encode("utf8")).hexdigest()

# create/get audio (happens as part of transcript process)
create_or_get_audio(key, session.video_uri, bucket, credentials_file)
# Create or get audio (happens as part of transcript process)
create_or_get_audio(session.video_uri, bucket, credentials_file)

db_functions.upload_db_model_task(
db_functions.create_session_from_ingestion_model(
Expand All @@ -87,16 +84,14 @@ def create_event_gather_flow(
return flow


def create_or_get_audio(
key: str, video_uri: str, bucket: str, credentials_file: str
) -> str:
def create_or_get_audio(video_uri: str, bucket: str, credentials_file: str) -> str:
"""
Creates an audio file from a video uri and uploads it to the filestore and db.

Parameters
----------
key: str
The unique key made from a hash value of the video uri.
video_uri: str
The uri to the video file to split audio from.
Comment on lines +93 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the documentation here, I must've forgot this earlier

bucket: str
Name of the GCS bucket to upload files to.
credentials_file: str
Expand All @@ -107,26 +102,25 @@ def create_or_get_audio(
audio_uri: str
The uri of the created audio file in the file store.
"""
# Get just the video filename from the full uri
video_filename = video_uri.split("/")[-1]
tmp_video_filepath = file_util_functions.external_resource_copy_task(
uri=video_uri, dst=video_filename
)

# Hash the video contents
key = file_util_functions.hash_file_contents_task(uri=tmp_video_filepath)

tmp_audio_filepath = f"{key}_audio.wav"
# Check for existing audio
tmp_audio_filepath = file_util_functions.join_strs_and_extension(
parts=[key, "audio"], extension="wav"
)
audio_uri = fs_functions.get_file_uri_task(
bucket=bucket, filename=tmp_audio_filepath, credentials_file=credentials_file
)

# If no existing audio uri
with case(audio_uri, None): # type: ignore
# Store the video in temporary file
filename = video_uri.split("/")[-1]
if "." in filename:
suffix = filename.split(".")[-1]
else:
suffix = ""

tmp_video_filename = f"tmp_{key}_video.{suffix}"
tmp_video_filepath = file_util_functions.external_resource_copy_task(
uri=video_uri, dst=tmp_video_filename
)

# If no existing audio uri, process video
with case(audio_uri, None):
# Split and store the audio in temporary file prior to upload
(
tmp_audio_filepath,
Expand Down
4 changes: 2 additions & 2 deletions cdp_backend/tests/file_store/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


def test_initialize_gcs_file_system() -> None:
with mock.patch("gcsfs.GCSFileSystem.connect"):
with mock.patch("gcsfs.credentials.GoogleCredentials.connect"):
assert isinstance(
functions.initialize_gcs_file_system("path/to/credentials"), GCSFileSystem
)
Expand All @@ -48,7 +48,7 @@ def test_get_file_uri(
exists: bool,
expected: Optional[str],
) -> None:
with mock.patch("gcsfs.GCSFileSystem.connect"):
with mock.patch("gcsfs.credentials.GoogleCredentials.connect"):
with mock.patch("gcsfs.GCSFileSystem.exists") as mock_exists:
mock_exists.return_value = exists

Expand Down
12 changes: 8 additions & 4 deletions cdp_backend/tests/pipeline/test_event_gather_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,43 @@ def test_create_event_gather_flow(func: Callable) -> None:
@mock.patch("cdp_backend.database.functions.upload_db_model_task")
@mock.patch("cdp_backend.database.functions.create_file")
@mock.patch("cdp_backend.file_store.functions.create_filename_from_filepath")
@mock.patch("cdp_backend.utils.file_utils.hash_file_contents_task")
@mock.patch("cdp_backend.utils.file_utils.join_strs_and_extension")
@mock.patch("cdp_backend.utils.file_utils.split_audio_task")
@mock.patch("cdp_backend.utils.file_utils.external_resource_copy_task")
@mock.patch("cdp_backend.file_store.functions.upload_file_task")
@mock.patch("cdp_backend.file_store.functions.get_file_uri_task")
@pytest.mark.parametrize(
"key, video_uri, get_file_uri_value, upload_file_value, expected_audio_uri",
"video_uri, get_file_uri_value, upload_file_value, expected_audio_uri",
[
# TODO add test case for when audio uri doesn't exist
("123", "video_uri", None, "audio_uri", "audio_uri")
("video_uri", None, "audio_uri", "audio_uri")
],
)
def test_create_or_get_audio(
mock_get_file_uri: MagicMock,
mock_upload_file: MagicMock,
mock_external_copy: MagicMock,
mock_audio: MagicMock,
mock_hash_file: MagicMock,
mock_create_filename_from_parts: MagicMock,
mock_create_filename: MagicMock,
mock_create_file: MagicMock,
mock_upload_db: MagicMock,
mock_remove_file: MagicMock,
key: str,
video_uri: str,
get_file_uri_value: str,
upload_file_value: str,
expected_audio_uri: str,
) -> None:
mock_get_file_uri.return_value = get_file_uri_value
mock_external_copy.return_value = "mock value"
mock_hash_file.return_value = "abc"
mock_create_filename_from_parts.return_value = "abc_audio.wav"
mock_audio.return_value = ("audio path", "err", "out")
mock_upload_file.return_value = upload_file_value

actual_audio_uri = pipeline.create_or_get_audio(
key=key,
video_uri=video_uri,
bucket="bucket",
credentials_file="/fake/credentials/path",
Expand Down
39 changes: 38 additions & 1 deletion cdp_backend/tests/utils/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from pathlib import Path
from typing import BinaryIO, Generator, Optional
from typing import BinaryIO, Generator, List, Optional
from unittest import mock

import pytest
Expand Down Expand Up @@ -74,6 +74,43 @@ def test_external_resource_copy(tmpdir: LocalPath, mocked_request: Generator) ->
external_resource_copy("https://doesntmatter.com/example.mp4", save_path)


def test_hash_file_contents(tmpdir: LocalPath) -> None:
test_file = Path(tmpdir) / "a.txt"

with open(test_file, "w") as open_f:
open_f.write("hello")

hash_a = file_utils.hash_file_contents_task.run(
str(test_file.absolute())
) # type: ignore

with open(test_file, "w") as open_f:
open_f.write("world")

hash_b = file_utils.hash_file_contents_task.run(
str(test_file.absolute())
) # type: ignore

assert hash_a != hash_b


@pytest.mark.parametrize(
"parts, extension, delimiter, expected",
[
(["hello", "world"], "mp4", "_", "hello_world.mp4"),
(["a", "b", "c"], "wav", "-", "a-b-c.wav"),
(["single"], "png", "***", "single.png"),
],
)
def test_join_strs_and_extension(
parts: List[str], extension: str, delimiter: str, expected: str
) -> None:
result = file_utils.join_strs_and_extension.run(
parts=parts, extension=extension, delimiter=delimiter
) # type: ignore
assert result == expected


@pytest.mark.parametrize(
"audio_save_path",
[
Expand Down
47 changes: 46 additions & 1 deletion cdp_backend/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# -*- coding: utf-8 -*-
import logging
import shutil
from hashlib import sha256
from pathlib import Path
from typing import Optional, Tuple, Union
from typing import List, Optional, Tuple, Union

import dask.dataframe as dd
import ffmpeg
import fsspec
import requests
from prefect import task

Expand Down Expand Up @@ -62,6 +64,7 @@ def external_resource_copy(
) -> str:
"""
Copy an external resource to a local destination on the machine.

Parameters
----------
uri: str
Expand All @@ -72,6 +75,7 @@ def external_resource_copy(
overwrite: bool
Boolean value indicating whether or not to overwrite a local resource with
the same name if it already exists.

Returns
-------
saved_path: str
Expand Down Expand Up @@ -106,12 +110,14 @@ def split_audio(
) -> Tuple[str, str, str]:
"""
Split and store the audio from a video file using ffmpeg.

Parameters
----------
video_read_path: str
Path to the video to split the audio from.
audio_save_path: str
Path to where the audio should be stored.

Returns
-------
resolved_audio_save_path: str
Expand Down Expand Up @@ -163,6 +169,45 @@ def split_audio(
)


@task
def hash_file_contents_task(uri: str, buffer_size: int = 2 ** 16) -> str:
"""
Return the SHA256 hash of a file's content.

Parameters
----------
uri: str
The uri for the file to hash.
buffer_size: int
The number of bytes to read at a time.
Default: 2^16 (64KB)

Returns
-------
hash: str
The SHA256 hash for the file contents.
"""
hasher = sha256()

with fsspec.open(uri, "rb") as open_resource:
while True:
block = open_resource.read(buffer_size)
if not block:
break

hasher.update(block)

return hasher.hexdigest()


@task
def join_strs_and_extension(
parts: List[str], extension: str, delimiter: str = "_"
) -> str:
name_without_suffix = delimiter.join(parts)
return f"{name_without_suffix}.{extension}"


@task
def external_resource_copy_task(
uri: str, dst: Optional[Union[str, Path]] = None, overwrite: bool = False
Expand Down
19 changes: 9 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,22 @@
]

requirements = [
"dask[bag]~=2020.12.0",
"dask[bag]~=2021.6.0",
"dataclasses-json~=0.5.2",
"ffmpeg-python~=0.2.0",
"fireo~=1.3.7",
"fsspec~=0.8.3",
"gcsfs~=0.7.1",
"fsspec~=2021.5.0",
"gcsfs~=2021.5.0",
"google-cloud-speech~=1.3.2",
"graphviz~=0.14",
"nltk>=3.5",
"pandas~=1.1.3",
"graphviz~=0.16",
"pandas~=1.2.4",
"prefect~=0.14.0",
"pulumi~=3.0.0",
"pulumi~=3.3.0",
"pulumi-google-native~=0.1.0",
"pulumi-gcp~=5.0.0",
"pulumi-gcp~=5.7.0",
"spacy~=3.0.6",
"truecase>=0.0.9",
"webvtt-py>=0.4.5",
"truecase~=0.0.9",
"webvtt-py~=0.4.6",
]

extra_requirements = {
Expand Down