Skip to content

Commit

Permalink
[AIR] Remove PredictorDeployment from examples (ray-project#37457)
Browse files Browse the repository at this point in the history
This PR removes the AIR PredictorDeployment from examples and replaces them with Ray Serve deployments directly. This is more explicit and often time simpler and more understandable. It also reduces the number of ways in which things are done according to the Zen of Python `There should be one -- and preferably only one -- obvious way to do it.`
  • Loading branch information
pcmoritz committed Jul 29, 2023
1 parent 2424388 commit 702c36d
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 135 deletions.
22 changes: 9 additions & 13 deletions doc/source/ray-air/computer-vision.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ standard way to preprocess data with Ray.
To apply TorchVision transforms, create a :class:`~ray.data.preprocessors.TorchVisionPreprocessor`.

Create two :class:`TorchVisionPreprocessors <ray.data.preprocessors.TorchVisionPreprocessor>`
-- one to normalize images, and another to augment images. Later, you'll pass the preprocessors to :class:`Trainers <ray.train.trainer.BaseTrainer>`,
:class:`Predictors <ray.train.predictor.Predictor>`, and
:class:`PredictorDeployments <ray.serve.air_integrations.PredictorDeployment>`.
-- one to normalize images, and another to augment images. Later, you'll pass the preprocessors to :class:`Trainers <ray.train.trainer.BaseTrainer>` and
:class:`Predictors <ray.train.predictor.Predictor>`.

.. literalinclude:: ./doc_code/computer_vision.py
:start-after: __torch_preprocessors_start__
Expand All @@ -148,9 +147,8 @@ standard way to preprocess data with Ray.
To apply TorchVision transforms, create a :class:`~ray.data.preprocessors.BatchMapper`.

Create two :class:`~ray.data.preprocessors.BatchMapper` -- one to normalize images, and another to
augment images. Later, you'll pass the preprocessors to :class:`Trainers <ray.train.trainer.BaseTrainer>`,
:class:`Predictors <ray.train.predictor.Predictor>`, and
:class:`PredictorDeployments <ray.serve.air_integrations.PredictorDeployment>`.
augment images. Later, you'll pass the preprocessors to :class:`Trainers <ray.train.trainer.BaseTrainer>` and
:class:`Predictors <ray.train.predictor.Predictor>`.

.. literalinclude:: ./doc_code/computer_vision.py
:start-after: __tensorflow_preprocessors_start__
Expand Down Expand Up @@ -279,7 +277,7 @@ image datasets.
Serving vision models
---------------------

:class:`~ray.serve.air_integrations.PredictorDeployment` lets you
:class:`~ray.serve.Deployment` lets you
deploy a model to an endpoint and make predictions over the Internet.

Deployments use :ref:`HTTP adapters <serve-http>` to define how HTTP messages are converted to model
Expand All @@ -300,9 +298,8 @@ To NumPy ndarrays like this:

.. tab-item:: Torch

To deploy a Torch model to an endpoint, pass the checkpoint you created in `Creating checkpoints`_
to :meth:`PredictorDeployment.bind <ray.serve.air_integrations.PredictorDeployment.bind>` and specify
:func:`~ray.serve.http_adapters.json_to_ndarray` as the HTTP adapter.
To deploy a Torch model to an endpoint, create a predictor from the checkpoint you created in `Creating checkpoints`_
and serve via a Ray Serve deployment.

.. literalinclude:: ./doc_code/computer_vision.py
:start-after: __torch_serve_start__
Expand All @@ -320,9 +317,8 @@ To NumPy ndarrays like this:

.. tab-item:: TensorFlow

To deploy a TensorFlow model to an endpoint, pass the checkpoint you created in `Creating checkpoints`_
to :meth:`PredictorDeployment.bind <ray.serve.air_integrations.PredictorDeployment.bind>` and specify
:func:`~ray.serve.http_adapters.json_to_multi_ndarray` as the HTTP adapter.
To deploy a TensorFlow model to an endpoint, use the checkpoint you created in `Creating checkpoints`_
to create a Ray Serve deployment serving the model.

.. literalinclude:: ./doc_code/computer_vision.py
:start-after: __tensorflow_serve_start__
Expand Down
66 changes: 31 additions & 35 deletions doc/source/ray-air/doc_code/computer_vision.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,68 +356,64 @@ def batch_predict_tensorflow(dataset, checkpoint):

def online_predict_torch(checkpoint):
# __torch_serve_start__
from io import BytesIO
import numpy as np
from PIL import Image
from ray import serve
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import json_to_ndarray
from ray.train.torch import TorchPredictor

serve.run(
PredictorDeployment.bind(
TorchPredictor,
checkpoint,
http_adapter=json_to_ndarray,
)
)
@serve.deployment
class TorchDeployment:
def __init__(self, checkpoint):
self.predictor = TorchPredictor.from_checkpoint(checkpoint)

async def __call__(self, request):
image = Image.open(BytesIO(await request.body()))
return self.predictor.predict(np.array(image)[np.newaxis])

serve.run(TorchDeployment.bind(checkpoint))
# __torch_serve_stop__

# __torch_online_predict_start__
from io import BytesIO

import numpy as np
import requests
from PIL import Image

response = requests.get("http:https://placekitten.com/200/300")
image = Image.open(BytesIO(response.content))

payload = {"array": np.array(image).tolist(), "dtype": "float32"}
response = requests.post("http:https://localhost:8000/", json=payload)
response = requests.post("http:https://localhost:8000/", data=response.content)
predictions = response.json()
# __torch_online_predict_stop__
predictions


def online_predict_tensorflow(checkpoint):
# __tensorflow_serve_start__
from io import BytesIO
import numpy as np
from PIL import Image
import tensorflow as tf

from ray import serve
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import json_to_multi_ndarray
from ray.train.tensorflow import TensorflowPredictor

serve.run(
PredictorDeployment.bind(
TensorflowPredictor,
checkpoint,
http_adapter=json_to_multi_ndarray,
model_definition=tf.keras.applications.resnet50.ResNet50,
)
)
@serve.deployment
class TensorflowDeployment:
def __init__(self, checkpoint):
self.predictor = TensorflowPredictor.from_checkpoint(
checkpoint,
model_definition=tf.keras.applications.resnet50.ResNet50,
)

async def __call__(self, request):
image = Image.open(BytesIO(await request.body()))
return self.predictor.predict({"image": np.array(image)[np.newaxis]})

serve.run(TensorflowDeployment.bind(checkpoint))
# __tensorflow_serve_stop__

# __tensorflow_online_predict_start__
from io import BytesIO

import numpy as np
import requests
from PIL import Image

response = requests.get("http:https://placekitten.com/200/300")
image = Image.open(BytesIO(response.content))

payload = {"image": {"array": np.array(image).tolist(), "dtype": "float32"}}
response = requests.post("http:https://localhost:8000/", json=payload)
response = requests.post("http:https://localhost:8000/", data=response.content)
predictions = response.json()
# __tensorflow_online_predict_stop__
predictions
Expand Down
52 changes: 38 additions & 14 deletions doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "VaFMt6AIhYbK"
Expand All @@ -21,6 +22,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "sQbdfyWQhYbO"
Expand All @@ -45,6 +47,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "pvSRaEHChYbP"
Expand All @@ -54,6 +57,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "LRdL3kWBhYbQ"
Expand Down Expand Up @@ -143,6 +147,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "oJiSdWy2hYbR"
Expand Down Expand Up @@ -178,6 +183,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "jAgvLbhT8nB0"
Expand All @@ -187,6 +193,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "IXQb4--97_Cf"
Expand Down Expand Up @@ -242,6 +249,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "1WALC3kT8WgL"
Expand Down Expand Up @@ -417,6 +425,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "xzNQKJMA9YV-"
Expand Down Expand Up @@ -483,6 +492,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "N7tiwqdP-zVS"
Expand All @@ -492,6 +502,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "4RRkXuteIrIh"
Expand Down Expand Up @@ -587,6 +598,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "V2BIiegi_brE"
Expand Down Expand Up @@ -619,6 +631,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "whPRbBNbIrIl"
Expand All @@ -628,6 +641,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "W7QYTpxXIrIl"
Expand Down Expand Up @@ -655,6 +669,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "UVVji2YKADrh"
Expand Down Expand Up @@ -699,6 +714,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "RzfPtOMoIrIu"
Expand Down Expand Up @@ -739,6 +755,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "Nb0HkOV2R4uL"
Expand All @@ -748,21 +765,17 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "OlzjlW8QR_q6"
},
"source": [
"We will use Ray Serve to serve the trained model. A core concept of Ray Serve is a [Deployment](https://docs.ray.io/en/latest/serve/getting_started.html#converting-to-a-ray-serve-deployment). It allows you to define and update your business logic or models that will handle incoming requests as well as how this is exposed over HTTP or in Python.\n",
"\n",
"In the case of serving a model, `ray.serve.air_integrations.Predictor` and `ray.serve.air_integrations.PredictorDeployment` wrap a `ray.air.checkpoint.Checkpoint` into a Ray Serve deployment that can readily serve HTTP requests.\n",
"Note, ``Checkpoint`` captures both model and preprocessing steps in a way compatible with Ray Serve and ensures that the ML workload can transition seamlessly between training and\n",
"serving.\n",
"\n",
"This removes the boilerplate code and minimizes the effort to bring your model to production!"
"We will use Ray Serve to serve the trained model. A core concept of Ray Serve is a [Deployment](https://docs.ray.io/en/latest/serve/getting_started.html#converting-to-a-ray-serve-deployment). It allows you to define and update your business logic or models that will handle incoming requests as well as how this is exposed over HTTP or in Python. We create a Predictor from the `ray.air.checkpoint.Checkpoint` and serve it with the Ray Serve deployment."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "SOnl90IuRywD"
Expand All @@ -784,20 +797,30 @@
"from ray import serve\n",
"from ray.air.checkpoint import Checkpoint\n",
"from ray.train.tensorflow import TensorflowPredictor\n",
"from ray.serve import PredictorDeployment\n",
"from ray.serve.drivers import DAGDriver\n",
"from ray.serve.http_adapters import pandas_read_json\n",
"\n",
"def serve_model(checkpoint: Checkpoint, model_definition, name=\"Model\") -> str:\n",
"@serve.deployment\n",
"class TensorflowDeployment:\n",
" def __init__(self, checkpoint, model_definition):\n",
" self.predictor = TensorflowPredictor.from_checkpoint(\n",
" checkpoint,\n",
" model_definition=model_definition\n",
" )\n",
"\n",
" async def __call__(self, data):\n",
" result = self.predictor.predict(data.reset_index(drop=True))\n",
" return result[\"predictions\"]\n",
"\n",
"def serve_model(checkpoint: Checkpoint, model_definition) -> str:\n",
" \"\"\"Expose a serve endpoint.\n",
"\n",
" Returns:\n",
" serve URL.\n",
" \"\"\"\n",
" serve.run(\n",
" PredictorDeployment.options(name=name).bind(\n",
" TensorflowPredictor,\n",
" checkpoint,\n",
" model_definition=model_definition,\n",
" DAGDriver.bind(\n",
" TensorflowDeployment.bind(checkpoint, model_definition),\n",
" http_adapter=pandas_read_json,\n",
" )\n",
" )\n",
Expand All @@ -821,6 +844,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "rzHSwa2bSyee"
Expand Down Expand Up @@ -849,7 +873,7 @@
" one_row = df.iloc[[i]].to_dict()\n",
" serve_result = requests.post(endpoint_uri, data=json.dumps(one_row), headers={\"Content-Type\": \"application/json\"}).json()\n",
" print(\n",
" f\"request{i} prediction: {serve_result[0]['predictions']} \"\n",
" f\"request{i} prediction: {serve_result} \"\n",
" f\"- label: {str(label[i])}\"\n",
" )"
]
Expand Down
Loading

0 comments on commit 702c36d

Please sign in to comment.