Skip to content

Commit

Permalink
[Data] Remove ActorPoolStrategy from documentation (#41948)
Browse files Browse the repository at this point in the history
This PR is to remove `ActorPoolStrategy`, and replace it with `concurrency` parameter in all documentations.

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Dec 15, 2023
1 parent 7b1aebc commit 4579860
Show file tree
Hide file tree
Showing 17 changed files with 51 additions and 60 deletions.
1 change: 0 additions & 1 deletion doc/source/data/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ Execution
:toctree: doc/

Dataset.materialize
ActorPoolStrategy

Serialization
-------------
Expand Down
42 changes: 19 additions & 23 deletions doc/source/data/batch_inference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ For how to configure batch inference, see :ref:`the configuration guide<batch_in
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch

# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(HuggingFacePredictor, compute=scale)
# Step 4: Show one prediction output.
predictions = ds.map_batches(HuggingFacePredictor, concurrency=2)
# Step 3: Show one prediction output.
predictions.show(limit=1)

.. testoutput::
Expand Down Expand Up @@ -122,12 +121,11 @@ For how to configure batch inference, see :ref:`the configuration guide<batch_in
# Get the predictions from the input batch.
return {"output": self.model(tensor).numpy()}

# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(TorchPredictor, compute=scale)
# Step 4: Show one prediction output.
predictions = ds.map_batches(TorchPredictor, concurrency=2)
# Step 3: Show one prediction output.
predictions.show(limit=1)

.. testoutput::
Expand Down Expand Up @@ -168,12 +166,11 @@ For how to configure batch inference, see :ref:`the configuration guide<batch_in
# Get the predictions from the input batch.
return {"output": self.model(batch["data"]).numpy()}

# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(TFPredictor, compute=scale)
# Step 4: Show one prediction output.
predictions = ds.map_batches(TFPredictor, concurrency=2)
# Step 3: Show one prediction output.
predictions.show(limit=1)

.. testoutput::
Expand Down Expand Up @@ -239,8 +236,8 @@ The remaining is the same as the :ref:`Quickstart <batch_inference_quickstart>`.
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the ActorPool size to the number of GPUs in your cluster.
compute=ray.data.ActorPoolStrategy(size=2),
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
)
predictions.show(limit=1)

Expand Down Expand Up @@ -287,8 +284,8 @@ The remaining is the same as the :ref:`Quickstart <batch_inference_quickstart>`.
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the ActorPool size to the number of GPUs in your cluster.
compute=ray.data.ActorPoolStrategy(size=2)
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
)
predictions.show(limit=1)

Expand Down Expand Up @@ -331,8 +328,8 @@ The remaining is the same as the :ref:`Quickstart <batch_inference_quickstart>`.
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the ActorPool size to the number of GPUs in your cluster.
compute=ray.data.ActorPoolStrategy(size=2)
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
)
predictions.show(limit=1)

Expand Down Expand Up @@ -420,8 +417,8 @@ Suppose your cluster has 4 nodes, each with 16 CPUs. To limit to at most
HuggingFacePredictor,
# Require 5 CPUs per actor (so at most 3 can fit per 16 CPU node).
num_cpus=5,
# 3 actors per node, with 4 nodes in the cluster means ActorPool size of 12.
compute=ray.data.ActorPoolStrategy(size=12)
# 3 actors per node, with 4 nodes in the cluster means concurrency of 12.
concurrency=12,
)
predictions.show(limit=1)

Expand Down Expand Up @@ -497,13 +494,12 @@ The rest of the logic looks the same as in the `Quickstart <#quickstart>`_.
return {"predictions": self.model.predict(dmatrix)}


# Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Map the Predictor over the Dataset to get predictions.
predictions = test_dataset.map_batches(
XGBoostPredictor,
compute=scale,
concurrency=2,
batch_format="pandas",
# Pass in the Checkpoint to the XGBoostPredictor constructor.
fn_constructor_kwargs={"checkpoint": checkpoint}
Expand Down
7 changes: 6 additions & 1 deletion doc/source/data/data-internals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,15 @@ The following code is a hello world example which invokes the execution with
time.sleep(0.1)
return x
class SleepClass():
def __call__(self, x):
time.sleep(0.1)
return x
for _ in (
ray.data.range_tensor(5000, shape=(80, 80, 3), parallelism=200)
.map_batches(sleep, num_cpus=2)
.map_batches(sleep, compute=ray.data.ActorPoolStrategy(min_size=2, max_size=4))
.map_batches(SleepClass, concurrency=(2, 4))
.map_batches(sleep, num_cpus=1)
.iter_batches()
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@
"source": [
"Then we use the {meth}`map_batches <ray.data.Dataset.map_batches>` API to apply the model to the whole dataset. \n",
"\n",
"The first parameter of `map` and `map_batches` is the user-defined function (UDF), which can either be a function or a class. Function-based UDFs will run as short-running [Ray tasks](https://docs.ray.io/en/latest/ray-core/key-concepts.html#tasks), and class-based UDFs will run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). For class-based UDFs, we use the `compute` argument to specify {class}`ActorPoolStrategy <ray.data.dataset_internal.compute.ActorPoolStrategy>` with the number of parallel actors. And the `batch_size` argument indicates the number of images in each batch.\n",
"The first parameter of `map` and `map_batches` is the user-defined function (UDF), which can either be a function or a class. Function-based UDFs run as short-running [Ray tasks](https://docs.ray.io/en/latest/ray-core/key-concepts.html#tasks), and class-based UDFs run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). For class-based UDFs, use the `concurrency` argument to specify the number of parallel actors. The `batch_size` argument indicates the number of images in each batch.\n",
"\n",
"The `num_gpus` argument specifies the number of GPUs needed for each `ObjectDetectionModel` instance. The Ray scheduler can handle heterogeous resource requirements in order to maximize the resource utilization. In this case, the `ObjectDetectionModel` instances will run on GPU and `preprocess_image` instances will run on CPU."
]
Expand All @@ -932,7 +932,7 @@
"source": [
"ds = ds.map_batches(\n",
" ObjectDetectionModel,\n",
" compute=ray.data.ActorPoolStrategy(size=4), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" concurrency=4, # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" batch_size=4, # Use the largest batch size that can fit in GPU memory.\n",
" num_gpus=1, # Specify 1 GPU per model replica. Remove this if you are doing CPU inference.\n",
")"
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/examples/gptj_batch_prediction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
" batch_size=4,\n",
" fn_constructor_kwargs=dict(model_id=model_id, revision=revision),\n",
" batch_format=\"pandas\",\n",
" compute=ray.data.ActorPoolStrategy(),\n",
" concurrency=1,\n",
" num_gpus=1,\n",
" )\n",
")"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@
"source": [
"Then we use the {meth}`map_batches <ray.data.Dataset.map_batches>` API to apply the model to the whole dataset. \n",
"\n",
"The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Since we are using a class in this case, the UDF will run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). For class-based UDFs, we use the `compute` argument to specify {class}`ActorPoolStrategy <ray.data.dataset_internal.compute.ActorPoolStrategy>` with the number of parallel actors. And the `batch_size` argument indicates the number of images in each batch.\n",
"The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Because this case uses a class, the UDF runs as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). For class-based UDFs, use the `concurrency` argument to specify the number of parallel actors. The `batch_size` argument indicates the number of images in each batch.\n",
"\n",
"The `num_gpus` argument specifies the number of GPUs needed for each `ImageClassifier` instance. In this case, we want 1 GPU for each model replica."
]
Expand All @@ -368,7 +368,7 @@
"source": [
"predictions = ds.map_batches(\n",
" ImageClassifier,\n",
" compute=ray.data.ActorPoolStrategy(size=4), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" concurrency=4, # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" num_gpus=1, # Specify 1 GPU per model replica.\n",
" batch_size=BATCH_SIZE # Use the largest batch size that can fit on our GPUs\n",
")"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@
"source": [
"Then we use the {meth}`~ray.data.Dataset.map_batches` API to apply the model to the whole dataset.\n",
"\n",
"The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Since we are using a class in this case, the UDF will run as long-running [Ray actors](actor-guide). For class-based UDFs, we use the `compute` argument to specify {class}`~ray.data.ActorPoolStrategy` with the number of parallel actors.\n",
"The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Because this case uses a class, the UDF runs as long-running [Ray actors](actor-guide). For class-based UDFs, use the `concurrency` argument to specify the number of parallel actors.\n",
"\n",
"The `batch_size` argument indicates the number of images in each batch. See the Ray dashboard\n",
"for GPU memory usage to experiment with the `batch_size` when using your own model and dataset.\n",
Expand All @@ -458,9 +458,7 @@
"source": [
"predictions = transformed_ds.map_batches(\n",
" ResnetModel,\n",
" compute=ray.data.ActorPoolStrategy(\n",
" size=4\n",
" ), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" concurrency=4, # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" num_gpus=1, # Specify 1 GPU per model replica.\n",
" batch_size=720, # Use the largest batch size that can fit on our GPUs\n",
")\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
" PredictCallable,\n",
" batch_size=1,\n",
" fn_constructor_kwargs=dict(model_id=model_id),\n",
" compute=ray.data.ActorPoolStrategy(),\n",
" concurrency=1,\n",
" batch_format=\"pandas\",\n",
" num_gpus=1,\n",
")\n",
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-images.rst
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Finally, call :meth:`Dataset.map_batches() <ray.data.Dataset.map_batches>`.

predictions = ds.map_batches(
ImageClassifier,
compute=ray.data.ActorPoolStrategy(size=2),
concurrency=2,
batch_size=4
)
predictions.show(3)
Expand Down
7 changes: 3 additions & 4 deletions doc/source/data/working-with-pytorch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,11 @@ With Ray Datasets, you can do scalable offline batch inference with Torch models
# Get the predictions from the input batch.
return {"output": self.model(tensor).numpy()}

# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(TorchPredictor, compute=scale)
# Step 4: Show one prediction output.
predictions = ds.map_batches(TorchPredictor, concurrency=2)
# Step 3: Show one prediction output.
predictions.show(limit=1)

.. testoutput::
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-text.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ that sets up and invokes a model. Then, call

ds = (
ray.data.read_text("s3:https://anonymous@ray-example-data/this.txt")
.map_batches(TextClassifier, compute=ray.data.ActorPoolStrategy(size=2))
.map_batches(TextClassifier, concurrency=2)
)

ds.show(3)
Expand Down
6 changes: 2 additions & 4 deletions doc/source/templates/01_batch_inference/start.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@
"source": [
"Then we use the [`map_batches`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html) API to apply the model to the whole dataset.\n",
"\n",
"The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Since we are using a class in this case, the UDF will run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/actors.html). For class-based UDFs, we use the `compute` argument to specify [`ActorPoolStrategy`](https://docs.ray.io/en/latest/data/api/doc/ray.data.ActorPoolStrategy.html) with the number of parallel actors.\n",
"The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Because this class uses a class, the UDF runs as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/actors.html). For class-based UDFs, use the `concurrency` argument to specify the number of parallel actors.\n",
"\n",
"The `batch_size` argument indicates the number of images in each batch. See the Ray dashboard\n",
"for GPU memory usage to experiment with the `batch_size` when using your own model and dataset.\n",
Expand All @@ -458,9 +458,7 @@
"source": [
"predictions = transformed_ds.map_batches(\n",
" ResnetModel,\n",
" compute=ray.data.ActorPoolStrategy(\n",
" size=4\n",
" ), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" concurrency=4, # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" num_gpus=1, # Specify 1 GPU per model replica.\n",
" batch_size=720, # Use the largest batch size that can fit on our GPUs\n",
")\n"
Expand Down
3 changes: 1 addition & 2 deletions doc/source/train/examples/lightgbm/lightgbm_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@
"source": [
"import pandas as pd\n",
"from ray.train import Checkpoint\n",
"from ray.data import ActorPoolStrategy\n",
"\n",
"\n",
"class Predict:\n",
Expand All @@ -186,7 +185,7 @@
" scores = test_dataset.map_batches(\n",
" Predict, \n",
" fn_constructor_args=[result.checkpoint], \n",
" compute=ActorPoolStrategy(), \n",
" concurrency=1, \n",
" batch_format=\"pandas\"\n",
" )\n",
" \n",
Expand Down
3 changes: 1 addition & 2 deletions doc/source/train/examples/xgboost/xgboost_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@
"source": [
"import pandas as pd\n",
"from ray.train import Checkpoint\n",
"from ray.data import ActorPoolStrategy\n",
"\n",
"\n",
"class Predict:\n",
Expand All @@ -209,7 +208,7 @@
" scores = test_dataset.map_batches(\n",
" Predict, \n",
" fn_constructor_args=[result.checkpoint], \n",
" compute=ActorPoolStrategy(), \n",
" concurrency=1, \n",
" batch_format=\"pandas\"\n",
" )\n",
" \n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ExecutionOptions:
operators under the streaming executor. The bulk executor always preserves
order. Off by default.
actor_locality_enabled: Whether to enable locality-aware task dispatch to
actors (on by default). This applies to both ActorPoolStrategy map and
actors (on by default). This parameter applies to both stateful map and
streaming_split operations.
verbose_progress: Whether to report progress individually per operator. By
default, only AllToAll operators and global progress is reported. This
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ def get_compute_strategy(
"The argument ``compute`` is deprecated in Ray 2.9. Please specify "
"argument ``concurrency`` instead. For more information, see "
"https://docs.ray.io/en/master/data/transforming-data.html#"
"transforming-with-python-class."
"stateful-transforms."
)
if is_callable_class and (
compute == "tasks" or isinstance(compute, TaskPoolStrategy)
Expand Down
Loading

0 comments on commit 4579860

Please sign in to comment.