Skip to content

Commit

Permalink
Revert "[serve] Use soft constraint for pinning controller on head no…
Browse files Browse the repository at this point in the history
…de (#25091)" (#25857)

This reverts commit 0f60036.
  • Loading branch information
edoakes committed Jun 16, 2022
1 parent d944f74 commit e435230
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 48 deletions.
18 changes: 7 additions & 11 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@

import ray
from ray import cloudpickle
from ray._private.usage import usage_lib
from ray.experimental.dag import DAGNode
from ray.util.annotations import PublicAPI
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray._private.usage import usage_lib

from ray.serve.application import Application
from ray.serve.client import ServeControllerClient
Expand Down Expand Up @@ -59,12 +58,14 @@
from ray.serve.utils import (
ensure_serialization_context,
format_actor_name,
get_current_node_resource_key,
get_random_letters,
in_interactive_shell,
DEFAULT,
install_serve_encoders_to_fastapi,
)


logger = logging.getLogger(__file__)


Expand Down Expand Up @@ -149,25 +150,20 @@ def start(
if http_options is None:
http_options = HTTPOptions()

# Used for scheduling things to the head node explicitly.
head_node_id = ray.get_runtime_context().node_id.hex()
controller = ServeController.options(
num_cpus=1 if dedicated_cpu else 0,
name=controller_name,
lifetime="detached" if detached else None,
max_restarts=-1,
max_task_retries=-1,
# Schedule the controller on the head node with a soft constraint. This
# prefers it to run on the head node in most cases, but allows it to be
# restarted on other nodes in an HA cluster.
scheduling_strategy=NodeAffinitySchedulingStrategy(head_node_id, soft=True),
# Pin Serve controller on the head node.
resources={get_current_node_resource_key(): 0.01},
namespace=SERVE_NAMESPACE,
max_concurrency=CONTROLLER_MAX_CONCURRENCY,
).remote(
controller_name,
http_config=http_options,
checkpoint_path=_checkpoint_path,
head_node_id=head_node_id,
http_options,
_checkpoint_path,
detached=detached,
)

Expand Down
3 changes: 0 additions & 3 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@ class ServeController:
async def __init__(
self,
controller_name: str,
*,
http_config: HTTPOptions,
checkpoint_path: str,
head_node_id: str,
detached: bool = False,
):
configure_component_logger(
Expand Down Expand Up @@ -110,7 +108,6 @@ async def __init__(
controller_name,
detached,
http_config,
head_node_id,
)
self.endpoint_state = EndpointState(self.kv_store, self.long_poll_host)
# Fetch all running actors in current cluster as source of current
Expand Down
5 changes: 3 additions & 2 deletions python/ray/serve/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ray.serve.constants import SERVE_LOGGER_NAME, SERVE_NAMESPACE
from ray.serve.long_poll import LongPollClient, LongPollNamespace
from ray.serve.logging_utils import access_log_msg, configure_component_logger
from ray.serve.utils import node_id_to_ip_addr

logger = logging.getLogger(SERVE_LOGGER_NAME)

Expand Down Expand Up @@ -332,11 +333,11 @@ def __init__(
port: int,
root_path: str,
controller_name: str,
node_ip_address: str,
node_id: str,
http_middlewares: Optional[List["starlette.middleware.Middleware"]] = None,
): # noqa: F821
configure_component_logger(
component_name="http_proxy", component_id=node_ip_address
component_name="http_proxy", component_id=node_id_to_ip_addr(node_id)
)

if http_middlewares is None:
Expand Down
29 changes: 11 additions & 18 deletions python/ray/serve/http_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import ray
from ray.actor import ActorHandle
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

from ray.serve.config import HTTPOptions, DeploymentMode
from ray.serve.constants import (
ASYNC_CONCURRENCY,
Expand All @@ -18,6 +16,7 @@
from ray.serve.utils import (
format_actor_name,
get_all_node_ids,
get_current_node_resource_key,
)
from ray.serve.common import EndpointTag, NodeId

Expand All @@ -36,7 +35,6 @@ def __init__(
controller_name: str,
detached: bool,
config: HTTPOptions,
head_node_id: str,
# Used by unit testing
_start_proxies_on_init: bool = True,
):
Expand All @@ -45,8 +43,6 @@ def __init__(
self._config = config
self._proxy_actors: Dict[NodeId, ActorHandle] = dict()
self._proxy_actor_names: Dict[NodeId, str] = dict()
self._head_node_id: str = head_node_id
assert isinstance(head_node_id, str)

# Will populate self.proxy_actors with existing actors.
if _start_proxies_on_init:
Expand All @@ -70,21 +66,20 @@ def update(self):
self._stop_proxies_if_needed()

def _get_target_nodes(self) -> List[Tuple[str, str]]:
"""Return the list of (node_id, ip_address) to deploy HTTP servers on."""
"""Return the list of (id, resource_key) to deploy HTTP servers on."""
location = self._config.location
target_nodes = get_all_node_ids()

if location == DeploymentMode.NoServer:
return []

if location == DeploymentMode.HeadOnly:
nodes = [
(node_id, ip_address)
for node_id, ip_address in target_nodes
if node_id == self._head_node_id
]
assert len(nodes) == 1, f"Head node not found! {target_nodes}"
return nodes
head_node_resource_key = get_current_node_resource_key()
return [
(node_id, node_resource)
for node_id, node_resource in target_nodes
if node_resource == head_node_resource_key
][:1]

if location == DeploymentMode.FixedNumber:
num_replicas = self._config.fixed_number_replicas
Expand All @@ -106,7 +101,7 @@ def _get_target_nodes(self) -> List[Tuple[str, str]]:

def _start_proxies_if_needed(self) -> None:
"""Start a proxy on every node if it doesn't already exist."""
for node_id, node_ip_address in self._get_target_nodes():
for node_id, node_resource in self._get_target_nodes():
if node_id in self._proxy_actors:
continue

Expand All @@ -128,15 +123,13 @@ def _start_proxies_if_needed(self) -> None:
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
max_task_retries=-1,
scheduling_strategy=NodeAffinitySchedulingStrategy(
node_id, soft=False
),
resources={node_resource: 0.01},
).remote(
self._config.host,
self._config.port,
self._config.root_path,
controller_name=self._controller_name,
node_ip_address=node_ip_address,
node_id=node_id,
http_middlewares=self._config.middlewares,
)

Expand Down
16 changes: 9 additions & 7 deletions python/ray/serve/tests/test_http_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@


def test_node_selection():
head_node_id = "node_id-index-head"

def _make_http_state(http_options):
return HTTPState(
"mock_controller_name",
detached=True,
config=http_options,
head_node_id=head_node_id,
_start_proxies_on_init=False,
)

all_nodes = [(head_node_id, "fake-head-ip")] + [
(f"worker-node-id-{i}", f"fake-worker-ip-{i}") for i in range(100)
all_nodes = [("node_id-index-head", "node-id-1")] + [
(f"node_idx-worker-{i}", f"node-id-{i}") for i in range(100)
]

with patch("ray.serve.http_state.get_all_node_ids") as func:
Expand All @@ -30,8 +27,13 @@ def _make_http_state(http_options):
assert state._get_target_nodes() == []

# Test HeadOnly
state = _make_http_state(HTTPOptions(location=DeploymentMode.HeadOnly))
assert state._get_target_nodes() == all_nodes[:1]
with patch(
"ray.serve.http_state.get_current_node_resource_key"
) as get_current_node:
get_current_node.return_value = "node-id-1"

state = _make_http_state(HTTPOptions(location=DeploymentMode.HeadOnly))
assert state._get_target_nodes() == all_nodes[:1]

# Test EveryNode
state = _make_http_state(HTTPOptions(location=DeploymentMode.EveryNode))
Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@
from ray import serve
from ray.serve.utils import (
get_deployment_import_path,
node_id_to_ip_addr,
override_runtime_envs_except_env_vars,
serve_encoders,
)


def test_node_id_to_ip_addr():
assert node_id_to_ip_addr("node:127.0.0.1-0") == "127.0.0.1"
assert node_id_to_ip_addr("127.0.0.1-0") == "127.0.0.1"
assert node_id_to_ip_addr("127.0.0.1") == "127.0.0.1"
assert node_id_to_ip_addr("node:127.0.0.1") == "127.0.0.1"


def test_bytes_encoder():
data_before = {"inp": {"nest": b"bytes"}}
data_after = {"inp": {"nest": "bytes"}}
Expand Down
38 changes: 31 additions & 7 deletions python/ray/serve/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from functools import wraps
import importlib
from itertools import groupby
import inspect
import pickle
import random
Expand Down Expand Up @@ -146,20 +147,43 @@ def format_actor_name(actor_name, controller_name=None, *modifiers):
return name


def get_all_node_ids() -> List[Tuple[str, str]]:
"""Get IDs for all live nodes in the cluster.
def get_all_node_ids():
"""Get IDs for all nodes in the cluster.
Returns a list of (node_id: str, ip_address: str). The node_id can be
passed into the Ray SchedulingPolicy API.
Handles multiple nodes on the same IP by appending an index to the
node_id, e.g., 'node_id-index'.
Returns a list of ('node_id-index', 'node_id') tuples (the latter can be
used as a resource requirement for actor placements).
"""
node_ids = []
for node in ray.nodes():
if node["Alive"]:
node_ids.append((node["NodeID"], node["NodeName"]))
# We need to use the node_id and index here because we could
# have multiple virtual nodes on the same host. In that case
# they will have the same IP and therefore node_id.
for _, node_id_group in groupby(sorted(ray.state.node_ids())):
for index, node_id in enumerate(node_id_group):
node_ids.append(("{}-{}".format(node_id, index), node_id))

return node_ids


def node_id_to_ip_addr(node_id: str):
"""Recovers the IP address for an entry from get_all_node_ids."""
if ":" in node_id:
node_id = node_id.split(":")[1]

if "-" in node_id:
node_id = node_id.split("-")[0]

return node_id


def get_node_id_for_actor(actor_handle):
"""Given an actor handle, return the node id it's placed on."""

return ray.state.actors()[actor_handle._actor_id.hex()]["Address"]["NodeID"]


def compute_iterable_delta(old: Iterable, new: Iterable) -> Tuple[set, set, set]:
"""Given two iterables, return the entries that's (added, removed, updated).
Expand Down

0 comments on commit e435230

Please sign in to comment.