diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index e3f20a7dfb16e..665e9380877bc 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1127,52 +1127,70 @@ def streaming_split( """Returns ``n`` :class:`DataIterators ` that can be used to read disjoint subsets of the dataset in parallel. - This method is the recommended way to consume Datasets from multiple - processes (e.g., for distributed training), and requires streaming execution - mode. + This method is the recommended way to consume :class:`Datasets ` for + distributed training. - Streaming split works by delegating the execution of this Dataset to a + Streaming split works by delegating the execution of this :class:`Dataset` to a coordinator actor. The coordinator pulls block references from the executed - stream, and divides those blocks among `n` output iterators. Iterators pull - blocks from the coordinator actor to return to their caller on `next`. + stream, and divides those blocks among ``n`` output iterators. Iterators pull + blocks from the coordinator actor to return to their caller on ``next``. The returned iterators are also repeatable; each iteration will trigger a new execution of the Dataset. There is an implicit barrier at the start of each iteration, which means that `next` must be called on all iterators before the iteration starts. - Warning: because iterators are pulling blocks from the same Dataset - execution, if one iterator falls behind other iterators may be stalled. + .. warning:: + + Because iterators are pulling blocks from the same :class:`Dataset` + execution, if one iterator falls behind, other iterators may be stalled. Examples: - >>> import ray - >>> ds = ray.data.range(1000000) - >>> it1, it2 = ds.streaming_split(2, equal=True) - - >>> # Can consume from both iterators in parallel. - >>> @ray.remote - ... def consume(it): - ... for batch in it.iter_batches(): - ... print(batch) - >>> ray.get([consume.remote(it1), consume.remote(it2)]) # doctest: +SKIP - - >>> # Can loop over the iterators multiple times (multiple epochs). - >>> @ray.remote - ... def train(it): - ... NUM_EPOCHS = 100 - ... for _ in range(NUM_EPOCHS): - ... for batch in it.iter_batches(): - ... print(batch) - >>> ray.get([train.remote(it1), train.remote(it2)]) # doctest: +SKIP - - >>> # ERROR: this will block waiting for a read on `it2` to start. - >>> ray.get(train.remote(it1)) # doctest: +SKIP + + .. testcode:: + + import ray + + ds = ray.data.range(100) + it1, it2 = ds.streaming_split(2, equal=True) + + Consume data from iterators in parallel. + + .. testcode:: + + @ray.remote + def consume(it): + for batch in it.iter_batches(): + pass + + ray.get([consume.remote(it1), consume.remote(it2)]) + + You can loop over the iterators multiple times (multiple epochs). + + .. testcode:: + + @ray.remote + def train(it): + NUM_EPOCHS = 2 + for _ in range(NUM_EPOCHS): + for batch in it.iter_batches(): + pass + + ray.get([train.remote(it1), train.remote(it2)]) + + The following remote function call blocks waiting for a read on ``it2`` to + start. + + .. testcode:: + :skipif: True + + ray.get(train.remote(it1)) Args: n: Number of output iterators to return. - equal: If True, each output iterator will see an exactly equal number - of rows, dropping data if necessary. If False, some iterators may see - slightly more or less rows than other, but no data is dropped. + equal: If ``True``, each output iterator sees an exactly equal number + of rows, dropping data if necessary. If ``False``, some iterators may + see slightly more or less rows than others, but no data is dropped. locality_hints: Specify the node ids corresponding to each iterator location. Dataset will try to minimize data movement based on the iterator output locations. This list must have length ``n``. You can @@ -1182,6 +1200,12 @@ def streaming_split( Returns: The output iterator splits. These iterators are Ray-serializable and can be freely passed to any Ray task or actor. + + .. seealso:: + + :meth:`Dataset.split` + Unlike :meth:`~Dataset.streaming_split`, :meth:`~Dataset.split` + materializes the dataset in memory. """ return StreamSplitDataIterator.create(self, n, equal, locality_hints) @@ -1191,34 +1215,52 @@ def split( ) -> List["MaterializedDataset"]: """Materialize and split the dataset into ``n`` disjoint pieces. - This returns a list of MaterializedDatasets that can be passed to Ray tasks - and actors and used to read the dataset records in parallel. + This method returns a list of ``MaterializedDataset`` that can be passed to Ray + Tasks and Actors and used to read the dataset rows in parallel. Examples: - >>> import ray - >>> ds = ray.data.range(100) # doctest: +SKIP - >>> workers = ... # doctest: +SKIP - >>> # Split up a dataset to process over `n` worker actors. - >>> shards = ds.split(len(workers), locality_hints=workers) # doctest: +SKIP - >>> for shard, worker in zip(shards, workers): # doctest: +SKIP - ... worker.consume.remote(shard) # doctest: +SKIP - Time complexity: O(1) + .. testcode:: + + @ray.remote + class Worker: - See also: ``Dataset.split_at_indices``, ``Dataset.split_proportionately``, - and ``Dataset.streaming_split``. + def train(self, data_iterator): + for batch in data_iterator.iter_batches(batch_size=8): + pass + + workers = [Worker.remote() for _ in range(4)] + shards = ray.data.range(100).split(n=4, equal=True) + ray.get([w.train.remote(s) for w, s in zip(workers, shards)]) + + Time complexity: O(1) Args: n: Number of child datasets to return. equal: Whether to guarantee each split has an equal - number of records. This may drop records if they cannot be + number of records. This might drop records if the rows can't be divided equally among the splits. locality_hints: [Experimental] A list of Ray actor handles of size ``n``. - The system will try to co-locate the blocks of the i-th dataset + The system tries to co-locate the blocks of the i-th dataset with the i-th actor to maximize data locality. Returns: A list of ``n`` disjoint dataset splits. + + .. seealso:: + + :meth:`Dataset.split_at_indices` + Unlike :meth:`~Dataset.split`, which splits a dataset into approximately + equal splits, :meth:`Dataset.split_proportionately` lets you split a + dataset into different sizes. + + :meth:`Dataset.split_proportionately` + This method is equivalent to :meth:`Dataset.split_at_indices` if + you compute indices manually. + + :meth:`Dataset.streaming_split`. + Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split` + doesn't materialize the dataset in memory. """ if n <= 0: raise ValueError(f"The number of splits {n} is not positive.") @@ -1408,7 +1450,7 @@ def build_node_id_by_actor(actors: List[Any]) -> Dict[Any, str]: @ConsumptionAPI def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]: - """Materialize and split the dataset at the given indices (like np.split). + """Materialize and split the dataset at the given indices (like ``np.split``). Examples: >>> import ray @@ -1423,16 +1465,28 @@ def split_at_indices(self, indices: List[int]) -> List["MaterializedDataset"]: Time complexity: O(num splits) - See also: ``Dataset.split_at_indices``, ``Dataset.split_proportionately``, - and ``Dataset.streaming_split``. - Args: indices: List of sorted integers which indicate where the dataset - is split. If an index exceeds the length of the dataset, + are split. If an index exceeds the length of the dataset, an empty dataset is returned. Returns: The dataset splits. + + .. seealso:: + + :meth:`Dataset.split` + Unlike :meth:`~Dataset.split_at_indices`, which lets you split a + dataset into different sizes, :meth:`Dataset.split` splits a dataset + into approximately equal splits. + + :meth:`Dataset.split_proportionately` + This method is equivalent to :meth:`Dataset.split_at_indices` if + you compute indices manually. + + :meth:`Dataset.streaming_split`. + Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split` + doesn't materialize the dataset in memory. """ if len(indices) < 1: @@ -1486,16 +1540,16 @@ def split_proportionately( ) -> List["MaterializedDataset"]: """Materialize and split the dataset using proportions. - A common use case for this would be splitting the dataset into train + A common use case for this is splitting the dataset into train and test sets (equivalent to eg. scikit-learn's ``train_test_split``). - See also ``Dataset.train_test_split`` for a higher level abstraction. + For a higher level abstraction, see :meth:`Dataset.train_test_split`. - The indices to split at is calculated in such a way so that all splits - always contains at least one element. If that is not possible, + This method splits datasets so that all splits + always contains at least one row. If that isn't possible, an exception is raised. This is equivalent to caulculating the indices manually and calling - ``Dataset.split_at_indices``. + :meth:`Dataset.split_at_indices`. Examples: >>> import ray @@ -1510,16 +1564,27 @@ def split_proportionately( Time complexity: O(num splits) - See also: ``Dataset.split``, ``Dataset.split_at_indices``, - ``Dataset.train_test_split`` - Args: proportions: List of proportions to split the dataset according to. - Must sum up to less than 1, and each proportion has to be bigger + Must sum up to less than 1, and each proportion must be bigger than 0. Returns: The dataset splits. + + .. seealso:: + + :meth:`Dataset.split` + Unlike :meth:`~Dataset.split_proportionately`, which lets you split a + dataset into different sizes, :meth:`Dataset.split` splits a dataset + into approximately equal splits. + + :meth:`Dataset.split_at_indices` + :meth:`Dataset.split_proportionately` uses this method under the hood. + + :meth:`Dataset.streaming_split`. + Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split` + doesn't materialize the dataset in memory. """ if len(proportions) < 1: @@ -1572,16 +1637,20 @@ def train_test_split( Args: test_size: If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, - represents the absolute number of test samples. The train split will - always be the compliment of the test split. + represents the absolute number of test samples. The train split + always complements the test split. shuffle: Whether or not to globally shuffle the dataset before splitting. - Defaults to False. This may be a very expensive operation with large - dataset. + Defaults to ``False``. This may be a very expensive operation with a + large dataset. seed: Fix the random seed to use for shuffle, otherwise one is chosen based on system randomness. Ignored if ``shuffle=False``. Returns: - Train and test subsets as two MaterializedDatasets. + Train and test subsets as two ``MaterializedDatasets``. + + .. seealso:: + + :meth:`Dataset.split_proportionately` """ ds = self @@ -1607,24 +1676,32 @@ def train_test_split( ) return ds.split_at_indices([ds_length - test_size]) - @ConsumptionAPI(pattern="Args:") + @ConsumptionAPI def union(self, *other: List["Dataset"]) -> "Dataset": - """Materialize and combine this dataset with others of the same type. + """Materialize and concatenate :class:`Datasets ` across rows. The order of the blocks in the datasets is preserved, as is the relative ordering between the datasets passed in the argument list. - .. note:: - Unioned datasets are not lineage-serializable, i.e. they can not be + .. caution:: + Unioned datasets aren't lineage-serializable. As a result, they can't be used as a tunable hyperparameter in Ray Tune. + Examples: + + >>> import ray + >>> ds1 = ray.data.range(2) + >>> ds2 = ray.data.range(3) + >>> ds1.union(ds2).take_all() + [{'id': 0}, {'id': 1}, {'id': 0}, {'id': 1}, {'id': 2}] + Args: other: List of datasets to combine with this one. The datasets must have the same schema as this dataset, otherwise the behavior is undefined. Returns: - A new dataset holding the union of their data. + A new dataset holding the rows of the input datasets. """ start_time = time.perf_counter() @@ -2099,17 +2176,18 @@ def sort(self, key: Optional[str] = None, descending: bool = False) -> "Dataset" return Dataset(plan, self._epoch, self._lazy, logical_plan) def zip(self, other: "Dataset") -> "Dataset": - """Materialize and zip this dataset with the elements of another. + """Materialize and zip the columns of this dataset with the columns of another. The datasets must have the same number of rows. Their column sets are - merged, and any duplicate column names disambiguated with _1, _2, etc. suffixes. + merged, and any duplicate column names are disambiguated with suffixes like + ``"_1"``. .. note:: - The smaller of the two datasets are repartitioned to align the number + The smaller of the two datasets is repartitioned to align the number of rows per block with the larger dataset. .. note:: - Zipped datasets are not lineage-serializable, i.e. they can not be used + Zipped datasets aren't lineage-serializable. As a result, they can't be used as a tunable hyperparameter in Ray Tune. Examples: @@ -2125,9 +2203,9 @@ def zip(self, other: "Dataset") -> "Dataset": other: The dataset to zip with on the right hand side. Returns: - A ``Dataset`` containing the columns of the second dataset + A :class:`Dataset` containing the columns of the second dataset concatenated horizontally with the columns of the first dataset, - with duplicate column names disambiguated with _1, _2, etc. suffixes. + with duplicate column names disambiguated with suffixes like ``"_1"``. """ plan = self._plan.with_stage(ZipStage(other))