Skip to content

Commit

Permalink
[core] Reduce assertion check in NodeManager on worker not found (ray…
Browse files Browse the repository at this point in the history
…-project#41841)

Avoid crashing the raylet when a worker that sends "available" IPC is not found.

Related to ray-project#41477, although there may be a Tune/Train issue there as well.

Signed-off-by: Stephanie Wang <[email protected]>
  • Loading branch information
stephanie-wang committed Dec 14, 2023
1 parent dde5461 commit 98d2e86
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 29 deletions.
2 changes: 2 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ class ClientConnection : public ServerConnection {
/// ProcessClientMessage handler will be called.
void ProcessMessages();

const std::string GetDebugLabel() const { return debug_label_; }

protected:
/// A protected constructor for a node client connection.
ClientConnection(MessageHandler &message_handler,
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
std::placeholders::_8);
direct_task_receiver_ = std::make_unique<CoreWorkerDirectTaskReceiver>(
worker_context_, task_execution_service_, execute_task, [this] {
return local_raylet_client_->TaskDone();
return local_raylet_client_->ActorCreationTaskDone();
});
}

Expand Down
11 changes: 6 additions & 5 deletions src/ray/core_worker/test/direct_actor_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -720,12 +720,13 @@ class MockWorkerContext : public WorkerContext {

class MockCoreWorkerDirectTaskReceiver : public CoreWorkerDirectTaskReceiver {
public:
MockCoreWorkerDirectTaskReceiver(WorkerContext &worker_context,
instrumented_io_context &main_io_service,
const TaskHandler &task_handler,
const OnTaskDone &task_done)
MockCoreWorkerDirectTaskReceiver(
WorkerContext &worker_context,
instrumented_io_context &main_io_service,
const TaskHandler &task_handler,
const OnActorCreationTaskDone &actor_creation_task_done_)
: CoreWorkerDirectTaskReceiver(
worker_context, main_io_service, task_handler, task_done) {}
worker_context, main_io_service, task_handler, actor_creation_task_done_) {}

void UpdateConcurrencyGroupsCache(const ActorID &actor_id,
const std::vector<ConcurrencyGroup> &cgs) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
// Tell raylet that an actor creation task has finished execution, so that
// raylet can publish actor creation event to GCS, and mark this worker as
// actor, thus if this worker dies later raylet will restart the actor.
RAY_CHECK_OK(task_done_());
RAY_CHECK_OK(actor_creation_task_done_());
if (status.IsCreationTaskError()) {
RAY_LOG(WARNING) << "Actor creation task finished with errors, task_id: "
<< task_spec.TaskId()
Expand Down
8 changes: 4 additions & 4 deletions src/ray/core_worker/transport/direct_actor_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ class CoreWorkerDirectTaskReceiver {
bool *is_retryable_error,
std::string *application_error)>;

using OnTaskDone = std::function<Status()>;
using OnActorCreationTaskDone = std::function<Status()>;

CoreWorkerDirectTaskReceiver(WorkerContext &worker_context,
instrumented_io_context &main_io_service,
const TaskHandler &task_handler,
const OnTaskDone &task_done)
const OnActorCreationTaskDone &actor_creation_task_done_)
: worker_context_(worker_context),
task_handler_(task_handler),
task_main_io_service_(main_io_service),
task_done_(task_done),
actor_creation_task_done_(actor_creation_task_done_),
pool_manager_(std::make_shared<ConcurrencyGroupManager<BoundedExecutor>>()),
fiber_state_manager_(nullptr) {}

Expand Down Expand Up @@ -127,7 +127,7 @@ class CoreWorkerDirectTaskReceiver {
/// The IO event loop for running tasks on.
instrumented_io_context &task_main_io_service_;
/// The callback function to be invoked when finishing a task.
OnTaskDone task_done_;
OnActorCreationTaskDone actor_creation_task_done_;
/// Shared pool for producing new core worker clients.
std::shared_ptr<rpc::CoreWorkerClientPool> client_pool_;
/// Address of our RPC server.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum MessageType:int {
SubmitTask = 1,
// Notify the raylet that a task has finished. This is sent from a
// worker to a raylet.
TaskDone,
ActorCreationTaskDone,
// Log a message to the event table. This is sent from a worker to a raylet.
EventLogMessage,
// Send an initial connection message to the raylet. This is sent
Expand Down
14 changes: 6 additions & 8 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,11 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &
case protocol::MessageType::AnnounceWorkerPort: {
ProcessAnnounceWorkerPortMessage(client, message_data);
} break;
case protocol::MessageType::TaskDone: {
HandleWorkerAvailable(client);
case protocol::MessageType::ActorCreationTaskDone: {
if (registered_worker) {
// Worker may send this message after it was disconnected.
HandleWorkerAvailable(registered_worker);
}
} break;
case protocol::MessageType::DisconnectClient: {
ProcessDisconnectClientMessage(client, message_data);
Expand Down Expand Up @@ -1400,15 +1403,10 @@ void NodeManager::ProcessAnnounceWorkerPortMessage(
worker->Connect(port);
if (is_worker) {
worker_pool_.OnWorkerStarted(worker);
HandleWorkerAvailable(worker->Connection());
HandleWorkerAvailable(worker);
}
}

void NodeManager::HandleWorkerAvailable(const std::shared_ptr<ClientConnection> &client) {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
HandleWorkerAvailable(worker);
}

void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &worker) {
RAY_CHECK(worker);

Expand Down
6 changes: 0 additions & 6 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
void ProcessAnnounceWorkerPortMessage(const std::shared_ptr<ClientConnection> &client,
const uint8_t *message_data);

/// Handle the case that a worker is available.
///
/// \param client The connection for the worker.
/// \return Void.
void HandleWorkerAvailable(const std::shared_ptr<ClientConnection> &client);

/// Handle the case that a worker is available.
///
/// \param worker The pointer to the worker
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ Status raylet::RayletClient::AnnounceWorkerPort(int port) {
return conn_->WriteMessage(MessageType::AnnounceWorkerPort, &fbb);
}

Status raylet::RayletClient::TaskDone() {
return conn_->WriteMessage(MessageType::TaskDone);
Status raylet::RayletClient::ActorCreationTaskDone() {
return conn_->WriteMessage(MessageType::ActorCreationTaskDone);
}

Status raylet::RayletClient::FetchOrReconstruct(
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class RayletClient : public RayletClientInterface {
/// Tell the raylet that the client has finished executing a task.
///
/// \return ray::Status.
ray::Status TaskDone();
ray::Status ActorCreationTaskDone();

/// Tell the raylet to reconstruct or fetch objects.
///
Expand Down

0 comments on commit 98d2e86

Please sign in to comment.