Skip to content

Commit

Permalink
[Data] Move default metadata providers to separate file (ray-project#…
Browse files Browse the repository at this point in the history
…38031)

This PR consolidates the default metadata providers in a single constants file to improve maintainability.
  • Loading branch information
bveeramani committed Aug 4, 2023
1 parent 5fac9f7 commit 6263634
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 14 deletions.
23 changes: 23 additions & 0 deletions python/ray/data/datasource/_default_metadata_providers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from ray.data.datasource import (
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
FastFileMetadataProvider,
)
from ray.data.datasource.image_datasource import _ImageFileMetadataProvider

# Used by `read_parquet`
DEFAULT_PARQUET_METADATA_PROVIDER = DefaultParquetMetadataProvider()
# Used by `read_images`
DEFAULT_IMAGE_METADATA_PROVIDER = _ImageFileMetadataProvider()
# Used by `read_parquet_bulk`
DEFAULT_BULK_PARQUET_METADATA_PROVIDER = FastFileMetadataProvider()
# Used by all other file-based `read_*` APIs
DEFAULT_GENERIC_METADATA_PROVIDER = DefaultFileMetadataProvider()


__all__ = [
"DEFAULT_PARQUET_METADATA_PROVIDER",
"DEFAULT_IMAGE_METADATA_PROVIDER",
"DEFAULT_BULK_PARQUET_METADATA_PROVIDER",
"DEFAULT_GENERIC_METADATA_PROVIDER",
]
30 changes: 16 additions & 14 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@
Connection,
CSVDatasource,
Datasource,
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
FastFileMetadataProvider,
ImageDatasource,
JSONDatasource,
MongoDatasource,
Expand All @@ -67,11 +64,16 @@
TFRecordDatasource,
WebDatasetDatasource,
)
from ray.data.datasource._default_metadata_providers import (
DEFAULT_BULK_PARQUET_METADATA_PROVIDER,
DEFAULT_GENERIC_METADATA_PROVIDER,
DEFAULT_IMAGE_METADATA_PROVIDER,
DEFAULT_PARQUET_METADATA_PROVIDER,
)
from ray.data.datasource.file_based_datasource import (
_unwrap_arrow_serialization_workaround,
_wrap_arrow_serialization_workaround,
)
from ray.data.datasource.image_datasource import _ImageFileMetadataProvider
from ray.data.datasource.partitioning import Partitioning
from ray.types import ObjectRef
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
Expand Down Expand Up @@ -547,7 +549,7 @@ def read_parquet(
parallelism: int = -1,
ray_remote_args: Dict[str, Any] = None,
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
meta_provider: ParquetMetadataProvider = DEFAULT_PARQUET_METADATA_PROVIDER,
**arrow_parquet_args,
) -> Dataset:
"""Creates a :class:`~ray.data.Dataset` from parquet files.
Expand Down Expand Up @@ -681,7 +683,7 @@ def read_images(
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
parallelism: int = -1,
meta_provider: BaseFileMetadataProvider = _ImageFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_IMAGE_METADATA_PROVIDER,
ray_remote_args: Dict[str, Any] = None,
arrow_open_file_args: Optional[Dict[str, Any]] = None,
partition_filter: Optional[
Expand Down Expand Up @@ -820,7 +822,7 @@ def read_parquet_bulk(
ray_remote_args: Dict[str, Any] = None,
arrow_open_file_args: Optional[Dict[str, Any]] = None,
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
meta_provider: BaseFileMetadataProvider = FastFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_BULK_PARQUET_METADATA_PROVIDER,
partition_filter: Optional[PathPartitionFilter] = (
ParquetBaseDatasource.file_extension_filter()
),
Expand Down Expand Up @@ -925,7 +927,7 @@ def read_json(
parallelism: int = -1,
ray_remote_args: Dict[str, Any] = None,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_GENERIC_METADATA_PROVIDER,
partition_filter: Optional[
PathPartitionFilter
] = JSONDatasource.file_extension_filter(),
Expand Down Expand Up @@ -1045,7 +1047,7 @@ def read_csv(
parallelism: int = -1,
ray_remote_args: Dict[str, Any] = None,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_GENERIC_METADATA_PROVIDER,
partition_filter: Optional[PathPartitionFilter] = None,
partitioning: Partitioning = Partitioning("hive"),
ignore_missing_paths: bool = False,
Expand Down Expand Up @@ -1195,7 +1197,7 @@ def read_text(
parallelism: int = -1,
ray_remote_args: Optional[Dict[str, Any]] = None,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_GENERIC_METADATA_PROVIDER,
partition_filter: Optional[PathPartitionFilter] = None,
partitioning: Partitioning = None,
ignore_missing_paths: bool = False,
Expand Down Expand Up @@ -1284,7 +1286,7 @@ def read_numpy(
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
parallelism: int = -1,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_GENERIC_METADATA_PROVIDER,
partition_filter: Optional[
PathPartitionFilter
] = NumpyDatasource.file_extension_filter(),
Expand Down Expand Up @@ -1353,7 +1355,7 @@ def read_tfrecords(
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
parallelism: int = -1,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_GENERIC_METADATA_PROVIDER,
partition_filter: Optional[PathPartitionFilter] = None,
ignore_missing_paths: bool = False,
tf_schema: Optional["schema_pb2.Schema"] = None,
Expand Down Expand Up @@ -1454,7 +1456,7 @@ def read_webdataset(
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
parallelism: int = -1,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_GENERIC_METADATA_PROVIDER,
partition_filter: Optional[PathPartitionFilter] = None,
decoder: Optional[Union[bool, str, callable, list]] = True,
fileselect: Optional[Union[list, callable]] = None,
Expand Down Expand Up @@ -1519,7 +1521,7 @@ def read_binary_files(
parallelism: int = -1,
ray_remote_args: Dict[str, Any] = None,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
meta_provider: BaseFileMetadataProvider = DEFAULT_GENERIC_METADATA_PROVIDER,
partition_filter: Optional[PathPartitionFilter] = None,
partitioning: Partitioning = None,
ignore_missing_paths: bool = False,
Expand Down

0 comments on commit 6263634

Please sign in to comment.