Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Autodetect dataset parallelism based on available resources and data size #25883

Merged
merged 61 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
94fa31e
wip
ericl Jun 17, 2022
d52cbba
wip
ericl Jun 17, 2022
b813603
wip
ericl Jun 17, 2022
34c5eb3
wip
ericl Jun 17, 2022
7d46ef0
lint
ericl Jun 17, 2022
f7fc883
wip
ericl Jun 17, 2022
622d211
lint
ericl Jun 17, 2022
7163e0b
update
ericl Jun 17, 2022
c831447
rename
ericl Jun 17, 2022
5145ab4
update
ericl Jun 17, 2022
d2c5a18
update
ericl Jun 17, 2022
7b8794c
update
ericl Jun 17, 2022
55a48d4
update
ericl Jun 17, 2022
ed8e8f5
update
ericl Jun 17, 2022
510b8c6
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 17, 2022
d88fcd0
update
ericl Jun 17, 2022
dad1569
fix from_items
ericl Jun 17, 2022
a7a6138
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 18, 2022
8dc156d
fix test
ericl Jun 18, 2022
af722b0
wip
ericl Jun 18, 2022
04e7771
update
ericl Jun 18, 2022
745105a
fix
ericl Jun 18, 2022
277c89f
fix
ericl Jun 18, 2022
0b0e967
fx
ericl Jun 18, 2022
42c03b9
update
ericl Jun 18, 2022
748f7e9
update
ericl Jun 18, 2022
a7123b3
fix
ericl Jun 18, 2022
3b4277b
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 19, 2022
0f7630e
update
ericl Jun 19, 2022
40a3b13
fix
ericl Jun 19, 2022
ee9073d
update
ericl Jun 23, 2022
2ee5e04
fix
ericl Jun 23, 2022
653fb01
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 23, 2022
902e404
fix
ericl Jun 23, 2022
e899426
fix
ericl Jun 23, 2022
e80b08a
improve doc
ericl Jun 24, 2022
4bca8a3
update
ericl Jun 24, 2022
f651581
lint
ericl Jun 24, 2022
558fb4b
update
ericl Jun 24, 2022
2e39005
fix
ericl Jun 24, 2022
1700f73
try out
ericl Jun 24, 2022
59f3bc7
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 29, 2022
2d149cf
add warning
ericl Jun 30, 2022
7a6a07b
fix
ericl Jun 30, 2022
3a00261
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 30, 2022
20e09a8
fix test
ericl Jun 30, 2022
f57f5a8
fix
ericl Jun 30, 2022
36f92cb
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jul 5, 2022
c1986c8
try fixing
ericl Jul 6, 2022
5048fbb
fix closure capture
ericl Jul 6, 2022
a5c6d85
stale comment
Jul 6, 2022
2544e84
lint
Jul 7, 2022
a488e9c
Merge branch 'master' of https://github.com/ray-project/ray into eric…
Jul 7, 2022
4297fcd
feedback
Jul 7, 2022
73a5802
Merge branch 'master' of https://github.com/ray-project/ray into eric…
Jul 8, 2022
e74b78d
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jul 12, 2022
df476e2
Merge branch 'detect-parallelism' of github.com:ericl/ray into detect…
ericl Jul 12, 2022
b7eee3f
handle subclass
Jul 13, 2022
013a449
Merge branch 'master' of https://github.com/ray-project/ray into eric…
Jul 13, 2022
4556a15
Merge branch 'detect-parallelism' of github.com:ericl/ray into ericl-…
Jul 13, 2022
d6d4ce2
fix file read
Jul 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions doc/source/data/creating-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,13 @@ Each of these APIs take a path or list of paths to files or directories. Any dir
provided will be walked in order to obtain concrete file paths, at which point all files
will be read in parallel.

Datasets uses a default parallelism of 200, truncated by the number of files being read:
``parallelism = min(num_files, 200)``. ``parallelism`` parallel read tasks will be
Datasets automatically selects the read ``parallelism`` according to the following procedure:
1. The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster.
2. The parallelism is set to the estimated number of CPUs multiplied by 2. If the parallelism is less than 8, it is set to 8.
3. The in-memory data size is estimated. If the parallelism would create in-memory blocks that are larger on average than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.
4. The parallelism is truncated to ``min(num_files, parallelism)``.

To perform the read, ``parallelism`` parallel read tasks will be
launched, each reading one or more files and each creating a single block of data.
When reading from remote datasources, these parallel read tasks will be spread across
the nodes in your Ray cluster, creating the distributed collection of blocks that makes
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/key-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ This page overviews the execution model of Datasets, which may be useful for und
Reading Data
============

Datasets uses Ray tasks to read data from remote storage. When reading from a file-based datasource (e.g., S3, GCS), it creates a number of read tasks equal to the specified read parallelism (200 by default). One or more files will be assigned to each read task. Each read task reads its assigned files and produces one or more output blocks (Ray objects):
Datasets uses Ray tasks to read data from remote storage. When reading from a file-based datasource (e.g., S3, GCS), it creates a number of read tasks equal to the specified read parallelism (autodetected by default). One or more files will be assigned to each read task. Each read task reads its assigned files and produces one or more output blocks (Ray objects):

.. image:: images/dataset-read.svg
:width: 650px
Expand Down
8 changes: 4 additions & 4 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ This can be used in conjunction with column pruning when appropriate to get the
Tuning Read Parallelism
~~~~~~~~~~~~~~~~~~~~~~~

By default, Ray requests 0.5 CPUs per read task, which means two read tasks can concurrently execute per CPU.
By default, Ray requests 1 CPU per read task, which means one read tasks per CPU can execute concurrently.
For data sources that can benefit from higher degress of I/O parallelism, you can specify a lower ``num_cpus`` value for the read function via the ``ray_remote_args`` parameter.
For example, use ``ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25})`` to allow up to four read tasks per CPU.

The number of read tasks can also be increased by increasing the ``parallelism`` parameter.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to create up to 1000 read tasks.
Typically, increasing the number of read tasks only helps if you have more cluster CPUs than the default parallelism.
By default, Datasets automatically selects the read parallelism based on the current cluster size and dataset size.
However, the number of read tasks can also be increased manually via the ``parallelism`` parameter.
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created.

Tuning Max Block Size
~~~~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def set_description(self, name: str) -> None:
self._bar.set_description(name)

def update(self, i: int) -> None:
if self._bar:
if self._bar and i != 0:
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
self._bar.update(i)

def close(self):
Expand Down
8 changes: 7 additions & 1 deletion python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
_context_lock = threading.Lock()

# The max target block size in bytes for reads and transformations.
DEFAULT_TARGET_MAX_BLOCK_SIZE = 2048 * 1024 * 1024
DEFAULT_TARGET_MAX_BLOCK_SIZE = 512 * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make this user-configurable per dataset?

Copy link
Contributor Author

@ericl ericl Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you can configure it via the DatasetContext (before creating the dataset).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. It would be nice if the user could pass it as a top-level arg like parallelism, but maybe we can do this as a follow-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, agree that could be nice. I think we should do this when enabling block splitting by default (and maybe deprecating parallelism at the same time. That way, the story becomes pretty simple: parallelism is auto-detected usually, and the block size can be reduced if needed if you're running into OOMs.


# Whether block splitting is on by default
DEFAULT_BLOCK_SPLITTING_ENABLED = False
Expand All @@ -33,6 +33,9 @@
# Whether to furthermore fuse prior map tasks with shuffle stages.
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True

# Minimum amount of parallelism to auto-detect for a dataset.
DEFAULT_MIN_PARALLELISM = 8

# Wether to use actor based block prefetcher.
DEFAULT_ACTOR_PREFETCHER_ENABLED = True

Expand Down Expand Up @@ -71,6 +74,7 @@ def __init__(
pipeline_push_based_shuffle_reduce_tasks: bool,
scheduling_strategy: SchedulingStrategyT,
use_polars: bool,
min_parallelism: bool,
):
"""Private constructor (use get_current() instead)."""
self.block_owner = block_owner
Expand All @@ -88,6 +92,7 @@ def __init__(
)
self.scheduling_strategy = scheduling_strategy
self.use_polars = use_polars
self.min_parallelism = min_parallelism

@staticmethod
def get_current() -> "DatasetContext":
Expand Down Expand Up @@ -118,6 +123,7 @@ def get_current() -> "DatasetContext":
pipeline_push_based_shuffle_reduce_tasks=True,
scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY,
use_polars=DEFAULT_USE_POLARS,
min_parallelism=DEFAULT_MIN_PARALLELISM,
)

if (
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
RandomIntRowDatasource,
RangeDatasource,
ReadTask,
Reader,
WriteResult,
)
from ray.data.datasource.file_based_datasource import (
Expand Down Expand Up @@ -64,6 +65,7 @@
"RandomIntRowDatasource",
"RangeDatasource",
"ReadTask",
"Reader",
"SimpleTensorFlowDatasource",
"SimpleTorchDatasource",
"WriteResult",
Expand Down
130 changes: 108 additions & 22 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import builtins
from typing import Any, Generic, List, Dict, Callable, Union, Tuple, Iterable
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Tuple, Union

import numpy as np

import ray
from ray.types import ObjectRef
from ray.data._internal.arrow_block import ArrowRow
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.util import _check_pyarrow_version
from ray.data.block import (
Block,
BlockAccessor,
BlockMetadata,
T,
BlockPartition,
BlockPartitionMetadata,
MaybeBlockPartition,
T,
)
from ray.data.context import DatasetContext
from ray.data._internal.arrow_block import ArrowRow
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.util import _check_pyarrow_version
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.types import ObjectRef
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI

WriteResult = Any


@DeveloperAPI
@PublicAPI
class Datasource(Generic[T]):
"""Interface for defining a custom ``ray.data.Dataset`` datasource.

Expand All @@ -33,22 +33,24 @@ class Datasource(Generic[T]):
See ``RangeDatasource`` and ``DummyOutputDatasource`` for examples
of how to implement readable and writable datasources.

Datasource instances must be serializable, since ``prepare_read()`` and
Datasource instances must be serializable, since ``create_reader()`` and
``do_write()`` are called in remote tasks.
"""

def prepare_read(self, parallelism: int, **read_args) -> List["ReadTask[T]"]:
"""Return the list of tasks needed to perform a read.
def create_reader(self, **read_args) -> "Reader[T]":
"""Return a Reader for the given read arguments.

The reader object will be responsible for querying the read metadata, and
generating the actual read tasks to retrieve the data blocks upon request.

Args:
parallelism: The requested read parallelism. The number of read
tasks should be as close to this value as possible.
read_args: Additional kwargs to pass to the datasource impl.

Returns:
A list of read tasks that can be executed to read blocks from the
datasource in parallel.
"""
return _LegacyDatasourceReader(self, **read_args)

@Deprecated
def prepare_read(self, parallelism: int, **read_args) -> List["ReadTask[T]"]:
"""Deprecated: Please implement create_reader() instead."""
raise NotImplementedError

def do_write(
Expand Down Expand Up @@ -100,11 +102,54 @@ def on_write_failed(
pass


@PublicAPI
class Reader(Generic[T]):
ericl marked this conversation as resolved.
Show resolved Hide resolved
"""A bound read operation for a datasource.

This is a stateful class so that reads can be prepared in multiple stages.
For example, it is useful for Datasets to know the in-memory size of the read
prior to executing it.
"""

def estimate_inmemory_data_size(self) -> Optional[int]:
"""Return an estimate of the in-memory data size, or None if unknown.

Note that the in-memory data size may be larger than the on-disk data size.
"""
raise NotImplementedError

def get_read_tasks(self, parallelism: int) -> List["ReadTask[T]"]:
"""Execute the read and return read tasks.

Args:
parallelism: The requested read parallelism. The number of read
tasks should equal to this value if possible.
read_args: Additional kwargs to pass to the datasource impl.

Returns:
A list of read tasks that can be executed to read blocks from the
datasource in parallel.
"""
raise NotImplementedError


class _LegacyDatasourceReader(Reader):
def __init__(self, datasource: Datasource, **read_args):
self._datasource = datasource
self._read_args = read_args

def estimate_inmemory_data_size(self) -> Optional[int]:
return None

def get_read_tasks(self, parallelism: int) -> List["ReadTask[T]"]:
return self._datasource.prepare_read(parallelism, **self._read_args)


@DeveloperAPI
class ReadTask(Callable[[], BlockPartition]):
"""A function used to read blocks from the dataset.
ericl marked this conversation as resolved.
Show resolved Hide resolved

Read tasks are generated by ``datasource.prepare_read()``, and return
Read tasks are generated by ``reader.get_read_tasks()``, and return
a list of ``ray.data.Block`` when called. Initial metadata about the read
operation can be retrieved via ``get_metadata()`` prior to executing the
read. Final metadata is returned after the read along with the blocks.
Expand Down Expand Up @@ -171,14 +216,36 @@ class RangeDatasource(Datasource[Union[ArrowRow, int]]):
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""

def prepare_read(
def create_reader(
self,
parallelism: int,
n: int,
block_format: str = "list",
tensor_shape: Tuple = (1,),
) -> List[ReadTask]:
return _RangeDatasourceReader(n, block_format, tensor_shape)


class _RangeDatasourceReader(Reader):
def __init__(self, n: int, block_format: str = "list", tensor_shape: Tuple = (1,)):
self._n = n
self._block_format = block_format
self._tensor_shape = tensor_shape

def estimate_inmemory_data_size(self) -> Optional[int]:
if self._block_format == "tensor":
element_size = np.product(self._tensor_shape)
else:
element_size = 1
return 8 * self._n * element_size

def get_read_tasks(
self,
parallelism: int,
) -> List[ReadTask]:
read_tasks: List[ReadTask] = []
n = self._n
block_format = self._block_format
tensor_shape = self._tensor_shape
block_size = max(1, n // parallelism)

# Example of a read task. In a real datasource, this would pull data
Expand Down Expand Up @@ -316,13 +383,32 @@ class RandomIntRowDatasource(Datasource[ArrowRow]):
{'c_0': 4983608804013926748, 'c_1': 1160140066899844087}
"""

def prepare_read(
self, parallelism: int, n: int, num_columns: int
def create_reader(
self,
n: int,
num_columns: int,
) -> List[ReadTask]:
return _RandomIntRowDatasourceReader(n, num_columns)


class _RandomIntRowDatasourceReader(Reader):
def __init__(self, n: int, num_columns: int):
self._n = n
self._num_columns = num_columns

def estimate_inmemory_data_size(self) -> Optional[int]:
return self._n * self._num_columns * 8

def get_read_tasks(
self,
parallelism: int,
) -> List[ReadTask]:
_check_pyarrow_version()
import pyarrow

read_tasks: List[ReadTask] = []
n = self._n
num_columns = self._num_columns
block_size = max(1, n // parallelism)

def make_block(count: int, num_columns: int) -> Block:
Expand Down
Loading