Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Autodetect dataset parallelism based on available resources and data size #25883

Merged
merged 61 commits into from
Jul 13, 2022
Merged
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
94fa31e
wip
ericl Jun 17, 2022
d52cbba
wip
ericl Jun 17, 2022
b813603
wip
ericl Jun 17, 2022
34c5eb3
wip
ericl Jun 17, 2022
7d46ef0
lint
ericl Jun 17, 2022
f7fc883
wip
ericl Jun 17, 2022
622d211
lint
ericl Jun 17, 2022
7163e0b
update
ericl Jun 17, 2022
c831447
rename
ericl Jun 17, 2022
5145ab4
update
ericl Jun 17, 2022
d2c5a18
update
ericl Jun 17, 2022
7b8794c
update
ericl Jun 17, 2022
55a48d4
update
ericl Jun 17, 2022
ed8e8f5
update
ericl Jun 17, 2022
510b8c6
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 17, 2022
d88fcd0
update
ericl Jun 17, 2022
dad1569
fix from_items
ericl Jun 17, 2022
a7a6138
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 18, 2022
8dc156d
fix test
ericl Jun 18, 2022
af722b0
wip
ericl Jun 18, 2022
04e7771
update
ericl Jun 18, 2022
745105a
fix
ericl Jun 18, 2022
277c89f
fix
ericl Jun 18, 2022
0b0e967
fx
ericl Jun 18, 2022
42c03b9
update
ericl Jun 18, 2022
748f7e9
update
ericl Jun 18, 2022
a7123b3
fix
ericl Jun 18, 2022
3b4277b
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 19, 2022
0f7630e
update
ericl Jun 19, 2022
40a3b13
fix
ericl Jun 19, 2022
ee9073d
update
ericl Jun 23, 2022
2ee5e04
fix
ericl Jun 23, 2022
653fb01
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 23, 2022
902e404
fix
ericl Jun 23, 2022
e899426
fix
ericl Jun 23, 2022
e80b08a
improve doc
ericl Jun 24, 2022
4bca8a3
update
ericl Jun 24, 2022
f651581
lint
ericl Jun 24, 2022
558fb4b
update
ericl Jun 24, 2022
2e39005
fix
ericl Jun 24, 2022
1700f73
try out
ericl Jun 24, 2022
59f3bc7
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 29, 2022
2d149cf
add warning
ericl Jun 30, 2022
7a6a07b
fix
ericl Jun 30, 2022
3a00261
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jun 30, 2022
20e09a8
fix test
ericl Jun 30, 2022
f57f5a8
fix
ericl Jun 30, 2022
36f92cb
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jul 5, 2022
c1986c8
try fixing
ericl Jul 6, 2022
5048fbb
fix closure capture
ericl Jul 6, 2022
a5c6d85
stale comment
Jul 6, 2022
2544e84
lint
Jul 7, 2022
a488e9c
Merge branch 'master' of https://github.com/ray-project/ray into eric…
Jul 7, 2022
4297fcd
feedback
Jul 7, 2022
73a5802
Merge branch 'master' of https://github.com/ray-project/ray into eric…
Jul 8, 2022
e74b78d
Merge remote-tracking branch 'upstream/master' into detect-parallelism
ericl Jul 12, 2022
df476e2
Merge branch 'detect-parallelism' of github.com:ericl/ray into detect…
ericl Jul 12, 2022
b7eee3f
handle subclass
Jul 13, 2022
013a449
Merge branch 'master' of https://github.com/ray-project/ray into eric…
Jul 13, 2022
4556a15
Merge branch 'detect-parallelism' of github.com:ericl/ray into ericl-…
Jul 13, 2022
d6d4ce2
fix file read
Jul 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
  • Loading branch information
ericl committed Jun 17, 2022
commit 7163e0bb41ad29e4f7c28902c44765f0e978a0e4
58 changes: 30 additions & 28 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,27 @@ def _file_format(self):


class _FileBasedDatasourceReader(Reader):
def __init__(self, delegate, **read_args):
self._delegate = delegate
self._read_args = read_args
def __init__(
self,
delegate,
ericl marked this conversation as resolved.
Show resolved Hide resolved
paths: Union[str, List[str]],
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
schema: Optional[Union[type, "pyarrow.lib.Schema"]] = None,
open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
partition_filter: PathPartitionFilter = None,
# TODO(ekl) deprecate this once read fusion is available.
_block_udf: Optional[Callable[[Block], Block]] = None,
**reader_args,
):
_check_pyarrow_version()
paths = read_args["paths"]
filesystem = read_args.get("filesystem")
meta_provider = read_args["meta_provider"]
self._delegate = delegate
self._schema = schema
self._open_stream_args = open_stream_args
self._meta_provider = meta_provider
self._partition_filter = partition_filter
self._block_udf = _block_udf
self._reader_args = reader_args
paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)
self._paths, self._file_sizes = meta_provider.expand_paths(
paths, self._filesystem
Expand All @@ -290,30 +304,18 @@ def estimate_inmemory_data_size(self) -> Optional[int]:
return total_size

def read(self, parallelism: int) -> List[ReadTask]:
return self._read(parallelism, **self._read_args)

def _read(
self,
parallelism: int,
paths: Union[str, List[str]],
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
schema: Optional[Union[type, "pyarrow.lib.Schema"]] = None,
open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
partition_filter: PathPartitionFilter = None,
# TODO(ekl) deprecate this once read fusion is available.
_block_udf: Optional[Callable[[Block], Block]] = None,
**reader_args,
) -> List[ReadTask]:
import numpy as np

paths, file_sizes, filesystem = self._paths, self._file_sizes, self._filesystem
if partition_filter is not None:
paths = partition_filter(paths)
open_stream_args = self._open_stream_args
reader_args = self._reader_args
_block_udf = self._block_udf

read_stream = self._delegate._read_stream
paths, file_sizes = self._paths, self._file_sizes
if self._partition_filter is not None:
paths = self._partition_filter(paths)

filesystem = _wrap_s3_serialization_workaround(filesystem)
read_stream = self._delegate._read_stream
filesystem = _wrap_s3_serialization_workaround(self._filesystem)

if open_stream_args is None:
open_stream_args = {}
Expand Down Expand Up @@ -379,9 +381,9 @@ def read_files(
if len(read_paths) <= 0:
continue

meta = meta_provider(
meta = self._meta_provider(
read_paths,
schema,
self._schema,
rows_per_file=self._delegate._rows_per_file(),
file_sizes=file_sizes,
)
Expand Down