Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Fix logging of set_read_parallelism rule #43423

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix logging of set_read_parallelism rule
Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Feb 26, 2024
commit 478ac164d3f1ee08c0aca35aceaf34aee530abca
22 changes: 11 additions & 11 deletions python/ray/data/_internal/logical/rules/set_read_parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def compute_additional_split_factor(
cur_additional_split_factor: Optional[int] = None,
) -> Tuple[int, str, int, Optional[int]]:
ctx = DataContext.get_current()
parallelism, reason, _, _ = _autodetect_parallelism(
detected_parallelism, reason, _, _ = _autodetect_parallelism(
parallelism, target_max_block_size, ctx, datasource_or_legacy_reader, mem_size
)
num_read_tasks = len(datasource_or_legacy_reader.get_read_tasks(parallelism))
num_read_tasks = len(datasource_or_legacy_reader.get_read_tasks(detected_parallelism))
expected_block_size = None
if mem_size:
expected_block_size = mem_size / num_read_tasks
Expand All @@ -47,18 +47,18 @@ def compute_additional_split_factor(

available_cpu_slots = ray_available_resources().get("CPU", 1)
if (
parallelism
parallelism > 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: might be easier to understand as "user-specified parallelism"

Suggested change
parallelism > 0
parallelism != -1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

and num_read_tasks >= available_cpu_slots * 4
and num_read_tasks >= 5000
):
logger.get_logger().warn(
f"{WARN_PREFIX} The requested parallelism of {parallelism} "
f"{WARN_PREFIX} The requested number of read blocks of {parallelism} "
"is more than 4x the number of available CPU slots in the cluster of "
f"{available_cpu_slots}. This can "
"lead to slowdowns during the data reading phase due to excessive "
"task creation. Reduce the parallelism to match with the available "
"CPU slots in the cluster, or set parallelism to -1 for Ray Data "
"to automatically determine the parallelism. "
"task creation. Reduce the value to match with the available "
"CPU slots in the cluster, or set override_num_blocks to -1 for Ray Data "
"to automatically determine the number of read tasks blocks."
"You can ignore this message if the cluster is expected to autoscale."
)

Expand All @@ -67,12 +67,12 @@ def compute_additional_split_factor(
# parallelism), and if the following operator produces much larger blocks,
# we should scale down the target max block size here instead of using
# splitting, which can have higher memory usage.
if estimated_num_blocks < parallelism and estimated_num_blocks > 0:
k = math.ceil(parallelism / estimated_num_blocks)
if estimated_num_blocks < detected_parallelism and estimated_num_blocks > 0:
k = math.ceil(detected_parallelism / estimated_num_blocks)
estimated_num_blocks = estimated_num_blocks * k
return parallelism, reason, estimated_num_blocks, k
return detected_parallelism, reason, estimated_num_blocks, k

return parallelism, reason, estimated_num_blocks, None
return detected_parallelism, reason, estimated_num_blocks, None


class SetReadParallelismRule(Rule):
Expand Down