Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Dependent] Fix task creation with cloud storage data #7903

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed

- Optimized memory usage by not keeping all downloaded images/part of images in memory while creating a manifest file
(<https://github.com/cvat-ai/cvat/pull/7903>)
- Optimized the number of requests to CS providers by downloading only images from a specified range
(use_cache in True/False) (<https://github.com/cvat-ai/cvat/pull/7903>)
2 changes: 1 addition & 1 deletion cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def _prepare_cloud_preview(self, db_storage):
manifest.set_index()
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
if not len(manifest):
continue
preview_info = manifest[0]
preview_info = manifest.get_first_not_empty_item()
preview_filename = ''.join([preview_info['name'], preview_info['extension']])
preview_path = os.path.join(manifest_prefix, preview_filename)
break
Expand Down
82 changes: 51 additions & 31 deletions cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import functools
import json
import os
import math
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
from abc import ABC, abstractmethod, abstractproperty
from enum import Enum
from io import BytesIO
from multiprocessing.pool import ThreadPool
from typing import Dict, List, Optional, Any, Callable, TypeVar
from typing import Dict, List, Optional, Any, Callable, TypeVar, Iterator
from concurrent.futures import ThreadPoolExecutor, as_completed

import boto3
from azure.core.exceptions import HttpResponseError, ResourceExistsError
Expand All @@ -31,10 +32,27 @@
from cvat.apps.engine.models import CloudProviderChoice, CredentialsTypeChoice
from cvat.apps.engine.utils import get_cpu_number

class NamedBytesIO(BytesIO):
@property
def filename(self) -> Optional[str]:
return getattr(self, '_filename', None)

@filename.setter
def filename(self, value: str) -> None:
self._filename = value

slogger = ServerLogManager(__name__)

ImageFile.LOAD_TRUNCATED_IMAGES = True

CPU_NUMBER = get_cpu_number()

def normalize_threads_number(threads_number: Optional[int], number_of_files: int) -> int:
if threads_number is None:
return min(CPU_NUMBER, settings.CVAT_MAX_THREADS_NUMBER_FOR_DATA_DOWNLOADING, max(math.ceil(number_of_files / settings.CVAT_NUMBER_OF_FILES_PER_THREAD), 1))

return min(threads_number, CPU_NUMBER, settings.CVAT_MAX_THREADS_NUMBER_FOR_DATA_DOWNLOADING)

coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
class Status(str, Enum):
AVAILABLE = 'AVAILABLE'
NOT_FOUND = 'NOT_FOUND'
Expand Down Expand Up @@ -128,7 +146,7 @@ def get_file_last_modified(self, key):
pass

@abstractmethod
def download_fileobj(self, key):
def download_fileobj(self, key: str) -> NamedBytesIO:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method download_fileobj is implemented in multiple classes but seems to have a similar structure. Consider refactoring to reduce duplication and improve maintainability. Perhaps a base class implementation could be used, with subclasses overriding only the necessary parts.

Also applies to: 551-559, 753-762, 867-872

pass

def download_file(self, key, path):
Expand Down Expand Up @@ -163,7 +181,7 @@ def download_range_of_bytes(self, key: str, stop_byte: int, start_byte: int = 0)
def _download_range_of_bytes(self, key: str, stop_byte: int, start_byte: int):
pass

def optimally_image_download(self, key: str, chunk_size: int = 65536) -> BytesIO:
def optimally_image_download(self, key: str, chunk_size: int = 65536) -> NamedBytesIO:
"""
Method downloads image by the following approach:
Firstly we try to download the first N bytes of image which will be enough for determining image properties.
Expand All @@ -176,51 +194,50 @@ def optimally_image_download(self, key: str, chunk_size: int = 65536) -> BytesIO
Returns:
BytesIO: Buffer with image
"""
image_parser=ImageFile.Parser()
image_parser = ImageFile.Parser()
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved

chunk = self.download_range_of_bytes(key, chunk_size - 1)
image_parser.feed(chunk)

if image_parser.image:
buff = BytesIO(chunk)
buff = NamedBytesIO(chunk)
buff.filename = key
else:
buff = self.download_fileobj(key)
image_size_in_bytes = len(buff.getvalue())
slogger.glob.warning(
f'The {chunk_size} bytes were not enough to parse "{key}" image. '
f'Image size was {image_size_in_bytes} bytes. Image resolution was {Image.open(buff).size}. '
f'Downloaded percent was {round(min(chunk_size, image_size_in_bytes) / image_size_in_bytes * 100)}')
buff.filename = key

return buff

def bulk_download_to_memory(
self,
files: List[str],
threads_number: int = min(get_cpu_number(), 4),
*,
threads_number: Optional[int] = None,
_use_optimal_downloading: bool = True,
) -> List[BytesIO]:
) -> Iterator[BytesIO]:
func = self.optimally_image_download if _use_optimal_downloading else self.download_fileobj
if threads_number > 1:
with ThreadPool(threads_number) as pool:
return pool.map(func, files)
else:
slogger.glob.warning('Download files to memory in series in one thread.')
return [func(f) for f in files]
threads_number = normalize_threads_number(threads_number, len(files))

with ThreadPoolExecutor(max_workers=threads_number) as executor:
yield from executor.map(func, files)
Comment on lines +234 to +242
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of bulk_download_to_memory and bulk_download_to_dir using threading is a robust enhancement. However, ensure that exceptions during thread execution are handled to prevent the application from crashing. Consider wrapping the thread execution in a try-except block.

+        try:
+            with ThreadPoolExecutor(max_workers=threads_number) as executor:
+                yield from executor.map(func, files)
+        except Exception as ex:
+            slogger.glob.error(f"Error during bulk download: {ex}")
+            raise

Also applies to: 248-258


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
*,
threads_number: Optional[int] = None,
_use_optimal_downloading: bool = True,
) -> List[BytesIO]:
) -> Iterator[BytesIO]:
func = self.optimally_image_download if _use_optimal_downloading else self.download_fileobj
if threads_number > 1:
with ThreadPool(threads_number) as pool:
return pool.map(func, files)
else:
slogger.glob.warning('Download files to memory in series in one thread.')
return [func(f) for f in files]
threads_number = normalize_threads_number(threads_number, len(files))
with ThreadPoolExecutor(max_workers=threads_number) as executor:
yield from executor.map(func, files)
*,
threads_number: Optional[int] = None,
_use_optimal_downloading: bool = True,
) -> Iterator[BytesIO]:
func = self.optimally_image_download if _use_optimal_downloading else self.download_fileobj
threads_number = normalize_threads_number(threads_number, len(files))
try:
with ThreadPoolExecutor(max_workers=threads_number) as executor:
yield from executor.map(func, files)
except Exception as ex:
slogger.glob.error(f"Error during bulk download: {ex}")
raise

zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved

def bulk_download_to_dir(
self,
files: List[str],
upload_dir: str,
threads_number: int = min(get_cpu_number(), 4),
):
args = zip(files, [os.path.join(upload_dir, f) for f in files])
if threads_number > 1:
with ThreadPool(threads_number) as pool:
return pool.map(lambda x: self.download_file(*x), args)
else:
slogger.glob.warning(f'Download files to {upload_dir} directory in series in one thread.')
for f, path in args:
self.download_file(f, path)
*,
threads_number: Optional[int] = None,
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
threads_number = normalize_threads_number(threads_number, len(files))

with ThreadPoolExecutor(max_workers=threads_number) as executor:
futures = [executor.submit(self.download_file, f, os.path.join(upload_dir, f)) for f in files]
for future in as_completed(futures):
future.result()

@abstractmethod
def upload_fileobj(self, file_obj, file_name):
Expand Down Expand Up @@ -513,14 +530,15 @@ def _list_raw_content_on_one_page(

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key):
buf = BytesIO()
def download_fileobj(self, key: str) -> NamedBytesIO:
buf = NamedBytesIO()
self.bucket.download_fileobj(
Key=key,
Fileobj=buf,
Config=TransferConfig(max_io_queue=self.transfer_config['max_io_queue'])
)
buf.seek(0)
buf.filename = key
return buf

@validate_file_status
Expand Down Expand Up @@ -714,15 +732,16 @@ def _list_raw_content_on_one_page(

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key):
buf = BytesIO()
def download_fileobj(self, key: str) -> NamedBytesIO:
buf = NamedBytesIO()
storage_stream_downloader = self._client.download_blob(
blob=key,
offset=None,
length=None,
)
storage_stream_downloader.download_to_stream(buf, max_concurrency=self.MAX_CONCURRENCY)
buf.seek(0)
buf.filename = key
return buf

@validate_file_status
Expand Down Expand Up @@ -827,11 +846,12 @@ def _list_raw_content_on_one_page(

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key):
buf = BytesIO()
def download_fileobj(self, key: str) -> NamedBytesIO:
buf = NamedBytesIO()
blob = self.bucket.blob(key)
self._client.download_blob_to_file(blob, buf)
buf.seek(0)
buf.filename = key
return buf

@validate_file_status
Expand Down
48 changes: 42 additions & 6 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,27 @@ def _create_task_manifest_from_cloud_data(
db_storage: models.CloudStorage,
sorted_media: List[str],
manifest: ImageManifestManager,
*,
start_frame: int = 0,
stop_frame: Optional[int] = None,
step: int = 1,
dimension: models.DimensionType = models.DimensionType.DIM_2D,
) -> None:
if stop_frame is None:
stop_frame = len(sorted_media) - 1
cloud_storage_instance = db_storage_to_storage_instance(db_storage)
content = cloud_storage_instance.bulk_download_to_memory(sorted_media)
manifest.link(sources=content, DIM_3D=dimension == models.DimensionType.DIM_3D)
filtered_sorted_media = sorted_media
if start_frame != 0 or step != 1 or stop_frame != len(sorted_media) - 1:
filtered_sorted_media = sorted_media[start_frame : stop_frame + 1 : step]
content_generator = cloud_storage_instance.bulk_download_to_memory(filtered_sorted_media)

manifest.link(
sources=content_generator,
DIM_3D=dimension == models.DimensionType.DIM_3D,
stop=stop_frame,
start=start_frame,
step=step,
)
manifest.create()

@transaction.atomic
Expand Down Expand Up @@ -657,7 +673,20 @@ def _update_status(msg: str) -> None:
filtered_data = []
for files in (i for i in media.values() if i):
filtered_data.extend(files)
_download_data_from_cloud_storage(db_data.cloud_storage, filtered_data, upload_dir)
media_to_download = filtered_data

if media['image']:
start_frame = db_data.start_frame
stop_frame = len(filtered_data) - 1
if data['stop_frame'] is not None:
stop_frame = min(stop_frame, data['stop_frame'])

step = db_data.get_frame_step()
if start_frame or step != 1 or stop_frame != len(filtered_data) - 1:
media_to_download = filtered_data[start_frame : stop_frame + 1: step]
_download_data_from_cloud_storage(db_data.cloud_storage, media_to_download, upload_dir)
del media_to_download
del filtered_data
is_data_in_cloud = False
db_data.storage = models.StorageChoice.LOCAL
else:
Expand Down Expand Up @@ -686,7 +715,13 @@ def _update_status(msg: str) -> None:
cloud_storage_manifest, manifest)
else: # without manifest file but with use_cache option
# Define task manifest content based on list with uploaded files
_create_task_manifest_from_cloud_data(db_data.cloud_storage, sorted_media, manifest)
stop_frame = len(sorted_media) - 1
if data['stop_frame'] is not None:
stop_frame = min(stop_frame, data['stop_frame'])
_create_task_manifest_from_cloud_data(
db_data.cloud_storage, sorted_media, manifest,
start_frame=db_data.start_frame, stop_frame=stop_frame, step=db_data.get_frame_step()
)

av_scan_paths(upload_dir)

Expand Down Expand Up @@ -910,10 +945,11 @@ def update_progress(progress):
# calculate chunk size if it isn't specified
if db_data.chunk_size is None:
if isinstance(compressed_chunk_writer, ZipCompressedChunkWriter):
first_image_idx = db_data.start_frame
if not is_data_in_cloud:
w, h = extractor.get_image_size(0)
w, h = extractor.get_image_size(first_image_idx)
else:
img_properties = manifest[0]
img_properties = manifest[first_image_idx]
w, h = img_properties['width'], img_properties['height']
area = h * w
db_data.chunk_size = max(2, min(72, 36 * 1920 * 1080 // area))
Expand Down
3 changes: 3 additions & 0 deletions cvat/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,3 +717,6 @@ class CVAT_QUEUES(Enum):

from cvat.rq_patching import update_started_job_registry_cleanup
update_started_job_registry_cleanup()

CVAT_MAX_THREADS_NUMBER_FOR_DATA_DOWNLOADING = 4
CVAT_NUMBER_OF_FILES_PER_THREAD = 1000
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading