diff --git a/python/ray/data/datasource/image_folder_datasource.py b/python/ray/data/datasource/image_folder_datasource.py index 5a77e8c3e2e96..b37add256ca27 100644 --- a/python/ray/data/datasource/image_folder_datasource.py +++ b/python/ray/data/datasource/image_folder_datasource.py @@ -1,24 +1,42 @@ import io +import logging import pathlib -from typing import TYPE_CHECKING, Tuple, Optional +import time +from typing import TYPE_CHECKING, List, Optional, Tuple, Union import numpy as np from ray.data._internal.util import _check_import +from ray.data.block import Block, BlockMetadata from ray.data.datasource.binary_datasource import BinaryDatasource from ray.data.datasource.datasource import Reader from ray.data.datasource.file_based_datasource import ( + _FileBasedDatasourceReader, + FileBasedDatasource, _resolve_paths_and_filesystem, FileExtensionFilter, ) +from ray.data.datasource.file_meta_provider import DefaultFileMetadataProvider +from ray.data.datasource.partitioning import PathPartitionFilter from ray.util.annotations import DeveloperAPI if TYPE_CHECKING: import pyarrow from ray.data.block import T + +logger = logging.getLogger(__name__) + IMAGE_EXTENSIONS = ["png", "jpg", "jpeg", "tiff", "bmp", "gif"] +# The default size multiplier for reading image data source. +# This essentially is using image on-disk file size to estimate +# in-memory data size. +IMAGE_ENCODING_RATIO_ESTIMATE_DEFAULT = 1 + +# The lower bound value to estimate image encoding ratio. +IMAGE_ENCODING_RATIO_ESTIMATE_LOWER_BOUND = 0.5 + @DeveloperAPI class ImageFolderDatasource(BinaryDatasource): @@ -119,10 +137,11 @@ def create_reader( paths, filesystem = _resolve_paths_and_filesystem([root]) root = paths[0] - return super().create_reader( + return _ImageFolderDatasourceReader( + delegate=self, paths=paths, - partition_filter=FileExtensionFilter(file_extensions=IMAGE_EXTENSIONS), filesystem=filesystem, + partition_filter=FileExtensionFilter(file_extensions=IMAGE_EXTENSIONS), root=root, size=size, mode=mode, @@ -135,7 +154,7 @@ def _read_file( root: str, size: Optional[Tuple[int, int]], mode: Optional[str], - ): + ) -> Block: import pandas as pd from PIL import Image @@ -160,6 +179,103 @@ def _read_file( ) +class _ImageFileMetadataProvider(DefaultFileMetadataProvider): + def _set_encoding_ratio(self, encoding_ratio: int): + """Set image file encoding ratio, to provide accurate size in bytes metadata.""" + self._encoding_ratio = encoding_ratio + + def _get_block_metadata( + self, + paths: List[str], + schema: Optional[Union[type, "pyarrow.lib.Schema"]], + *, + rows_per_file: Optional[int], + file_sizes: List[Optional[int]], + ) -> BlockMetadata: + metadata = super()._get_block_metadata( + paths, schema, rows_per_file=rows_per_file, file_sizes=file_sizes + ) + if metadata.size_bytes is not None: + metadata.size_bytes = int(metadata.size_bytes * self._encoding_ratio) + return metadata + + +class _ImageFolderDatasourceReader(_FileBasedDatasourceReader): + def __init__( + self, + delegate: FileBasedDatasource, + paths: List[str], + filesystem: "pyarrow.fs.FileSystem", + partition_filter: PathPartitionFilter, + meta_provider: _ImageFileMetadataProvider = _ImageFileMetadataProvider(), + **reader_args, + ): + super().__init__( + delegate=delegate, + paths=paths, + filesystem=filesystem, + schema=None, + open_stream_args=None, + meta_provider=meta_provider, + partition_filter=partition_filter, + **reader_args, + ) + self._encoding_ratio = self._estimate_files_encoding_ratio() + meta_provider._set_encoding_ratio(self._encoding_ratio) + + def estimate_inmemory_data_size(self) -> Optional[int]: + return sum(self._file_sizes) * self._encoding_ratio + + def _estimate_files_encoding_ratio(self) -> float: + """Return an estimate of the image files encoding ratio.""" + start_time = time.perf_counter() + # Filter out empty file to avoid noise. + non_empty_path_and_size = list( + filter(lambda p: p[1] > 0, zip(self._paths, self._file_sizes)) + ) + num_files = len(non_empty_path_and_size) + if num_files == 0: + logger.warn( + "All input image files are empty. " + "Use on-disk file size to estimate images in-memory size." + ) + return IMAGE_ENCODING_RATIO_ESTIMATE_DEFAULT + + size = self._reader_args.get("size") + mode = self._reader_args.get("mode") + if size is not None and mode is not None: + # Use image size and mode to calculate data size for all images, + # because all images are homogeneous with same size after resizing. + # Resizing is enforced when reading every image in ImageFolderDatasource + # when `size` argument is provided. + if mode in ["1", "L", "P"]: + dimension = 1 + elif mode in ["RGB", "YCbCr", "LAB", "HSV"]: + dimension = 3 + elif mode in ["RGBA", "CMYK", "I", "F"]: + dimension = 4 + else: + logger.warn(f"Found unknown image mode: {mode}.") + return IMAGE_ENCODING_RATIO_ESTIMATE_DEFAULT + height, width = size + single_image_size = height * width * dimension + total_estimated_size = single_image_size * num_files + total_file_size = sum(p[1] for p in non_empty_path_and_size) + ratio = total_estimated_size / total_file_size + else: + # TODO(chengsu): sample images to estimate data size + ratio = IMAGE_ENCODING_RATIO_ESTIMATE_DEFAULT + + sampling_duration = time.perf_counter() - start_time + if sampling_duration > 5: + logger.warn( + "Image input size estimation took " + f"{round(sampling_duration, 2)} seconds." + ) + logger.debug(f"Estimated image encoding ratio from sampling is {ratio}.") + return max(ratio, IMAGE_ENCODING_RATIO_ESTIMATE_LOWER_BOUND) + + def _get_class_from_path(path: str, root: str) -> str: # The class is the name of the first directory after the root. For example, if # the root is "/data/imagenet/train" and the path is diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index e8be7d429a30c..4ba1d12ce455b 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -11,6 +11,10 @@ import pyarrow.parquet as pq import pytest from ray.data.datasource.file_meta_provider import _handle_read_os_error +from ray.data.datasource.image_folder_datasource import ( + IMAGE_EXTENSIONS, + _ImageFolderDatasourceReader, +) import requests import snappy from fsspec.implementations.local import LocalFileSystem @@ -36,7 +40,10 @@ SimpleTorchDatasource, WriteResult, ) -from ray.data.datasource.file_based_datasource import _unwrap_protocol +from ray.data.datasource.file_based_datasource import ( + FileExtensionFilter, + _unwrap_protocol, +) from ray.data.datasource.parquet_datasource import ( PARALLELIZE_META_FETCH_THRESHOLD, _ParquetDatasourceReader, @@ -2952,6 +2959,49 @@ def preprocess(df): predictor.predict(dataset, feature_columns=["image"]) +@pytest.mark.parametrize( + "image_size,image_mode,expected_size,expected_ratio", + [(64, "RGB", 30000, 4), (32, "L", 3500, 0.5), (256, "RGBA", 750000, 85)], +) +def test_image_folder_reader_estimate_data_size( + ray_start_regular_shared, image_size, image_mode, expected_size, expected_ratio +): + root = "example://image-folders/different-sizes" + ds = ray.data.read_datasource( + ImageFolderDatasource(), + root=root, + size=(image_size, image_size), + mode=image_mode, + ) + + data_size = ds.size_bytes() + assert ( + data_size >= expected_size and data_size <= expected_size * 1.5 + ), "estimated data size is out of expected bound" + data_size = ds.fully_executed().size_bytes() + assert ( + data_size >= expected_size and data_size <= expected_size * 1.5 + ), "actual data size is out of expected bound" + + reader = _ImageFolderDatasourceReader( + delegate=ImageFolderDatasource(), + paths=[root], + filesystem=LocalFileSystem(), + partition_filter=FileExtensionFilter(file_extensions=IMAGE_EXTENSIONS), + root=root, + size=(image_size, image_size), + mode=image_mode, + ) + assert ( + reader._encoding_ratio >= expected_ratio + and reader._encoding_ratio <= expected_ratio * 1.5 + ), "encoding ratio is out of expected bound" + data_size = reader.estimate_inmemory_data_size() + assert ( + data_size >= expected_size and data_size <= expected_size * 1.5 + ), "estimated data size is out of expected bound" + + # NOTE: The last test using the shared ray_start_regular_shared cluster must use the # shutdown_only fixture so the shared cluster is shut down, otherwise the below # test_write_datasource_ray_remote_args test, which uses a cluster_utils cluster, will