diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 8b2144d58f5f3..a16202018383e 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -105,6 +105,7 @@ def from_items( items: List[Any], *, parallelism: int = -1, + override_num_blocks: Optional[int] = None, ) -> MaterializedDataset: """Create a :class:`~ray.data.Dataset` from a list of local Python objects. @@ -123,19 +124,18 @@ def from_items( Args: items: List of local Python objects. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see - :ref:`Tuning read parallelism `. - Parallelism is upper bounded by ``len(items)``. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`~ray.data.Dataset` holding the items. """ import builtins + parallelism = _get_num_output_blocks(parallelism, override_num_blocks) if parallelism == 0: raise ValueError(f"parallelism must be -1 or > 0, got: {parallelism}") @@ -187,7 +187,13 @@ def from_items( @PublicAPI -def range(n: int, *, parallelism: int = -1) -> Dataset: +def range( + n: int, + *, + parallelism: int = -1, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: """Creates a :class:`~ray.data.Dataset` from a range of integers [0..n). This function allows for easy creation of synthetic datasets for testing or @@ -204,13 +210,15 @@ def range(n: int, *, parallelism: int = -1) -> Dataset: Args: n: The upper bound of the range of integers. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see - :ref:`Tuning read parallelism `. - Parallelism is upper bounded by n. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`~ray.data.Dataset` producing the integers from the range 0 to n. @@ -222,11 +230,23 @@ def range(n: int, *, parallelism: int = -1) -> Dataset: """ datasource = RangeDatasource(n=n, block_format="arrow", column_name="id") - return read_datasource(datasource, parallelism=parallelism) + return read_datasource( + datasource, + parallelism=parallelism, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) @PublicAPI -def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Dataset: +def range_tensor( + n: int, + *, + shape: Tuple = (1,), + parallelism: int = -1, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: """Creates a :class:`~ray.data.Dataset` tensors of the provided shape from range [0...n]. @@ -251,13 +271,15 @@ def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Datas Args: n: The upper bound of the range of tensor records. shape: The shape of each tensor in the dataset. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see - :ref:`Tuning read parallelism `. - Parallelism is upper bounded by n. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`~ray.data.Dataset` producing the tensor data from range 0 to n. @@ -271,7 +293,12 @@ def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Datas datasource = RangeDatasource( n=n, block_format="tensor", column_name="data", tensor_shape=tuple(shape) ) - return read_datasource(datasource, parallelism=parallelism) + return read_datasource( + datasource, + parallelism=parallelism, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) @PublicAPI @@ -294,24 +321,18 @@ def read_datasource( concurrency: The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn't change the total number of tasks run or the total number of output blocks. By default, - concurrency is dynamically decided based on available resource. - override_num_blocks: Override the number of output blocks of read tasks. By - default, the number of output blocks is dynamically decided based on - input data size and available resource. You should not need to manually - set this value in most cases. + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource` implementation. Returns: :class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`. """ # noqa: E501 - if parallelism != -1: - logger.warning( - "The argument ``parallelism`` is deprecated in Ray 2.10. Please specify " - "argument ``override_num_blocks`` instead." - ) - elif override_num_blocks is not None: - parallelism = override_num_blocks + parallelism = _get_num_output_blocks(parallelism, override_num_blocks) ctx = DataContext.get_current() @@ -394,6 +415,7 @@ def read_datasource( inmemory_size, block_list._estimated_num_blocks, ray_remote_args, + concurrency, ) logical_plan = LogicalPlan(read_op) @@ -414,6 +436,8 @@ def read_mongo( schema: Optional["pymongoarrow.api.Schema"] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, **mongo_args, ) -> Dataset: """Create a :class:`~ray.data.Dataset` from a MongoDB database. @@ -463,13 +487,16 @@ def read_mongo( be read. schema: The schema used to read the collection. If None, it'll be inferred from the results of pipeline. - parallelism: The requested parallelism of the read. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. mongo_args: kwargs passed to `aggregate_arrow_all() `_ in pymongoarrow in producing @@ -491,7 +518,11 @@ def read_mongo( **mongo_args, ) return read_datasource( - datasource, parallelism=parallelism, ray_remote_args=ray_remote_args + datasource, + parallelism=parallelism, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -503,6 +534,8 @@ def read_bigquery( *, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> Dataset: """Create a dataset from BigQuery. @@ -543,10 +576,16 @@ def read_bigquery( For more information, see `Creating and Managing Projects `_. dataset: The name of the dataset hosted in BigQuery in the format of ``dataset_id.table_id``. Both the dataset_id and table_id must exist otherwise an exception will be raised. - parallelism: The requested parallelism of the read. If -1, it will be - automatically chosen based on the available cluster resources and estimated - in-memory data size. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to ray.remote in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: Dataset producing rows from the results of executing the query (or reading the entire dataset) @@ -557,6 +596,8 @@ def read_bigquery( datasource, parallelism=parallelism, ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -688,11 +729,11 @@ def read_parquet( concurrency: The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn't change the total number of tasks run or the total number of output blocks. By default, - concurrency is dynamically decided based on available resource. - override_num_blocks: Override the number of output blocks of read tasks. By - default, the number of output blocks is dynamically decided based on - input data size and available resource. You should not need to manually - set this value in most cases. + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: :class:`~ray.data.Dataset` producing records read from the specified parquet @@ -748,6 +789,8 @@ def read_images( ignore_missing_paths: bool = False, shuffle: Union[Literal["files"], None] = None, file_extensions: Optional[List[str]] = ImageDatasource._FILE_EXTENSIONS, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> Dataset: """Creates a :class:`~ray.data.Dataset` from image files. @@ -805,13 +848,7 @@ class string you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the `S3FileSystem` is used. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - records in all the CSV files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. meta_provider: A :ref:`file metadata provider `. Custom metadata providers may be able to resolve file metadata more quickly and/or accurately. In most cases, you do not need to set this. If ``None``, this @@ -843,6 +880,14 @@ class string shuffle: If setting to "files", randomly shuffle input files order before read. Defaults to not shuffle with ``None``. file_extensions: A list of file extensions to filter files by. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`~ray.data.Dataset` producing tensors that represent the images at @@ -871,7 +916,11 @@ class string file_extensions=file_extensions, ) return read_datasource( - datasource, parallelism=parallelism, ray_remote_args=ray_remote_args + datasource, + parallelism=parallelism, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -890,6 +939,8 @@ def read_parquet_bulk( shuffle: Union[Literal["files"], None] = None, include_paths: bool = False, file_extensions: Optional[List[str]] = ParquetBaseDatasource._FILE_EXTENSIONS, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, **arrow_parquet_args, ) -> Dataset: """Create :class:`~ray.data.Dataset` from parquet files without reading metadata. @@ -930,13 +981,7 @@ def read_parquet_bulk( the `S3FileSystem` is used. columns: A list of column names to read. Only the specified columns are read during the file scan. - parallelism: The amount of parallelism to use for - the dataset. Defaults to -1, which automatically determines the optimal - parallelism for your configuration. You should not need to manually set - this value in most cases. For details on how the parallelism is - automatically determined and guidance on how to tune it, see - :ref:`Tuning read parallelism `. Parallelism is - upper bounded by the total number of records in all the parquet files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. arrow_open_file_args: kwargs passed to `pyarrow.fs.FileSystem.open_input_file Dataset: """Creates a :class:`~ray.data.Dataset` from JSON and JSONL files. @@ -1069,13 +1126,7 @@ def read_json( you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the `S3FileSystem` is used. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - records in all the JSON files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. arrow_open_stream_args: kwargs passed to `pyarrow.fs.FileSystem.open_input_file `_. file_extensions: A list of file extensions to filter files by. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: :class:`~ray.data.Dataset` producing records read from the specified paths. @@ -1127,7 +1186,11 @@ def read_json( file_extensions=file_extensions, ) return read_datasource( - datasource, parallelism=parallelism, ray_remote_args=ray_remote_args + datasource, + parallelism=parallelism, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -1146,6 +1209,8 @@ def read_csv( ignore_missing_paths: bool = False, shuffle: Union[Literal["files"], None] = None, file_extensions: Optional[List[str]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, **arrow_csv_args, ) -> Dataset: """Creates a :class:`~ray.data.Dataset` from CSV files. @@ -1228,13 +1293,7 @@ def read_csv( you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the `S3FileSystem` is used. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - records in all the CSV files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. arrow_open_stream_args: kwargs passed to `pyarrow.fs.FileSystem.open_input_file `_ when opening CSV files. file_extensions: A list of file extensions to filter files by. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: :class:`~ray.data.Dataset` producing records read from the specified paths. @@ -1288,6 +1355,8 @@ def read_csv( datasource, parallelism=parallelism, ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -1308,6 +1377,8 @@ def read_text( ignore_missing_paths: bool = False, shuffle: Union[Literal["files"], None] = None, file_extensions: Optional[List[str]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> Dataset: """Create a :class:`~ray.data.Dataset` from lines stored in text files. @@ -1337,13 +1408,7 @@ def read_text( you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the `S3FileSystem` is used. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - lines in all the text files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks and in the subsequent text decoding map task. arrow_open_stream_args: kwargs passed to @@ -1368,6 +1433,14 @@ def read_text( shuffle: If setting to "files", randomly shuffle input files order before read. Defaults to not shuffle with ``None``. file_extensions: A list of file extensions to filter files by. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: :class:`~ray.data.Dataset` producing lines of text read from the specified @@ -1391,7 +1464,11 @@ def read_text( file_extensions=file_extensions, ) return read_datasource( - datasource, parallelism=parallelism, ray_remote_args=ray_remote_args + datasource, + parallelism=parallelism, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -1409,6 +1486,8 @@ def read_numpy( ignore_missing_paths: bool = False, shuffle: Union[Literal["files"], None] = None, file_extensions: Optional[List[str]] = NumpyDatasource._FILE_EXTENSIONS, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, **numpy_load_args, ) -> Dataset: """Create an Arrow dataset from numpy files. @@ -1432,8 +1511,7 @@ def read_numpy( paths: A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories. filesystem: The filesystem implementation to read from. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files of the dataset. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. arrow_open_stream_args: kwargs passed to `pyarrow.fs.FileSystem.open_input_stream `_. numpy_load_args: Other options to pass to np.load. @@ -1453,6 +1531,14 @@ def read_numpy( shuffle: If setting to "files", randomly shuffle input files order before read. Defaults to not shuffle with ``None``. file_extensions: A list of file extensions to filter files by. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: Dataset holding Tensor records read from the specified paths. @@ -1476,6 +1562,8 @@ def read_numpy( return read_datasource( datasource, parallelism=parallelism, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -1493,6 +1581,8 @@ def read_tfrecords( tf_schema: Optional["schema_pb2.Schema"] = None, shuffle: Union[Literal["files"], None] = None, file_extensions: Optional[List[str]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> Dataset: """Create a :class:`~ray.data.Dataset` from TFRecord files that contain `tf.train.Example `_ @@ -1536,13 +1626,7 @@ def read_tfrecords( you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the `S3FileSystem` is used. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - records in all the CSV files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. arrow_open_stream_args: kwargs passed to `pyarrow.fs.FileSystem.open_input_file Dataset: """Create a :class:`~ray.data.Dataset` from `WebDataset `_ files. @@ -1619,8 +1718,7 @@ def read_webdataset( paths: A single file/directory path or a list of file/directory paths. A list of paths can contain both files and directories. filesystem: The filesystem implementation to read from. - parallelism: The requested parallelism of the read. Parallelism may be - limited by the number of files in the dataset. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. arrow_open_stream_args: Key-word arguments passed to `pyarrow.fs.FileSystem.open_input_stream `_. To read a compressed TFRecord file, @@ -1641,6 +1739,14 @@ def read_webdataset( include_paths: If ``True``, include the path to each file. File paths are stored in the ``'path'`` column. file_extensions: A list of file extensions to filter files by. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`~ray.data.Dataset` that contains the example features. @@ -1670,7 +1776,12 @@ def read_webdataset( include_paths=include_paths, file_extensions=file_extensions, ) - return read_datasource(datasource, parallelism=parallelism) + return read_datasource( + datasource, + parallelism=parallelism, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) @PublicAPI @@ -1688,6 +1799,8 @@ def read_binary_files( ignore_missing_paths: bool = False, shuffle: Union[Literal["files"], None] = None, file_extensions: Optional[List[str]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> Dataset: """Create a :class:`~ray.data.Dataset` from binary files of arbitrary contents. @@ -1728,13 +1841,7 @@ def read_binary_files( the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the `S3FileSystem` is used. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. - parallelism: The amount of parallelism to use for the dataset. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. arrow_open_stream_args: kwargs passed to `pyarrow.fs.FileSystem.open_input_file Dataset: """Read from a database that provides a `Python DB API2-compliant `_ connector. @@ -1852,13 +1973,16 @@ def create_connection(): connection_factory: A function that takes no arguments and returns a Python DB API2 `Connection object `_. - parallelism: The requested parallelism of the read. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`Dataset` containing the queried data. @@ -1868,6 +1992,8 @@ def create_connection(): datasource, parallelism=parallelism, ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -1881,6 +2007,8 @@ def read_databricks_tables( schema: Optional[str] = None, parallelism: int = -1, ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> Dataset: """Read a Databricks unity catalog table or Databricks SQL execution result. @@ -1927,13 +2055,16 @@ def read_databricks_tables( you can't set ``table_name`` argument. catalog: (Optional) The default catalog name used by the query. schema: (Optional) The default schema used by the query. - parallelism: The requested parallelism of the read. Defaults to -1, - which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`Dataset` containing the queried data. @@ -2001,7 +2132,11 @@ def get_dbutils(): query=query, ) return read_datasource( - datasource=datasource, parallelism=parallelism, ray_remote_args=ray_remote_args + datasource=datasource, + parallelism=parallelism, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, ) @@ -2354,28 +2489,37 @@ def from_arrow_refs( @PublicAPI def from_spark( - df: "pyspark.sql.DataFrame", *, parallelism: Optional[int] = None + df: "pyspark.sql.DataFrame", + *, + parallelism: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> MaterializedDataset: """Create a :class:`~ray.data.Dataset` from a `Spark DataFrame `_. Args: df: A `Spark DataFrame`_, which must be created by RayDP (Spark-on-Ray). - parallelism: The amount of parallelism to use for the dataset. If - not provided, the parallelism is equal to the number of partitions of - the original Spark DataFrame. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`~ray.data.MaterializedDataset` holding rows read from the DataFrame. """ # noqa: E501 import raydp + parallelism = _get_num_output_blocks(parallelism, override_num_blocks) return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism) @PublicAPI def from_huggingface( - dataset: Union["datasets.Dataset", "datasets.IterableDataset"], parallelism=-1 + dataset: Union["datasets.Dataset", "datasets.IterableDataset"], + parallelism: int = -1, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, ) -> Union[MaterializedDataset, Dataset]: """Create a :class:`~ray.data.MaterializedDataset` from a `Hugging Face Datasets Dataset `_ @@ -2425,14 +2569,15 @@ def from_huggingface( `DatasetDict `_ and `IterableDatasetDict `_ are not supported. - parallelism: The amount of parallelism to use for the dataset if applicable (i.e. - if the dataset is a public Hugging Face Dataset without transforms applied). - Defaults to -1, which automatically determines the optimal parallelism for your - configuration. You should not need to manually set this value in most cases. - For details on how the parallelism is automatically determined and guidance - on how to tune it, see :ref:`Tuning read parallelism - `. Parallelism is upper bounded by the total number of - records in all the parquet files. + parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. + concurrency: The maximum number of Ray tasks to run concurrently. Set this + to control number of tasks to run concurrently. This doesn't change the + total number of tasks run or the total number of output blocks. By default, + concurrency is dynamically decided based on the available resources. + override_num_blocks: Override the number of output blocks from all read tasks. + By default, the number of output blocks is dynamically decided based on + input data size and available resources. You shouldn't manually set this + value in most cases. Returns: A :class:`~ray.data.Dataset` holding rows from the `Hugging Face Datasets Dataset`_. @@ -2452,7 +2597,13 @@ def from_huggingface( import fsspec.implementations.http http = fsspec.implementations.http.HTTPFileSystem() - return read_parquet(file_urls, parallelism=parallelism, filesystem=http) + return read_parquet( + file_urls, + parallelism=parallelism, + filesystem=http, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) if isinstance(dataset, datasets.IterableDataset): # For an IterableDataset, we can use a streaming implementation to read data. @@ -2579,9 +2730,9 @@ def from_torch( } return read_datasource( TorchDatasource(dataset=dataset), - # Only non-parallel, streaming read is currently supported - parallelism=1, ray_remote_args=ray_remote_args, + # Only non-parallel, streaming read is currently supported + override_num_blocks=1, ) @@ -2646,3 +2797,17 @@ def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table": arrow_parquet_args["_block_udf"] = _block_udf return arrow_parquet_args + + +def _get_num_output_blocks( + parallelism: int = -1, + override_num_blocks: Optional[int] = None, +) -> int: + if parallelism != -1: + logger.warning( + "The argument ``parallelism`` is deprecated in Ray 2.10. Please specify " + "argument ``override_num_blocks`` instead." + ) + elif override_num_blocks is not None: + parallelism = override_num_blocks + return parallelism diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 192fcc7549aec..742d69f1bd779 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -14,6 +14,7 @@ import ray from ray.data.context import DataContext from ray.data.tests.conftest import * # noqa +from ray.data.tests.test_util import ConcurrencyCounter # noqa from ray.data.tests.util import column_udf, column_udf_class, extract_values from ray.tests.conftest import * # noqa @@ -647,25 +648,6 @@ def test_map_with_memory_resources(shutdown_only): max_concurrency = 5 ray.init(num_cpus=num_blocks, _memory=memory_per_task * max_concurrency) - @ray.remote - class ConcurrencyCounter: - def __init__(self): - self.concurrency = 0 - self.max_concurrency = 0 - - def inc(self): - self.concurrency += 1 - if self.concurrency > self.max_concurrency: - self.max_concurrency = self.concurrency - return self.concurrency - - def decr(self): - self.concurrency -= 1 - return self.concurrency - - def get_max_concurrency(self): - return self.max_concurrency - concurrency_counter = ConcurrencyCounter.remote() def map_batches(batch): diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 73c598a8d594c..0c59c89e587c7 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -1,5 +1,6 @@ import os import shutil +import time from typing import Any import numpy as np @@ -27,6 +28,7 @@ from ray.data.datasource.path_util import _unwrap_protocol from ray.data.tests.conftest import * # noqa from ray.data.tests.mock_http_server import * # noqa +from ray.data.tests.test_util import ConcurrencyCounter # noqa from ray.tests.conftest import * # noqa @@ -1173,6 +1175,48 @@ def test_parquet_datasource_names(ray_start_regular_shared, tmp_path): assert ParquetDatasource(path).get_name() == "Parquet" +@pytest.mark.parametrize( + "fs,data_path", + [ + (lazy_fixture("local_fs"), lazy_fixture("local_path")), + ], +) +def test_parquet_concurrency(ray_start_regular_shared, fs, data_path): + df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) + table = pa.Table.from_pandas(df1) + setup_data_path = _unwrap_protocol(data_path) + path1 = os.path.join(setup_data_path, "test1.parquet") + pq.write_table(table, path1, filesystem=fs) + df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) + table = pa.Table.from_pandas(df2) + path2 = os.path.join(setup_data_path, "test2.parquet") + pq.write_table(table, path2, filesystem=fs) + + concurrency_counter = ConcurrencyCounter.remote() + + def map_batches(batch): + ray.get(concurrency_counter.inc.remote()) + time.sleep(0.5) + ray.get(concurrency_counter.decr.remote()) + return batch + + concurrency = 1 + ds = ray.data.read_parquet( + data_path, + filesystem=fs, + concurrency=concurrency, + override_num_blocks=2, + ) + ds = ds.map_batches( + map_batches, + batch_size=None, + concurrency=concurrency, + ) + assert ds.count() == 6 + actual_max_concurrency = ray.get(concurrency_counter.get_max_concurrency.remote()) + assert actual_max_concurrency <= concurrency + + # NOTE: All tests above share a Ray cluster, while the tests below do not. These # tests should only be carefully reordered to retain this invariant! diff --git a/python/ray/data/tests/test_util.py b/python/ray/data/tests/test_util.py index f898eb007ee4e..7c7208fca9a9f 100644 --- a/python/ray/data/tests/test_util.py +++ b/python/ray/data/tests/test_util.py @@ -111,6 +111,26 @@ def get_parquet_read_logical_op( return read_op +@ray.remote(num_cpus=0) +class ConcurrencyCounter: + def __init__(self): + self.concurrency = 0 + self.max_concurrency = 0 + + def inc(self): + self.concurrency += 1 + if self.concurrency > self.max_concurrency: + self.max_concurrency = self.concurrency + return self.concurrency + + def decr(self): + self.concurrency -= 1 + return self.concurrency + + def get_max_concurrency(self): + return self.max_concurrency + + if __name__ == "__main__": import sys