Skip to content

Commit

Permalink
[Core] Remove multiple core workers in one process 2/n (ray-project#3…
Browse files Browse the repository at this point in the history
…4942)

Remove some legacy code related to multiple core workers in one process since we now no longer support it.

Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao committed May 2, 2023
1 parent ff11cb0 commit 19c172b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 143 deletions.
200 changes: 66 additions & 134 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ void WorkerPool::AddWorkerProcess(
const std::vector<std::string> &dynamic_options) {
state.worker_processes.emplace(worker_startup_token_counter_,
WorkerProcessInfo{/*is_pending_registration=*/true,
{},
worker_type,
proc,
start,
Expand Down Expand Up @@ -773,7 +772,6 @@ void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker)
auto it = state.worker_processes.find(worker_startup_token);
if (it != state.worker_processes.end()) {
it->second.is_pending_registration = false;
it->second.alive_started_workers.insert(worker);
// We may have slots to start more workers now.
TryStartIOWorkers(worker->GetLanguage());
}
Expand Down Expand Up @@ -1063,126 +1061,76 @@ void WorkerPool::TryKillingIdleWorkers() {
RAY_LOG(DEBUG) << "idle worker is already dead. Not going to kill worker "
<< idle_worker->WorkerId();
// This worker has already been killed.
// This is possible because a Java worker process may hold multiple workers.
// It will be removed from idle_of_all_languages_ later.
// This happens when ExitReply is received but the worker is not removed from
// idle_of_all_languages_ yet.
continue;
}
auto worker_startup_token = idle_worker->GetStartupToken();
auto &worker_state = GetStateForLanguage(idle_worker->GetLanguage());

auto it = worker_state.worker_processes.find(worker_startup_token);
if (it != worker_state.worker_processes.end() && it->second.is_pending_registration) {
// A Java worker process may hold multiple workers.
// Some workers of this process are pending registration. Skip killing this worker.
// Skip killing the worker process if there's any inflight `Exit` RPC requests to
// this worker process.
if (pending_exit_idle_workers_.count(idle_worker->WorkerId())) {
continue;
}

// TODO(clarng): get rid of multiple workers per process code here, as that is
// not longer supported.
auto process = idle_worker->GetProcess();
// Make sure all workers in this worker process are idle.
// This block of code is needed by Java workers.
auto workers_in_the_same_process = GetWorkersByProcess(process);
bool can_be_killed = true;
for (const auto &worker : workers_in_the_same_process) {
if (worker_state.idle.count(worker) == 0 ||
now - idle_of_all_languages_map_[worker] <
RayConfig::instance().idle_worker_killing_time_threshold_ms()) {
// Another worker in this process isn't idle, or hasn't been idle for a while, so
// this process can't be killed.
can_be_killed = false;
break;
}

// Skip killing the worker process if there's any inflight `Exit` RPC requests to
// this worker process.
if (pending_exit_idle_workers_.count(worker->WorkerId())) {
can_be_killed = false;
break;
}
}
if (!can_be_killed) {
continue;
RAY_LOG(DEBUG) << "The worker pool has " << running_size
<< " registered workers which exceeds the soft limit of "
<< num_workers_soft_limit_ << ", and worker "
<< idle_worker->WorkerId() << " with pid "
<< idle_worker->GetProcess().GetId()
<< " has been idle for a a while. Kill it.";
// To avoid object lost issue caused by forcibly killing, send an RPC request to the
// worker to allow it to do cleanup before exiting. We kill it anyway if the driver
// is already exited.
RAY_LOG(DEBUG) << "Sending exit message to worker " << idle_worker->WorkerId();
// Register the worker to pending exit so that we can correctly calculate the
// running_size.
// This also means that there's an inflight `Exit` RPC request to the worker.
pending_exit_idle_workers_.emplace(idle_worker->WorkerId(), idle_worker);
auto rpc_client = idle_worker->rpc_client();
RAY_CHECK(rpc_client);
RAY_CHECK(running_size > 0);
running_size--;
rpc::ExitRequest request;
if (finished_jobs_.contains(job_id) &&
RayConfig::instance().kill_idle_workers_of_terminated_job()) {
RAY_LOG(INFO) << "Force exiting worker whose job has exited "
<< idle_worker->WorkerId();
request.set_force_exit(true);
}
rpc_client->Exit(
request, [this, idle_worker](const ray::Status &status, const rpc::ExitReply &r) {
RAY_CHECK(pending_exit_idle_workers_.erase(idle_worker->WorkerId()));
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send exit request: " << status.ToString();
}

RAY_CHECK(running_size >= workers_in_the_same_process.size());
if (running_size - workers_in_the_same_process.size() <
static_cast<size_t>(num_workers_soft_limit_)) {
// A Java worker process may contain multiple workers. Killing more workers than we
// expect may slow the job.
if (!finished_jobs_.count(job_id)) {
// Ignore the soft limit for jobs that have already finished, as we
// should always clean up these workers.
return;
}
}

for (const auto &worker : workers_in_the_same_process) {
RAY_LOG(DEBUG) << "The worker pool has " << running_size
<< " registered workers which exceeds the soft limit of "
<< num_workers_soft_limit_ << ", and worker " << worker->WorkerId()
<< " with pid " << process.GetId()
<< " has been idle for a a while. Kill it.";
// To avoid object lost issue caused by forcibly killing, send an RPC request to the
// worker to allow it to do cleanup before exiting. We kill it anyway if the driver
// is already exited.
if (!worker->IsDead()) {
RAY_LOG(DEBUG) << "Sending exit message to worker " << worker->WorkerId();
// Register the worker to pending exit so that we can correctly calculate the
// running_size.
// This also means that there's an inflight `Exit` RPC request to the worker.
pending_exit_idle_workers_.emplace(worker->WorkerId(), worker);
auto rpc_client = worker->rpc_client();
RAY_CHECK(rpc_client);
RAY_CHECK(running_size > 0);
running_size--;
rpc::ExitRequest request;
if (finished_jobs_.contains(job_id) &&
RayConfig::instance().kill_idle_workers_of_terminated_job()) {
RAY_LOG(INFO) << "Force exiting worker whose job has exited "
<< worker->WorkerId();
request.set_force_exit(true);
}
rpc_client->Exit(
request, [this, worker](const ray::Status &status, const rpc::ExitReply &r) {
RAY_CHECK(pending_exit_idle_workers_.erase(worker->WorkerId()));
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send exit request: " << status.ToString();
}

// In case of failed to send request, we remove it from pool as well
// TODO (iycheng): We should handle the grpc failure in better way.
if (!status.ok() || r.success()) {
RAY_LOG(DEBUG) << "Removed worker " << worker->WorkerId();
auto &worker_state = GetStateForLanguage(worker->GetLanguage());
// If we could kill the worker properly, we remove them from the idle
// pool.
RemoveWorker(worker_state.idle, worker);
// We always mark the worker as dead.
// If the worker is not idle at this moment, we'd want to mark it as dead
// so it won't be reused later.
if (!worker->IsDead()) {
worker->MarkDead();
}
} else {
RAY_LOG(DEBUG) << "Failed to remove worker " << worker->WorkerId();
// We re-insert the idle worker to the back of the queue if it fails to
// kill the worker (e.g., when the worker owns the object). Without this,
// if the first N workers own objects, it can't kill idle workers that are
// >= N+1.
const auto &idle_pair = idle_of_all_languages_.front();
idle_of_all_languages_.push_back(idle_pair);
idle_of_all_languages_.pop_front();
RAY_CHECK(idle_of_all_languages_.size() ==
idle_of_all_languages_map_.size());
}
});
} else {
RAY_LOG(DEBUG) << "Removing dead worker " << worker->WorkerId();

// Even it's a dead worker, we still need to remove them from the pool.
RemoveWorker(worker_state.idle, worker);
}
}
// In case of failed to send request, we remove it from pool as well
// TODO (iycheng): We should handle the grpc failure in better way.
if (!status.ok() || r.success()) {
RAY_LOG(DEBUG) << "Removed worker " << idle_worker->WorkerId();
auto &worker_state = GetStateForLanguage(idle_worker->GetLanguage());
// If we could kill the worker properly, we remove them from the idle
// pool.
RemoveWorker(worker_state.idle, idle_worker);
// We always mark the worker as dead.
// If the worker is not idle at this moment, we'd want to mark it as dead
// so it won't be reused later.
if (!idle_worker->IsDead()) {
idle_worker->MarkDead();
}
} else {
RAY_LOG(DEBUG) << "Failed to remove worker " << idle_worker->WorkerId();
// We re-insert the idle worker to the back of the queue if it fails to
// kill the worker (e.g., when the worker owns the object). Without this,
// if the first N workers own objects, it can't kill idle workers that are
// >= N+1.
const auto &idle_pair = idle_of_all_languages_.front();
idle_of_all_languages_.push_back(idle_pair);
idle_of_all_languages_.pop_front();
RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size());
}
});
}

std::list<std::pair<std::shared_ptr<WorkerInterface>, int64_t>>
Expand Down Expand Up @@ -1417,19 +1365,17 @@ void WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker
auto &state = GetStateForLanguage(worker->GetLanguage());
auto it = state.worker_processes.find(worker->GetStartupToken());
if (it != state.worker_processes.end()) {
if (!RemoveWorker(it->second.alive_started_workers, worker)) {
if (it->second.is_pending_registration) {
// Worker is either starting or started,
// if it's not started, we should remove it from starting.
it->second.is_pending_registration = false;
if (worker->GetWorkerType() == rpc::WorkerType::WORKER) {
TryPendingPopWorkerRequests(worker->GetLanguage());
}
}
if (it->second.alive_started_workers.size() == 0 &&
!it->second.is_pending_registration) {
DeleteRuntimeEnvIfPossible(it->second.runtime_env_info.serialized_runtime_env());
RemoveWorkerProcess(state, worker->GetStartupToken());
}

DeleteRuntimeEnvIfPossible(it->second.runtime_env_info.serialized_runtime_env());
RemoveWorkerProcess(state, worker->GetStartupToken());
}
RAY_CHECK(RemoveWorker(state.registered_workers, worker));

Expand Down Expand Up @@ -1605,20 +1551,6 @@ void WorkerPool::TryStartIOWorkers(const Language &language,
}
}

std::unordered_set<std::shared_ptr<WorkerInterface>> WorkerPool::GetWorkersByProcess(
const Process &process) {
std::unordered_set<std::shared_ptr<WorkerInterface>> workers_of_process;
for (auto &entry : states_by_lang_) {
auto &worker_state = entry.second;
for (const auto &worker : worker_state.registered_workers) {
if (worker->GetProcess().GetId() == process.GetId()) {
workers_of_process.insert(worker);
}
}
}
return workers_of_process;
}

std::string WorkerPool::DebugString() const {
std::stringstream result;
result << "WorkerPool:";
Expand Down
9 changes: 0 additions & 9 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
struct WorkerProcessInfo {
/// Whether this worker is pending registration or is started.
bool is_pending_registration = true;
/// The started workers which is alive.
std::unordered_set<std::shared_ptr<WorkerInterface>> alive_started_workers;
/// The type of the worker.
rpc::WorkerType worker_type;
/// The worker process instance.
Expand Down Expand Up @@ -585,13 +583,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// \param language The language of the PopWorker requests.
void TryPendingPopWorkerRequests(const Language &language);

/// Get all workers of the given process.
///
/// \param process The process of workers.
/// \return The workers of the given process.
std::unordered_set<std::shared_ptr<WorkerInterface>> GetWorkersByProcess(
const Process &process);

/// Get either restore or spill worker state from state based on worker_type.
///
/// \param worker_type IO Worker Type.
Expand Down

0 comments on commit 19c172b

Please sign in to comment.