-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Autodetect dataset parallelism based on available resources and data size #25883
Changes from 1 commit
94fa31e
d52cbba
b813603
34c5eb3
7d46ef0
f7fc883
622d211
7163e0b
c831447
5145ab4
d2c5a18
7b8794c
55a48d4
ed8e8f5
510b8c6
d88fcd0
dad1569
a7a6138
8dc156d
af722b0
04e7771
745105a
277c89f
0b0e967
42c03b9
748f7e9
a7123b3
3b4277b
0f7630e
40a3b13
ee9073d
2ee5e04
653fb01
902e404
e899426
e80b08a
4bca8a3
f651581
558fb4b
2e39005
1700f73
59f3bc7
2d149cf
7a6a07b
3a00261
20e09a8
f57f5a8
36f92cb
c1986c8
5048fbb
a5c6d85
2544e84
a488e9c
4297fcd
73a5802
e74b78d
df476e2
b7eee3f
013a449
4556a15
d6d4ce2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,7 +75,7 @@ def from_items(items: List[Any], *, parallelism: int = -1) -> Dataset[Any]: | |
Returns: | ||
Dataset holding the items. | ||
""" | ||
block_size = max(1, len(items) // parallelism) | ||
block_size = max(1, len(items) // _autodetect_parallelism(parallelism)) | ||
|
||
blocks: List[ObjectRef[Block]] = [] | ||
metadata: List[BlockMetadata] = [] | ||
|
@@ -1056,28 +1056,35 @@ def _prepare_read( | |
kwargs = _unwrap_arrow_serialization_workaround(kwargs) | ||
DatasetContext._set_current(ctx) | ||
reader = ds.create_reader(**kwargs) | ||
parallelism = _autodetect_parallelism(parallelism, reader) | ||
return reader.prepare_read(parallelism) | ||
|
||
|
||
def _autodetect_parallelism(parallelism: int, reader=None) -> int: | ||
# Autodetect parallelism requested. The heuristic here are that we should try | ||
# to create as many blocks needed to saturate available resources, and also keep | ||
# block sizes below the target memory size, but no more. Creating too many | ||
# blocks is inefficient. | ||
if parallelism < 0: | ||
ctx = DatasetContext.get_current() | ||
if parallelism != -1: | ||
raise ValueError("`parallelism` must either be -1 or a positive integer.") | ||
# Start with 2x the number of cores as a baseline, with a min floor. | ||
avail_cpus = _estimate_avail_cpus() | ||
parallelism = max(8, avail_cpus * 2) | ||
# Increase it to avoid overly-large blocks as needed. | ||
mem_size = reader.estimate_inmemory_data_size() | ||
if mem_size is not None: | ||
parallelism = max(int(mem_size / ctx.target_max_block_size), parallelism) | ||
logger.debug( | ||
f"Autodetected parallelism={parallelism} based on " | ||
f"estimated_available_cpus={avail_cpus} and " | ||
f"estimated_data_size={mem_size}." | ||
) | ||
|
||
return reader.prepare_read(parallelism) | ||
if reader: | ||
# Increase it to avoid overly-large blocks as needed. | ||
mem_size = reader.estimate_inmemory_data_size() | ||
if mem_size is not None: | ||
parallelism = max( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In cases where the actual parallelism will be severely limited by the number of files (and block splitting is not enabled), can we add a warning message that a partition is too large? This might help UX in cases where we'll definitely OOM. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, let me add this. |
||
int(mem_size / ctx.target_max_block_size), parallelism | ||
) | ||
logger.debug( | ||
f"Autodetected parallelism={parallelism} based on " | ||
f"estimated_available_cpus={avail_cpus} and " | ||
f"estimated_data_size={mem_size}." | ||
) | ||
return parallelism | ||
|
||
|
||
def _estimate_avail_cpus() -> int: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this incorporate the num_files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The auto-detected parallelism will be truncated to the number of files in the datasource
prepare_read()
functions, which I think should be sufficient. Although we should note that this may result the target block size to be exceeded, but that will eventually be solved by dynamic block splitting.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to centralize these in one oracle. It's difficult to find where the parallelism got adjusted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of files isn't known here, so that is not a change we can do easily. It's in the datasource contract that the parallelism is just a "hint" and the actual datasource can decide a lower amount of parallelism if not feasible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the parallelism be tuned inside Reader? Like outside Reader, it's still the requested parallelism from user, and then there is a single place to tune/finalize the number inside Reader when all information is ready. It seems no use of parallelism outside Reader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's currently tuned based on the number of files. Other readers might be reading from a database / REST API and may have other constraints, such as number of connections, etc., that cannot be guessed from outside the reader.