Skip to content

Commit

Permalink
feature/hash-file-contents (#60)
Browse files Browse the repository at this point in the history
* Bump deps

* Add hash file contents function and tests

* Impl hash file content in pipeline and extra funcs

* Update gcsfs test mocks after dep update

* Fix typing on tests
  • Loading branch information
Jackson Maxfield Brown committed Jun 6, 2021
1 parent 5f09efc commit 47d9fc5
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 45 deletions.
48 changes: 21 additions & 27 deletions cdp_backend/pipeline/event_gather_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import hashlib
import logging
from typing import Callable, List

from prefect import Flow, case
from prefect import Flow
from prefect.tasks.control_flow import case

from ..database import functions as db_functions
from ..file_store import functions as fs_functions
Expand Down Expand Up @@ -70,11 +70,8 @@ def create_event_gather_flow(
for session in event.sessions:
# TODO create/get transcript

# Create unique key for video uri
key = hashlib.sha256(session.video_uri.encode("utf8")).hexdigest()

# create/get audio (happens as part of transcript process)
create_or_get_audio(key, session.video_uri, bucket, credentials_file)
# Create or get audio (happens as part of transcript process)
create_or_get_audio(session.video_uri, bucket, credentials_file)

db_functions.upload_db_model_task(
db_functions.create_session_from_ingestion_model(
Expand All @@ -87,16 +84,14 @@ def create_event_gather_flow(
return flow


def create_or_get_audio(
key: str, video_uri: str, bucket: str, credentials_file: str
) -> str:
def create_or_get_audio(video_uri: str, bucket: str, credentials_file: str) -> str:
"""
Creates an audio file from a video uri and uploads it to the filestore and db.
Parameters
----------
key: str
The unique key made from a hash value of the video uri.
video_uri: str
The uri to the video file to split audio from.
bucket: str
Name of the GCS bucket to upload files to.
credentials_file: str
Expand All @@ -107,26 +102,25 @@ def create_or_get_audio(
audio_uri: str
The uri of the created audio file in the file store.
"""
# Get just the video filename from the full uri
video_filename = video_uri.split("/")[-1]
tmp_video_filepath = file_util_functions.external_resource_copy_task(
uri=video_uri, dst=video_filename
)

# Hash the video contents
key = file_util_functions.hash_file_contents_task(uri=tmp_video_filepath)

tmp_audio_filepath = f"{key}_audio.wav"
# Check for existing audio
tmp_audio_filepath = file_util_functions.join_strs_and_extension(
parts=[key, "audio"], extension="wav"
)
audio_uri = fs_functions.get_file_uri_task(
bucket=bucket, filename=tmp_audio_filepath, credentials_file=credentials_file
)

# If no existing audio uri
with case(audio_uri, None): # type: ignore
# Store the video in temporary file
filename = video_uri.split("/")[-1]
if "." in filename:
suffix = filename.split(".")[-1]
else:
suffix = ""

tmp_video_filename = f"tmp_{key}_video.{suffix}"
tmp_video_filepath = file_util_functions.external_resource_copy_task(
uri=video_uri, dst=tmp_video_filename
)

# If no existing audio uri, process video
with case(audio_uri, None):
# Split and store the audio in temporary file prior to upload
(
tmp_audio_filepath,
Expand Down
4 changes: 2 additions & 2 deletions cdp_backend/tests/file_store/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


def test_initialize_gcs_file_system() -> None:
with mock.patch("gcsfs.GCSFileSystem.connect"):
with mock.patch("gcsfs.credentials.GoogleCredentials.connect"):
assert isinstance(
functions.initialize_gcs_file_system("path/to/credentials"), GCSFileSystem
)
Expand All @@ -48,7 +48,7 @@ def test_get_file_uri(
exists: bool,
expected: Optional[str],
) -> None:
with mock.patch("gcsfs.GCSFileSystem.connect"):
with mock.patch("gcsfs.credentials.GoogleCredentials.connect"):
with mock.patch("gcsfs.GCSFileSystem.exists") as mock_exists:
mock_exists.return_value = exists

Expand Down
12 changes: 8 additions & 4 deletions cdp_backend/tests/pipeline/test_event_gather_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,43 @@ def test_create_event_gather_flow(func: Callable) -> None:
@mock.patch("cdp_backend.database.functions.upload_db_model_task")
@mock.patch("cdp_backend.database.functions.create_file")
@mock.patch("cdp_backend.file_store.functions.create_filename_from_filepath")
@mock.patch("cdp_backend.utils.file_utils.hash_file_contents_task")
@mock.patch("cdp_backend.utils.file_utils.join_strs_and_extension")
@mock.patch("cdp_backend.utils.file_utils.split_audio_task")
@mock.patch("cdp_backend.utils.file_utils.external_resource_copy_task")
@mock.patch("cdp_backend.file_store.functions.upload_file_task")
@mock.patch("cdp_backend.file_store.functions.get_file_uri_task")
@pytest.mark.parametrize(
"key, video_uri, get_file_uri_value, upload_file_value, expected_audio_uri",
"video_uri, get_file_uri_value, upload_file_value, expected_audio_uri",
[
# TODO add test case for when audio uri doesn't exist
("123", "video_uri", None, "audio_uri", "audio_uri")
("video_uri", None, "audio_uri", "audio_uri")
],
)
def test_create_or_get_audio(
mock_get_file_uri: MagicMock,
mock_upload_file: MagicMock,
mock_external_copy: MagicMock,
mock_audio: MagicMock,
mock_hash_file: MagicMock,
mock_create_filename_from_parts: MagicMock,
mock_create_filename: MagicMock,
mock_create_file: MagicMock,
mock_upload_db: MagicMock,
mock_remove_file: MagicMock,
key: str,
video_uri: str,
get_file_uri_value: str,
upload_file_value: str,
expected_audio_uri: str,
) -> None:
mock_get_file_uri.return_value = get_file_uri_value
mock_external_copy.return_value = "mock value"
mock_hash_file.return_value = "abc"
mock_create_filename_from_parts.return_value = "abc_audio.wav"
mock_audio.return_value = ("audio path", "err", "out")
mock_upload_file.return_value = upload_file_value

actual_audio_uri = pipeline.create_or_get_audio(
key=key,
video_uri=video_uri,
bucket="bucket",
credentials_file="/fake/credentials/path",
Expand Down
39 changes: 38 additions & 1 deletion cdp_backend/tests/utils/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from pathlib import Path
from typing import BinaryIO, Generator, Optional
from typing import BinaryIO, Generator, List, Optional
from unittest import mock

import pytest
Expand Down Expand Up @@ -74,6 +74,43 @@ def test_external_resource_copy(tmpdir: LocalPath, mocked_request: Generator) ->
external_resource_copy("https://doesntmatter.com/example.mp4", save_path)


def test_hash_file_contents(tmpdir: LocalPath) -> None:
test_file = Path(tmpdir) / "a.txt"

with open(test_file, "w") as open_f:
open_f.write("hello")

hash_a = file_utils.hash_file_contents_task.run(
str(test_file.absolute())
) # type: ignore

with open(test_file, "w") as open_f:
open_f.write("world")

hash_b = file_utils.hash_file_contents_task.run(
str(test_file.absolute())
) # type: ignore

assert hash_a != hash_b


@pytest.mark.parametrize(
"parts, extension, delimiter, expected",
[
(["hello", "world"], "mp4", "_", "hello_world.mp4"),
(["a", "b", "c"], "wav", "-", "a-b-c.wav"),
(["single"], "png", "***", "single.png"),
],
)
def test_join_strs_and_extension(
parts: List[str], extension: str, delimiter: str, expected: str
) -> None:
result = file_utils.join_strs_and_extension.run(
parts=parts, extension=extension, delimiter=delimiter
) # type: ignore
assert result == expected


@pytest.mark.parametrize(
"audio_save_path",
[
Expand Down
47 changes: 46 additions & 1 deletion cdp_backend/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# -*- coding: utf-8 -*-
import logging
import shutil
from hashlib import sha256
from pathlib import Path
from typing import Optional, Tuple, Union
from typing import List, Optional, Tuple, Union

import dask.dataframe as dd
import ffmpeg
import fsspec
import requests
from prefect import task

Expand Down Expand Up @@ -62,6 +64,7 @@ def external_resource_copy(
) -> str:
"""
Copy an external resource to a local destination on the machine.
Parameters
----------
uri: str
Expand All @@ -72,6 +75,7 @@ def external_resource_copy(
overwrite: bool
Boolean value indicating whether or not to overwrite a local resource with
the same name if it already exists.
Returns
-------
saved_path: str
Expand Down Expand Up @@ -106,12 +110,14 @@ def split_audio(
) -> Tuple[str, str, str]:
"""
Split and store the audio from a video file using ffmpeg.
Parameters
----------
video_read_path: str
Path to the video to split the audio from.
audio_save_path: str
Path to where the audio should be stored.
Returns
-------
resolved_audio_save_path: str
Expand Down Expand Up @@ -163,6 +169,45 @@ def split_audio(
)


@task
def hash_file_contents_task(uri: str, buffer_size: int = 2 ** 16) -> str:
"""
Return the SHA256 hash of a file's content.
Parameters
----------
uri: str
The uri for the file to hash.
buffer_size: int
The number of bytes to read at a time.
Default: 2^16 (64KB)
Returns
-------
hash: str
The SHA256 hash for the file contents.
"""
hasher = sha256()

with fsspec.open(uri, "rb") as open_resource:
while True:
block = open_resource.read(buffer_size)
if not block:
break

hasher.update(block)

return hasher.hexdigest()


@task
def join_strs_and_extension(
parts: List[str], extension: str, delimiter: str = "_"
) -> str:
name_without_suffix = delimiter.join(parts)
return f"{name_without_suffix}.{extension}"


@task
def external_resource_copy_task(
uri: str, dst: Optional[Union[str, Path]] = None, overwrite: bool = False
Expand Down
19 changes: 9 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,22 @@
]

requirements = [
"dask[bag]~=2020.12.0",
"dask[bag]~=2021.6.0",
"dataclasses-json~=0.5.2",
"ffmpeg-python~=0.2.0",
"fireo~=1.3.7",
"fsspec~=0.8.3",
"gcsfs~=0.7.1",
"fsspec~=2021.5.0",
"gcsfs~=2021.5.0",
"google-cloud-speech~=1.3.2",
"graphviz~=0.14",
"nltk>=3.5",
"pandas~=1.1.3",
"graphviz~=0.16",
"pandas~=1.2.4",
"prefect~=0.14.0",
"pulumi~=3.0.0",
"pulumi~=3.3.0",
"pulumi-google-native~=0.1.0",
"pulumi-gcp~=5.0.0",
"pulumi-gcp~=5.7.0",
"spacy~=3.0.6",
"truecase>=0.0.9",
"webvtt-py>=0.4.5",
"truecase~=0.0.9",
"webvtt-py~=0.4.6",
]

extra_requirements = {
Expand Down

0 comments on commit 47d9fc5

Please sign in to comment.