From 478ac164d3f1ee08c0aca35aceaf34aee530abca Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 25 Feb 2024 21:16:59 -0800 Subject: [PATCH 1/2] Fix logging of set_read_parallelism rule Signed-off-by: Cheng Su --- .../logical/rules/set_read_parallelism.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/python/ray/data/_internal/logical/rules/set_read_parallelism.py b/python/ray/data/_internal/logical/rules/set_read_parallelism.py index d39c0bbce42ae..bbb471008db9a 100644 --- a/python/ray/data/_internal/logical/rules/set_read_parallelism.py +++ b/python/ray/data/_internal/logical/rules/set_read_parallelism.py @@ -22,10 +22,10 @@ def compute_additional_split_factor( cur_additional_split_factor: Optional[int] = None, ) -> Tuple[int, str, int, Optional[int]]: ctx = DataContext.get_current() - parallelism, reason, _, _ = _autodetect_parallelism( + detected_parallelism, reason, _, _ = _autodetect_parallelism( parallelism, target_max_block_size, ctx, datasource_or_legacy_reader, mem_size ) - num_read_tasks = len(datasource_or_legacy_reader.get_read_tasks(parallelism)) + num_read_tasks = len(datasource_or_legacy_reader.get_read_tasks(detected_parallelism)) expected_block_size = None if mem_size: expected_block_size = mem_size / num_read_tasks @@ -47,18 +47,18 @@ def compute_additional_split_factor( available_cpu_slots = ray_available_resources().get("CPU", 1) if ( - parallelism + parallelism > 0 and num_read_tasks >= available_cpu_slots * 4 and num_read_tasks >= 5000 ): logger.get_logger().warn( - f"{WARN_PREFIX} The requested parallelism of {parallelism} " + f"{WARN_PREFIX} The requested number of read blocks of {parallelism} " "is more than 4x the number of available CPU slots in the cluster of " f"{available_cpu_slots}. This can " "lead to slowdowns during the data reading phase due to excessive " - "task creation. Reduce the parallelism to match with the available " - "CPU slots in the cluster, or set parallelism to -1 for Ray Data " - "to automatically determine the parallelism. " + "task creation. Reduce the value to match with the available " + "CPU slots in the cluster, or set override_num_blocks to -1 for Ray Data " + "to automatically determine the number of read tasks blocks." "You can ignore this message if the cluster is expected to autoscale." ) @@ -67,12 +67,12 @@ def compute_additional_split_factor( # parallelism), and if the following operator produces much larger blocks, # we should scale down the target max block size here instead of using # splitting, which can have higher memory usage. - if estimated_num_blocks < parallelism and estimated_num_blocks > 0: - k = math.ceil(parallelism / estimated_num_blocks) + if estimated_num_blocks < detected_parallelism and estimated_num_blocks > 0: + k = math.ceil(detected_parallelism / estimated_num_blocks) estimated_num_blocks = estimated_num_blocks * k - return parallelism, reason, estimated_num_blocks, k + return detected_parallelism, reason, estimated_num_blocks, k - return parallelism, reason, estimated_num_blocks, None + return detected_parallelism, reason, estimated_num_blocks, None class SetReadParallelismRule(Rule): From a2a2031bf17574adc84089bdf3e708fec7607ecf Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 26 Feb 2024 13:50:06 -0800 Subject: [PATCH 2/2] Address comment and fix unit test Signed-off-by: Cheng Su --- .../data/_internal/logical/rules/set_read_parallelism.py | 6 ++++-- python/ray/data/tests/test_consumption.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/logical/rules/set_read_parallelism.py b/python/ray/data/_internal/logical/rules/set_read_parallelism.py index bbb471008db9a..6aaaf3b4a7b53 100644 --- a/python/ray/data/_internal/logical/rules/set_read_parallelism.py +++ b/python/ray/data/_internal/logical/rules/set_read_parallelism.py @@ -25,7 +25,9 @@ def compute_additional_split_factor( detected_parallelism, reason, _, _ = _autodetect_parallelism( parallelism, target_max_block_size, ctx, datasource_or_legacy_reader, mem_size ) - num_read_tasks = len(datasource_or_legacy_reader.get_read_tasks(detected_parallelism)) + num_read_tasks = len( + datasource_or_legacy_reader.get_read_tasks(detected_parallelism) + ) expected_block_size = None if mem_size: expected_block_size = mem_size / num_read_tasks @@ -47,7 +49,7 @@ def compute_additional_split_factor( available_cpu_slots = ray_available_resources().get("CPU", 1) if ( - parallelism > 0 + parallelism != -1 and num_read_tasks >= available_cpu_slots * 4 and num_read_tasks >= 5000 ): diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index e6a9236b42517..c612b86657d18 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -1540,7 +1540,7 @@ def test_read_warning_large_parallelism(ray_start_regular, propagate_logs, caplo with caplog.at_level(logging.WARNING, logger="ray.data.read_api"): ray.data.range(5000, parallelism=5000).materialize() assert ( - "The requested parallelism of 5000 is " + "The requested number of read blocks of 5000 is " "more than 4x the number of available CPU slots in the cluster" in caplog.text ), caplog.text