Skip to content

Commit

Permalink
[Data] Fix logging of set_read_parallelism rule (ray-project#43423)
Browse files Browse the repository at this point in the history
This PR is to fix the logging of set_read_parallelism rule. Avoid printing the warning for parallelism being set to high, when user does not set parallelism manually. Also, rewording the warning to avoid mentioning parallelism.

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 committed Feb 26, 2024
1 parent afcae80 commit efdf651
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
24 changes: 13 additions & 11 deletions python/ray/data/_internal/logical/rules/set_read_parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ def compute_additional_split_factor(
cur_additional_split_factor: Optional[int] = None,
) -> Tuple[int, str, int, Optional[int]]:
ctx = DataContext.get_current()
parallelism, reason, _, _ = _autodetect_parallelism(
detected_parallelism, reason, _, _ = _autodetect_parallelism(
parallelism, target_max_block_size, ctx, datasource_or_legacy_reader, mem_size
)
num_read_tasks = len(datasource_or_legacy_reader.get_read_tasks(parallelism))
num_read_tasks = len(
datasource_or_legacy_reader.get_read_tasks(detected_parallelism)
)
expected_block_size = None
if mem_size:
expected_block_size = mem_size / num_read_tasks
Expand All @@ -47,18 +49,18 @@ def compute_additional_split_factor(

available_cpu_slots = ray_available_resources().get("CPU", 1)
if (
parallelism
parallelism != -1
and num_read_tasks >= available_cpu_slots * 4
and num_read_tasks >= 5000
):
logger.get_logger().warn(
f"{WARN_PREFIX} The requested parallelism of {parallelism} "
f"{WARN_PREFIX} The requested number of read blocks of {parallelism} "
"is more than 4x the number of available CPU slots in the cluster of "
f"{available_cpu_slots}. This can "
"lead to slowdowns during the data reading phase due to excessive "
"task creation. Reduce the parallelism to match with the available "
"CPU slots in the cluster, or set parallelism to -1 for Ray Data "
"to automatically determine the parallelism. "
"task creation. Reduce the value to match with the available "
"CPU slots in the cluster, or set override_num_blocks to -1 for Ray Data "
"to automatically determine the number of read tasks blocks."
"You can ignore this message if the cluster is expected to autoscale."
)

Expand All @@ -67,12 +69,12 @@ def compute_additional_split_factor(
# parallelism), and if the following operator produces much larger blocks,
# we should scale down the target max block size here instead of using
# splitting, which can have higher memory usage.
if estimated_num_blocks < parallelism and estimated_num_blocks > 0:
k = math.ceil(parallelism / estimated_num_blocks)
if estimated_num_blocks < detected_parallelism and estimated_num_blocks > 0:
k = math.ceil(detected_parallelism / estimated_num_blocks)
estimated_num_blocks = estimated_num_blocks * k
return parallelism, reason, estimated_num_blocks, k
return detected_parallelism, reason, estimated_num_blocks, k

return parallelism, reason, estimated_num_blocks, None
return detected_parallelism, reason, estimated_num_blocks, None


class SetReadParallelismRule(Rule):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ def test_read_warning_large_parallelism(ray_start_regular, propagate_logs, caplo
with caplog.at_level(logging.WARNING, logger="ray.data.read_api"):
ray.data.range(5000, parallelism=5000).materialize()
assert (
"The requested parallelism of 5000 is "
"The requested number of read blocks of 5000 is "
"more than 4x the number of available CPU slots in the cluster" in caplog.text
), caplog.text

Expand Down

0 comments on commit efdf651

Please sign in to comment.