Skip to content

Commit

Permalink
[Data] Add concurrency and deprecate parallelism for read_parquet APIs (
Browse files Browse the repository at this point in the history
#42849)

This PR is to add a new `concurrency` parameter for read APIs. The motivation is to allow users to control concurrency for read operator as well, other than map operators.

TODO: Add `concurrency` parameter for all read APIs besides `read_parquet` once we agree on the change. Otherwise too many documentation change needs to make.

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Feb 7, 2024
1 parent d8b0fe9 commit d9b296e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 12 deletions.
2 changes: 2 additions & 0 deletions python/ray/data/_internal/logical/operators/read_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(
mem_size: Optional[int],
num_outputs: Optional[int] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
):
super().__init__(
f"Read{datasource.get_name()}",
Expand All @@ -26,6 +27,7 @@ def __init__(
self._datasource_or_legacy_reader = datasource_or_legacy_reader
self._parallelism = parallelism
self._mem_size = mem_size
self._concurrency = concurrency
self._detected_parallelism = None

def set_detected_parallelism(self, parallelism: int):
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ray
import ray.cloudpickle as cloudpickle
from ray.data._internal.compute import TaskPoolStrategy
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
from ray.data._internal.execution.interfaces.task_context import TaskContext
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
Expand Down Expand Up @@ -108,6 +109,7 @@ def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]:
inputs,
name=op.name,
target_max_block_size=None,
compute_strategy=TaskPoolStrategy(op._concurrency),
ray_remote_args=op._ray_remote_args,
)

Expand Down
45 changes: 33 additions & 12 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,23 +281,38 @@ def read_datasource(
*,
parallelism: int = -1,
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
**read_args,
) -> Dataset:
"""Read a stream from a custom :class:`~ray.data.Datasource`.
Args:
datasource: The :class:`~ray.data.Datasource` to read data from.
parallelism: The requested parallelism of the read. Parallelism might be
limited by the available partitioning of the datasource. If set to -1,
parallelism is automatically chosen based on the available cluster
resources and estimated in-memory data size.
parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
ray_remote_args: kwargs passed to :meth:`ray.remote` in the read tasks.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on available resource.
override_num_blocks: Override the number of output blocks of read tasks. By
default, the number of output blocks is dynamically decided based on
input data size and available resource. You should not need to manually
set this value in most cases.
read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource`
implementation.
ray_remote_args: kwargs passed to :meth:`ray.remote` in the read tasks.
Returns:
:class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`.
""" # noqa: E501
if parallelism != -1:
logger.warning(
"The argument ``parallelism`` is deprecated in Ray 2.10. Please specify "
"argument ``override_num_blocks`` instead."
)
elif override_num_blocks is not None:
parallelism = override_num_blocks

ctx = DataContext.get_current()

if ray_remote_args is None:
Expand Down Expand Up @@ -559,6 +574,8 @@ def read_parquet(
shuffle: Union[Literal["files"], None] = None,
include_paths: bool = False,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
**arrow_parquet_args,
) -> Dataset:
"""Creates a :class:`~ray.data.Dataset` from parquet files.
Expand Down Expand Up @@ -645,13 +662,7 @@ def read_parquet(
used. If ``None``, this function uses a system-chosen implementation.
columns: A list of column names to read. Only the specified columns are
read during the file scan.
parallelism: The amount of parallelism to use for the dataset. Defaults to -1,
which automatically determines the optimal parallelism for your
configuration. You should not need to manually set this value in most cases.
For details on how the parallelism is automatically determined and guidance
on how to tune it, see :ref:`Tuning read parallelism
<read_parallelism>`. Parallelism is upper bounded by the total number of
records in all the parquet files.
parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
tensor_column_schema: A dict of column name to PyArrow dtype and shape
mappings for converting a Parquet column containing serialized
Expand All @@ -674,6 +685,14 @@ def read_parquet(
include_paths: If ``True``, include the path to each file. File paths are
stored in the ``'path'`` column.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on available resource.
override_num_blocks: Override the number of output blocks of read tasks. By
default, the number of output blocks is dynamically decided based on
input data size and available resource. You should not need to manually
set this value in most cases.
Returns:
:class:`~ray.data.Dataset` producing records read from the specified parquet
Expand Down Expand Up @@ -707,6 +726,8 @@ def read_parquet(
datasource,
parallelism=parallelism,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)


Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path):
assert sorted(values) == [1, 2, 3, 4, 5, 6]
assert ds.schema().names == ["one"]

# Test concurrency.
ds = ray.data.read_parquet(data_path, filesystem=fs, concurrency=1)
values = [s["one"] for s in ds.take()]
assert sorted(values) == [1, 2, 3, 4, 5, 6]


@pytest.mark.parametrize(
"fs,data_path",
Expand Down

0 comments on commit d9b296e

Please sign in to comment.