Skip to content

Commit

Permalink
[Core] Ray drain node endpoint [2/n] (ray-project#38096)
Browse files Browse the repository at this point in the history
- Implement the DrainNode endpoint
- Change the scheduler to not schedule tasks onto draining nodes
- Add an API to retrieve draining nodes

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: harborn <[email protected]>
  • Loading branch information
jjyao authored and harborn committed Aug 17, 2023
1 parent ce1e21e commit 472e8e3
Show file tree
Hide file tree
Showing 50 changed files with 1,032 additions and 77 deletions.
6 changes: 6 additions & 0 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
from collections import defaultdict
from typing import Set

from google.protobuf.json_format import MessageToDict

Expand Down Expand Up @@ -734,6 +735,11 @@ def get_node_to_connect_for_driver(self, node_ip_address):
node_ip_address
)

def get_draining_nodes(self) -> Set[str]:
"""Get all the hex ids of nodes that are being drained."""
self._check_connected()
return self.global_state_accessor.get_draining_nodes()


state = GlobalState()
"""A global object used to access the cluster's global state."""
Expand Down
19 changes: 19 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2541,6 +2541,25 @@ cdef class GcsClient:

return serialized_reply

@_auto_reconnect
def drain_node(
self,
node_id: c_string,
reason: int32_t,
reason_message: c_string):
"""Send the DrainNode request to GCS.
This is only for testing.
"""
cdef:
int64_t timeout_ms = -1
c_bool is_accepted = False
with nogil:
check_status(self.inner.get().DrainNode(
node_id, reason, reason_message, timeout_ms, is_accepted))

return is_accepted

#############################################################
# Interface for rpc::autoscaler::AutoscalerStateService ends
#############################################################
Expand Down
6 changes: 6 additions & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
CRayStatus GetClusterStatus(
int64_t timeout_ms,
c_string &serialized_reply)
CRayStatus DrainNode(
const c_string &node_id,
int32_t reason,
const c_string &reason_message,
int64_t timeout_ms,
c_bool &is_accepted)


cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil:
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
CJobID GetNextJobID()
c_vector[c_string] GetAllNodeInfo()
c_vector[c_string] GetAllAvailableResources()
c_vector[CNodeID] GetDrainingNodes()
c_vector[c_string] GetAllTaskEvents()
unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id)
unique_ptr[c_string] GetAllResourceUsage()
Expand Down
9 changes: 9 additions & 0 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ cdef class GlobalStateAccessor:
results.append(node_info)
return results

def get_draining_nodes(self):
cdef c_vector[CNodeID] draining_nodes
with nogil:
draining_nodes = self.inner.get().GetDrainingNodes()
results = set()
for draining_node in draining_nodes:
results.add(ray._private.utils.binary_to_hex(draining_node.Binary()))
return results

def get_all_available_resources(self):
cdef c_vector[c_string] result
with nogil:
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ py_test_module_list(

py_test_module_list(
files = [
"test_autoscaler_fake_scaledown.py", # Temporarily owned by core.
"test_autoscaler_fake_scaledown.py",
"test_log_dedup.py",
"test_logging.py",
"test_memory_scheduling.py",
Expand Down Expand Up @@ -172,6 +172,7 @@ py_test_module_list(
"test_scheduling.py",
"test_traceback.py",
"test_queue.py",
"test_draining.py",
],
size = "medium",
tags = ["exclusive", "medium_size_python_tests_k_to_z", "team:core"],
Expand Down
204 changes: 204 additions & 0 deletions python/ray/tests/test_draining.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import sys
import pytest

import ray
import time
from ray._raylet import GcsClient
from ray.core.generated import autoscaler_pb2
from ray._private.test_utils import wait_for_condition
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


def test_idle_termination(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(resources={"head": 1})
ray.init(address=cluster.address)
cluster.add_node(resources={"worker": 1})
cluster.wait_for_nodes()

@ray.remote
def get_node_id():
return ray.get_runtime_context().get_node_id()

head_node_id = ray.get(get_node_id.options(resources={"head": 1}).remote())
worker_node_id = ray.get(get_node_id.options(resources={"worker": 1}).remote())

wait_for_condition(
lambda: {node["NodeID"] for node in ray.nodes() if (node["Alive"])}
== {head_node_id, worker_node_id}
)

@ray.remote(num_cpus=1, resources={"worker": 1})
class Actor:
def ping(self):
pass

actor = Actor.remote()
ray.get(actor.ping.remote())

gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)

# The worker node is not idle so the drain request should be rejected.
is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_IDLE_TERMINATION"),
"idle for long enough",
)
assert not is_accepted

ray.kill(actor)

def drain_until_accept():
# The worker node is idle now so the drain request should be accepted.
is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_IDLE_TERMINATION"),
"idle for long enough",
)
return is_accepted

wait_for_condition(drain_until_accept)

wait_for_condition(
lambda: {node["NodeID"] for node in ray.nodes() if (node["Alive"])}
== {head_node_id}
)

# Draining a dead node is always accepted.
is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_IDLE_TERMINATION"),
"idle for long enough",
)
assert is_accepted


def test_preemption(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(resources={"head": 1})
ray.init(address=cluster.address)
cluster.add_node(resources={"worker": 1})
cluster.wait_for_nodes()

@ray.remote
def get_node_id():
return ray.get_runtime_context().get_node_id()

head_node_id = ray.get(get_node_id.options(resources={"head": 1}).remote())
worker_node_id = ray.get(get_node_id.options(resources={"worker": 1}).remote())

@ray.remote(num_cpus=1, resources={"worker": 1})
class Actor:
def ping(self):
pass

actor = Actor.remote()
ray.get(actor.ping.remote())

gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)

# The worker node is not idle but the drain request should be still accepted.
is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
"preemption",
)
assert is_accepted

time.sleep(1)

# Worker node should still be alive since it's not idle and cannot be drained.
wait_for_condition(
lambda: {node["NodeID"] for node in ray.nodes() if (node["Alive"])}
== {head_node_id, worker_node_id}
)

ray.kill(actor)
wait_for_condition(
lambda: {node["NodeID"] for node in ray.nodes() if (node["Alive"])}
== {head_node_id}
)


def test_scheduling_during_draining(ray_start_cluster):
"""Test that the draining node is unschedulable for new tasks and actors."""
cluster = ray_start_cluster
cluster.add_node(num_cpus=1, resources={"head": 1})
ray.init(address=cluster.address)
cluster.add_node(num_cpus=1, resources={"worker": 1})
cluster.wait_for_nodes()

@ray.remote
def get_node_id():
return ray.get_runtime_context().get_node_id()

head_node_id = ray.get(get_node_id.options(resources={"head": 1}).remote())
worker_node_id = ray.get(get_node_id.options(resources={"worker": 1}).remote())

@ray.remote
class Actor:
def ping(self):
pass

actor = Actor.options(num_cpus=0, resources={"worker": 1}).remote()
ray.get(actor.ping.remote())

gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)

# The worker node is not idle but the drain request should be still accepted.
is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
"preemption",
)
assert is_accepted

assert (
ray.get(get_node_id.options(scheduling_strategy="SPREAD").remote())
== head_node_id
)
assert (
ray.get(get_node_id.options(scheduling_strategy="SPREAD").remote())
== head_node_id
)

assert (
ray.get(
get_node_id.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
worker_node_id, soft=True
)
).remote()
)
== head_node_id
)

with pytest.raises(ray.exceptions.TaskUnschedulableError):
ray.get(
get_node_id.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
worker_node_id, soft=False
)
).remote()
)

head_actor = Actor.options(num_cpus=1, resources={"head": 1}).remote()
ray.get(head_actor.ping.remote())

obj = get_node_id.remote()

# Cannot run on the draining worker node even though it has resources.
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=2)

ray.kill(head_actor)
ray.get(obj, timeout=2) == head_node_id


if __name__ == "__main__":
import os

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
47 changes: 47 additions & 0 deletions python/ray/tests/test_global_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import ray
import ray._private.gcs_utils as gcs_utils
import ray._private.ray_constants
from ray._raylet import GcsClient
from ray.core.generated import autoscaler_pb2
from ray._private.test_utils import (
convert_actor_state,
make_global_state_accessor,
Expand Down Expand Up @@ -445,6 +447,51 @@ def test_next_job_id(ray_start_regular):
assert job_id_1.int() + 1 == job_id_2.int()


def test_get_draining_nodes(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node()
ray.init(address=cluster.address)
cluster.add_node(resources={"worker": 1})
cluster.wait_for_nodes()

@ray.remote
def get_node_id():
return ray.get_runtime_context().get_node_id()

worker_node_id = ray.get(get_node_id.options(resources={"worker": 1}).remote())

# Initially there is no draining node.
assert ray._private.state.state.get_draining_nodes() == set()

@ray.remote(num_cpus=1, resources={"worker": 1})
class Actor:
def ping(self):
pass

actor = Actor.remote()
ray.get(actor.ping.remote())

gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)

# Drain the worker node.
is_accepted = gcs_client.drain_node(
worker_node_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
"preemption",
)
assert is_accepted

wait_for_condition(
lambda: ray._private.state.state.get_draining_nodes() == {worker_node_id}
)

# Kill the actor running on the draining worker node so
# that the worker node becomes idle and can be drained.
ray.kill(actor)

wait_for_condition(lambda: ray._private.state.state.get_draining_nodes() == set())


if __name__ == "__main__":
import sys

Expand Down
6 changes: 6 additions & 0 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class MockRayletClientInterface : public RayletClientInterface {
bool graceful,
const rpc::ClientCallback<rpc::ShutdownRayletReply> &callback),
(override));
MOCK_METHOD(void,
DrainRaylet,
(const rpc::autoscaler::DrainNodeReason &reason,
const std::string &reason_message,
const rpc::ClientCallback<rpc::DrainRayletReply> &callback),
(override));
};

} // namespace ray
2 changes: 1 addition & 1 deletion src/ray/common/scheduling/cluster_resource_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ std::string NodeResources::DebugString() const {
for (const auto &[key, value] : labels) {
buffer << "\"" << key << "\":\"" << value << "\",";
}
buffer << "}";
buffer << "}, \"is_draining\": " << is_draining << "}";
return buffer.str();
}

Expand Down
Loading

0 comments on commit 472e8e3

Please sign in to comment.