From d9b296e348a0f0dacf7bfc2561a064e90e30314f Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 7 Feb 2024 12:21:28 -0800 Subject: [PATCH] [Data] Add concurrency and deprecate parallelism for read_parquet APIs (#42849) This PR is to add a new `concurrency` parameter for read APIs. The motivation is to allow users to control concurrency for read operator as well, other than map operators. TODO: Add `concurrency` parameter for all read APIs besides `read_parquet` once we agree on the change. Otherwise too many documentation change needs to make. Signed-off-by: Cheng Su --- .../logical/operators/read_operator.py | 2 + .../data/_internal/planner/plan_read_op.py | 2 + python/ray/data/read_api.py | 45 ++++++++++++++----- python/ray/data/tests/test_parquet.py | 5 +++ 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/python/ray/data/_internal/logical/operators/read_operator.py b/python/ray/data/_internal/logical/operators/read_operator.py index 61c7bc8f8fd5a..8f31802281417 100644 --- a/python/ray/data/_internal/logical/operators/read_operator.py +++ b/python/ray/data/_internal/logical/operators/read_operator.py @@ -15,6 +15,7 @@ def __init__( mem_size: Optional[int], num_outputs: Optional[int] = None, ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, ): super().__init__( f"Read{datasource.get_name()}", @@ -26,6 +27,7 @@ def __init__( self._datasource_or_legacy_reader = datasource_or_legacy_reader self._parallelism = parallelism self._mem_size = mem_size + self._concurrency = concurrency self._detected_parallelism = None def set_detected_parallelism(self, parallelism: int): diff --git a/python/ray/data/_internal/planner/plan_read_op.py b/python/ray/data/_internal/planner/plan_read_op.py index 511b8c3e26b8d..4dd3cb03edbeb 100644 --- a/python/ray/data/_internal/planner/plan_read_op.py +++ b/python/ray/data/_internal/planner/plan_read_op.py @@ -2,6 +2,7 @@ import ray import ray.cloudpickle as cloudpickle +from ray.data._internal.compute import TaskPoolStrategy from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle from ray.data._internal.execution.interfaces.task_context import TaskContext from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer @@ -108,6 +109,7 @@ def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]: inputs, name=op.name, target_max_block_size=None, + compute_strategy=TaskPoolStrategy(op._concurrency), ray_remote_args=op._ray_remote_args, ) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index d096e0a3319d5..8b2144d58f5f3 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -281,23 +281,38 @@ def read_datasource( *, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, **read_args, ) -> Dataset: """Read a stream from a custom :class:`~ray.data.Datasource`. Args: datasource: The :class:`~ray.data.Datasource` to read data from. - parallelism: The requested parallelism of the read. Parallelism might be - limited by the available partitioning of the datasource. If set to -1, - parallelism is automatically chosen based on the available cluster - resources and estimated in-memory data size. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. + ray_remote_args: kwargs passed to :meth:`ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on available resource. + override_num_blocks: Override the number of output blocks of read tasks. By + default, the number of output blocks is dynamically decided based on + input data size and available resource. You should not need to manually + set this value in most cases. read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource` implementation. - ray_remote_args: kwargs passed to :meth:`ray.remote` in the read tasks. Returns: :class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`. """ # noqa: E501 + if parallelism != -1: + logger.warning( + "The argument ``parallelism`` is deprecated in Ray 2.10. Please specify " + "argument ``override_num_blocks`` instead." + ) + elif override_num_blocks is not None: + parallelism = override_num_blocks + ctx = DataContext.get_current() if ray_remote_args is None: @@ -559,6 +574,8 @@ def read_parquet( shuffle: Union[Literal["files"], None] = None, include_paths: bool = False, file_extensions: Optional[List[str]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, **arrow_parquet_args, ) -> Dataset: """Creates a :class:`~ray.data.Dataset` from parquet files. @@ -645,13 +662,7 @@ def read_parquet( used. If ``None``, this function uses a system-chosen implementation. columns: A list of column names to read. Only the specified columns are read during the file scan. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - records in all the parquet files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. tensor_column_schema: A dict of column name to PyArrow dtype and shape mappings for converting a Parquet column containing serialized @@ -674,6 +685,14 @@ def read_parquet( include_paths: If ``True``, include the path to each file. File paths are stored in the ``'path'`` column. file_extensions: A list of file extensions to filter files by. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on available resource. + override_num_blocks: Override the number of output blocks of read tasks. By + default, the number of output blocks is dynamically decided based on + input data size and available resource. You should not need to manually + set this value in most cases. Returns: :class:`~ray.data.Dataset` producing records read from the specified parquet @@ -707,6 +726,8 @@ def read_parquet( datasource, parallelism=parallelism, ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index a9829d3e0133c..92fdc1f709732 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -181,6 +181,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path): assert sorted(values) == [1, 2, 3, 4, 5, 6] assert ds.schema().names == ["one"] + # Test concurrency. + ds = ray.data.read_parquet(data_path, filesystem=fs, concurrency=1) + values = [s["one"] for s in ds.take()] + assert sorted(values) == [1, 2, 3, 4, 5, 6] + @pytest.mark.parametrize( "fs,data_path",