Skip to content

Commit

Permalink
[Data] Remove read parallelism from Ray Data documentation (#43690)
Browse files Browse the repository at this point in the history
This PR is to remove read parallelism from Ray DAta documentation, to replace it with number of output blocks for read. The motivation is we already deprecate `parallelism` in favor of `override_num_blocks` for read APIs.

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Mar 6, 2024
1 parent a4ed9b4 commit c176117
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 61 deletions.
2 changes: 1 addition & 1 deletion doc/source/data/data-internals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ The following code is a hello world example which invokes the execution with
return x
for _ in (
ray.data.range_tensor(5000, shape=(80, 80, 3), parallelism=200)
ray.data.range_tensor(5000, shape=(80, 80, 3), override_num_blocks=200)
.map_batches(sleep, num_cpus=2)
.map_batches(SleepClass, concurrency=(2, 4))
.map_batches(sleep, num_cpus=1)
Expand Down
23 changes: 10 additions & 13 deletions doc/source/data/loading-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -942,16 +942,13 @@ datasource and pass it to :func:`~ray.data.read_datasource`.
Performance considerations
==========================

The dataset ``parallelism`` determines the number of blocks the base data is split
into for parallel reads. Ray Data decides internally how many read tasks to run
concurrently to best utilize the cluster, ranging from ``1...parallelism`` tasks. In
other words, the higher the parallelism, the smaller the data blocks in the Dataset and
hence the more opportunity for parallel execution.

.. image:: images/dataset-read.svg
:width: 650px
:align: center

You can override the default parallelism by setting the ``parallelism`` argument. For
more information on how to tune the read parallelism, see
:ref:`Advanced: Performance Tips and Tuning <data_performance_tips>`.
By default, the number of output blocks from all read tasks is dynamically decided
based on input data size and available resources. It should work well in most cases.
However, you can also override the default value by setting the ``override_num_blocks``
argument. Ray Data decides internally how many read tasks to run concurrently to best
utilize the cluster, ranging from ``1...override_num_blocks`` tasks. In other words,
the higher the ``override_num_blocks``, the smaller the data blocks in the Dataset and
hence more opportunities for parallel execution.

For more information on how to tune the number of output blocks, see
:ref:`Tuning output blocks for read <read_output_blocks>`.
65 changes: 31 additions & 34 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@ If your transformation isn't vectorized, there's no performance benefit.
Optimizing reads
----------------

.. _read_parallelism:
.. _read_output_blocks:

Tuning read parallelism
~~~~~~~~~~~~~~~~~~~~~~~
Tuning output blocks for read
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

By default, Ray Data automatically selects the read ``parallelism`` according to the following procedure:
By default, Ray Data automatically selects the number of output blocks for read according to the following procedure:

The ``parallelism`` parameter passed to Ray Data's :ref:`read APIs <input-output>` specifies the number of read tasks to create.
Usually, if the read is followed by a :func:`~ray.data.Dataset.map` or :func:`~ray.data.Dataset.map_batches`, the map is fused with the read; therefore ``parallelism`` also determines the number of map tasks.
- The ``override_num_blocks`` parameter passed to Ray Data's :ref:`read APIs <input-output>` specifies the number of output blocks, which is equivalent to the number of read tasks to create.
- Usually, if the read is followed by a :func:`~ray.data.Dataset.map` or :func:`~ray.data.Dataset.map_batches`, the map is fused with the read; therefore ``override_num_blocks`` also determines the number of map tasks.

Ray Data decides the default value for ``parallelism`` based on the following heuristics, applied in order:
Ray Data decides the default value for number of output blocks based on the following heuristics, applied in order:

1. Start with the default parallelism of 200. You can overwrite this by setting :class:`DataContext.min_parallelism <ray.data.context.DataContext>`.
2. Min block size (default=1 MiB). If the parallelism would make blocks smaller than this threshold, reduce parallelism to avoid the overhead of tiny blocks. You can override by setting :class:`DataContext.target_min_block_size <ray.data.context.DataContext>` (bytes).
3. Max block size (default=128 MiB). If the parallelism would make blocks larger than this threshold, increase parallelism to avoid out-of-memory errors during processing. You can override by setting :class:`DataContext.target_max_block_size <ray.data.context.DataContext>` (bytes).
4. Available CPUs. Increase parallelism to utilize all of the available CPUs in the cluster. Ray Data chooses the number of read tasks to be at least 2x the number of available CPUs.
1. Start with the default value of 200. You can overwrite this by setting :class:`DataContext.read_op_min_num_blocks <ray.data.context.DataContext>`.
2. Min block size (default=1 MiB). If number of blocks would make blocks smaller than this threshold, reduce number of blocks to avoid the overhead of tiny blocks. You can override by setting :class:`DataContext.target_min_block_size <ray.data.context.DataContext>` (bytes).
3. Max block size (default=128 MiB). If number of blocks would make blocks larger than this threshold, increase number of blocks to avoid out-of-memory errors during processing. You can override by setting :class:`DataContext.target_max_block_size <ray.data.context.DataContext>` (bytes).
4. Available CPUs. Increase number of blocks to utilize all of the available CPUs in the cluster. Ray Data chooses the number of read tasks to be at least 2x the number of available CPUs.

Occasionally, it's advantageous to manually tune the parallelism to optimize the application.
Occasionally, it's advantageous to manually tune the number of blocks to optimize the application.
For example, the following code batches multiple files into the same read task to avoid creating blocks that are too large.

.. testcode::
Expand All @@ -57,7 +57,6 @@ For example, the following code batches multiple files into the same read task t
.. testoutput::
:options: +MOCK

2023-11-20 14:28:47,597 INFO plan.py:760 -- Using autodetected parallelism=4 for operator ReadCSV to satisfy parallelism at least twice the available number of CPUs (2).
MaterializedDataset(
num_blocks=4,
num_rows=2400,
Expand All @@ -66,8 +65,8 @@ For example, the following code batches multiple files into the same read task t

But suppose that you knew that you wanted to read all 16 files in parallel.
This could be, for example, because you know that additional CPUs should get added to the cluster by the autoscaler or because you want the downstream operator to transform each file's contents in parallel.
You can get this behavior by setting the ``parallelism`` parameter.
Notice how the number of output blocks is equal to ``parallelism`` in the following code:
You can get this behavior by setting the ``override_num_blocks`` parameter.
Notice how the number of output blocks is equal to ``override_num_blocks`` in the following code:

.. testcode::
:hide:
Expand All @@ -82,7 +81,7 @@ Notice how the number of output blocks is equal to ``parallelism`` in the follow
ray.init(num_cpus=2)

# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["example:https://iris.csv"] * 16, parallelism=16)
ds = ray.data.read_csv(["example:https://iris.csv"] * 16, override_num_blocks=16)
print(ds.materialize())

.. testoutput::
Expand All @@ -95,10 +94,10 @@ Notice how the number of output blocks is equal to ``parallelism`` in the follow
)


When using the default auto-detected ``parallelism``, Ray Data attempts to cap each task's output to :class:`DataContext.target_max_block_size <ray.data.context.DataContext>` many bytes.
When using the default auto-detected number of blocks, Ray Data attempts to cap each task's output to :class:`DataContext.target_max_block_size <ray.data.context.DataContext>` many bytes.
Note however that Ray Data can't perfectly predict the size of each task's output, so it's possible that each task produces one or more output blocks.
Thus, the total blocks in the final :class:`~ray.data.Dataset` may differ from the specified ``parallelism``.
Here's an example where we manually specify ``parallelism=1``, but the one task still produces multiple blocks in the materialized Dataset:
Thus, the total blocks in the final :class:`~ray.data.Dataset` may differ from the specified ``override_num_blocks``.
Here's an example where we manually specify ``override_num_blocks=1``, but the one task still produces multiple blocks in the materialized Dataset:

.. testcode::
:hide:
Expand All @@ -113,7 +112,7 @@ Here's an example where we manually specify ``parallelism=1``, but the one task
ray.init(num_cpus=2)

# Generate ~400MB of data.
ds = ray.data.range_tensor(5_000, shape=(10_000, ), parallelism=1)
ds = ray.data.range_tensor(5_000, shape=(10_000, ), override_num_blocks=1)
print(ds.materialize())

.. testoutput::
Expand All @@ -127,8 +126,8 @@ Here's an example where we manually specify ``parallelism=1``, but the one task


Currently, Ray Data can assign at most one read task per input file.
Thus, if the number of input files is smaller than ``parallelism``, the number of read tasks is capped to the number of input files.
To ensure that downstream transforms can still execute with the desired parallelism, Ray Data splits the read tasks' outputs into a total of ``parallelism`` blocks and prevents fusion with the downstream transform.
Thus, if the number of input files is smaller than ``override_num_blocks``, the number of read tasks is capped to the number of input files.
To ensure that downstream transforms can still execute with the desired number of blocks, Ray Data splits the read tasks' outputs into a total of ``override_num_blocks`` blocks and prevents fusion with the downstream transform.
In other words, each read task's output blocks are materialized to Ray's object store before the consuming map task executes.
For example, the following code executes :func:`~ray.data.read_csv` with only one task, but its output is split into 4 blocks before executing the :func:`~ray.data.Dataset.map`:

Expand All @@ -150,17 +149,15 @@ For example, the following code executes :func:`~ray.data.read_csv` with only on
.. testoutput::
:options: +MOCK

2023-11-20 15:47:02,404 INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=4 for operator ReadCSV to satisfy parallelism at least twice the available number of CPUs (2).
2023-11-20 15:47:02,405 INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 4, each read task output is split into 4 smaller blocks.
...
Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.01s
...

Operator 2 Map(<lambda>): 4 tasks executed, 4 blocks produced in 0.3s
...

To turn off this behavior and allow the read and map operators to be fused, set ``parallelism`` manually.
For example, this code sets ``parallelism`` to equal the number of files:
To turn off this behavior and allow the read and map operators to be fused, set ``override_num_blocks`` manually.
For example, this code sets the number of files equal to ``override_num_blocks``:

.. testcode::
:hide:
Expand All @@ -174,7 +171,7 @@ For example, this code sets ``parallelism`` to equal the number of files:
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.read_csv("example:https://iris.csv", parallelism=1).map(lambda row: row)
ds = ray.data.read_csv("example:https://iris.csv", override_num_blocks=1).map(lambda row: row)
print(ds.materialize().stats())

.. testoutput::
Expand Down Expand Up @@ -244,7 +241,7 @@ To avoid these issues:

1. Make sure no single item in your dataset is too large. Aim for rows that are <10 MB each.
2. Always call :meth:`ds.map_batches() <ray.data.Dataset.map_batches>` with a batch size small enough such that the output batch can comfortably fit into heap memory. Or, if vectorized execution is not necessary, use :meth:`ds.map() <ray.data.Dataset.map>`.
3. If neither of these is sufficient, manually increase the :ref:`read parallelism <read_parallelism>` or modify your application code to ensure that each task reads a smaller amount of data.
3. If neither of these is sufficient, manually increase the :ref:`read output blocks <read_output_blocks>` or modify your application code to ensure that each task reads a smaller amount of data.

As an example of tuning batch size, the following code uses one task to load a 1 GB :class:`~ray.data.Dataset` with 1000 1 MB rows and applies an identity function using :func:`~ray.data.Dataset.map_batches`.
Because the default ``batch_size`` for :func:`~ray.data.Dataset.map_batches` is 1024 rows, this code produces only one very large batch, causing the heap memory usage to increase to 4 GB.
Expand All @@ -262,7 +259,7 @@ Because the default ``batch_size`` for :func:`~ray.data.Dataset.map_batches` is
ray.init(num_cpus=2)

# Force Ray Data to use one task to show the memory issue.
ds = ray.data.range_tensor(1000, shape=(125_000, ), parallelism=1)
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
# The default batch size is 1024 rows.
ds = ds.map_batches(lambda batch: batch)
print(ds.materialize().stats())
Expand Down Expand Up @@ -291,7 +288,7 @@ Setting a lower batch size produces lower peak heap memory usage:
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.range_tensor(1000, shape=(125_000, ), parallelism=1)
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
ds = ds.map_batches(lambda batch: batch, batch_size=32)
print(ds.materialize().stats())

Expand Down Expand Up @@ -331,7 +328,7 @@ There are some cases where spilling is expected. In particular, if the total Dat
2. There is a call to :meth:`ds.materialize() <ray.data.Dataset.materialize>`.

Otherwise, it's best to tune your application to avoid spilling.
The recommended strategy is to manually increase the :ref:`read parallelism <read_parallelism>` or modify your application code to ensure that each task reads a smaller amount of data.
The recommended strategy is to manually increase the :ref:`read output blocks <read_output_blocks>` or modify your application code to ensure that each task reads a smaller amount of data.

.. note:: This is an active area of development. If your Dataset is causing spilling and you don't know why, `file a Ray Data issue on GitHub`_.

Expand Down Expand Up @@ -365,11 +362,11 @@ To illustrate these, the following code uses both strategies to coalesce the 10
ray.init(num_cpus=2)

# 1. Use ds.repartition().
ds = ray.data.range(10, parallelism=10).repartition(1)
ds = ray.data.range(10, override_num_blocks=10).repartition(1)
print(ds.materialize().stats())

# 2. Use ds.map_batches().
ds = ray.data.range(10, parallelism=10).map_batches(lambda batch: batch, batch_size=10)
ds = ray.data.range(10, override_num_blocks=10).map_batches(lambda batch: batch, batch_size=10)
print(ds.materialize().stats())

.. testoutput::
Expand Down Expand Up @@ -511,7 +508,7 @@ Here is an example that shows how to limit a random shuffle operation to two out
)

ds = (
ray.data.range(1000, parallelism=10)
ray.data.range(1000, override_num_blocks=10)
.random_shuffle()
.materialize()
)
Expand Down
4 changes: 2 additions & 2 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ Repartitioning data

A :class:`~ray.data.dataset.Dataset` operates on a sequence of distributed data
:term:`blocks <block>`. If you want to achieve more fine-grained parallelization,
increase the number of blocks by setting a higher ``parallelism`` at read time.
increase the number of blocks by setting a higher ``override_num_blocks`` at read time.

To change the number of blocks for an existing Dataset, call
:meth:`Dataset.repartition() <ray.data.Dataset.repartition>`.
Expand All @@ -407,7 +407,7 @@ To change the number of blocks for an existing Dataset, call

import ray

ds = ray.data.range(10000, parallelism=1000)
ds = ray.data.range(10000, override_num_blocks=1000)

# Repartition the data into 100 blocks. Since shuffle=False, Ray Data will minimize
# data movement during this operation by merging adjacent blocks.
Expand Down
27 changes: 21 additions & 6 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import ray
from ray._private.utils import _get_pyarrow_version
from ray.data._internal.arrow_ops.transform_pyarrow import unify_schemas
from ray.data.context import WARN_PREFIX, DataContext
from ray.data.context import DEFAULT_READ_OP_MIN_NUM_BLOCKS, WARN_PREFIX, DataContext

if TYPE_CHECKING:
import pandas
Expand Down Expand Up @@ -113,8 +113,8 @@ def _autodetect_parallelism(
This detects parallelism using the following heuristics, applied in order:
1) We start with the default parallelism of 200. This can be overridden by
setting the `min_parallelism` attribute of
1) We start with the default value of 200. This can be overridden by
setting the `read_op_min_num_blocks` attribute of
:class:`~ray.data.context.DataContext`.
2) Min block size. If the parallelism would make blocks smaller than this
threshold, the parallelism is reduced to avoid the overhead of tiny blocks.
Expand Down Expand Up @@ -156,18 +156,33 @@ def _autodetect_parallelism(
if parallelism < 0:
if parallelism != -1:
raise ValueError("`parallelism` must either be -1 or a positive integer.")

if (
ctx.min_parallelism is not None
and ctx.min_parallelism != DEFAULT_READ_OP_MIN_NUM_BLOCKS
and ctx.read_op_min_num_blocks == DEFAULT_READ_OP_MIN_NUM_BLOCKS
):
logger.warning(
"``DataContext.min_parallelism`` is deprecated in Ray 2.10. "
"Please specify ``DataContext.read_op_min_num_blocks`` instead."
)
ctx.read_op_min_num_blocks = ctx.min_parallelism

# Start with 2x the number of cores as a baseline, with a min floor.
if placement_group is None:
placement_group = ray.util.get_current_placement_group()
avail_cpus = avail_cpus or _estimate_avail_cpus(placement_group)
parallelism = max(
min(ctx.min_parallelism, max_reasonable_parallelism),
min(ctx.read_op_min_num_blocks, max_reasonable_parallelism),
min_safe_parallelism,
avail_cpus * 2,
)

if parallelism == ctx.min_parallelism:
reason = f"DataContext.get_current().min_parallelism={ctx.min_parallelism}"
if parallelism == ctx.read_op_min_num_blocks:
reason = (
"DataContext.get_current().read_op_min_num_blocks="
f"{ctx.read_op_min_num_blocks}"
)
elif parallelism == max_reasonable_parallelism:
reason = (
"output blocks of size at least "
Expand Down
9 changes: 6 additions & 3 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
# TODO (kfstorm): Remove this once stable.
DEFAULT_ENABLE_PANDAS_BLOCK = True

# Minimum amount of parallelism to auto-detect for a dataset. Note that the min
# Minimum number of read output blocks for a dataset. Note that the min
# block size config takes precedence over this.
DEFAULT_MIN_PARALLELISM = 200
DEFAULT_READ_OP_MIN_NUM_BLOCKS = 200

# Wether to use actor based block prefetcher.
DEFAULT_ACTOR_PREFETCHER_ENABLED = False
Expand Down Expand Up @@ -252,6 +252,8 @@ def __init__(
self.op_resource_reservation_enabled = DEFAULT_ENABLE_OP_RESOURCE_RESERVATION
# The reservation ratio for ReservationOpResourceAllocator.
self.op_resource_reservation_ratio = DEFAULT_OP_RESOURCE_RESERVATION_RATIO
# Minimum number of read output blocks for a dataset.
self.read_op_min_num_blocks = DEFAULT_READ_OP_MIN_NUM_BLOCKS

@staticmethod
def get_current() -> "DataContext":
Expand Down Expand Up @@ -285,7 +287,8 @@ def get_current() -> "DataContext":
use_polars=DEFAULT_USE_POLARS,
eager_free=DEFAULT_EAGER_FREE,
decoding_size_estimation=DEFAULT_DECODING_SIZE_ESTIMATION_ENABLED,
min_parallelism=DEFAULT_MIN_PARALLELISM,
# NOTE: This parameter is deprecated. Use `read_op_min_num_blocks`.
min_parallelism=DEFAULT_READ_OP_MIN_NUM_BLOCKS,
enable_tensor_extension_casting=(
DEFAULT_ENABLE_TENSOR_EXTENSION_CASTING
),
Expand Down
Loading

0 comments on commit c176117

Please sign in to comment.