-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Raylet adds job to GCS after the driver port announced. #44626
[core] Raylet adds job to GCS after the driver port announced. #44626
Conversation
Signed-off-by: Ruiyang Wang <[email protected]>
Note: I expect this PR to not have any performance regressions. The driver needs to wait for the raylet -> GCS RPC to finish anyways in both before & after this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lg
Many test failures. |
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
@jjyao ready to merge |
python/ray/tests/test_advanced_9.py
Outdated
# When we create a new node, the new raylet invokes RegisterGcs -> AsyncSubscribeAll | ||
# -> AsyncGetAll -> GetAllJobInfo which makes the GCS to make a connection to each | ||
# drivers. To make the connections consistent, pre-create the connection here. | ||
ray._private.worker.global_worker.gcs_client.get_all_job_info() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand why we need to do this here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I found after this PR, we got 1 more connection created in the test add node, and is not closed in test remove node. Then I found creating a node triggers gcs to connect with driver core workers, and that connection is persisted in core_worker_connection_pool in an LRU policy. Previously we did not have that because gcs had no way to connect to the driver (port is 0) which is exactly what we are fixing in this PR. So I pre-trigger the gcs->driver core worker connection to make it count in the initial list, so after the add node and remove node, the conn count is stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move ray.init(cluster.address)
down and add a corresponding ray.shutdown()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: just shutdown the driver job
python/ray/tests/test_state_api.py
Outdated
time.sleep(10000) | ||
|
||
# Create some long running tasks, no need to wait. | ||
tasks = [f.remote() for i in range(4)] # noqa: F841 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why we need 4 tasks here instead of 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just random. Switched back to only 1 task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need some synchronization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: make it a signal actor enabling task
src/ray/raylet/node_manager.cc
Outdated
if (!status.ok()) { | ||
RAY_LOG(ERROR) << "Failed to add job to GCS: " << status.ToString(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original code, if this fails, RegisterClientReply
will have
// Whether the registration succeeded.
success: bool;
// The reason of registration failure.
failure_reason: string;
set. We should do the same thing for AnnounceWorkerPortReply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, added reply status. But at the terminal we check-fail it, since the cython binding notify_raylet
returns void. Do we want to also change that to a raise exception?
src/ray/raylet/node_manager.cc
Outdated
auto message = protocol::CreateAnnounceWorkerPortReply(fbb); | ||
fbb.Finish(message); | ||
|
||
auto reply_status = client->WriteMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we call WriteMessageAsync
here following
auto reply =
ray::protocol::CreateRegisterClientReply(fbb,
status.ok(),
fbb.CreateString(status.ToString()),
to_flatbuf(fbb, self_node_id_),
assigned_port);
fbb.Finish(reply);
client->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::RegisterClientReply),
fbb.GetSize(),
fbb.GetBufferPointer(),
[this, client](const ray::Status &status) {
if (!status.ok()) {
DisconnectClient(client,
rpc::WorkerExitType::SYSTEM_ERROR,
"Worker is failed because the raylet couldn't reply the "
"registration request.");
}
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. updated
Signed-off-by: Ruiyang Wang <[email protected]>
src/ray/core_worker/core_worker.cc
Outdated
RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort())); | ||
if (options_.worker_type == WorkerType::DRIVER) { | ||
RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPortForDriver( | ||
core_worker_server_->GetPort(), options_.entrypoint)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: add a msg to check
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
time.sleep(10) | ||
# Note: `fds_without_workers` need to be recorded *after* a ray start, because | ||
# a prestarted worker is started on the first driver init. This worker keeps 1 | ||
# connection to the GCS, and it stays alive even after the driver exits. If |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know why the connection stays alive even after the driver exits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The prestarted worker is not specifically for job 01000000; it's for job = nil (ffffffff): code. So it's not killed after the job finished. Frankly I don't know what it does - the worker can't be used by any jobs right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, initially the prestarted worker is not tied to a job but I thought later on when we start the actor, it will late bind to job 0100000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, those prestarted workers cannot be used for actors
void WorkerPool::PrestartDefaultCpuWorkers(ray::Language language, int64_t num_needed) {
// default workers uses 1 cpu and doesn't support actor.
static const WorkerCacheKey kDefaultCpuWorkerCacheKey{/*serialized_runtime_env*/ "",
{{"CPU", 1}},
/*is_actor*/ false,
/*is_gpu*/ false};
Co-authored-by: Jiajun Yao <[email protected]> Signed-off-by: Ruiyang Wang <[email protected]>
…roject#44626) GCS Client API GetAllJobInfo makes 1 RPC to each driver process for some up-to-date info. To make the call, GCS uses the driver core worker's IP and port address from the Raylet calling ray::gcs::JobInfoAccessor::AsyncAdd method. However Raylet may give a port = 0 in the call, causing GCS not able to access the driver's core worker. Signed-off-by: Ruiyang Wang <[email protected]>
…roject#44626) GCS Client API GetAllJobInfo makes 1 RPC to each driver process for some up-to-date info. To make the call, GCS uses the driver core worker's IP and port address from the Raylet calling ray::gcs::JobInfoAccessor::AsyncAdd method. However Raylet may give a port = 0 in the call, causing GCS not able to access the driver's core worker. Signed-off-by: Ruiyang Wang <[email protected]>
GCS Client API
GetAllJobInfo
makes 1 RPC to each driver process for some up-to-date info. To make the call, GCS uses the driver core worker's IP and port address from the Raylet callingray::gcs::JobInfoAccessor::AsyncAdd
method. However Raylet may give aport = 0
in the call, causing GCS not able to access the driver's core worker.Here is a time order in core worker init:
RegisterClientRequest
AddJob
AnnounceWorkerPort
.Note step 3 should actually happen after step 4, because at step 3, raylet does not yet know the core worker's real port. This can happen when the raylet has
NodeManagerConfig::min_worker_port
set to 0, allowing the core worker to pick a port on its own.This PR moves the step 3 (raylet -> GCS
AddJob
) to the raylet function that handles incomingAnnounceWorkerPort
message, and instead of theassigned_port
, now we give GCS the real port.One catch is that, previously
AnnounceWorkerPort
is one-way, core worker does not wait for the reply and continues to the user code immediately. However we do want it to wait until the GCS received the newly added job. We don't want to always add an RTT to all worker inits either, so we only add the reply message when the worker is a driver. Luckily, both sides knows the worker type.Fixes #44459.