Skip to content

Commit

Permalink
[docs] batch prediction guide, strict mode (#34969)
Browse files Browse the repository at this point in the history
Signed-off-by: Max Pumperla <[email protected]>
Signed-off-by: amogkam <[email protected]>
Co-authored-by: amogkam <[email protected]>
  • Loading branch information
maxpumperla and amogkam authored May 2, 2023
1 parent 827bc8a commit 9f35bda
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 89 deletions.
102 changes: 26 additions & 76 deletions doc/source/data/batch_inference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ Once you have your Ray Datastream ``ds`` and your predictor class, you can use
In the example below, we use two CPUs to run inference in parallel and then print the results.
We cover resource allocation in more detail in :ref:`the configuration section of this guide <batch_inference_config>`.

.. note::

Defining your :meth:`ds.map_batches() <ray.data.Dataset.map_batches>` function requires
you to write a Python function that takes a batch of data and returns a batch of predictions.
An easy way to do this and validate it is to use ``ds.take_batch(N)`` to get a batch of data
first, and then locally test your predictor function on that batch, without using Ray.
Once you are happy with the results, you can use the same function in ``map_batches``
on the full dataset. The examples below show you how.

.. tabs::

.. group-tab:: HuggingFace
Expand Down Expand Up @@ -446,7 +455,7 @@ Working with batch formats
--------------------------

Now that you've seen examples of batch inference with Ray, let's have a closer look
at how to deal with different data formats.
at how to deal with different data formats for batches.
First of all, you need to distinguish between two types of batch formats:

- Input batch formats: This is the format of the input to your UDFs. You will often have to
Expand All @@ -462,57 +471,43 @@ but it's good to be aware of the differences.
We often use batch format names and the libraries they represent interchangeably.

Let's focus on the three available input batch formats first,
namely Pandas, NumPy, and Arrow, and how they're used in Ray Data:
namely NumPy, Pandas and Arrow, and how they're used in Ray Data.
By default, the batch format will be ``"numpy"``, but you can specify other formats
as you see fit.

.. tabbed:: Pandas
.. tabbed:: NumPy (default)

The ``"pandas"`` batch format presents batches in
`pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__
format. If converting a simple dataset to Pandas DataFrame batches, a single-column
dataframe with the column ``"__value__"`` will be created.
The ``"numpy"`` batch format presents batches as dictionary of
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`__ (``Dict[str, np.ndarray]``), with each key-value pair representing one column.

.. literalinclude:: ./doc_code/batch_formats.py
:language: python
:start-after: __simple_pandas_start__
:end-before: __simple_pandas_end__

.. tabbed:: NumPy

The ``"numpy"`` batch format presents batches in
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`__
format as follows:

* **Tabular datasets**: Each batch will be a dictionary of NumPy
ndarrays (``Dict[str, np.ndarray]``), with each key-value pair representing a column
in the table.
:start-after: __simple_numpy_start__
:end-before: __simple_numpy_end__

* **Tensor datasets** (single-column): Each batch will be a single
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`__
containing the single tensor column for this batch.
.. tabbed:: Pandas

* **Simple datasets**: Each batch will be a single NumPy ndarray, where Ray Data will
attempt to convert each list-batch to an ndarray.
The ``"pandas"`` batch format presents batches in
`pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__
format.

.. literalinclude:: ./doc_code/batch_formats.py
:language: python
:start-after: __simple_numpy_start__
:end-before: __simple_numpy_end__
:start-after: __simple_pandas_start__
:end-before: __simple_pandas_end__

.. tabbed:: Arrow

The ``"pyarrow"`` batch format presents batches in ``pyarrow.Table`` format.
If converting a simple dataset to Arrow Table batches, a single-column table
with the column ``"__value__"`` will be created.

.. literalinclude:: ./doc_code/batch_formats.py
:language: python
:start-after: __simple_pyarrow_start__
:end-before: __simple_pyarrow_end__

When defining the return value of your UDF, you can choose between
Pandas dataframes (``pandas.DataFrame``), NumPy arrays (``numpy.ndarray``), Arrow tables
(``pyarrow.Table``), dictionaries of NumPy arrays (``Dict[str, np.ndarray]``) or simple
Python lists (``list``).
Pandas dataframes (``pandas.DataFrame``), Arrow tables
(``pyarrow.Table``), or dictionaries of NumPy arrays (``Dict[str, np.ndarray]``).
You can learn more about output formats in :ref:`the output format guide<transform_datasets_batch_output_types>`.

.. important::
Expand All @@ -522,50 +517,6 @@ You can learn more about output formats in :ref:`the output format guide<transfo
``"pandas"`` batch format, you will need to know the basics of interacting with
dataframes to make your batch inference jobs work.

Default data formats
~~~~~~~~~~~~~~~~~~~~

In all the examples we've seen so far, we didn't have to specify the batch format.
In fact, the format is inferred from the input dataset, which can be straightforward.
For instance, when loading a NumPy array with :meth:`ray.data.from_numpy() <ray.data.from_numpy>`,
the batch format will be ``"numpy"``, but it's not always that easy.

In any case, Ray Data has a ``"default"`` batch format that is computed per data type
as follows:

.. tabbed:: Tabular data

Each batch will be a
`pandas.DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__.
This may incur a conversion cost if the underlying Datastream block is not
zero-copy convertible from an Arrow table.

.. literalinclude:: ./doc_code/transforming_datastreams.py
:language: python
:start-after: __writing_default_udfs_tabular_begin__
:end-before: __writing_default_udfs_tabular_end__

.. tabbed:: Tensor data (single-column)

Each batch will be a single
`numpy.ndarray <https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html>`__
containing the single tensor column for this batch.

.. literalinclude:: ./doc_code/transforming_datastreams.py
:language: python
:start-after: __writing_default_udfs_tensor_begin__
:end-before: __writing_default_udfs_tensor_end__

.. tabbed:: Simple data

Each batch will be a Python list.

.. literalinclude:: ./doc_code/transforming_datastreams.py
:language: python
:start-after: __writing_default_udfs_list_begin__
:end-before: __writing_default_udfs_list_end__


.. seealso::

As we've discussed in this guide, using :meth:`ds.map_batches() <ray.data.Dataset.map_batches>`
Expand All @@ -585,7 +536,6 @@ as follows:
:start-after: __hf_quickstart_air_start__
:end-before: __hf_quickstart_air_end__


.. _batch_inference_config:
Configuration & Troubleshooting
-------------------------------
Expand Down
27 changes: 21 additions & 6 deletions doc/source/data/doc_code/batch_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
def map_function(data):
return data[data["sepal.length"] < 5]

batch = ds.take_batch(10)
mapped_batch = map_function(batch)

transformed = ds.map_batches(map_function, batch_size=10)
# __simple_map_function_end__

Expand All @@ -21,7 +24,6 @@ def map_function(data):
ds.show(1)
# -> {'sepal.length': 5.1, ..., 'petal.width': 0.2, 'variety': 'Setosa'}

ds.default_batch_format()
# pandas.core.frame.DataFrame

def transform_pandas(df_batch: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -30,20 +32,27 @@ def transform_pandas(df_batch: pd.DataFrame) -> pd.DataFrame:
df_batch = df_batch.drop(columns=["sepal.length"])
return df_batch

ds.map_batches(transform_pandas).show(1)
ds.map_batches(transform_pandas, batch_format="pandas").show(1)
# -> {..., 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# __simple_pandas_end__

# __simple_numpy_start__
from typing import Dict

import ray
import numpy as np


ds = ray.data.range_tensor(1000, shape=(2, 2))
ds.default_batch_format()
# 'numpy.ndarray'

def transform_numpy(arr: np.ndarray) -> np.ndarray:
return arr * 2
def transform_numpy(arr: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
arr["data"] = arr["data"] * 2
return arr


# test map function on a batch
batch = ds.take_batch(1)
mapped_batch = transform_numpy(batch)

ds.map_batches(transform_numpy)
# __simple_numpy_end__
Expand All @@ -56,10 +65,16 @@ def transform_numpy(arr: np.ndarray) -> np.ndarray:

ds = ray.data.read_csv("example:https://iris.csv")


def transform_pyarrow(batch: pa.Table) -> pa.Table:
batch = batch.filter(pac.equal(batch["variety"], "Versicolor"))
return batch.drop(["sepal.length"])


# test map function on a batch
batch = ds.take_batch(1)
mapped_batch = transform_pyarrow(batch)

ds.map_batches(transform_pyarrow, batch_format="pyarrow").show(1)
# -> {'sepal.width': 3.2, ..., 'variety': 'Versicolor'}
# __simple_pyarrow_end__
Expand Down
7 changes: 6 additions & 1 deletion doc/source/data/doc_code/hf_quick_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ def __init__(self): # <1>
self.model = pipeline("text-generation", model="gpt2")

def __call__(self, batch): # <2>
# TODO make this run with "numpy" format
return self.model(list(batch["text"]), max_length=20)
# __hf_quickstart_model_end__


# __hf_quickstart_prediction_start__
scale = ray.data.ActorPoolStrategy(2)
hfp = HuggingFacePredictor()
batch = ds.take_batch(10)
test = hfp(batch)

scale = ray.data.ActorPoolStrategy(size=2)
predictions = ds.map_batches(HuggingFacePredictor, compute=scale)

predictions.show(limit=1)
Expand Down
10 changes: 7 additions & 3 deletions doc/source/data/doc_code/pytorch_quick_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np


dataset = ray.data.from_numpy(np.ones((1, 100)))
ds = ray.data.from_numpy(np.ones((1, 100)))
# __pt_quickstart_load_end__


Expand All @@ -32,8 +32,12 @@ def __call__(self, batch): # <2>


# __pt_quickstart_prediction_start__
scale = ray.data.ActorPoolStrategy(2)
predictions = dataset.map_batches(TorchPredictor, compute=scale)
tp = TorchPredictor()
batch = ds.take_batch(10)
test = tp(batch)

scale = ray.data.ActorPoolStrategy(size=2)
predictions = ds.map_batches(TorchPredictor, compute=scale)
predictions.show(limit=1)
# [0.45092654]
# __pt_quickstart_prediction_end__
Expand Down
10 changes: 7 additions & 3 deletions doc/source/data/doc_code/tf_quick_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np


dataset = ray.data.from_numpy(np.ones((1, 100)))
ds = ray.data.from_numpy(np.ones((1, 100)))
# __tf_quickstart_load_end__


Expand All @@ -26,9 +26,13 @@ def __call__(self, batch: np.ndarray): # <2>


# __tf_quickstart_prediction_start__
scale = ray.data.ActorPoolStrategy(2)
tfp = TFPredictor()
batch = ds.take_batch(10)
test = tfp(batch)

predicted_probabilities = dataset.map_batches(TFPredictor, compute=scale)
scale = ray.data.ActorPoolStrategy(size=2)

predicted_probabilities = ds.map_batches(TFPredictor, compute=scale)
predicted_probabilities.show(limit=1)
# [0.45119727]
# __tf_quickstart_prediction_end__
Expand Down

0 comments on commit 9f35bda

Please sign in to comment.