Skip to content

Commit

Permalink
[Data] Introduce concurrency argument to replace ComputeStrategy in…
Browse files Browse the repository at this point in the history
… map-like APIs (#41461)

Generated doc for review - https://anyscale-ray--41461.com.readthedocs.build/en/41461/data/transforming-data.html#transforming-with-python-class .

This PR is to add an extra `concurrency` argument into all map-like APIs (`map_batches`, `map`, `filter`, `flat_map`, `add_column`, `drop_columns`, `select_columns`), with the motivation to deprecate `compute` argument.

The typing for new `concurrency` is
```
Optional[Union[int, Tuple[int, int]]]
```

So it allows user to set a fixed-sized actors pool, or an auto-scaling actors pool. For 2.9, the `compute` argument would still work, but will print out a warning message for users to migrate to use `concurrency`. So this PR does not break any existing code and maintains backward compatibility.

Several other alternatives:
* Use two arguments `min_concurrency`, `max_concurrency`: `max_concurrency` is already a reserved parameter for Ray Core. This represents the number of concurrent actor tasks in Ray Core. So this would introduce extra confusion for users. In addition, we are recommending our users to use a fixed-sized actors pool for now. These two arguments are only useful for auto-scaling actors pool.

* Introduce a class like `ConcurrencyOption`: Do not see a need right now, and it would go back to have same issue with `ActorPoolStrategy`. We can always overload the type of `concurrency` and add more new types later, without breaking backward compatibility.

* Overload the type of existing argument `compute`: This would also work and requires minimal change from user side. The naming of `compute` is more vague than `concurrency` though.

Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: Stephanie Wang <[email protected]>
  • Loading branch information
c21 and stephanie-wang committed Dec 4, 2023
1 parent 19bedd1 commit c71f43c
Show file tree
Hide file tree
Showing 15 changed files with 465 additions and 273 deletions.
2 changes: 1 addition & 1 deletion doc/source/data/examples/gptj_batch_prediction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](transforming_data_actors). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](stateful_transforms). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](transforming_data_actors). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](stateful_transforms). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
]
},
{
Expand Down
164 changes: 79 additions & 85 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This guide shows you how to:

* :ref:`Transform rows <transforming_rows>`
* :ref:`Transform batches <transforming_batches>`
* :ref:`Stateful Transforms <stateful_transforms>`
* :ref:`Groupby and transform groups <transforming_groupby>`
* :ref:`Shuffle rows <shuffling_rows>`
* :ref:`Repartition data <repartitioning_data>`
Expand Down Expand Up @@ -84,22 +85,6 @@ Transforming batches
If your transformation is vectorized like most NumPy or pandas operations, transforming
batches is more performant than transforming rows.

Choosing between tasks and actors
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data transforms batches with either tasks or actors. Actors perform setup exactly
once. In contrast, tasks require setup every batch. So, if your transformation involves
expensive setup like downloading model weights, use actors. Otherwise, use tasks.

To learn more about tasks and actors, read the
:ref:`Ray Core Key Concepts <core-key-concepts>`.

Transforming batches with tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To transform batches with tasks, call :meth:`~ray.data.Dataset.map_batches`. Ray Data
uses tasks by default.

.. testcode::

from typing import Dict
Expand All @@ -115,19 +100,87 @@ uses tasks by default.
.map_batches(increase_brightness)
)

.. _transforming_data_actors:
.. _configure_batch_format:

Transforming batches with actors
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Configuring batch format
~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. By
default, Ray Data represents batches as dicts of NumPy ndarrays.

To transform batches with actors, complete these steps:
To configure the batch type, specify ``batch_format`` in
:meth:`~ray.data.Dataset.map_batches`. You can return either format from your function.

1. Implement a class. Perform setup in ``__init__`` and transform data in ``__call__``.
.. tab-set::

.. tab-item:: NumPy

.. testcode::

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch

ds = (
ray.data.read_images("s3:https://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness, batch_format="numpy")
)

2. Create an :class:`~ray.data.ActorPoolStrategy` and configure the number of concurrent
workers. Each worker transforms a partition of data.
.. tab-item:: pandas

3. Call :meth:`~ray.data.Dataset.map_batches` and pass your ``ActorPoolStrategy`` to ``compute``.
.. testcode::

import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
return batch.dropna()

ds = (
ray.data.read_csv("s3:https://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="pandas")
)

Configuring batch size
~~~~~~~~~~~~~~~~~~~~~~

Increasing ``batch_size`` improves the performance of vectorized transformations like
NumPy functions and model inference. However, if your batch size is too large, your
program might run out of memory. If you encounter an out-of-memory error, decrease your
``batch_size``.

.. note::

The default batch size depends on your resource type. If you're using CPUs,
the default batch size is 4096. If you're using GPUs, you must specify an explicit
batch size.

.. _stateful_transforms:

Stateful Transforms
==============================

If your transform requires expensive setup such as downloading
model weights, use a callable Python class instead of a function to make the transform stateful. When a Python class
is used, the ``__init__`` method is called to perform setup exactly once on each worker.
In contrast, functions are stateless, so any setup must be performed for each data item.

Internally, Ray Data uses tasks to execute functions, and uses actors to execute classes.
To learn more about tasks and actors, read the
:ref:`Ray Core Key Concepts <core-key-concepts>`.

To transform data with a Python class, complete these steps:

1. Implement a class. Perform setup in ``__init__`` and transform data in ``__call__``.

2. Call :meth:`~ray.data.Dataset.map_batches`, :meth:`~ray.data.Dataset.map`, or
:meth:`~ray.data.Dataset.flat_map`. Pass the number of concurrent workers to use with the ``concurrency`` argument. Each worker transforms a partition of data in parallel.
Fixing the number of concurrent workers gives the most predictable performance, but you can also pass a tuple of ``(min, max)`` to allow Ray Data to automatically
scale the number of concurrent workers.

.. tab-set::

Expand All @@ -154,7 +207,7 @@ To transform batches with actors, complete these steps:

ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
.map_batches(TorchPredictor, concurrency=2)
)

.. testcode::
Expand Down Expand Up @@ -188,7 +241,7 @@ To transform batches with actors, complete these steps:
.map_batches(
TorchPredictor,
# Two workers with one GPU each
compute=ray.data.ActorPoolStrategy(size=2),
concurrency=2,
# Batch size is required if you're using GPUs.
batch_size=4,
num_gpus=1
Expand All @@ -200,65 +253,6 @@ To transform batches with actors, complete these steps:

ds.materialize()

.. _configure_batch_format:

Configuring batch format
~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. By
default, Ray Data represents batches as dicts of NumPy ndarrays.

To configure the batch type, specify ``batch_format`` in
:meth:`~ray.data.Dataset.map_batches`. You can return either format from your function.

.. tab-set::

.. tab-item:: NumPy

.. testcode::

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch

ds = (
ray.data.read_images("s3:https://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness, batch_format="numpy")
)

.. tab-item:: pandas

.. testcode::

import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
return batch.dropna()

ds = (
ray.data.read_csv("s3:https://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="pandas")
)

Configuring batch size
~~~~~~~~~~~~~~~~~~~~~~

Increasing ``batch_size`` improves the performance of vectorized transformations like
NumPy functions and model inference. However, if your batch size is too large, your
program might run out of memory. If you encounter an out-of-memory error, decrease your
``batch_size``.

.. note::

The default batch size depends on your resource type. If you're using CPUs,
the default batch size is 4096. If you're using GPUs, you must specify an explicit
batch size.

.. _transforming_groupby:

Groupby and transforming groups
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-images.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Finally, call :meth:`Dataset.map_batches() <ray.data.Dataset.map_batches>`.

For more information on performing inference, see
:ref:`End-to-end: Offline Batch Inference <batch_inference_home>`
and :ref:`Transforming batches with actors <transforming_data_actors>`.
and :ref:`Stateful Transforms <stateful_transforms>`.

.. _saving_images:

Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-text.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ that sets up and invokes a model. Then, call

For more information on performing inference, see
:ref:`End-to-end: Offline Batch Inference <batch_inference_home>`
and :ref:`Transforming batches with actors <transforming_data_actors>`.
and :ref:`Stateful Transforms <stateful_transforms>`.

.. _saving-text:

Expand Down
11 changes: 9 additions & 2 deletions doc/source/ray-core/_examples/datasets_train/datasets_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,18 @@ def column_standard_scaler(s: pd.Series):


def inference(
dataset, model_cls: type, batch_size: int, result_path: str, use_gpu: bool
dataset,
load_model_func,
model_cls: type,
batch_size: int,
result_path: str,
use_gpu: bool,
):
print("inferencing...")
num_gpus = 1 if use_gpu else 0
dataset.map_batches(
model_cls,
fn_constructor_args=[load_model_func],
compute=ray.data.ActorPoolStrategy(),
batch_size=batch_size,
batch_format="pandas",
Expand Down Expand Up @@ -699,7 +705,8 @@ def __call__(self, batch) -> "pd.DataFrame":
)
inference(
inference_dataset,
BatchInferModel(load_model_func),
load_model_func,
BatchInferModel,
100,
inference_output_path,
use_gpu,
Expand Down
2 changes: 0 additions & 2 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
)
from ray.data._internal.stage_impl import LimitStage, RandomizeBlocksStage
from ray.data._internal.stats import DatasetStats, StatsDict
from ray.data._internal.util import validate_compute
from ray.data.block import Block, BlockMetadata, CallableClass, List
from ray.data.context import DataContext
from ray.data.datasource import ReadTask
Expand Down Expand Up @@ -323,7 +322,6 @@ def _stage_to_operator(stage: Stage, input_op: PhysicalOperator) -> PhysicalOper

if isinstance(stage, OneToOneStage):
compute = get_compute(stage.compute)
validate_compute(stage.fn, compute)

block_fn = stage.block_fn
if stage.fn:
Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
MapRows,
)
from ray.data._internal.numpy_support import is_valid_udf_return
from ray.data._internal.util import _truncated_repr, validate_compute
from ray.data._internal.util import _truncated_repr
from ray.data.block import (
Block,
BlockAccessor,
Expand All @@ -51,7 +51,6 @@ def plan_udf_map_op(
"""

compute = get_compute(op._compute)
validate_compute(op._fn, compute)
fn, init_fn = _parse_op_fn(op)

if isinstance(op, MapBatches):
Expand Down
Loading

0 comments on commit c71f43c

Please sign in to comment.