Skip to content

Commit

Permalink
Revert "Revert "Revert "use an agent-id rather than the process PID (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-mo committed Jun 13, 2022
1 parent b574f75 commit feb8c29
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 73 deletions.
26 changes: 8 additions & 18 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,17 @@ def __init__(
dashboard_agent_port,
gcs_address,
minimal,
temp_dir=None,
session_dir=None,
runtime_env_dir=None,
log_dir=None,
metrics_export_port=None,
node_manager_port=None,
listen_port=0,
object_store_name=None,
raylet_name=None,
logging_params=None,
disable_metrics_collection: bool = False,
*, # the following are required kwargs
object_store_name: str,
raylet_name: str,
log_dir: str,
temp_dir: str,
session_dir: str,
runtime_env_dir: str,
logging_params: dict,
agent_id: int,
):
"""Initialize the DashboardAgent object."""
# Public attributes are accessible for all agent modules.
Expand All @@ -88,7 +86,6 @@ def __init__(
self.logging_params = logging_params
self.node_id = os.environ["RAY_NODE_ID"]
self.metrics_collection_disabled = disable_metrics_collection
self.agent_id = agent_id
# TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is
# only used for fate-sharing with the raylet and we need a different
# fate-sharing mechanism for Windows anyways.
Expand Down Expand Up @@ -266,7 +263,7 @@ async def _check_parent():

await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_id=self.agent_id,
agent_pid=os.getpid(),
agent_port=self.grpc_port,
agent_ip_address=self.ip,
)
Expand Down Expand Up @@ -417,12 +414,6 @@ async def _check_parent():
action="store_true",
help=("If this arg is set, metrics report won't be enabled from the agent."),
)
parser.add_argument(
"--agent-id",
required=True,
type=int,
help="ID to report when registering with raylet",
)

args = parser.parse_args()
try:
Expand Down Expand Up @@ -452,7 +443,6 @@ async def _check_parent():
raylet_name=args.raylet_name,
logging_params=logging_params,
disable_metrics_collection=args.disable_metrics_collection,
agent_id=args.agent_id,
)
if os.environ.get("_RAY_AGENT_FAILING"):
raise Exception("Failure injection failure.")
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/agent_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum AgentRpcStatus {
}

message RegisterAgentRequest {
int32 agent_id = 1;
int32 agent_pid = 1;
int32 agent_port = 2;
string agent_ip_address = 3;
}
Expand Down
81 changes: 30 additions & 51 deletions src/ray/raylet/agent_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ namespace raylet {
void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request,
rpc::RegisterAgentReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reported_agent_ip_address_ = request.agent_ip_address();
reported_agent_port_ = request.agent_port();
reported_agent_id_ = request.agent_id();
agent_ip_address_ = request.agent_ip_address();
agent_port_ = request.agent_port();
agent_pid_ = request.agent_pid();
// TODO(SongGuyang): We should remove this after we find better port resolution.
// Note: `agent_port_` should be 0 if the grpc port of agent is in conflict.
if (reported_agent_port_ != 0) {
runtime_env_agent_client_ = runtime_env_agent_client_factory_(
reported_agent_ip_address_, reported_agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << reported_agent_ip_address_
<< ", port: " << reported_agent_port_ << ", id: " << reported_agent_id_;
if (agent_port_ != 0) {
runtime_env_agent_client_ =
runtime_env_agent_client_factory_(agent_ip_address_, agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_
<< ", port: " << agent_port_ << ", pid: " << agent_pid_;
} else {
RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: "
<< reported_agent_ip_address_ << ", id: " << reported_agent_id_
<< agent_ip_address_ << ", pid: " << agent_pid_
<< ". The agent client in the raylet has been disabled.";
disable_agent_client_ = true;
}
Expand All @@ -56,79 +56,58 @@ void AgentManager::StartAgent() {
return;
}

// Create a non-zero random agent_id to pass to the child process
// We cannot use pid an id because os.getpid() from the python process is not
// reliable when using a launcher.
// See https://github.com/ray-project/ray/issues/24361 and Python issue
// https://github.com/python/cpython/issues/83086
int agent_id = 0;
while (agent_id == 0) {
agent_id = rand();
if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : options_.agent_commands) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
}
const std::string agent_id_str = std::to_string(agent_id);

// Launch the process to create the agent.
std::error_code ec;
std::vector<const char *> argv;
for (const std::string &arg : options_.agent_commands) {
argv.push_back(arg.c_str());
}
argv.push_back("--agent-id");
argv.push_back(agent_id_str.c_str());

// Disable metrics report if needed.
if (!RayConfig::instance().enable_metrics_collection()) {
argv.push_back("--disable-metrics-collection");
}
argv.push_back(NULL);

if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : argv) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
}

// Set node id to agent.
ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});

// Launch the process to create the agent.
std::error_code ec;
Process child(argv.data(), nullptr, ec, false, env);
if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start agent with return value " << ec << ": "
<< ec.message();
}

std::thread monitor_thread([this, child, agent_id]() mutable {
std::thread monitor_thread([this, child]() mutable {
SetThreadName("agent.monitor");
RAY_LOG(INFO) << "Monitor agent process with id " << agent_id << ", register timeout "
RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId()
<< ", register timeout "
<< RayConfig::instance().agent_register_timeout_ms() << "ms.";
auto timer = delay_executor_(
[this, child, agent_id]() mutable {
if (reported_agent_id_ != agent_id) {
if (reported_agent_id_ == 0) {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " timed out before registering. ip "
<< reported_agent_ip_address_ << ", id "
<< reported_agent_id_;
} else {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " but got id " << reported_agent_id_
<< ", this is a fatal error";
}
[this, child]() mutable {
if (agent_pid_ != child.GetId()) {
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " has not registered. ip " << agent_ip_address_
<< ", pid " << agent_pid_;
child.Kill();
}
},
RayConfig::instance().agent_register_timeout_ms());

int exit_code = child.Wait();
timer->cancel();
RAY_LOG(WARNING) << "Agent process with id " << agent_id << " exit, return value "
<< exit_code << ". ip " << reported_agent_ip_address_ << ". id "
<< reported_agent_id_;
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId()
<< " exit, return value " << exit_code << ". ip "
<< agent_ip_address_ << ". pid " << agent_pid_;
RAY_LOG(ERROR)
<< "The raylet exited immediately because the Ray agent failed. "
"The raylet fate shares with the agent. This can happen because the "
Expand Down
6 changes: 3 additions & 3 deletions src/ray/raylet/agent_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ class AgentManager : public rpc::AgentManagerServiceHandler {

private:
Options options_;
pid_t reported_agent_id_ = 0;
int reported_agent_port_ = 0;
pid_t agent_pid_ = 0;
int agent_port_ = 0;
/// Whether or not we intend to start the agent. This is false if we
/// are missing Ray Dashboard dependencies, for example.
bool should_start_agent_ = true;
std::string reported_agent_ip_address_;
std::string agent_ip_address_;
DelayExecutorFn delay_executor_;
RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_;
std::shared_ptr<rpc::RuntimeEnvAgentClientInterface> runtime_env_agent_client_;
Expand Down

0 comments on commit feb8c29

Please sign in to comment.