From 4e17b31608a276d00479a9e87f4349ebf8fe77e9 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 23 Jan 2024 17:17:17 -0800 Subject: [PATCH 1/6] Allow tasks to control concurrency in map-like APIs Signed-off-by: Cheng Su --- python/ray/data/_internal/compute.py | 41 ++++++++++++++++++- .../concurrency_cap_backpressure_policy.py | 30 ++++++++++---- .../execution/operators/map_operator.py | 24 ++++++++++- .../execution/streaming_executor_state.py | 7 +++- python/ray/data/_internal/util.py | 26 ++++++------ 5 files changed, 104 insertions(+), 24 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 8c96688925980..e52ff4ca7816b 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -56,6 +56,42 @@ def _apply( @DeveloperAPI class TaskPoolStrategy(ComputeStrategy): + def __init__( + self, + size: Optional[int] = None, + min_size: Optional[int] = None, + max_size: Optional[int] = None, + ): + """Construct TaskPoolStrategy for a Dataset transform. + + Args: + size: Specify a fixed size task pool of this size. It is an error to + specify both `size` and `min_size` or `max_size`. + min_size: The minimize size of the task pool. + max_size: The maximum size of the task pool. + """ + + if size: + if size < 1: + raise ValueError("`size` must be >= 1", size) + if max_size is not None or min_size is not None: + raise ValueError( + "`min_size` and `max_size` cannot be set at the same time as `size`" + ) + min_size = size + max_size = size + elif max_size is not None and min_size is not None: + if min_size < 1: + raise ValueError("`min_size` must be >= 1", min_size) + if min_size > max_size: + raise ValueError("`min_size` must be <= `max_size`", min_size, max_size) + else: + # Legacy code path to support `TaskPoolStrategy()` + min_size = None + max_size = None + self.min_size = min_size + self.max_size = max_size + def _apply( self, block_fn: BlockTransform, @@ -148,7 +184,10 @@ def _apply( ) def __eq__(self, other: Any) -> bool: - return isinstance(other, TaskPoolStrategy) or other == "tasks" + if isinstance(other, TaskPoolStrategy) or other == "tasks": + return self.min_size == other.min_size and self.max_size == other.max_size + else: + return False @PublicAPI diff --git a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py index 9052447c2587b..e6c4a8594c6c1 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py @@ -1,5 +1,5 @@ import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import ray from .backpressure_policy import BackpressurePolicy @@ -20,7 +20,8 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy): It will be set to an intial value, and will ramp up exponentially. The concrete stategy is as follows: - - Each PhysicalOperator is assigned an initial concurrency cap. + - Each PhysicalOperator is assigned an initial concurrency cap, and maximal + concurrency cap (optional). - An PhysicalOperator can run new tasks if the number of running tasks is less than the cap. - When the number of finished tasks reaches a threshold, the concurrency cap will @@ -44,13 +45,22 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy): CAP_MULTIPLIER = 2.0 CAP_MULTIPLIER_CONFIG_KEY = "backpressure_policies.concurrency_cap.cap_multiplier" - def __init__(self, topology: "Topology"): + def __init__( + self, + topology: "Topology", + min_cap: Optional[int] = None, + max_cap: Optional[int] = None, + ): self._concurrency_caps: dict["PhysicalOperator", float] = {} data_context = ray.data.DataContext.get_current() - self._init_cap = data_context.get_config( - self.INIT_CAP_CONFIG_KEY, self.INIT_CAP - ) + if min_cap: + self._init_cap = min_cap + else: + self._init_cap = data_context.get_config( + self.INIT_CAP_CONFIG_KEY, self.INIT_CAP + ) + self._max_cap = max_cap self._cap_multiplier = data_context.get_config( self.CAP_MULTIPLIER_CONFIG_KEY, self.CAP_MULTIPLIER ) @@ -59,12 +69,15 @@ def __init__(self, topology: "Topology"): ) assert self._init_cap > 0 + assert self._max_cap is None or self._max_cap >= self._init_cap assert 0 < self._cap_multiply_threshold <= 1 assert self._cap_multiplier >= 1 logger.debug( "ConcurrencyCapBackpressurePolicy initialized with config: " - f"{self._init_cap}, {self._cap_multiply_threshold}, {self._cap_multiplier}" + f"init_cap: {self._init_cap}, max_cap: {self._max_cap}, " + f"cap_multiply_threshold: {self._cap_multiply_threshold}, " + f"cap_multiplier: {self._cap_multiplier}" ) for op, _ in topology.items(): @@ -76,6 +89,9 @@ def can_add_input(self, op: "PhysicalOperator") -> bool: self._concurrency_caps[op] * self._cap_multiply_threshold ): self._concurrency_caps[op] *= self._cap_multiplier + if self._max_cap and self._concurrency_caps[op] >= self._max_cap: + self._concurrency_caps[op] = self._max_cap + break logger.debug( f"Concurrency cap for {op} increased to {self._concurrency_caps[op]}" ) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 774fde880f0a5..f94daf6f40e26 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -13,6 +13,12 @@ ComputeStrategy, TaskPoolStrategy, ) +from ray.data._internal.execution.backpressure_policy.backpressure_policy import ( + BackpressurePolicy, +) +from ray.data._internal.execution.backpressure_policy.concurrency_cap_backpressure_policy import ( + ConcurrencyCapBackpressurePolicy, +) from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -63,6 +69,9 @@ def __init__( self._ray_remote_args_factory = None self._remote_args_for_metrics = copy.deepcopy(self._ray_remote_args) + # Initialize the back pressure policies to be empty list. + self._backpressure_policies = [] + # Bundles block references up to the min_rows_per_bundle target. self._block_ref_bundler = _BlockRefBundler(min_rows_per_bundle) @@ -146,7 +155,7 @@ def create( TaskPoolMapOperator, ) - return TaskPoolMapOperator( + op = TaskPoolMapOperator( map_transformer, input_op, name=name, @@ -154,6 +163,13 @@ def create( min_rows_per_bundle=min_rows_per_bundle, ray_remote_args=ray_remote_args, ) + if compute_strategy.min_size is not None and compute_strategy.max_size is not None: + concurrency_policy = ConcurrencyCapBackpressurePolicy( + topology={op: None}, + min_cap=compute_strategy.min_size, + max_cap=compute_strategy.max_size, + ) + op.set_backpressure_polices([concurrency_policy]) elif isinstance(compute_strategy, ActorPoolStrategy): from ray.data._internal.execution.operators.actor_pool_map_operator import ( ActorPoolMapOperator, @@ -364,6 +380,12 @@ def _get_next_inner(self) -> RefBundle: self._output_metadata.append(meta) return bundle + def set_backpressure_polices(self, policies: List[BackpressurePolicy]): + self._backpressure_policies = policies + + def get_backpressure_polices(self) -> List[BackpressurePolicy]: + return self._backpressure_policies + @abstractmethod def progress_str(self) -> str: raise NotImplementedError diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 88108ba86847e..5b5a37e6c2e2e 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -32,6 +32,7 @@ AllToAllOperator, ) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.util import memory_string from ray.data._internal.progress_bar import ProgressBar from ray.data.context import DataContext @@ -570,13 +571,17 @@ def select_operator_to_run( ops = [] for op, state in topology.items(): under_resource_limits = _execution_allowed(op, cur_usage, limits) + policies_per_op = [] + if isinstance(op, MapOperator): + policies_per_op += op.get_backpressure_polices + if ( op.need_more_inputs() and state.num_queued() > 0 and op.should_add_input() and under_resource_limits and not op.completed() - and all(p.can_add_input(op) for p in backpressure_policies) + and all(p.can_add_input(op) for p in (backpressure_policies + policies_per_op)) ): ops.append(op) # Update the op in all cases to enable internal autoscaling, etc. diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index d7b0b644f8bcb..106d79201a59f 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -587,32 +587,30 @@ def get_compute_strategy( ) return compute elif concurrency is not None: - if not is_callable_class: - # Currently do not support concurrency control with function, - # i.e., running with Ray Tasks (`TaskPoolMapOperator`). - logger.warning( - "``concurrency`` is set, but ``fn`` is not a callable class: " - f"{fn}. ``concurrency`` are currently only supported when " - "``fn`` is a callable class." - ) - return TaskPoolStrategy() - if isinstance(concurrency, tuple): if ( len(concurrency) == 2 and isinstance(concurrency[0], int) and isinstance(concurrency[1], int) ): - return ActorPoolStrategy( - min_size=concurrency[0], max_size=concurrency[1] - ) + if is_callable_class: + return ActorPoolStrategy( + min_size=concurrency[0], max_size=concurrency[1] + ) + else: + return TaskPoolStrategy( + min_size=concurrency[0], max_size=concurrency[1] + ) else: raise ValueError( "``concurrency`` is expected to be set as a tuple of " f"integers, but got: {concurrency}." ) elif isinstance(concurrency, int): - return ActorPoolStrategy(size=concurrency) + if is_callable_class: + return ActorPoolStrategy(size=concurrency) + else: + return TaskPoolStrategy(size=concurrency) else: raise ValueError( "``concurrency`` is expected to be set as an integer or a " From 398105b96f9466e2e572712454c6a6bca5903888 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 25 Jan 2024 16:07:53 -0800 Subject: [PATCH 2/6] Change to remove exponential rampup Signed-off-by: Cheng Su --- python/ray/data/_internal/compute.py | 36 ++------ .../execution/backpressure_policy/__init__.py | 3 +- .../concurrency_cap_backpressure_policy.py | 84 ++++--------------- .../execution/operators/map_operator.py | 22 +---- .../operators/task_pool_map_operator.py | 7 ++ .../execution/streaming_executor_state.py | 7 +- python/ray/data/_internal/util.py | 6 +- python/ray/data/tests/test_map.py | 9 +- 8 files changed, 44 insertions(+), 130 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index e52ff4ca7816b..60c9afef6766f 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -59,38 +59,16 @@ class TaskPoolStrategy(ComputeStrategy): def __init__( self, size: Optional[int] = None, - min_size: Optional[int] = None, - max_size: Optional[int] = None, ): """Construct TaskPoolStrategy for a Dataset transform. Args: - size: Specify a fixed size task pool of this size. It is an error to - specify both `size` and `min_size` or `max_size`. - min_size: The minimize size of the task pool. - max_size: The maximum size of the task pool. + size: Specify the maximum size of the task pool. """ - if size: - if size < 1: - raise ValueError("`size` must be >= 1", size) - if max_size is not None or min_size is not None: - raise ValueError( - "`min_size` and `max_size` cannot be set at the same time as `size`" - ) - min_size = size - max_size = size - elif max_size is not None and min_size is not None: - if min_size < 1: - raise ValueError("`min_size` must be >= 1", min_size) - if min_size > max_size: - raise ValueError("`min_size` must be <= `max_size`", min_size, max_size) - else: - # Legacy code path to support `TaskPoolStrategy()` - min_size = None - max_size = None - self.min_size = min_size - self.max_size = max_size + if size is not None and size < 1: + raise ValueError("`size` must be >= 1", size) + self.size = size def _apply( self, @@ -184,10 +162,8 @@ def _apply( ) def __eq__(self, other: Any) -> bool: - if isinstance(other, TaskPoolStrategy) or other == "tasks": - return self.min_size == other.min_size and self.max_size == other.max_size - else: - return False + return (isinstance(other, TaskPoolStrategy) and self.size == other.size)\ + or (other == "tasks" and self.size is None) @PublicAPI diff --git a/python/ray/data/_internal/execution/backpressure_policy/__init__.py b/python/ray/data/_internal/execution/backpressure_policy/__init__.py index 9f15fddc59cbf..57d52f96460fa 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/__init__.py +++ b/python/ray/data/_internal/execution/backpressure_policy/__init__.py @@ -10,9 +10,8 @@ # Default enabled backpressure policies and its config key. # Use `DataContext.set_config` to config it. -# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default. # TODO(hchen): Enable StreamingOutputBackpressurePolicy by default. -ENABLED_BACKPRESSURE_POLICIES = [] +ENABLED_BACKPRESSURE_POLICIES = [ConcurrencyCapBackpressurePolicy] ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled" diff --git a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py index e6c4a8594c6c1..e1b307c3e6d1b 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py @@ -1,8 +1,10 @@ import logging -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING -import ray from .backpressure_policy import BackpressurePolicy +from ray.data._internal.execution.operators.task_pool_map_operator import ( + TaskPoolMapOperator, +) if TYPE_CHECKING: from ray.data._internal.execution.interfaces.physical_operator import ( @@ -17,82 +19,28 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy): """A backpressure policy that caps the concurrency of each operator. The concurrency cap limits the number of concurrently running tasks. - It will be set to an intial value, and will ramp up exponentially. The concrete stategy is as follows: - - Each PhysicalOperator is assigned an initial concurrency cap, and maximal - concurrency cap (optional). + - Each PhysicalOperator is assigned a concurrency cap. - An PhysicalOperator can run new tasks if the number of running tasks is less than the cap. - - When the number of finished tasks reaches a threshold, the concurrency cap will - increase. - """ - - # Following are the default values followed by the config keys of the - # available configs. - # Use `DataContext.set_config` to config them. - # The intial concurrency cap for each operator. - INIT_CAP = 4 - INIT_CAP_CONFIG_KEY = "backpressure_policies.concurrency_cap.init_cap" - # When the number of finished tasks reaches this threshold, the concurrency cap - # will be multiplied by the multiplier. - CAP_MULTIPLY_THRESHOLD = 0.5 - CAP_MULTIPLY_THRESHOLD_CONFIG_KEY = ( - "backpressure_policies.concurrency_cap.cap_multiply_threshold" - ) - # The multiplier to multiply the concurrency cap by. - CAP_MULTIPLIER = 2.0 - CAP_MULTIPLIER_CONFIG_KEY = "backpressure_policies.concurrency_cap.cap_multiplier" + NOTE: Only support setting concurrency cap for `TaskPoolMapOperator` for now. + """ - def __init__( - self, - topology: "Topology", - min_cap: Optional[int] = None, - max_cap: Optional[int] = None, - ): + def __init__(self, topology: "Topology"): self._concurrency_caps: dict["PhysicalOperator", float] = {} - data_context = ray.data.DataContext.get_current() - if min_cap: - self._init_cap = min_cap - else: - self._init_cap = data_context.get_config( - self.INIT_CAP_CONFIG_KEY, self.INIT_CAP - ) - self._max_cap = max_cap - self._cap_multiplier = data_context.get_config( - self.CAP_MULTIPLIER_CONFIG_KEY, self.CAP_MULTIPLIER - ) - self._cap_multiply_threshold = data_context.get_config( - self.CAP_MULTIPLY_THRESHOLD_CONFIG_KEY, self.CAP_MULTIPLY_THRESHOLD - ) - - assert self._init_cap > 0 - assert self._max_cap is None or self._max_cap >= self._init_cap - assert 0 < self._cap_multiply_threshold <= 1 - assert self._cap_multiplier >= 1 + for op, _ in topology.items(): + if isinstance(op, TaskPoolMapOperator) and op.get_concurrency() is not None: + self._concurrency_caps[op] = op.get_concurrency() + else: + self._concurrency_caps[op] = float("inf") logger.debug( - "ConcurrencyCapBackpressurePolicy initialized with config: " - f"init_cap: {self._init_cap}, max_cap: {self._max_cap}, " - f"cap_multiply_threshold: {self._cap_multiply_threshold}, " - f"cap_multiplier: {self._cap_multiplier}" + "ConcurrencyCapBackpressurePolicy initialized with: " + f"{self._concurrency_caps}" ) - for op, _ in topology.items(): - self._concurrency_caps[op] = self._init_cap - def can_add_input(self, op: "PhysicalOperator") -> bool: - metrics = op.metrics - while self._cap_multiplier > 1 and metrics.num_tasks_finished >= ( - self._concurrency_caps[op] * self._cap_multiply_threshold - ): - self._concurrency_caps[op] *= self._cap_multiplier - if self._max_cap and self._concurrency_caps[op] >= self._max_cap: - self._concurrency_caps[op] = self._max_cap - break - logger.debug( - f"Concurrency cap for {op} increased to {self._concurrency_caps[op]}" - ) - return metrics.num_tasks_running < self._concurrency_caps[op] + return op.metrics.num_tasks_running < self._concurrency_caps[op] diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index f94daf6f40e26..353526372fbe4 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -13,12 +13,6 @@ ComputeStrategy, TaskPoolStrategy, ) -from ray.data._internal.execution.backpressure_policy.backpressure_policy import ( - BackpressurePolicy, -) -from ray.data._internal.execution.backpressure_policy.concurrency_cap_backpressure_policy import ( - ConcurrencyCapBackpressurePolicy, -) from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -155,21 +149,15 @@ def create( TaskPoolMapOperator, ) - op = TaskPoolMapOperator( + return TaskPoolMapOperator( map_transformer, input_op, name=name, target_max_block_size=target_max_block_size, min_rows_per_bundle=min_rows_per_bundle, + concurrency=compute_strategy.size, ray_remote_args=ray_remote_args, ) - if compute_strategy.min_size is not None and compute_strategy.max_size is not None: - concurrency_policy = ConcurrencyCapBackpressurePolicy( - topology={op: None}, - min_cap=compute_strategy.min_size, - max_cap=compute_strategy.max_size, - ) - op.set_backpressure_polices([concurrency_policy]) elif isinstance(compute_strategy, ActorPoolStrategy): from ray.data._internal.execution.operators.actor_pool_map_operator import ( ActorPoolMapOperator, @@ -380,12 +368,6 @@ def _get_next_inner(self) -> RefBundle: self._output_metadata.append(meta) return bundle - def set_backpressure_polices(self, policies: List[BackpressurePolicy]): - self._backpressure_policies = policies - - def get_backpressure_polices(self) -> List[BackpressurePolicy]: - return self._backpressure_policies - @abstractmethod def progress_str(self) -> str: raise NotImplementedError diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index a3404897d4407..01afc6830357c 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -23,6 +23,7 @@ def __init__( target_max_block_size: Optional[int], name: str = "TaskPoolMap", min_rows_per_bundle: Optional[int] = None, + concurrency: Optional[int] = None, ray_remote_args: Optional[Dict[str, Any]] = None, ): """Create an TaskPoolMapOperator instance. @@ -37,8 +38,11 @@ def __init__( transform_fn, or None to use the block size. Setting the batch size is important for the performance of GPU-accelerated transform functions. The actual rows passed may be less if the dataset is small. + concurrency: The maximum number of Ray tasks to use concurrently, + or None to use as many tasks as possible. ray_remote_args: Customize the ray remote args for this op's tasks. """ + self._concurrency = concurrency super().__init__( map_transformer, input_op, @@ -114,3 +118,6 @@ def incremental_resource_usage(self) -> ExecutionResources: gpu=self._ray_remote_args.get("num_gpus", 0), object_store_memory=self._metrics.average_bytes_outputs_per_task, ) + + def get_concurrency(self) -> Optional[int]: + return self._concurrency diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 5b5a37e6c2e2e..88108ba86847e 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -32,7 +32,6 @@ AllToAllOperator, ) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.util import memory_string from ray.data._internal.progress_bar import ProgressBar from ray.data.context import DataContext @@ -571,17 +570,13 @@ def select_operator_to_run( ops = [] for op, state in topology.items(): under_resource_limits = _execution_allowed(op, cur_usage, limits) - policies_per_op = [] - if isinstance(op, MapOperator): - policies_per_op += op.get_backpressure_polices - if ( op.need_more_inputs() and state.num_queued() > 0 and op.should_add_input() and under_resource_limits and not op.completed() - and all(p.can_add_input(op) for p in (backpressure_policies + policies_per_op)) + and all(p.can_add_input(op) for p in backpressure_policies) ): ops.append(op) # Update the op in all cases to enable internal autoscaling, etc. diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index 106d79201a59f..2d251163f5ced 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -598,8 +598,10 @@ def get_compute_strategy( min_size=concurrency[0], max_size=concurrency[1] ) else: - return TaskPoolStrategy( - min_size=concurrency[0], max_size=concurrency[1] + raise ValueError( + "``concurrency`` is set as a tuple of integers, but ``fn`` " + f"is not a callable class: {fn}. Use ``concurrency=n`` to " + "control maximal number of workers to use." ) else: raise ValueError( diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index a3dc65e921460..a08642c8caba3 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -250,8 +250,13 @@ def __call__(self, x): for fn in [udf, UDFClass]: # Test concurrency with None, single integer and a tuple of integers. for concurrency in [2, (2, 4)]: - result = ds.map(fn, concurrency=concurrency).take_all() - assert sorted(extract_values("id", result)) == list(range(10)), result + if fn == udf and concurrency == (2, 4): + error_message = "``concurrency`` is set as a tuple of integers" + with pytest.raises(ValueError, match=error_message): + ds.map(fn, concurrency=concurrency).take_all() + else: + result = ds.map(fn, concurrency=concurrency).take_all() + assert sorted(extract_values("id", result)) == list(range(10)), result # Test concurrency with an illegal value. error_message = "``concurrency`` is expected to be set a" From 2ca833df82830ac64f406821c6d0f0fed673fc8a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 26 Jan 2024 11:50:01 -0800 Subject: [PATCH 3/6] Fix unit test Signed-off-by: Cheng Su --- python/ray/data/_internal/compute.py | 5 +- .../execution/operators/map_operator.py | 3 - .../data/tests/test_backpressure_policies.py | 128 ++++-------------- 3 files changed, 32 insertions(+), 104 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 60c9afef6766f..fecc415c8983c 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -162,8 +162,9 @@ def _apply( ) def __eq__(self, other: Any) -> bool: - return (isinstance(other, TaskPoolStrategy) and self.size == other.size)\ - or (other == "tasks" and self.size is None) + return (isinstance(other, TaskPoolStrategy) and self.size == other.size) or ( + other == "tasks" and self.size is None + ) @PublicAPI diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 353526372fbe4..83eeb58f0a594 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -63,9 +63,6 @@ def __init__( self._ray_remote_args_factory = None self._remote_args_for_metrics = copy.deepcopy(self._ray_remote_args) - # Initialize the back pressure policies to be empty list. - self._backpressure_policies = [] - # Bundles block references up to the min_rows_per_bundle target. self._block_ref_bundler = _BlockRefBundler(min_rows_per_bundle) diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 2673d069e802d..a855c92c3ad76 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -2,7 +2,6 @@ import time import unittest from collections import defaultdict -from contextlib import contextmanager from unittest.mock import MagicMock, patch import numpy as np @@ -14,6 +13,10 @@ ConcurrencyCapBackpressurePolicy, StreamingOutputBackpressurePolicy, ) +from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.execution.operators.task_pool_map_operator import ( + TaskPoolMapOperator, +) from ray.data.tests.conftest import restore_data_context # noqa: F401 from ray.data.tests.conftest import ( CoreExecutionMetrics, @@ -42,94 +45,39 @@ def tearDownClass(cls): data_context = ray.data.DataContext.get_current() data_context.remove_config(ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY) - @contextmanager - def _patch_config(self, init_cap, cap_multiply_threshold, cap_multiplier): - data_context = ray.data.DataContext.get_current() - data_context.set_config( - ConcurrencyCapBackpressurePolicy.INIT_CAP_CONFIG_KEY, - init_cap, - ) - data_context.set_config( - ConcurrencyCapBackpressurePolicy.CAP_MULTIPLY_THRESHOLD_CONFIG_KEY, - cap_multiply_threshold, - ) - data_context.set_config( - ConcurrencyCapBackpressurePolicy.CAP_MULTIPLIER_CONFIG_KEY, - cap_multiplier, - ) - yield - data_context.remove_config(ConcurrencyCapBackpressurePolicy.INIT_CAP_CONFIG_KEY) - data_context.remove_config( - ConcurrencyCapBackpressurePolicy.CAP_MULTIPLY_THRESHOLD_CONFIG_KEY - ) - data_context.remove_config( - ConcurrencyCapBackpressurePolicy.CAP_MULTIPLIER_CONFIG_KEY - ) - def test_basic(self): - op = MagicMock() - op.metrics = MagicMock( - num_tasks_running=0, - num_tasks_finished=0, + concurrency = 16 + input_op = InputDataBuffer(input_data=[MagicMock()]) + map_op_no_concurrency = TaskPoolMapOperator( + map_transformer=MagicMock(), + input_op=input_op, + target_max_block_size=None, ) - topology = {op: MagicMock()} + map_op = TaskPoolMapOperator( + map_transformer=MagicMock(), + input_op=map_op_no_concurrency, + target_max_block_size=None, + concurrency=concurrency, + ) + map_op.metrics.num_tasks_running = 0 + map_op.metrics.num_tasks_finished = 0 + topology = {map_op: MagicMock()} - init_cap = 4 - cap_multiply_threshold = 0.5 - cap_multiplier = 2.0 + policy = ConcurrencyCapBackpressurePolicy(topology) - with self._patch_config(init_cap, cap_multiply_threshold, cap_multiplier): - policy = ConcurrencyCapBackpressurePolicy(topology) + self.assertEqual(policy._concurrency_caps[map_op], concurrency) + self.assertTrue(input_op not in policy._concurrency_caps) + self.assertTrue(map_op_no_concurrency not in policy._concurrency_caps) - self.assertEqual(policy._concurrency_caps[op], 4) # Gradually increase num_tasks_running to the cap. - for i in range(1, init_cap + 1): - self.assertTrue(policy.can_add_input(op)) - op.metrics.num_tasks_running = i + for i in range(1, concurrency + 1): + self.assertTrue(policy.can_add_input(map_op)) + map_op.metrics.num_tasks_running = i # Now num_tasks_running reaches the cap, so can_add_input should return False. - self.assertFalse(policy.can_add_input(op)) - - # If we increase num_task_finished to the threshold (4 * 0.5 = 2), - # it should trigger the cap to increase. - op.metrics.num_tasks_finished = init_cap * cap_multiply_threshold - self.assertEqual(policy.can_add_input(op), True) - self.assertEqual(policy._concurrency_caps[op], init_cap * cap_multiplier) - - # Now the cap is 8 (4 * 2). - # If we increase num_tasks_finished directly to the next-level's threshold - # (8 * 2 * 0.5 = 8), it should trigger the cap to increase twice. - op.metrics.num_tasks_finished = ( - policy._concurrency_caps[op] * cap_multiplier * cap_multiply_threshold - ) - op.metrics.num_tasks_running = 0 - self.assertEqual(policy.can_add_input(op), True) - self.assertEqual(policy._concurrency_caps[op], init_cap * cap_multiplier**3) + self.assertFalse(policy.can_add_input(map_op)) - def test_config(self): - topology = {} - # Test good config. - with self._patch_config(10, 0.3, 1.5): - policy = ConcurrencyCapBackpressurePolicy(topology) - self.assertEqual(policy._init_cap, 10) - self.assertEqual(policy._cap_multiply_threshold, 0.3) - self.assertEqual(policy._cap_multiplier, 1.5) - - with self._patch_config(10, 0.3, 1): - policy = ConcurrencyCapBackpressurePolicy(topology) - self.assertEqual(policy._init_cap, 10) - self.assertEqual(policy._cap_multiply_threshold, 0.3) - self.assertEqual(policy._cap_multiplier, 1) - - # Test bad configs. - with self._patch_config(-1, 0.3, 1.5): - with self.assertRaises(AssertionError): - policy = ConcurrencyCapBackpressurePolicy(topology) - with self._patch_config(10, 1.1, 1.5): - with self.assertRaises(AssertionError): - policy = ConcurrencyCapBackpressurePolicy(topology) - with self._patch_config(10, 0.3, 0.5): - with self.assertRaises(AssertionError): - policy = ConcurrencyCapBackpressurePolicy(topology) + map_op.metrics.num_tasks_running = concurrency / 2 + self.assertEqual(policy.can_add_input(map_op), True) def _create_record_time_actor(self): @ray.remote(num_cpus=0) @@ -185,24 +133,6 @@ def test_e2e_normal(self): start2, end2 = ray.get(actor.get_start_and_end_time_for_op.remote(2)) assert start1 < start2 < end1 < end2, (start1, start2, end1, end2) - def test_e2e_no_ramping_up(self): - """Test setting the multiplier to 1.0, which means no ramping up of the - concurrency cap.""" - with self._patch_config(1, 1, 1): - actor = self._create_record_time_actor() - map_func1 = self._get_map_func(actor, 1) - N = self.__class__._cluster_cpus - ds = ray.data.range(N, parallelism=N) - ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1) - res = ds.take_all() - self.assertEqual(len(res), N) - - start, end = ray.get( - actor.get_start_and_end_time_for_all_tasks_of_op.remote(1) - ) - for i in range(len(start) - 1): - assert start[i] < end[i] < start[i + 1], (i, start, end) - class TestStreamOutputBackpressurePolicy(unittest.TestCase): """Tests for StreamOutputBackpressurePolicy.""" From 20f185bebc8108786f171fdaf6c9af59d37e1e84 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 26 Jan 2024 14:12:21 -0800 Subject: [PATCH 4/6] Update test Signed-off-by: Cheng Su --- python/ray/data/tests/test_backpressure_policies.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index a855c92c3ad76..9df247454713b 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -1,4 +1,5 @@ import functools +import math import time import unittest from collections import defaultdict @@ -61,13 +62,17 @@ def test_basic(self): ) map_op.metrics.num_tasks_running = 0 map_op.metrics.num_tasks_finished = 0 - topology = {map_op: MagicMock()} + topology = { + map_op: MagicMock(), + input_op: MagicMock(), + map_op_no_concurrency: MagicMock(), + } policy = ConcurrencyCapBackpressurePolicy(topology) self.assertEqual(policy._concurrency_caps[map_op], concurrency) - self.assertTrue(input_op not in policy._concurrency_caps) - self.assertTrue(map_op_no_concurrency not in policy._concurrency_caps) + self.assertTrue(math.isinf(policy._concurrency_caps[input_op])) + self.assertTrue(math.isinf(policy._concurrency_caps[map_op_no_concurrency])) # Gradually increase num_tasks_running to the cap. for i in range(1, concurrency + 1): From 6e7033849eadad34aa7a34547e14ed3f2b9c411c Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 29 Jan 2024 12:46:22 -0800 Subject: [PATCH 5/6] Address comments Signed-off-by: Cheng Su --- .../_internal/execution/operators/task_pool_map_operator.py | 2 +- python/ray/data/_internal/util.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index 01afc6830357c..b46a039f44ff9 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -42,7 +42,6 @@ def __init__( or None to use as many tasks as possible. ray_remote_args: Customize the ray remote args for this op's tasks. """ - self._concurrency = concurrency super().__init__( map_transformer, input_op, @@ -51,6 +50,7 @@ def __init__( min_rows_per_bundle, ray_remote_args, ) + self._concurrency = concurrency def _add_bundled_input(self, bundle: RefBundle): # Submit the task as a normal Ray task. diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index 2d251163f5ced..d5dfedff84fe2 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -601,7 +601,7 @@ def get_compute_strategy( raise ValueError( "``concurrency`` is set as a tuple of integers, but ``fn`` " f"is not a callable class: {fn}. Use ``concurrency=n`` to " - "control maximal number of workers to use." + "control maximum number of workers to use." ) else: raise ValueError( From 06f5d4a143dfb3103ad310385147878f1f5fe88d Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 29 Jan 2024 15:13:57 -0800 Subject: [PATCH 6/6] Address comments Signed-off-by: Cheng Su --- .../concurrency_cap_backpressure_policy.py | 9 +++------ python/ray/data/tests/test_backpressure_policies.py | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py index e1b307c3e6d1b..a52bd1f6ab9f7 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py @@ -18,14 +18,11 @@ class ConcurrencyCapBackpressurePolicy(BackpressurePolicy): """A backpressure policy that caps the concurrency of each operator. - The concurrency cap limits the number of concurrently running tasks. - - The concrete stategy is as follows: - - Each PhysicalOperator is assigned a concurrency cap. - - An PhysicalOperator can run new tasks if the number of running tasks is less - than the cap. + The policy will limit the number of concurrently running tasks based on its + concurrency cap parameter. NOTE: Only support setting concurrency cap for `TaskPoolMapOperator` for now. + TODO(chengsu): Consolidate with actor scaling logic of `ActorPoolMapOperator`. """ def __init__(self, topology: "Topology"): diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 9df247454713b..e852e182f4149 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -125,8 +125,8 @@ def test_e2e_normal(self): N = self.__class__._cluster_cpus ds = ray.data.range(N, parallelism=N) # Use different `num_cpus` to make sure they don't fuse. - ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1) - ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1) + ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1, concurrency=1) + ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1, concurrency=1) res = ds.take_all() self.assertEqual(len(res), N)