Skip to content

Commit

Permalink
[Data][Docs] Standardize "Splitting and Merging Datasets" references (r…
Browse files Browse the repository at this point in the history
…ay-project#37013)

This PR standardizes API references listed under "Splitting and Merging Datasets".

---------

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Co-authored-by: angelinalg <[email protected]>
Co-authored-by: Amog Kamsetty <[email protected]>
  • Loading branch information
3 people committed Jul 25, 2023
1 parent 0a383da commit beae077
Showing 1 changed file with 155 additions and 77 deletions.
232 changes: 155 additions & 77 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,52 +1127,70 @@ def streaming_split(
"""Returns ``n`` :class:`DataIterators <ray.data.DataIterator>` that can
be used to read disjoint subsets of the dataset in parallel.
This method is the recommended way to consume Datasets from multiple
processes (e.g., for distributed training), and requires streaming execution
mode.
This method is the recommended way to consume :class:`Datasets <Dataset>` for
distributed training.
Streaming split works by delegating the execution of this Dataset to a
Streaming split works by delegating the execution of this :class:`Dataset` to a
coordinator actor. The coordinator pulls block references from the executed
stream, and divides those blocks among `n` output iterators. Iterators pull
blocks from the coordinator actor to return to their caller on `next`.
stream, and divides those blocks among ``n`` output iterators. Iterators pull
blocks from the coordinator actor to return to their caller on ``next``.
The returned iterators are also repeatable; each iteration will trigger a
new execution of the Dataset. There is an implicit barrier at the start of
each iteration, which means that `next` must be called on all iterators before
the iteration starts.
Warning: because iterators are pulling blocks from the same Dataset
execution, if one iterator falls behind other iterators may be stalled.
.. warning::
Because iterators are pulling blocks from the same :class:`Dataset`
execution, if one iterator falls behind, other iterators may be stalled.
Examples:
>>> import ray
>>> ds = ray.data.range(1000000)
>>> it1, it2 = ds.streaming_split(2, equal=True)
>>> # Can consume from both iterators in parallel.
>>> @ray.remote
... def consume(it):
... for batch in it.iter_batches():
... print(batch)
>>> ray.get([consume.remote(it1), consume.remote(it2)]) # doctest: +SKIP
>>> # Can loop over the iterators multiple times (multiple epochs).
>>> @ray.remote
... def train(it):
... NUM_EPOCHS = 100
... for _ in range(NUM_EPOCHS):
... for batch in it.iter_batches():
... print(batch)
>>> ray.get([train.remote(it1), train.remote(it2)]) # doctest: +SKIP
>>> # ERROR: this will block waiting for a read on `it2` to start.
>>> ray.get(train.remote(it1)) # doctest: +SKIP
.. testcode::
import ray
ds = ray.data.range(100)
it1, it2 = ds.streaming_split(2, equal=True)
Consume data from iterators in parallel.
.. testcode::
@ray.remote
def consume(it):
for batch in it.iter_batches():
pass
ray.get([consume.remote(it1), consume.remote(it2)])
You can loop over the iterators multiple times (multiple epochs).
.. testcode::
@ray.remote
def train(it):
NUM_EPOCHS = 2
for _ in range(NUM_EPOCHS):
for batch in it.iter_batches():
pass
ray.get([train.remote(it1), train.remote(it2)])
The following remote function call blocks waiting for a read on ``it2`` to
start.
.. testcode::
:skipif: True
ray.get(train.remote(it1))
Args:
n: Number of output iterators to return.
equal: If True, each output iterator will see an exactly equal number
of rows, dropping data if necessary. If False, some iterators may see
slightly more or less rows than other, but no data is dropped.
equal: If ``True``, each output iterator sees an exactly equal number
of rows, dropping data if necessary. If ``False``, some iterators may
see slightly more or less rows than others, but no data is dropped.
locality_hints: Specify the node ids corresponding to each iterator
location. Dataset will try to minimize data movement based on the
iterator output locations. This list must have length ``n``. You can
Expand All @@ -1182,6 +1200,12 @@ def streaming_split(
Returns:
The output iterator splits. These iterators are Ray-serializable and can
be freely passed to any Ray task or actor.
.. seealso::
:meth:`Dataset.split`
Unlike :meth:`~Dataset.streaming_split`, :meth:`~Dataset.split`
materializes the dataset in memory.
"""
return StreamSplitDataIterator.create(self, n, equal, locality_hints)

Expand All @@ -1191,34 +1215,52 @@ def split(
) -> List["MaterializedDataset"]:
"""Materialize and split the dataset into ``n`` disjoint pieces.
This returns a list of MaterializedDatasets that can be passed to Ray tasks
and actors and used to read the dataset records in parallel.
This method returns a list of ``MaterializedDataset`` that can be passed to Ray
Tasks and Actors and used to read the dataset rows in parallel.
Examples:
>>> import ray
>>> ds = ray.data.range(100) # doctest: +SKIP
>>> workers = ... # doctest: +SKIP
>>> # Split up a dataset to process over `n` worker actors.
>>> shards = ds.split(len(workers), locality_hints=workers) # doctest: +SKIP
>>> for shard, worker in zip(shards, workers): # doctest: +SKIP
... worker.consume.remote(shard) # doctest: +SKIP
Time complexity: O(1)
.. testcode::
@ray.remote
class Worker:
See also: ``Dataset.split_at_indices``, ``Dataset.split_proportionately``,
and ``Dataset.streaming_split``.
def train(self, data_iterator):
for batch in data_iterator.iter_batches(batch_size=8):
pass
workers = [Worker.remote() for _ in range(4)]
shards = ray.data.range(100).split(n=4, equal=True)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
Time complexity: O(1)
Args:
n: Number of child datasets to return.
equal: Whether to guarantee each split has an equal
number of records. This may drop records if they cannot be
number of records. This might drop records if the rows can't be
divided equally among the splits.
locality_hints: [Experimental] A list of Ray actor handles of size ``n``.
The system will try to co-locate the blocks of the i-th dataset
The system tries to co-locate the blocks of the i-th dataset
with the i-th actor to maximize data locality.
Returns:
A list of ``n`` disjoint dataset splits.
.. seealso::
:meth:`Dataset.split_at_indices`
Unlike :meth:`~Dataset.split`, which splits a dataset into approximately
equal splits, :meth:`Dataset.split_proportionately` lets you split a
dataset into different sizes.
:meth:`Dataset.split_proportionately`
This method is equivalent to :meth:`Dataset.split_at_indices` if
you compute indices manually.
:meth:`Dataset.streaming_split`.
Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split`
doesn't materialize the dataset in memory.
"""
if n <= 0:
raise ValueError(f"The number of splits {n} is not positive.")
Expand Down Expand Up @@ -1408,7 +1450,7 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]:

@ConsumptionAPI
def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]:
"""Materialize and split the dataset at the given indices (like np.split).
"""Materialize and split the dataset at the given indices (like ``np.split``).
Examples:
>>> import ray
Expand All @@ -1423,16 +1465,28 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]:
Time complexity: O(num splits)
See also: ``Dataset.split_at_indices``, ``Dataset.split_proportionately``,
and ``Dataset.streaming_split``.
Args:
indices: List of sorted integers which indicate where the dataset
is split. If an index exceeds the length of the dataset,
are split. If an index exceeds the length of the dataset,
an empty dataset is returned.
Returns:
The dataset splits.
.. seealso::
:meth:`Dataset.split`
Unlike :meth:`~Dataset.split_at_indices`, which lets you split a
dataset into different sizes, :meth:`Dataset.split` splits a dataset
into approximately equal splits.
:meth:`Dataset.split_proportionately`
This method is equivalent to :meth:`Dataset.split_at_indices` if
you compute indices manually.
:meth:`Dataset.streaming_split`.
Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split`
doesn't materialize the dataset in memory.
"""

if len(indices) < 1:
Expand Down Expand Up @@ -1486,16 +1540,16 @@ def split_proportionately(
) -> List["MaterializedDataset"]:
"""Materialize and split the dataset using proportions.
A common use case for this would be splitting the dataset into train
A common use case for this is splitting the dataset into train
and test sets (equivalent to eg. scikit-learn's ``train_test_split``).
See also ``Dataset.train_test_split`` for a higher level abstraction.
For a higher level abstraction, see :meth:`Dataset.train_test_split`.
The indices to split at is calculated in such a way so that all splits
always contains at least one element. If that is not possible,
This method splits datasets so that all splits
always contains at least one row. If that isn't possible,
an exception is raised.
This is equivalent to caulculating the indices manually and calling
``Dataset.split_at_indices``.
:meth:`Dataset.split_at_indices`.
Examples:
>>> import ray
Expand All @@ -1510,16 +1564,27 @@ def split_proportionately(
Time complexity: O(num splits)
See also: ``Dataset.split``, ``Dataset.split_at_indices``,
``Dataset.train_test_split``
Args:
proportions: List of proportions to split the dataset according to.
Must sum up to less than 1, and each proportion has to be bigger
Must sum up to less than 1, and each proportion must be bigger
than 0.
Returns:
The dataset splits.
.. seealso::
:meth:`Dataset.split`
Unlike :meth:`~Dataset.split_proportionately`, which lets you split a
dataset into different sizes, :meth:`Dataset.split` splits a dataset
into approximately equal splits.
:meth:`Dataset.split_at_indices`
:meth:`Dataset.split_proportionately` uses this method under the hood.
:meth:`Dataset.streaming_split`.
Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split`
doesn't materialize the dataset in memory.
"""

if len(proportions) < 1:
Expand Down Expand Up @@ -1572,16 +1637,20 @@ def train_test_split(
Args:
test_size: If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the test split. If int,
represents the absolute number of test samples. The train split will
always be the compliment of the test split.
represents the absolute number of test samples. The train split
always complements the test split.
shuffle: Whether or not to globally shuffle the dataset before splitting.
Defaults to False. This may be a very expensive operation with large
dataset.
Defaults to ``False``. This may be a very expensive operation with a
large dataset.
seed: Fix the random seed to use for shuffle, otherwise one is chosen
based on system randomness. Ignored if ``shuffle=False``.
Returns:
Train and test subsets as two MaterializedDatasets.
Train and test subsets as two ``MaterializedDatasets``.
.. seealso::
:meth:`Dataset.split_proportionately`
"""
ds = self

Expand All @@ -1607,24 +1676,32 @@ def train_test_split(
)
return ds.split_at_indices([ds_length - test_size])

@ConsumptionAPI(pattern="Args:")
@ConsumptionAPI
def union(self, *other: List["Dataset"]) -> "Dataset":
"""Materialize and combine this dataset with others of the same type.
"""Materialize and concatenate :class:`Datasets <ray.data.Dataset>` across rows.
The order of the blocks in the datasets is preserved, as is the
relative ordering between the datasets passed in the argument list.
.. note::
Unioned datasets are not lineage-serializable, i.e. they can not be
.. caution::
Unioned datasets aren't lineage-serializable. As a result, they can't be
used as a tunable hyperparameter in Ray Tune.
Examples:
>>> import ray
>>> ds1 = ray.data.range(2)
>>> ds2 = ray.data.range(3)
>>> ds1.union(ds2).take_all()
[{'id': 0}, {'id': 1}, {'id': 0}, {'id': 1}, {'id': 2}]
Args:
other: List of datasets to combine with this one. The datasets
must have the same schema as this dataset, otherwise the
behavior is undefined.
Returns:
A new dataset holding the union of their data.
A new dataset holding the rows of the input datasets.
"""

start_time = time.perf_counter()
Expand Down Expand Up @@ -2099,17 +2176,18 @@ def sort(self, key: Optional[str] = None, descending: bool = False) -> "Dataset"
return Dataset(plan, self._epoch, self._lazy, logical_plan)

def zip(self, other: "Dataset") -> "Dataset":
"""Materialize and zip this dataset with the elements of another.
"""Materialize and zip the columns of this dataset with the columns of another.
The datasets must have the same number of rows. Their column sets are
merged, and any duplicate column names disambiguated with _1, _2, etc. suffixes.
merged, and any duplicate column names are disambiguated with suffixes like
``"_1"``.
.. note::
The smaller of the two datasets are repartitioned to align the number
The smaller of the two datasets is repartitioned to align the number
of rows per block with the larger dataset.
.. note::
Zipped datasets are not lineage-serializable, i.e. they can not be used
Zipped datasets aren't lineage-serializable. As a result, they can't be used
as a tunable hyperparameter in Ray Tune.
Examples:
Expand All @@ -2125,9 +2203,9 @@ def zip(self, other: "Dataset") -> "Dataset":
other: The dataset to zip with on the right hand side.
Returns:
A ``Dataset`` containing the columns of the second dataset
A :class:`Dataset` containing the columns of the second dataset
concatenated horizontally with the columns of the first dataset,
with duplicate column names disambiguated with _1, _2, etc. suffixes.
with duplicate column names disambiguated with suffixes like ``"_1"``.
"""

plan = self._plan.with_stage(ZipStage(other))
Expand Down

0 comments on commit beae077

Please sign in to comment.