Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Raylet adds job to GCS after the driver port announced. #44626

Merged
merged 17 commits into from
Apr 16, 2024

Conversation

rynewang
Copy link
Contributor

GCS Client API GetAllJobInfo makes 1 RPC to each driver process for some up-to-date info. To make the call, GCS uses the driver core worker's IP and port address from the Raylet calling ray::gcs::JobInfoAccessor::AsyncAdd method. However Raylet may give a port = 0 in the call, causing GCS not able to access the driver's core worker.

Here is a time order in core worker init:

  1. core worker starts
  2. core worker -> raylet RegisterClientRequest
  3. raylet -> GCS AddJob
  4. core worker -> raylet AnnounceWorkerPort.

Note step 3 should actually happen after step 4, because at step 3, raylet does not yet know the core worker's real port. This can happen when the raylet has NodeManagerConfig::min_worker_port set to 0, allowing the core worker to pick a port on its own.

This PR moves the step 3 (raylet -> GCS AddJob) to the raylet function that handles incoming AnnounceWorkerPort message, and instead of the assigned_port, now we give GCS the real port.

One catch is that, previously AnnounceWorkerPort is one-way, core worker does not wait for the reply and continues to the user code immediately. However we do want it to wait until the GCS received the newly added job. We don't want to always add an RTT to all worker inits either, so we only add the reply message when the worker is a driver. Luckily, both sides knows the worker type.

Fixes #44459.

@rynewang rynewang requested a review from a team as a code owner April 10, 2024 15:04
@rynewang
Copy link
Contributor Author

Note: I expect this PR to not have any performance regressions. The driver needs to wait for the raylet -> GCS RPC to finish anyways in both before & after this PR.

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lg

src/ray/raylet/node_manager.cc Outdated Show resolved Hide resolved
src/ray/raylet/node_manager.cc Outdated Show resolved Hide resolved
src/ray/raylet/node_manager.cc Outdated Show resolved Hide resolved
src/ray/raylet_client/raylet_client.h Outdated Show resolved Hide resolved
python/ray/tests/test_state_api.py Outdated Show resolved Hide resolved
python/ray/tests/test_state_api.py Show resolved Hide resolved
@jjyao
Copy link
Collaborator

jjyao commented Apr 11, 2024

Many test failures.

@rynewang
Copy link
Contributor Author

@jjyao ready to merge

Comment on lines 267 to 271
# When we create a new node, the new raylet invokes RegisterGcs -> AsyncSubscribeAll
# -> AsyncGetAll -> GetAllJobInfo which makes the GCS to make a connection to each
# drivers. To make the connections consistent, pre-create the connection here.
ray._private.worker.global_worker.gcs_client.get_all_job_info()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why we need to do this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I found after this PR, we got 1 more connection created in the test add node, and is not closed in test remove node. Then I found creating a node triggers gcs to connect with driver core workers, and that connection is persisted in core_worker_connection_pool in an LRU policy. Previously we did not have that because gcs had no way to connect to the driver (port is 0) which is exactly what we are fixing in this PR. So I pre-trigger the gcs->driver core worker connection to make it count in the initial list, so after the add node and remove node, the conn count is stable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move ray.init(cluster.address) down and add a corresponding ray.shutdown()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: just shutdown the driver job

time.sleep(10000)

# Create some long running tasks, no need to wait.
tasks = [f.remote() for i in range(4)] # noqa: F841
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we need 4 tasks here instead of 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just random. Switched back to only 1 task.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some synchronization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: make it a signal actor enabling task

Comment on lines 1336 to 1338
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add job to GCS: " << status.ToString();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original code, if this fails, RegisterClientReply will have

// Whether the registration succeeded.
  success: bool;
  // The reason of registration failure.
  failure_reason: string;

set. We should do the same thing for AnnounceWorkerPortReply?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, added reply status. But at the terminal we check-fail it, since the cython binding notify_raylet returns void. Do we want to also change that to a raise exception?

auto message = protocol::CreateAnnounceWorkerPortReply(fbb);
fbb.Finish(message);

auto reply_status = client->WriteMessage(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we call WriteMessageAsync here following

auto reply =
        ray::protocol::CreateRegisterClientReply(fbb,
                                                 status.ok(),
                                                 fbb.CreateString(status.ToString()),
                                                 to_flatbuf(fbb, self_node_id_),
                                                 assigned_port);
    fbb.Finish(reply);
    client->WriteMessageAsync(
        static_cast<int64_t>(protocol::MessageType::RegisterClientReply),
        fbb.GetSize(),
        fbb.GetBufferPointer(),
        [this, client](const ray::Status &status) {
          if (!status.ok()) {
            DisconnectClient(client,
                             rpc::WorkerExitType::SYSTEM_ERROR,
                             "Worker is failed because the raylet couldn't reply the "
                             "registration request.");
          }
        });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. updated

Signed-off-by: Ruiyang Wang <[email protected]>
@rynewang rynewang assigned rynewang and unassigned jjyao Apr 15, 2024
RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort()));
if (options_.worker_type == WorkerType::DRIVER) {
RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPortForDriver(
core_worker_server_->GetPort(), options_.entrypoint));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: add a msg to check

Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
python/ray/tests/test_advanced_9.py Outdated Show resolved Hide resolved
time.sleep(10)
# Note: `fds_without_workers` need to be recorded *after* a ray start, because
# a prestarted worker is started on the first driver init. This worker keeps 1
# connection to the GCS, and it stays alive even after the driver exits. If
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why the connection stays alive even after the driver exits?

Copy link
Contributor Author

@rynewang rynewang Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prestarted worker is not specifically for job 01000000; it's for job = nil (ffffffff): code. So it's not killed after the job finished. Frankly I don't know what it does - the worker can't be used by any jobs right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, initially the prestarted worker is not tied to a job but I thought later on when we start the actor, it will late bind to job 0100000

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, those prestarted workers cannot be used for actors

void WorkerPool::PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed) {
  // default workers uses 1 cpu and doesn't support actor.
  static const WorkerCacheKey kDefaultCpuWorkerCacheKey{/*serialized_runtime_env*/ "",
                                                        {{"CPU", 1}},
                                                        /*is_actor*/ false,
                                                        /*is_gpu*/ false};

Co-authored-by: Jiajun Yao <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
@jjyao jjyao merged commit 26af5b7 into ray-project:master Apr 16, 2024
5 checks passed
harborn pushed a commit to harborn/ray that referenced this pull request Apr 18, 2024
…roject#44626)

GCS Client API GetAllJobInfo makes 1 RPC to each driver process for some up-to-date info. To make the call, GCS uses the driver core worker's IP and port address from the Raylet calling ray::gcs::JobInfoAccessor::AsyncAdd method. However Raylet may give a port = 0 in the call, causing GCS not able to access the driver's core worker.

Signed-off-by: Ruiyang Wang <[email protected]>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
…roject#44626)

GCS Client API GetAllJobInfo makes 1 RPC to each driver process for some up-to-date info. To make the call, GCS uses the driver core worker's IP and port address from the Raylet calling ray::gcs::JobInfoAccessor::AsyncAdd method. However Raylet may give a port = 0 in the call, causing GCS not able to access the driver's core worker.

Signed-off-by: Ruiyang Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

GetAllJobInfo is_running_tasks is not returning the correct value when driver starts ray
2 participants