Skip to content

Commit

Permalink
[Core] improve agent health checking mechanism (#41935) (#41964)
Browse files Browse the repository at this point in the history
Right now, we use psutil to get the parent's pid and check if parent pid is changed to figure out if the parent is dead.

This works most of time, but there's a rare edge case (from psutil) where this mechanism doesn't work. psutil has a process identifier based on (pid, creation_time), but if the system clock is synchronized again, the creation time could be changed. See the related issue for more details.

Since agent death == raylet death, this edge case is still critical although it is rare. We fix the issue by using more reliable health checking; parent opens up a pipe and redirect to the child process stdin. Child process reads stdin. If stdin returns 0 bytes, it means the parent is dead (see https://stackoverflow.com/questions/12193581/detect-death-of-parent-process). This is the same mechanism we are using to detect worker failure from raylet (but opposite direction).

Note that the PR is a bit ugly because I made a feature flag for this feature. Since it is the last minute PR merge, we should be careful, and that's why I added the feature flag.
  • Loading branch information
rkooo567 committed Dec 17, 2023
1 parent d0300cf commit 15a558e
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 63 deletions.
4 changes: 2 additions & 2 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ async def run(self):
tasks = [m.run(self.server) for m in modules]
if sys.platform not in ["win32", "cygwin"]:

def callback():
def callback(msg):
logger.info(
f"Terminated Raylet: ip={self.ip}, node_id={self.node_id}. "
f"Terminated Raylet: ip={self.ip}, node_id={self.node_id}. {msg}"
)

check_parent_task = create_check_raylet_task(
Expand Down
5 changes: 4 additions & 1 deletion dashboard/consts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ray._private.ray_constants import env_integer
from ray._private.ray_constants import env_integer, env_bool

DASHBOARD_LOG_FILENAME = "dashboard.log"
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
Expand Down Expand Up @@ -79,3 +79,6 @@
"dashboard",
"gcs",
}
PARENT_HEALTH_CHECK_BY_PIPE = env_bool(
"RAY_enable_pipe_based_agent_to_parent_health_check", False
)
20 changes: 18 additions & 2 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,17 @@ def test_raylet_and_agent_share_fate(shutdown_only):
raylet_proc.wait(15)


def test_agent_report_unexpected_raylet_death(shutdown_only):
@pytest.mark.parametrize("parent_health_check_by_pipe", [True, False])
def test_agent_report_unexpected_raylet_death(
monkeypatch, shutdown_only, parent_health_check_by_pipe
):
"""Test agent reports Raylet death if it is not SIGTERM."""

monkeypatch.setenv(
"RAY_enable_pipe_based_agent_to_parent_health_check",
parent_health_check_by_pipe,
)

ray.init()
p = init_error_pubsub()

Expand Down Expand Up @@ -247,9 +255,17 @@ def test_agent_report_unexpected_raylet_death(shutdown_only):
)


def test_agent_report_unexpected_raylet_death_large_file(shutdown_only):
@pytest.mark.parametrize("parent_health_check_by_pipe", [True, False])
def test_agent_report_unexpected_raylet_death_large_file(
monkeypatch, shutdown_only, parent_health_check_by_pipe
):
"""Test agent reports Raylet death if it is not SIGTERM."""

monkeypatch.setenv(
"RAY_enable_pipe_based_agent_to_parent_health_check",
parent_health_check_by_pipe,
)

ray.init()
p = init_error_pubsub()

Expand Down
141 changes: 90 additions & 51 deletions python/ray/_private/process_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sys
import os

from concurrent.futures import ThreadPoolExecutor

import ray
from ray.dashboard.consts import _PARENT_DEATH_THREASHOLD
import ray.dashboard.consts as dashboard_consts
Expand Down Expand Up @@ -59,9 +61,92 @@ def create_check_raylet_task(log_dir, gcs_address, parent_dead_callback, loop):
if sys.platform in ["win32", "cygwin"]:
raise RuntimeError("can't check raylet process in Windows.")
raylet_pid = get_raylet_pid()
return run_background_task(
_check_parent(raylet_pid, log_dir, gcs_address, parent_dead_callback)
)

if dashboard_consts.PARENT_HEALTH_CHECK_BY_PIPE:
logger.info("check_parent_via_pipe")
check_parent_task = _check_parent_via_pipe(
log_dir, gcs_address, loop, parent_dead_callback
)
else:
logger.info("_check_parent")
check_parent_task = _check_parent(
raylet_pid, log_dir, gcs_address, parent_dead_callback
)

return run_background_task(check_parent_task)


def report_raylet_error_logs(log_dir: str, gcs_address: str):
log_path = os.path.join(log_dir, "raylet.out")
error = False
msg = "Raylet is terminated. "
try:
with open(log_path, "r", encoding="utf-8") as f:
# Seek to _RAYLET_LOG_MAX_TAIL_SIZE from the end if the
# file is larger than that.
f.seek(0, io.SEEK_END)
pos = max(0, f.tell() - _RAYLET_LOG_MAX_TAIL_SIZE)
f.seek(pos, io.SEEK_SET)
# Read remaining logs by lines.
raylet_logs = f.readlines()
# Assume the SIGTERM message must exist within the last
# _RAYLET_LOG_MAX_TAIL_SIZE of the log file.
if any("Raylet received SIGTERM" in line for line in raylet_logs):
msg += "Termination is graceful."
logger.info(msg)
else:
msg += (
"Termination is unexpected. Possible reasons "
"include: (1) SIGKILL by the user or system "
"OOM killer, (2) Invalid memory access from "
"Raylet causing SIGSEGV or SIGBUS, "
"(3) Other termination signals. "
f"Last {_RAYLET_LOG_MAX_PUBLISH_LINES} lines "
"of the Raylet logs:\n"
)
msg += " " + " ".join(
raylet_logs[-_RAYLET_LOG_MAX_PUBLISH_LINES:]
)
error = True
except Exception as e:
msg += f"Failed to read Raylet logs at {log_path}: {e}!"
logger.exception(msg)
error = True
if error:
logger.error(msg)
# TODO: switch to async if necessary.
ray._private.utils.publish_error_to_driver(
ray_constants.RAYLET_DIED_ERROR,
msg,
gcs_publisher=ray._raylet.GcsPublisher(address=gcs_address),
)
else:
logger.info(msg)


async def _check_parent_via_pipe(
log_dir: str, gcs_address: str, loop, parent_dead_callback
):
while True:
try:
# Read input asynchronously.
# The parent (raylet) should have redirected its pipe
# to stdin. If we read 0 bytes from stdin, it means
# the process is dead.
with ThreadPoolExecutor(max_workers=1) as executor:
input_data = await loop.run_in_executor(
executor, lambda: sys.stdin.readline()
)
if len(input_data) == 0:
# cannot read bytes from parent == parent is dead.
parent_dead_callback("_check_parent_via_pipe: The parent is dead.")
report_raylet_error_logs(log_dir, gcs_address)
sys.exit(0)
except Exception as e:
logger.exception(
"raylet health checking is failed. "
f"The agent process may leak. Exception: {e}"
)


async def _check_parent(raylet_pid, log_dir, gcs_address, parent_dead_callback):
Expand Down Expand Up @@ -100,54 +185,8 @@ async def _check_parent(raylet_pid, log_dir, gcs_address, parent_dead_callback):
)
continue

log_path = os.path.join(log_dir, "raylet.out")
error = False
parent_dead_callback()
msg = "Raylet is terminated. "
try:
with open(log_path, "r", encoding="utf-8") as f:
# Seek to _RAYLET_LOG_MAX_TAIL_SIZE from the end if the
# file is larger than that.
f.seek(0, io.SEEK_END)
pos = max(0, f.tell() - _RAYLET_LOG_MAX_TAIL_SIZE)
f.seek(pos, io.SEEK_SET)
# Read remaining logs by lines.
raylet_logs = f.readlines()
# Assume the SIGTERM message must exist within the last
# _RAYLET_LOG_MAX_TAIL_SIZE of the log file.
if any(
"Raylet received SIGTERM" in line for line in raylet_logs
):
msg += "Termination is graceful."
logger.info(msg)
else:
msg += (
"Termination is unexpected. Possible reasons "
"include: (1) SIGKILL by the user or system "
"OOM killer, (2) Invalid memory access from "
"Raylet causing SIGSEGV or SIGBUS, "
"(3) Other termination signals. "
f"Last {_RAYLET_LOG_MAX_PUBLISH_LINES} lines "
"of the Raylet logs:\n"
)
msg += " " + " ".join(
raylet_logs[-_RAYLET_LOG_MAX_PUBLISH_LINES:]
)
error = True
except Exception as e:
msg += f"Failed to read Raylet logs at {log_path}: {e}!"
logger.exception(msg)
error = True
if error:
logger.error(msg)
# TODO: switch to async if necessary.
ray._private.utils.publish_error_to_driver(
ray_constants.RAYLET_DIED_ERROR,
msg,
gcs_publisher=ray._raylet.GcsPublisher(address=gcs_address),
)
else:
logger.info(msg)
parent_dead_callback("_check_parent: The parent is dead.")
report_raylet_error_logs(log_dir, gcs_address)
sys.exit(0)
else:
parent_death_cnt = 0
Expand Down
5 changes: 3 additions & 2 deletions python/ray/_private/runtime_env/agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,12 @@ async def get_runtime_envs_info(request: web.Request) -> web.Response:
check_raylet_task = None
if sys.platform not in ["win32", "cygwin"]:

def parent_dead_callback():
def parent_dead_callback(msg):
agent._logger.info(
"Raylet is dead! Exiting Runtime Env Agent. "
f"addr: {args.node_ip_address}, "
f"port: {args.runtime_env_agent_port}"
f"port: {args.runtime_env_agent_port}\n"
f"{msg}"
)

# No need to await this task.
Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ RAY_CONFIG(uint32_t, agent_register_timeout_ms, 100 * 1000)
RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000)
#endif

/// If true, agent checks the health of parent by reading pipe.
/// If false, it checks the parent pid using psutil.
RAY_CONFIG(bool, enable_pipe_based_agent_to_parent_health_check, true)

/// If the agent manager fails to communicate with the dashboard agent or the runtime env
/// agent, we will retry after this interval.
RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000)
Expand Down
16 changes: 15 additions & 1 deletion src/ray/raylet/agent_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,24 @@ void AgentManager::StartAgent() {
ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});
env.insert({"RAY_enable_pipe_based_agent_to_parent_health_check",
RayConfig::instance().enable_pipe_based_agent_to_parent_health_check()
? "1"
: "0"});

// Launch the process to create the agent.
std::error_code ec;
process_ = Process(argv.data(), nullptr, ec, false, env);
// NOTE: we pipe to stdin so that agent can read stdin to detect when
// the parent dies. See
// https://stackoverflow.com/questions/12193581/detect-death-of-parent-process
process_ =
Process(argv.data(),
nullptr,
ec,
false,
env,
/*pipe_to_stdin*/
RayConfig::instance().enable_pipe_based_agent_to_parent_health_check());
if (!process_.IsValid() || ec) {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start agent " << options_.agent_name
Expand Down
50 changes: 47 additions & 3 deletions src/ray/util/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class ProcessFD {
static ProcessFD spawnvpe(const char *argv[],
std::error_code &ec,
bool decouple,
const ProcessEnvironment &env) {
const ProcessEnvironment &env,
bool pipe_to_stdin) {
ec = std::error_code();
intptr_t fd;
pid_t pid;
Expand Down Expand Up @@ -172,10 +173,23 @@ class ProcessFD {
// TODO(mehrdadn): Use clone() on Linux or posix_spawnp() on Mac to avoid duplicating
// file descriptors into the child process, as that can be problematic.
int pipefds[2]; // Create pipe to get PID & track lifetime
int parent_lifetime_pipe[2];

// Create pipes to health check parent <> child.
// pipefds is used for parent to check child's health.
if (pipe(pipefds) == -1) {
pipefds[0] = pipefds[1] = -1;
}
// parent_lifetime_pipe is used for child to check parent's health.
if (pipe_to_stdin) {
if (pipe(parent_lifetime_pipe) == -1) {
parent_lifetime_pipe[0] = parent_lifetime_pipe[1] = -1;
}
}

pid = pipefds[1] != -1 ? fork() : -1;

// If we don't pipe to stdin close pipes that are not needed.
if (pid <= 0 && pipefds[0] != -1) {
close(pipefds[0]); // not the parent, so close the read end of the pipe
pipefds[0] = -1;
Expand All @@ -184,13 +198,42 @@ class ProcessFD {
close(pipefds[1]); // not the child, so close the write end of the pipe
pipefds[1] = -1;
}

// Create a pipe and redirect the read pipe to a child's stdin.
// Child can use it to detect the parent's lifetime.
// See the below link for details.
// https://stackoverflow.com/questions/12193581/detect-death-of-parent-process
if (pipe_to_stdin) {
if (pid <= 0 && parent_lifetime_pipe[1] != -1) {
// Child. Close sthe write end of the pipe from child.
close(parent_lifetime_pipe[1]);
parent_lifetime_pipe[1] = -1;
}
if (pid != 0 && parent_lifetime_pipe[0] != -1) {
// Parent. Close the read end of the pipe.
close(parent_lifetime_pipe[0]);
parent_lifetime_pipe[0] = -1;
}
} else {
// parent_lifetime_pipe pipes are not used.
parent_lifetime_pipe[0] = -1;
parent_lifetime_pipe[1] = -1;
}

if (pid == 0) {
// Child process case. Reset the SIGCHLD handler.
signal(SIGCHLD, SIG_DFL);
// If process needs to be decoupled, double-fork to avoid zombies.
if (pid_t pid2 = decouple ? fork() : 0) {
_exit(pid2 == -1 ? errno : 0); // Parent of grandchild; must exit
}

// Redirect the read pipe to stdin so that child can track the
// parent lifetime.
if (parent_lifetime_pipe[0] != -1) {
dup2(parent_lifetime_pipe[0], STDIN_FILENO);
}

// This is the spawned process. Any intermediate parent is now dead.
pid_t my_pid = getpid();
if (write(pipefds[1], &my_pid, sizeof(my_pid)) == sizeof(my_pid)) {
Expand Down Expand Up @@ -317,10 +360,11 @@ Process::Process(const char *argv[],
void *io_service,
std::error_code &ec,
bool decouple,
const ProcessEnvironment &env) {
const ProcessEnvironment &env,
bool pipe_to_stdin) {
/// TODO: use io_service with boost asio notify_fork.
(void)io_service;
ProcessFD procfd = ProcessFD::spawnvpe(argv, ec, decouple, env);
ProcessFD procfd = ProcessFD::spawnvpe(argv, ec, decouple, env, pipe_to_stdin);
if (!ec) {
p_ = std::make_shared<ProcessFD>(std::move(procfd));
}
Expand Down
6 changes: 5 additions & 1 deletion src/ray/util/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ class Process {
/// \param[in] decouple True iff the parent will not wait for the child to exit.
/// \param[in] env Additional environment variables to be set on this process besides
/// the environment variables of the parent process.
/// \param[in] pipe_to_stdin If true, it creates a pipe and redirect to child process'
/// stdin. It is used for health checking from a child process.
/// Child process can read stdin to detect when the current process dies.
explicit Process(const char *argv[],
void *io_service,
std::error_code &ec,
bool decouple = false,
const ProcessEnvironment &env = {});
const ProcessEnvironment &env = {},
bool pipe_to_stdin = false);
/// Convenience function to run the given command line and wait for it to finish.
static std::error_code Call(const std::vector<std::string> &args,
const ProcessEnvironment &env = {});
Expand Down

0 comments on commit 15a558e

Please sign in to comment.