From e4352305dd3a6c64f73a114f41201c1822cbf1a0 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Thu, 16 Jun 2022 11:16:20 -0500 Subject: [PATCH] Revert "[serve] Use soft constraint for pinning controller on head node (#25091)" (#25857) This reverts commit 0f600362dd91f1fc83555c3d166b5370bc77ed87. --- python/ray/serve/api.py | 18 +++++------ python/ray/serve/controller.py | 3 -- python/ray/serve/http_proxy.py | 5 +-- python/ray/serve/http_state.py | 29 +++++++---------- python/ray/serve/tests/test_http_state.py | 16 +++++----- python/ray/serve/tests/test_util.py | 8 +++++ python/ray/serve/utils.py | 38 ++++++++++++++++++----- 7 files changed, 69 insertions(+), 48 deletions(-) diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 7eb16a1d4c5f7..5474a8cf7990f 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -18,10 +18,9 @@ import ray from ray import cloudpickle +from ray._private.usage import usage_lib from ray.experimental.dag import DAGNode from ray.util.annotations import PublicAPI -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -from ray._private.usage import usage_lib from ray.serve.application import Application from ray.serve.client import ServeControllerClient @@ -59,12 +58,14 @@ from ray.serve.utils import ( ensure_serialization_context, format_actor_name, + get_current_node_resource_key, get_random_letters, in_interactive_shell, DEFAULT, install_serve_encoders_to_fastapi, ) + logger = logging.getLogger(__file__) @@ -149,25 +150,20 @@ def start( if http_options is None: http_options = HTTPOptions() - # Used for scheduling things to the head node explicitly. - head_node_id = ray.get_runtime_context().node_id.hex() controller = ServeController.options( num_cpus=1 if dedicated_cpu else 0, name=controller_name, lifetime="detached" if detached else None, max_restarts=-1, max_task_retries=-1, - # Schedule the controller on the head node with a soft constraint. This - # prefers it to run on the head node in most cases, but allows it to be - # restarted on other nodes in an HA cluster. - scheduling_strategy=NodeAffinitySchedulingStrategy(head_node_id, soft=True), + # Pin Serve controller on the head node. + resources={get_current_node_resource_key(): 0.01}, namespace=SERVE_NAMESPACE, max_concurrency=CONTROLLER_MAX_CONCURRENCY, ).remote( controller_name, - http_config=http_options, - checkpoint_path=_checkpoint_path, - head_node_id=head_node_id, + http_options, + _checkpoint_path, detached=detached, ) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 54c1635d980be..f654f1e4db41e 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -79,10 +79,8 @@ class ServeController: async def __init__( self, controller_name: str, - *, http_config: HTTPOptions, checkpoint_path: str, - head_node_id: str, detached: bool = False, ): configure_component_logger( @@ -110,7 +108,6 @@ async def __init__( controller_name, detached, http_config, - head_node_id, ) self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host) # Fetch all running actors in current cluster as source of current diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 22021c71f435b..915b96e076bf3 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -28,6 +28,7 @@ from ray.serve.constants import SERVE_LOGGER_NAME, SERVE_NAMESPACE from ray.serve.long_poll import LongPollClient, LongPollNamespace from ray.serve.logging_utils import access_log_msg, configure_component_logger +from ray.serve.utils import node_id_to_ip_addr logger = logging.getLogger(SERVE_LOGGER_NAME) @@ -332,11 +333,11 @@ def __init__( port: int, root_path: str, controller_name: str, - node_ip_address: str, + node_id: str, http_middlewares: Optional[List["starlette.middleware.Middleware"]] = None, ): # noqa: F821 configure_component_logger( - component_name="http_proxy", component_id=node_ip_address + component_name="http_proxy", component_id=node_id_to_ip_addr(node_id) ) if http_middlewares is None: diff --git a/python/ray/serve/http_state.py b/python/ray/serve/http_state.py index cecc17e0a9ab6..de094b40dcbf8 100644 --- a/python/ray/serve/http_state.py +++ b/python/ray/serve/http_state.py @@ -5,8 +5,6 @@ import ray from ray.actor import ActorHandle -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy - from ray.serve.config import HTTPOptions, DeploymentMode from ray.serve.constants import ( ASYNC_CONCURRENCY, @@ -18,6 +16,7 @@ from ray.serve.utils import ( format_actor_name, get_all_node_ids, + get_current_node_resource_key, ) from ray.serve.common import EndpointTag, NodeId @@ -36,7 +35,6 @@ def __init__( controller_name: str, detached: bool, config: HTTPOptions, - head_node_id: str, # Used by unit testing _start_proxies_on_init: bool = True, ): @@ -45,8 +43,6 @@ def __init__( self._config = config self._proxy_actors: Dict[NodeId, ActorHandle] = dict() self._proxy_actor_names: Dict[NodeId, str] = dict() - self._head_node_id: str = head_node_id - assert isinstance(head_node_id, str) # Will populate self.proxy_actors with existing actors. if _start_proxies_on_init: @@ -70,7 +66,7 @@ def update(self): self._stop_proxies_if_needed() def _get_target_nodes(self) -> List[Tuple[str, str]]: - """Return the list of (node_id, ip_address) to deploy HTTP servers on.""" + """Return the list of (id, resource_key) to deploy HTTP servers on.""" location = self._config.location target_nodes = get_all_node_ids() @@ -78,13 +74,12 @@ def _get_target_nodes(self) -> List[Tuple[str, str]]: return [] if location == DeploymentMode.HeadOnly: - nodes = [ - (node_id, ip_address) - for node_id, ip_address in target_nodes - if node_id == self._head_node_id - ] - assert len(nodes) == 1, f"Head node not found! {target_nodes}" - return nodes + head_node_resource_key = get_current_node_resource_key() + return [ + (node_id, node_resource) + for node_id, node_resource in target_nodes + if node_resource == head_node_resource_key + ][:1] if location == DeploymentMode.FixedNumber: num_replicas = self._config.fixed_number_replicas @@ -106,7 +101,7 @@ def _get_target_nodes(self) -> List[Tuple[str, str]]: def _start_proxies_if_needed(self) -> None: """Start a proxy on every node if it doesn't already exist.""" - for node_id, node_ip_address in self._get_target_nodes(): + for node_id, node_resource in self._get_target_nodes(): if node_id in self._proxy_actors: continue @@ -128,15 +123,13 @@ def _start_proxies_if_needed(self) -> None: max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, max_task_retries=-1, - scheduling_strategy=NodeAffinitySchedulingStrategy( - node_id, soft=False - ), + resources={node_resource: 0.01}, ).remote( self._config.host, self._config.port, self._config.root_path, controller_name=self._controller_name, - node_ip_address=node_ip_address, + node_id=node_id, http_middlewares=self._config.middlewares, ) diff --git a/python/ray/serve/tests/test_http_state.py b/python/ray/serve/tests/test_http_state.py index 88546be60e087..1bdbd17963f62 100644 --- a/python/ray/serve/tests/test_http_state.py +++ b/python/ray/serve/tests/test_http_state.py @@ -7,19 +7,16 @@ def test_node_selection(): - head_node_id = "node_id-index-head" - def _make_http_state(http_options): return HTTPState( "mock_controller_name", detached=True, config=http_options, - head_node_id=head_node_id, _start_proxies_on_init=False, ) - all_nodes = [(head_node_id, "fake-head-ip")] + [ - (f"worker-node-id-{i}", f"fake-worker-ip-{i}") for i in range(100) + all_nodes = [("node_id-index-head", "node-id-1")] + [ + (f"node_idx-worker-{i}", f"node-id-{i}") for i in range(100) ] with patch("ray.serve.http_state.get_all_node_ids") as func: @@ -30,8 +27,13 @@ def _make_http_state(http_options): assert state._get_target_nodes() == [] # Test HeadOnly - state = _make_http_state(HTTPOptions(location=DeploymentMode.HeadOnly)) - assert state._get_target_nodes() == all_nodes[:1] + with patch( + "ray.serve.http_state.get_current_node_resource_key" + ) as get_current_node: + get_current_node.return_value = "node-id-1" + + state = _make_http_state(HTTPOptions(location=DeploymentMode.HeadOnly)) + assert state._get_target_nodes() == all_nodes[:1] # Test EveryNode state = _make_http_state(HTTPOptions(location=DeploymentMode.EveryNode)) diff --git a/python/ray/serve/tests/test_util.py b/python/ray/serve/tests/test_util.py index f710bee89b314..1ea450b283c74 100644 --- a/python/ray/serve/tests/test_util.py +++ b/python/ray/serve/tests/test_util.py @@ -12,11 +12,19 @@ from ray import serve from ray.serve.utils import ( get_deployment_import_path, + node_id_to_ip_addr, override_runtime_envs_except_env_vars, serve_encoders, ) +def test_node_id_to_ip_addr(): + assert node_id_to_ip_addr("node:127.0.0.1-0") == "127.0.0.1" + assert node_id_to_ip_addr("127.0.0.1-0") == "127.0.0.1" + assert node_id_to_ip_addr("127.0.0.1") == "127.0.0.1" + assert node_id_to_ip_addr("node:127.0.0.1") == "127.0.0.1" + + def test_bytes_encoder(): data_before = {"inp": {"nest": b"bytes"}} data_after = {"inp": {"nest": "bytes"}} diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index a255cda3f85a5..15c55975c41db 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -1,5 +1,6 @@ from functools import wraps import importlib +from itertools import groupby import inspect import pickle import random @@ -146,20 +147,43 @@ def format_actor_name(actor_name, controller_name=None, *modifiers): return name -def get_all_node_ids() -> List[Tuple[str, str]]: - """Get IDs for all live nodes in the cluster. +def get_all_node_ids(): + """Get IDs for all nodes in the cluster. - Returns a list of (node_id: str, ip_address: str). The node_id can be - passed into the Ray SchedulingPolicy API. + Handles multiple nodes on the same IP by appending an index to the + node_id, e.g., 'node_id-index'. + + Returns a list of ('node_id-index', 'node_id') tuples (the latter can be + used as a resource requirement for actor placements). """ node_ids = [] - for node in ray.nodes(): - if node["Alive"]: - node_ids.append((node["NodeID"], node["NodeName"])) + # We need to use the node_id and index here because we could + # have multiple virtual nodes on the same host. In that case + # they will have the same IP and therefore node_id. + for _, node_id_group in groupby(sorted(ray.state.node_ids())): + for index, node_id in enumerate(node_id_group): + node_ids.append(("{}-{}".format(node_id, index), node_id)) return node_ids +def node_id_to_ip_addr(node_id: str): + """Recovers the IP address for an entry from get_all_node_ids.""" + if ":" in node_id: + node_id = node_id.split(":")[1] + + if "-" in node_id: + node_id = node_id.split("-")[0] + + return node_id + + +def get_node_id_for_actor(actor_handle): + """Given an actor handle, return the node id it's placed on.""" + + return ray.state.actors()[actor_handle._actor_id.hex()]["Address"]["NodeID"] + + def compute_iterable_delta(old: Iterable, new: Iterable) -> Tuple[set, set, set]: """Given two iterables, return the entries that's (added, removed, updated).