Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] [Docs] Standardize API Refs for Input/Output #37017

Merged
merged 21 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
wip
Signed-off-by: amogkam <[email protected]>
  • Loading branch information
amogkam committed Jun 30, 2023
commit c7c74ceb2930c5d22a9671c30ca0f7794f754b35
15 changes: 15 additions & 0 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ Partitioning API
datasource.PathPartitionParser
datasource.PathPartitionFilter

.. _metadata_provider:

MetadataProvider API
--------------------

Expand All @@ -240,3 +242,16 @@ MetadataProvider API
datasource.DefaultFileMetadataProvider
datasource.DefaultParquetMetadataProvider
datasource.FastFileMetadataProvider


.. _block_write_path_provider:

BlockWritePathProvider API
--------------------------

.. autosummary::
:toctree: doc/

datasource.file_based_datasource.BlockWritePathProvider
datasource.file_based_datasource.DefaultBlockWritePathProvider

2 changes: 2 additions & 0 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ If your transformation isn't vectorized, there's no performance benefit.
Optimizing reads
----------------

.. _read_parallelism:

Tuning read parallelism
~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
46 changes: 25 additions & 21 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2386,41 +2386,45 @@ def write_parquet(
ray_remote_args: Dict[str, Any] = None,
**arrow_parquet_args,
) -> None:
"""Write the dataset to parquet.
"""Writes the :class:`~ray.data.Dataset` to parquet files under the provided ``path``.

This is only supported for datasets convertible to Arrow records.
To control the number of files, use ``.repartition()``.
The number of files is determined by the number of blocks in the dataset.
To control the number of number of blocks, call
:meth:`~ray.data.Dataset.repartition`.

Unless a custom block path provider is given, the format of the output
files will be {uuid}_{block_idx}.parquet, where ``uuid`` is an unique
id for the dataset.
This method is only supported for datasets with records that are convertible to pyarrow tables.

By default, the format of the output files will be {uuid}_{block_idx}.parquet, where ``uuid`` is an unique
id for the dataset. To modify this behavior, implement a custom :class:`~ray.data.file_based_datasource.BlockWritePathProvider` and pass it in as the ``block_path_provider`` argument.

Examples:
>>> import ray
>>> ds = ray.data.range(100) # doctest: +SKIP
>>> ds.write_parquet("s3:https://bucket/path") # doctest: +SKIP
>>> ds = ray.data.range(100)
>>> ds.write_parquet("local:https:///tmp/data/")

Time complexity: O(dataset size / parallelism)

Args:
path: The path to the destination root directory, where Parquet
files will be written to.
filesystem: The filesystem implementation to write to.
try_create_dir: Try to create all directories in destination path
if True. Does nothing if all directories already exist.
path: The path to the destination root directory, where
parquet files will be written to.
amogkam marked this conversation as resolved.
Show resolved Hide resolved
amogkam marked this conversation as resolved.
Show resolved Hide resolved
filesystem: The pyarrow filesystem
implementation to write to. These are
specified in the `pyarrow docs <https://arrow.apache.org/docs/python/api/filesystems.html#filesystem-implementations>`_. You should specify this if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3:https://``, the `S3FileSystem` is used.
try_create_dir: Try to create all directories in
amogkam marked this conversation as resolved.
Show resolved Hide resolved
destination path if True. Does nothing if all directories already exist. Defaults to True.
arrow_open_stream_args: kwargs passed to
pyarrow.fs.FileSystem.open_output_stream
block_path_provider: BlockWritePathProvider implementation to
write each dataset block to a custom output path.
`pyarrow.fs.FileSystem.open_output_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.open_output_stream>`_, which is used when opening the file to write to.
block_path_provider: A :class:`~ray.data.file_based_datasource.BlockWritePathProvider` implementation
specifying the filename structure for each output parquet file. By default, the format of the output files will be {uuid}_{block_idx}.parquet, where ``uuid`` is an unique id for the dataset.
arrow_parquet_args_fn: Callable that returns a dictionary of write
arguments to use when writing each block to a file. Overrides
any duplicate keys from arrow_parquet_args. This should be used
instead of arrow_parquet_args if any of your write arguments
arguments to that are provided to `pyarrow.parquet.write_table() <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow.parquet.write_table>_ when writing each block to a file. Overrides
any duplicate keys from ``arrow_parquet_args``. This should be used
amogkam marked this conversation as resolved.
Show resolved Hide resolved
instead of ``arrow_parquet_args`` if any of your write arguments
cannot be pickled, or if you'd like to lazily resolve the write
amogkam marked this conversation as resolved.
Show resolved Hide resolved
arguments for each dataset block.
ray_remote_args: Kwargs passed to ray.remote in the write tasks.
ray_remote_args: Kwargs passed to :meth:`~ray.remote` in the write tasks.
arrow_parquet_args: Options to pass to
pyarrow.parquet.write_table(), which is used to write out each
`pyarrow.parquet.write_table() <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow.parquet.write_table>_, which is used to write out each
block to a file.
"""
self.write_datasource(
Expand Down
188 changes: 108 additions & 80 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,27 @@ def from_items(
items: List[Any],
*,
parallelism: int = -1,
output_arrow_format: bool = True,
) -> MaterializedDataset:
"""Create a :class:`~ray.data.Dataset` from a list of local Python objects.

Use this method to create small datasets for testing and exploration.
Use this method to create small datasets from data that fits in memory.

Examples:

.. testcode::

import ray

ds = ray.data.from_items([1, 2, 3, 4, 5])

print(ds.schema())

.. testoutput::

Column Type
------ ----
item int64

>>> import ray
>>> ds = ray.data.from_items([1, 2, 3, 4, 5])
>>> ds # doctest: +ELLIPSIS
amogkam marked this conversation as resolved.
Show resolved Hide resolved
MaterializedDataset(num_blocks=..., num_rows=5, schema={item: int64})
>>> ds.schema()
Column Type
------ ----
item int64

Args:
items: List of local Python objects.
parallelism: The amount of parallelism to use for the dataset.
Parallelism might be limited by the number of items.
parallelism: The amount of parallelism to use for the dataset. Defaults to -1
amogkam marked this conversation as resolved.
Show resolved Hide resolved
which automatically determines the optimal parallelism for your configuration. You should not need to manually set this value in most cases.
For details on how the parallelism is automatically determined and guidance on how to tune it, see the :ref:`Tuning read parallelism guide <read_parallelism>`. Parallelism is upper bounded by len(items).

Returns:
A :class:`~ray.data.Dataset` holding the items.
Expand Down Expand Up @@ -188,23 +183,33 @@ def from_items(

@PublicAPI
def range(n: int, *, parallelism: int = -1) -> Dataset:
"""Create a dataset from a range of integers [0..n).
"""Creates a :class:`~ray.data.Dataset` from a range of integers [0..n).

This allows for easy creation of synthetic datasets for testing or benchmarking :ref:`Ray Data <data>`.

Examples:

>>> import ray
>>> ds = ray.data.range(10000) # doctest: +SKIP
>>> ds # doctest: +SKIP
>>> ds = ray.data.range(10000)
>>> ds # doctest: +ELLIPSIS
Dataset(num_blocks=..., num_rows=10000, schema={id: int64})
>>> ds.map(lambda x: {"id": x["id"] * 2}).take(4) # doctest: +SKIP
>>> ds.map(lambda row: {"id": row["id"] * 2}).take(4)
[{"id": 0}, {"id": 2}, {"id": 4}, {"id": 6}]

Args:
n: The upper bound of the range of integers.
parallelism: The amount of parallelism to use for the dataset.
Parallelism may be limited by the number of items.
parallelism: The amount of parallelism to use for the dataset. Defaults to -1
amogkam marked this conversation as resolved.
Show resolved Hide resolved
which automatically determines the optimal parallelism for your configuration. You should not need to manually set this value in most cases.
For details on how the parallelism is automatically determined and guidance on how to tune it, see the :ref:`Tuning read parallelism guide <read_parallelism>`. Parallelism is upper bounded by n.

Returns:
Dataset producing the integers.
A :class:`~ray.data.Dataset` producing the integers from the range 0 to n.

.. seealso::

:meth:`~ray.data.range_tensor`
Call this method for creating synthetic datasets of tensor data.
amogkam marked this conversation as resolved.
Show resolved Hide resolved

"""
return read_datasource(
RangeDatasource(),
Expand All @@ -222,9 +227,12 @@ def range_table(n: int, *, parallelism: int = -1) -> Dataset:

@PublicAPI
def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Dataset:
"""Create a Tensor stream from a range of integers [0..n).
"""Creates a :class:`~ray.data.Dataset` tensors of the provided shape from range [0...n].

This allows for easy creation of synthetic tensor datasets for testing or benchmarking :ref:`Ray Data <data>`.

Examples:

>>> import ray
>>> ds = ray.data.range_tensor(1000, shape=(2, 2))
>>> ds # doctest: +ELLIPSIS
Expand All @@ -233,23 +241,27 @@ def range_tensor(n: int, *, shape: Tuple = (1,), parallelism: int = -1) -> Datas
num_rows=1000,
schema={data: numpy.ndarray(shape=(2, 2), dtype=int64)}
)
>>> ds.map_batches(lambda arr: arr * 2).take(2) # doctest: +SKIP
[array([[0, 0],
[0, 0]]),
array([[2, 2],
[2, 2]])]

This is similar to range_table(), but uses the ArrowTensorArray extension
type. The dataset elements take the form {"data": array(N, shape=shape)}.
>>> ds.map_batches(lambda row: {"data": row["data"] * 2}).take(2)
[{'data': array([[0, 0],
[0, 0]])},
{'data': array([[2, 2],
[2, 2]])}]

Args:
n: The upper bound of the range of integer records.
shape: The shape of each record.
parallelism: The amount of parallelism to use for the dataset.
Parallelism may be limited by the number of items.
n: The upper bound of the range of tensor records.
shape: The shape of each tensor in the dataset.
parallelism: The amount of parallelism to use for the dataset. Defaults to -1
amogkam marked this conversation as resolved.
Show resolved Hide resolved
which automatically determines the optimal parallelism for your configuration. You should not need to manually set this value in most cases.
For details on how the parallelism is automatically determined and guidance on how to tune it, see the :ref:`Tuning read parallelism guide <read_parallelism>`. Parallelism is upper bounded by n.

Returns:
Dataset producing the integers as Arrow tensor records.
A :class:`~ray.data.Dataset` producing the tensor data from range 0 to n.

.. seealso::

:meth:`~ray.data.range`
Call this method for creating synthetic datasets of integer data.

"""
return read_datasource(
RangeDatasource(),
Expand Down Expand Up @@ -513,37 +525,48 @@ def read_parquet(
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
**arrow_parquet_args,
) -> Dataset:
"""Create an Arrow dataset from parquet files.
"""Creates a :class:`~ray.data.Dataset` from parquet files.


Examples:
>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_parquet("s3:https://bucket/path") # doctest: +SKIP

>>> # Read a file in remote storage.
amogkam marked this conversation as resolved.
Show resolved Hide resolved
>>> ds = ray.data.read_parquet("s3:https://anonymous@ray-example-data/iris.parquet")
>>> ds.schema()
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string


>>> # Read a directory in remote storage.
>>> ds = ray.data.read_parquet("s3:https://anonymous@ray-example-data/iris-parquet/")

>>> # Read multiple local files.
>>> ray.data.read_parquet(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP
>>> ray.data.read_parquet(["local:https:///path/to/file1", "local:https:///path/to/file2"]) # doctest: +SKIP

>>> # Specify a schema for the parquet file.
>>> import pyarrow as pa
>>> fields = [("sepal.length", pa.float64()),
... ("sepal.width", pa.float64()),
... ("petal.length", pa.float64()),
... ("petal.width", pa.float64()),
>>> fields = [("sepal.length", pa.float32()),
... ("sepal.width", pa.float32()),
... ("petal.length", pa.float32()),
... ("petal.width", pa.float32()),
... ("variety", pa.string())]
>>> ray.data.read_parquet("example:https://iris.parquet",
>>> ds = ray.data.read_parquet("example:https://iris.parquet",
... schema=pa.schema(fields))
Dataset(
num_blocks=...,
num_rows=150,
schema={
sepal.length: double,
sepal.width: double,
petal.length: double,
petal.width: double,
variety: string
}
)

>>> ds.schema()
Column Type
------ ----
sepal.length float
sepal.width float
petal.length float
petal.width float
variety string


The Parquet reader also supports projection and filter pushdown, allowing column
selection and row filtering to be pushed down to the file scan.

Expand All @@ -566,31 +589,34 @@ def read_parquet(
{'sepal.length': 5.1, 'variety': 'Setosa'}
{'sepal.length': 5.4, 'variety': 'Setosa'}

For further arguments you can pass to pyarrow as a keyword argument, see
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment
For further arguments you can pass to pyarrow as a keyword argument, see the `pyarrow API reference <https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment>`_.

Args:
paths: A single file path or directory, or a list of file paths. Multiple
directories are not supported.
filesystem: The filesystem implementation to read from. These are specified in
https://arrow.apache.org/docs/python/api/filesystems.html#filesystem-implementations.
columns: A list of column names to read.
parallelism: The requested parallelism of the read. Parallelism may be
limited by the number of files of the dataset.
ray_remote_args: kwargs passed to ray.remote in the read tasks.
tensor_column_schema: A dict of column name --> tensor dtype and shape
filesystem: The pyarrow filesystem
amogkam marked this conversation as resolved.
Show resolved Hide resolved
implementation to read from. These are
specified in the `pyarrow docs <https://arrow.apache.org/docs/python/api/filesystems.html#filesystem-implementations>`_. You should specify this if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3:https://``, the `S3FileSystem` is used.
columns: A list of column names to read. Only the specified columns will be
amogkam marked this conversation as resolved.
Show resolved Hide resolved
read during the file scan.
parallelism: The amount of parallelism to use for the dataset. Defaults to -1
amogkam marked this conversation as resolved.
Show resolved Hide resolved
which automatically determines the optimal parallelism for your configuration. You should not need to manually set this value in most cases.
For details on how the parallelism is automatically determined and guidance on how to tune it, see the :ref:`Tuning read parallelism guide <read_parallelism>`. Parallelism is upper bounded by the total number of records in all the parquet files..
ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
tensor_column_schema: A dict of column name to tensor dtype and shape
mappings for converting a Parquet column containing serialized
tensors (ndarrays) as their elements to our tensor column extension
type. This assumes that the tensors were serialized in the raw
NumPy array format in C-contiguous order (e.g. via
amogkam marked this conversation as resolved.
Show resolved Hide resolved
`arr.tobytes()`).
meta_provider: File metadata provider. Custom metadata providers may
be able to resolve file metadata more quickly and/or accurately.
arrow_parquet_args: Other parquet read options to pass to pyarrow, see
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment
meta_provider: A :ref:`file metadata provider <metadata_provider>`. Custom
metadata providers may be able to resolve file metadata more quickly and/or accurately. In most cases you do not need to set this.
arrow_parquet_args: Other parquet read options to pass to pyarrow. For the full
amogkam marked this conversation as resolved.
Show resolved Hide resolved
set of arguments, see `the pyarrow API https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment`_

Returns:
Dataset producing Arrow records read from the specified paths.
:class:`~ray.data.Dataset` producing records read from the specified parquet
files.
"""
arrow_parquet_args = _resolve_parquet_args(
tensor_column_schema,
Expand Down Expand Up @@ -734,8 +760,9 @@ def read_parquet_bulk(
),
**arrow_parquet_args,
) -> Dataset:
"""Create an Arrow dataset from a large number (such as >1K) of parquet files
quickly.
"""Create :class:`~ray.data.Dataset` from a large number (such as >1K) of parquet files.

Unlike :meth:`~ray.data.read_parquet`, this method

By default, ONLY file paths should be provided as input (i.e. no directory paths),
and an OSError will be raised if one or more paths point to directories. If your
Expand Down Expand Up @@ -908,12 +935,13 @@ def read_csv(
ignore_missing_paths: bool = False,
**arrow_csv_args,
) -> Dataset:
r"""Create an Arrow dataset from csv files.
"""Creates a :class:`~ray.data.Dataset` from CSV files.

Examples:
>>> import ray
>>> # Read a directory of files in remote storage.
>>> ray.data.read_csv("s3:https://bucket/path") # doctest: +SKIP
>>> # Read a file in remote storage.
>>> ds = ray.data.read_csv("s3:https://anonymous@ray-example-data/iris.csv)
>>>

>>> # Read multiple local files.
>>> ray.data.read_csv(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP
Expand Down