Skip to content

Commit

Permalink
[Data] Improve stability of Parquet metadata prefetch task (#42044)
Browse files Browse the repository at this point in the history
This PR is a fix for Parquet metadata prefetching task, when reading a large amount of Parquet files on S3 (>50k). Before this PR, the Parquet prefetch metadata task is running on head node (w/ `DEFAULT` scheduling strategy), and not retry on S3 transient exception. So it can fail very quickly because it launches too many request from same node, and throttled by S3.

This PR does 3 things:
* Fix scheduling strategy to use `SPREAD` same as read task, to spread out metadata prefetch task across cluster. This avoids hit S3 w/ too many requests from same node.
* Auto-retry on `OSError`, where S3 throws transient error such as `Access Denied`, `Read Timeout`.
* Extract `num_cpus` default value out as a variable. So we can tune the value to control the concurrency of prefetch metadata task for particular workload. Sometime `num_cpus=0.5` does not work well.

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Dec 21, 2023
1 parent 71a3728 commit 6fdc9e3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/datasource/file_meta_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def _fetch_metadata_parallel(
**ray_remote_args,
) -> Iterator[Meta]:
"""Fetch file metadata in parallel using Ray tasks."""
remote_fetch_func = cached_remote_fn(fetch_func, num_cpus=0.5)
remote_fetch_func = cached_remote_fn(fetch_func)
if ray_remote_args:
remote_fetch_func = remote_fetch_func.options(**ray_remote_args)
# Choose a parallelism that results in a # of metadata fetches per task that
Expand Down
23 changes: 23 additions & 0 deletions python/ray/data/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@
FRAGMENTS_PER_META_FETCH = 6
PARALLELIZE_META_FETCH_THRESHOLD = 24

# The `num_cpus` for each metadata prefetching task.
# Default to 0.5 instead of 1 because it is cheaper than normal read task.
NUM_CPUS_FOR_META_FETCH_TASK = 0.5

# The application-level exceptions to retry for metadata prefetching task.
# Default to retry on `OSError` because AWS S3 would throw this transient
# error when load is too high.
RETRY_EXCEPTIONS_FOR_META_FETCH_TASK = [OSError]

# The number of rows to read per batch. This is sized to generate 10MiB batches
# for rows about 1KiB in size.
PARQUET_READER_ROW_BATCH_SIZE = 10_000
Expand Down Expand Up @@ -268,8 +277,22 @@ def __init__(

try:
prefetch_remote_args = {}
prefetch_remote_args["num_cpus"] = NUM_CPUS_FOR_META_FETCH_TASK
if self._local_scheduling:
prefetch_remote_args["scheduling_strategy"] = self._local_scheduling
else:
# Use the scheduling strategy ("SPREAD" by default) provided in
# `DataContext``, to spread out prefetch tasks in cluster, avoid
# AWS S3 throttling error.
# Note: this is the same scheduling strategy used by read tasks.
prefetch_remote_args[
"scheduling_strategy"
] = DataContext.get_current().scheduling_strategy
if RETRY_EXCEPTIONS_FOR_META_FETCH_TASK is not None:
prefetch_remote_args[
"retry_exceptions"
] = RETRY_EXCEPTIONS_FOR_META_FETCH_TASK

self._metadata = (
meta_provider.prefetch_file_metadata(
pq_ds.fragments, **prefetch_remote_args
Expand Down
14 changes: 13 additions & 1 deletion python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

import ray
from ray.data.block import BlockAccessor
from ray.data.context import DataContext
from ray.data.datasource import (
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
)
from ray.data.datasource.parquet_base_datasource import ParquetBaseDatasource
from ray.data.datasource.parquet_datasource import (
NUM_CPUS_FOR_META_FETCH_TASK,
PARALLELIZE_META_FETCH_THRESHOLD,
RETRY_EXCEPTIONS_FOR_META_FETCH_TASK,
ParquetDatasource,
_deserialize_fragments_with_retry,
_SerializedFragment,
Expand Down Expand Up @@ -206,7 +209,16 @@ def test_parquet_read_meta_provider(ray_start_regular_shared, fs, data_path):
pq.write_table(table, path2, filesystem=fs)

class TestMetadataProvider(DefaultParquetMetadataProvider):
def prefetch_file_metadata(self, fragments):
def prefetch_file_metadata(self, fragments, **ray_remote_args):
assert ray_remote_args["num_cpus"] == NUM_CPUS_FOR_META_FETCH_TASK
assert (
ray_remote_args["scheduling_strategy"]
== DataContext.get_current().scheduling_strategy
)
assert (
ray_remote_args["retry_exceptions"]
== RETRY_EXCEPTIONS_FOR_META_FETCH_TASK
)
return None

ds = ray.data.read_parquet(
Expand Down

0 comments on commit 6fdc9e3

Please sign in to comment.