Skip to content

Commit

Permalink
[core] add EC2InstanceTerminator and refactor killer creation (#45630)
Browse files Browse the repository at this point in the history
Signed-off-by: hongchaodeng <[email protected]>
  • Loading branch information
hongchaodeng committed May 30, 2024
1 parent c94140a commit e528cb0
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 38 deletions.
62 changes: 38 additions & 24 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1493,8 +1493,7 @@ async def get_total_killed(self):
return self.killed


@ray.remote(num_cpus=0)
class NodeKillerActor(ResourceKillerActor):
class NodeKillerBase(ResourceKillerActor):
async def _find_resource_to_kill(self):
node_to_kill_ip = None
node_to_kill_port = None
Expand All @@ -1521,6 +1520,16 @@ async def _find_resource_to_kill(self):

return node_id, node_to_kill_ip, node_to_kill_port

def _get_alive_nodes(self, nodes):
alive_nodes = 0
for node in nodes:
if node["Alive"]:
alive_nodes += 1
return alive_nodes


@ray.remote(num_cpus=0)
class RayletKiller(NodeKillerBase):
def _kill_resource(self, node_id, node_to_kill_ip, node_to_kill_port):
if node_to_kill_port is not None:
try:
Expand All @@ -1533,6 +1542,33 @@ def _kill_resource(self, node_id, node_to_kill_ip, node_to_kill_port):
)
self.killed.add(node_id)

def _kill_raylet(self, ip, port, graceful=False):
import grpc
from grpc._channel import _InactiveRpcError
from ray.core.generated import node_manager_pb2_grpc

raylet_address = f"{ip}:{port}"
channel = grpc.insecure_channel(raylet_address)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
try:
stub.ShutdownRaylet(
node_manager_pb2.ShutdownRayletRequest(graceful=graceful)
)
except _InactiveRpcError:
assert not graceful


@ray.remote(num_cpus=0)
class EC2InstanceTerminator(NodeKillerBase):
def _kill_resource(self, node_id, node_to_kill_ip, _):
if node_to_kill_ip is not None:
try:
self._terminate_ec2_instance(node_to_kill_ip)
except Exception:
pass
logging.info(f"Terminated instance, {node_id=}, address={node_to_kill_ip}")
self.killed.add(node_id)

def _terminate_ec2_instance(self, ip):
# This command uses IMDSv2 to get the host instance id and region.
# After that it terminates itself using aws cli.
Expand All @@ -1552,28 +1588,6 @@ def _terminate_ec2_instance(self, ip):
print(f"STDOUT:\n{result.stdout}\n")
print(f"STDERR:\n{result.stderr}\n")

def _kill_raylet(self, ip, port, graceful=False):
import grpc
from grpc._channel import _InactiveRpcError
from ray.core.generated import node_manager_pb2_grpc

raylet_address = f"{ip}:{port}"
channel = grpc.insecure_channel(raylet_address)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
try:
stub.ShutdownRaylet(
node_manager_pb2.ShutdownRayletRequest(graceful=graceful)
)
except _InactiveRpcError:
assert not graceful

def _get_alive_nodes(self, nodes):
alive_nodes = 0
for node in nodes:
if node["Alive"]:
alive_nodes += 1
return alive_nodes


@ray.remote(num_cpus=0)
class WorkerKillerActor(ResourceKillerActor):
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
find_available_port,
wait_for_condition,
find_free_port,
NodeKillerActor,
RayletKiller,
)
from ray.cluster_utils import AutoscalingCluster, Cluster, cluster_not_supported

Expand Down Expand Up @@ -921,7 +921,7 @@ def _ray_start_chaos_cluster(request):
assert len(nodes) == 1

if kill_interval is not None:
node_killer = get_and_run_resource_killer(NodeKillerActor, kill_interval)
node_killer = get_and_run_resource_killer(RayletKiller, kill_interval)

yield cluster

Expand Down
7 changes: 3 additions & 4 deletions python/ray/tests/test_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ray.data._internal.progress_bar import ProgressBar
from ray.util.placement_group import placement_group
from ray._private.test_utils import (
NodeKillerActor,
RayletKiller,
get_log_message,
get_and_run_resource_killer,
WorkerKillerActor,
Expand Down Expand Up @@ -337,15 +337,14 @@ def test_node_killer_filter(autoscaler_v2):
worker_nodes = [node for node in list_nodes() if not node["is_head_node"]]
node_to_kill = random.choice(worker_nodes)
node_killer = get_and_run_resource_killer(
NodeKillerActor,
RayletKiller,
1,
max_to_kill=1,
kill_filter_fn=lambda: lambda node: node["NodeID"] == node_to_kill.node_id,
)

def check_killed():
# Check that killed node is consistent across list_nodes() and
# NodeKillerActor
# Check that killed node is consistent across list_nodes()
killed = list(ray.get(node_killer.get_total_killed.remote()))
dead = [node.node_id for node in list_nodes() if node.state == "DEAD"]
if len(killed) != 1 or len(dead) != 1:
Expand Down
2 changes: 1 addition & 1 deletion release/nightly_tests/chaos_test/test_chaos_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def main():
# Step 3
print("Running with failures")
start = time.time()
node_killer = ray.get_actor("NodeKillerActor", namespace="release_test_namespace")
node_killer = ray.get_actor("RayletKiller", namespace="release_test_namespace")
node_killer.run.remote()
workload(total_num_cpus, args.smoke)
print(f"Runtime when there are many failures: {time.time() - start}")
Expand Down
41 changes: 34 additions & 7 deletions release/nightly_tests/setup_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,28 @@

from ray._private.test_utils import (
get_and_run_resource_killer,
NodeKillerActor,
RayletKiller,
WorkerKillerActor,
EC2InstanceTerminator,
)


def parse_script_args():
parser = argparse.ArgumentParser()

# '--kill-workers' to be deprecated in favor of '--chaos'
parser.add_argument("--kill-workers", action="store_true", default=False)

parser.add_argument(
"--chaos",
type=str,
default="",
help=(
"Chaos to inject into the test environment. "
"Options: KillRaylet, KillWorker, TerminateEC2Instance."
),
)

parser.add_argument("--kill-interval", type=int, default=60)
parser.add_argument("--max-to-kill", type=int, default=2)
parser.add_argument(
Expand Down Expand Up @@ -77,19 +91,32 @@ def _filter_fn(node):
return _task_node_filter


def get_chaos_killer(args):
if args.chaos != "":
chaos_type = args.chaos
elif args.kill_workers:
chaos_type = "KillWorker"
else:
chaos_type = "KillRaylet" # default

if chaos_type == "KillRaylet":
return RayletKiller, task_node_filter(args.task_names)
elif chaos_type == "KillWorker":
return WorkerKillerActor, task_filter(args.task_names)
elif chaos_type == "TerminateEC2Instance":
return EC2InstanceTerminator, task_node_filter(args.task_names)
else:
raise ValueError(f"Chaos type {chaos_type} not supported.")


def main():
"""Start the chaos testing.
Currently, chaos testing only covers random node failures.
"""
args, _ = parse_script_args()
ray.init(address="auto")
if args.kill_workers:
resource_killer_cls = WorkerKillerActor
kill_filter_fn = task_filter(args.task_names)
else:
resource_killer_cls = NodeKillerActor
kill_filter_fn = task_node_filter(args.task_names)
resource_killer_cls, kill_filter_fn = get_chaos_killer(args)

get_and_run_resource_killer(
resource_killer_cls,
Expand Down

0 comments on commit e528cb0

Please sign in to comment.