From a78c5d5ef251150ddc961c1fc0931edcf09d9603 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Fri, 17 Jul 2020 11:08:03 -0700 Subject: [PATCH] [New scheduler] Queueing refactor (#9491) * . * test_args passes * . * test_basic.py::test_many_fractional_resources causes ray to hang * test_basic.py::test_many_fractional_resources causes ray to hang * . * . * useful * test_many_fractional_resources fails instead of hanging now :) * Passes test_fractional_resources * . * . * Some cleanup * git is hard * cleanup * . * . * . * . * . * . * . * cleanup * address reviews * address reviews * more refactor * :) * travis pls * . * travis pls * . --- BUILD.bazel | 16 +- src/ray/raylet/node_manager.cc | 219 +++--------------- src/ray/raylet/node_manager.h | 29 +-- .../scheduling/cluster_resource_scheduler.cc | 0 .../scheduling/cluster_resource_scheduler.h | 6 +- .../raylet/scheduling/cluster_task_manager.cc | 218 +++++++++++++++++ .../raylet/scheduling/cluster_task_manager.h | 109 +++++++++ .../scheduling/fixed_point.cc | 0 .../scheduling/fixed_point.h | 0 .../scheduling/scheduling_ids.cc | 0 .../scheduling/scheduling_ids.h | 4 +- .../scheduling/scheduling_test.cc | 10 +- src/ray/raylet/worker.h | 4 +- 13 files changed, 387 insertions(+), 228 deletions(-) rename src/ray/{common => raylet}/scheduling/cluster_resource_scheduler.cc (100%) rename src/ray/{common => raylet}/scheduling/cluster_resource_scheduler.h (99%) create mode 100644 src/ray/raylet/scheduling/cluster_task_manager.cc create mode 100644 src/ray/raylet/scheduling/cluster_task_manager.h rename src/ray/{common => raylet}/scheduling/fixed_point.cc (100%) rename src/ray/{common => raylet}/scheduling/fixed_point.h (100%) rename src/ray/{common => raylet}/scheduling/scheduling_ids.cc (100%) rename src/ray/{common => raylet}/scheduling/scheduling_ids.h (100%) rename src/ray/{common => raylet}/scheduling/scheduling_test.cc (99%) diff --git a/BUILD.bazel b/BUILD.bazel index 0805a6cb6c1d2..21076fc2c5dd6 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -620,17 +620,19 @@ cc_library( name = "raylet_lib", srcs = glob( [ - "src/ray/raylet/*.cc", + "src/ray/raylet/**/*.cc", ], exclude = [ "src/ray/raylet/*_test.cc", "src/ray/raylet/main.cc", ], ), - hdrs = glob([ - "src/ray/raylet/*.h", - "src/ray/core_worker/common.h", - ]), + hdrs = glob( + [ + "src/ray/raylet/**/*.h", + "src/ray/core_worker/common.h", + ], + ), copts = COPTS, linkopts = select({ "@bazel_tools//src/conditions:windows": [ @@ -853,10 +855,10 @@ cc_test( cc_test( name = "scheduling_test", - srcs = ["src/ray/common/scheduling/scheduling_test.cc"], + srcs = ["src/ray/raylet/scheduling/scheduling_test.cc"], copts = COPTS, deps = [ - ":ray_common", + ":raylet_lib", "@com_google_googletest//:gtest_main", ], ) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 8c64e32a76c4c..8c057669eddaf 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -203,6 +203,23 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, std::shared_ptr(new ClusterResourceScheduler( self_node_id_.Binary(), local_resources.GetTotalResources().GetResourceMap())); + std::function fulfills_dependencies_func = + [this](const Task &task) { + bool args_ready = task_dependency_manager_.SubscribeGetDependencies( + task.GetTaskSpecification().TaskId(), task.GetDependencies()); + if (args_ready) { + task_dependency_manager_.UnsubscribeGetDependencies( + task.GetTaskSpecification().TaskId()); + } + return args_ready; + }; + + auto get_node_info_func = [this](const ClientID &node_id) { + return gcs_client_->Nodes().Get(node_id); + }; + cluster_task_manager_ = std::shared_ptr( + new ClusterTaskManager(self_node_id_, new_resource_scheduler_, + fulfills_dependencies_func, get_node_info_func)); } RAY_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); @@ -858,7 +875,7 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, new_resource_scheduler_->AddOrUpdateNode( client_id.Binary(), remote_resources.GetTotalResources().GetResourceMap(), remote_resources.GetAvailableResources().GetResourceMap()); - NewSchedulerSchedulePendingTasks(); + ScheduleAndDispatch(); return; } @@ -1339,7 +1356,7 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr &worker) { // Local resource availability changed: invoke scheduling policy for local node. if (new_scheduler_enabled_) { - NewSchedulerSchedulePendingTasks(); + ScheduleAndDispatch(); } else { cluster_resource_map_[self_node_id_].SetLoadResources( local_queues_.GetResourceLoad()); @@ -1463,7 +1480,7 @@ void NodeManager::ProcessDisconnectClientMessage( // Since some resources may have been released, we can try to dispatch more tasks. YYY if (new_scheduler_enabled_) { - NewSchedulerSchedulePendingTasks(); + ScheduleAndDispatch(); } else { DispatchTasks(local_queues_.GetReadyTasksByClass()); } @@ -1676,117 +1693,10 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) { SubmitTask(Task(task_message), Lineage()); } -void NodeManager::DispatchScheduledTasksToWorkers() { - RAY_CHECK(new_scheduler_enabled_); - - // Check every task in task_to_dispatch queue to see - // whether it can be dispatched and ran. This avoids head-of-line - // blocking where a task which cannot be dispatched because - // there are not enough available resources blocks other - // tasks from being dispatched. - for (size_t queue_size = tasks_to_dispatch_.size(); queue_size > 0; queue_size--) { - auto task = tasks_to_dispatch_.front(); - auto reply = task.first; - auto spec = task.second.GetTaskSpecification(); - tasks_to_dispatch_.pop_front(); - - std::shared_ptr worker = worker_pool_.PopWorker(spec); - if (!worker) { - // No worker available to schedule this task. - // Put the task back in the dispatch queue. - tasks_to_dispatch_.push_front(task); - return; - } - - std::shared_ptr allocated_instances( - new TaskResourceInstances()); - bool schedulable = new_resource_scheduler_->AllocateLocalTaskResources( - spec.GetRequiredResources().GetResourceMap(), allocated_instances); - if (!schedulable) { - // Not enough resources to schedule this task. - // Put it back at the end of the dispatch queue. - tasks_to_dispatch_.push_back(task); - worker_pool_.PushWorker(worker); - // Try next task in the dispatch queue. - continue; - } - - worker->SetOwnerAddress(spec.CallerAddress()); - if (spec.IsActorCreationTask()) { - // The actor belongs to this worker now. - worker->SetLifetimeAllocatedInstances(allocated_instances); - } else { - worker->SetAllocatedInstances(allocated_instances); - } - worker->AssignTaskId(spec.TaskId()); - worker->AssignJobId(spec.JobId()); - worker->SetAssignedTask(task.second); - - reply(worker, ClientID::Nil(), "", -1); - } -} - -void NodeManager::NewSchedulerSchedulePendingTasks() { - RAY_CHECK(new_scheduler_enabled_); - size_t queue_size = tasks_to_schedule_.size(); - - // Check every task in task_to_schedule queue to see - // whether it can be scheduled. This avoids head-of-line - // blocking where a task which cannot be scheduled because - // there are not enough available resources blocks other - // tasks from being scheduled. - while (queue_size-- > 0) { - auto work = tasks_to_schedule_.front(); - tasks_to_schedule_.pop_front(); - auto task = work.second; - auto request_resources = - task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); - int64_t violations = 0; - std::string node_id_string = - new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations); - if (node_id_string.empty()) { - /// There is no node that has available resources to run the request. - tasks_to_schedule_.push_back(work); - continue; - } else { - if (node_id_string == self_node_id_.Binary()) { - WaitForTaskArgsRequests(work); - } else { - // Should spill over to a different node. - new_resource_scheduler_->AllocateRemoteTaskResources(node_id_string, - request_resources); - - ClientID node_id = ClientID::FromBinary(node_id_string); - auto node_info_opt = gcs_client_->Nodes().Get(node_id); - RAY_CHECK(node_info_opt) - << "Spilling back to a node manager, but no GCS info found for node " - << node_id; - work.first(nullptr, node_id, node_info_opt->node_manager_address(), - node_info_opt->node_manager_port()); - } - } - } - DispatchScheduledTasksToWorkers(); -} - -void NodeManager::WaitForTaskArgsRequests(std::pair &work) { +void NodeManager::ScheduleAndDispatch() { RAY_CHECK(new_scheduler_enabled_); - const Task &task = work.second; - const auto &object_refs = task.GetDependencies(); - - if (object_refs.size() > 0) { - bool args_ready = task_dependency_manager_.SubscribeGetDependencies( - task.GetTaskSpecification().TaskId(), object_refs); - if (args_ready) { - task_dependency_manager_.UnsubscribeGetDependencies( - task.GetTaskSpecification().TaskId()); - tasks_to_dispatch_.push_back(work); - } else { - waiting_tasks_[task.GetTaskSpecification().TaskId()] = work; - } - } else { - tasks_to_dispatch_.push_back(work); - } + cluster_task_manager_->SchedulePendingTasks(); + cluster_task_manager_->DispatchScheduledTasksToWorkers(worker_pool_, leased_workers_); } void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request, @@ -1811,72 +1721,8 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest if (new_scheduler_enabled_) { auto task_spec = task.GetTaskSpecification(); - auto work = std::make_pair( - [this, task_spec, reply, send_reply_callback](std::shared_ptr worker, - ClientID spillback_to, - std::string address, int port) { - if (worker != nullptr) { - reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); - reply->mutable_worker_address()->set_port(worker->Port()); - reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary()); - reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary()); - RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end()); - leased_workers_[worker->WorkerId()] = worker; - std::shared_ptr allocated_resources; - if (task_spec.IsActorCreationTask()) { - allocated_resources = worker->GetLifetimeAllocatedInstances(); - } else { - allocated_resources = worker->GetAllocatedInstances(); - } - auto predefined_resources = allocated_resources->predefined_resources; - ::ray::rpc::ResourceMapEntry *resource; - for (size_t res_idx = 0; res_idx < predefined_resources.size(); res_idx++) { - bool first = true; // Set resource name only if at least one of its - // instances has available capacity. - for (size_t inst_idx = 0; inst_idx < predefined_resources[res_idx].size(); - inst_idx++) { - if (predefined_resources[res_idx][inst_idx] > 0.) { - if (first) { - resource = reply->add_resource_mapping(); - resource->set_name( - new_resource_scheduler_->GetResourceNameFromIndex(res_idx)); - first = false; - } - auto rid = resource->add_resource_ids(); - rid->set_index(inst_idx); - rid->set_quantity(predefined_resources[res_idx][inst_idx].Double()); - } - } - } - auto custom_resources = allocated_resources->custom_resources; - for (auto it = custom_resources.begin(); it != custom_resources.end(); ++it) { - bool first = true; // Set resource name only if at least one of its - // instances has available capacity. - for (size_t inst_idx = 0; inst_idx < it->second.size(); inst_idx++) { - if (it->second[inst_idx] > 0.) { - if (first) { - resource = reply->add_resource_mapping(); - resource->set_name( - new_resource_scheduler_->GetResourceNameFromIndex(it->first)); - first = false; - } - auto rid = resource->add_resource_ids(); - rid->set_index(inst_idx); - rid->set_quantity(it->second[inst_idx].Double()); - } - } - } - } else { - reply->mutable_retry_at_raylet_address()->set_ip_address(address); - reply->mutable_retry_at_raylet_address()->set_port(port); - reply->mutable_retry_at_raylet_address()->set_raylet_id( - spillback_to.Binary()); - } - send_reply_callback(Status::OK(), nullptr, nullptr); - }, - task); - tasks_to_schedule_.push_back(work); - NewSchedulerSchedulePendingTasks(); + cluster_task_manager_->QueueTask(task, reply, send_reply_callback); + ScheduleAndDispatch(); return; } @@ -2453,7 +2299,7 @@ void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr &wor worker->SetBorrowedCPUInstances(borrowed_cpu_instances); worker->MarkBlocked(); } - NewSchedulerSchedulePendingTasks(); + ScheduleAndDispatch(); return; } @@ -2481,7 +2327,7 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &w new_resource_scheduler_->AddCPUResourceInstances(worker->GetBorrowedCPUInstances()); worker->MarkUnblocked(); } - NewSchedulerSchedulePendingTasks(); + ScheduleAndDispatch(); return; } @@ -3162,15 +3008,8 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { << " tasks ready"; // Transition the tasks whose dependencies are now fulfilled to the ready state. if (new_scheduler_enabled_) { - for (auto task_id : ready_task_ids) { - auto it = waiting_tasks_.find(task_id); - if (it != waiting_tasks_.end()) { - task_dependency_manager_.UnsubscribeGetDependencies(task_id); - tasks_to_dispatch_.push_back(it->second); - waiting_tasks_.erase(it); - } - } - NewSchedulerSchedulePendingTasks(); + cluster_task_manager_->TasksUnblocked(ready_task_ids); + ScheduleAndDispatch(); } else { if (ready_task_ids.size() > 0) { std::unordered_set ready_task_id_set(ready_task_ids.begin(), diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 58fb51a7e820d..db01fc30c3ba7 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -25,11 +25,12 @@ #include "ray/common/client_connection.h" #include "ray/common/task/task_common.h" #include "ray/common/task/scheduling_resources.h" -#include "ray/common/scheduling/scheduling_ids.h" -#include "ray/common/scheduling/cluster_resource_scheduler.h" #include "ray/object_manager/object_manager.h" #include "ray/raylet/actor_registration.h" #include "ray/raylet/lineage_cache.h" +#include "ray/raylet/scheduling/scheduling_ids.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler.h" +#include "ray/raylet/scheduling/cluster_task_manager.h" #include "ray/raylet/scheduling_policy.h" #include "ray/raylet/scheduling_queue.h" #include "ray/raylet/reconstruction_policy.h" @@ -664,7 +665,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// in the system (local or remote) that has enough resources available to /// run the task, if any such node exist. /// Repeat the process as long as we can schedule a task. - void NewSchedulerSchedulePendingTasks(); + /// NEW SCHEDULER_FUNCTION + void ScheduleAndDispatch(); /// Whether a task is an actor creation task. bool IsActorCreationTask(const TaskID &task_id); @@ -772,20 +774,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// on all local workers of this raylet. bool should_local_gc_ = false; - /// The new resource scheduler for direct task calls. + /// These two classes make up the new scheduler. ClusterResourceScheduler is + /// responsible for maintaining a view of the cluster state w.r.t resource + /// usage. ClusterTaskManager is responsible for queuing, spilling back, and + /// dispatching tasks. std::shared_ptr new_resource_scheduler_; - - typedef std::function, ClientID spillback_to, - std::string address, int port)> - ScheduleFn; - - /// Queue of lease requests that are waiting for resources to become available. - /// TODO this should be a queue for each SchedulingClass - std::deque> tasks_to_schedule_; - /// Queue of lease requests that should be scheduled onto workers. - std::deque> tasks_to_dispatch_; - /// Queue tasks waiting for arguments to be transferred locally. - absl::flat_hash_map> waiting_tasks_; + std::shared_ptr cluster_task_manager_; /// Cache of gRPC clients to workers (not necessarily running on this node). /// Also includes the number of inflight requests to each worker - when this @@ -796,9 +790,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { absl::flat_hash_map> pinned_objects_; - /// Wait for a task's arguments to become ready. - void WaitForTaskArgsRequests(std::pair &work); - // TODO(swang): Evict entries from these caches. /// Cache for the WorkerTable in the GCS. absl::flat_hash_set failed_workers_cache_; diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc similarity index 100% rename from src/ray/common/scheduling/cluster_resource_scheduler.cc rename to src/ray/raylet/scheduling/cluster_resource_scheduler.cc diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h similarity index 99% rename from src/ray/common/scheduling/cluster_resource_scheduler.h rename to src/ray/raylet/scheduling/cluster_resource_scheduler.h index a1f04e64823c7..7d0bcfc3342a9 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -20,15 +20,15 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" -#include "ray/common/scheduling/fixed_point.h" -#include "ray/common/scheduling/scheduling_ids.h" #include "ray/common/task/scheduling_resources.h" +#include "ray/raylet/scheduling/fixed_point.h" +#include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/util/logging.h" /// List of predefined resources. enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX }; // Specify resources that consists of unit-size instances. -static std::unordered_set UnitInstanceResources{GPU, TPU}; +static std::unordered_set UnitInstanceResources{CPU, GPU, TPU}; /// Helper function to compare two vectors with FixedPoint values. bool EqualVectors(const std::vector &v1, const std::vector &v2); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc new file mode 100644 index 0000000000000..932608dea74e6 --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -0,0 +1,218 @@ +#include "cluster_task_manager.h" + +#include "ray/util/logging.h" + +namespace ray { +namespace raylet { + +ClusterTaskManager::ClusterTaskManager( + const ClientID &self_node_id, + std::shared_ptr cluster_resource_scheduler, + std::function fulfills_dependencies_func, + NodeInfoGetter get_node_info) + : self_node_id_(self_node_id), + cluster_resource_scheduler_(cluster_resource_scheduler), + fulfills_dependencies_func_(fulfills_dependencies_func), + get_node_info_(get_node_info) {} + +bool ClusterTaskManager::SchedulePendingTasks() { + size_t queue_size = tasks_to_schedule_.size(); + bool did_schedule = false; + + // Check every task in task_to_schedule queue to see + // whether it can be scheduled. This avoids head-of-line + // blocking where a task which cannot be scheduled because + // there are not enough available resources blocks other + // tasks from being scheduled. + while (queue_size-- > 0) { + Work work = tasks_to_schedule_.front(); + tasks_to_schedule_.pop_front(); + Task task = std::get<0>(work); + auto request_resources = + task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); + int64_t _unused; + std::string node_id_string = + cluster_resource_scheduler_->GetBestSchedulableNode(request_resources, &_unused); + if (node_id_string.empty()) { + /// There is no node that has available resources to run the request. + tasks_to_schedule_.push_back(work); + continue; + } else { + if (node_id_string == self_node_id_.Binary()) { + did_schedule = did_schedule || WaitForTaskArgsRequests(work); + } else { + // Should spill over to a different node. + cluster_resource_scheduler_->AllocateRemoteTaskResources(node_id_string, + request_resources); + + ClientID node_id = ClientID::FromBinary(node_id_string); + auto node_info_opt = get_node_info_(node_id); + // gcs_client_->Nodes().Get(node_id); + RAY_CHECK(node_info_opt) + << "Spilling back to a node manager, but no GCS info found for node " + << node_id; + auto reply = std::get<1>(work); + auto callback = std::get<2>(work); + Spillback(node_id, node_info_opt->node_manager_address(), + node_info_opt->node_manager_port(), reply, callback); + } + } + } + return did_schedule; +} + +bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) { + Task task = std::get<0>(work); + auto object_ids = task.GetTaskSpecification().GetDependencies(); + bool can_dispatch = true; + if (object_ids.size() > 0) { + bool args_ready = fulfills_dependencies_func_(task); + if (args_ready) { + tasks_to_dispatch_.push_back(work); + } else { + can_dispatch = false; + TaskID task_id = task.GetTaskSpecification().TaskId(); + waiting_tasks_[task_id] = work; + } + } else { + tasks_to_dispatch_.push_back(work); + } + return can_dispatch; +} + +void ClusterTaskManager::DispatchScheduledTasksToWorkers( + WorkerPool &worker_pool, + std::unordered_map> &leased_workers) { + // Check every task in task_to_dispatch queue to see + // whether it can be dispatched and ran. This avoids head-of-line + // blocking where a task which cannot be dispatched because + // there are not enough available resources blocks other + // tasks from being dispatched. + for (size_t queue_size = tasks_to_dispatch_.size(); queue_size > 0; queue_size--) { + auto work = tasks_to_dispatch_.front(); + auto task = std::get<0>(work); + auto spec = task.GetTaskSpecification(); + tasks_to_dispatch_.pop_front(); + + std::shared_ptr worker = worker_pool.PopWorker(spec); + if (!worker) { + // No worker available to schedule this task. + // Put the task back in the dispatch queue. + tasks_to_dispatch_.push_front(work); + return; + } + + std::shared_ptr allocated_instances( + new TaskResourceInstances()); + bool schedulable = cluster_resource_scheduler_->AllocateLocalTaskResources( + spec.GetRequiredResources().GetResourceMap(), allocated_instances); + if (!schedulable) { + // Not enough resources to schedule this task. + // Put it back at the end of the dispatch queue. + tasks_to_dispatch_.push_back(work); + worker_pool.PushWorker(worker); + // Try next task in the dispatch queue. + continue; + } + + auto reply = std::get<1>(work); + auto callback = std::get<2>(work); + worker->SetOwnerAddress(spec.CallerAddress()); + if (spec.IsActorCreationTask()) { + // The actor belongs to this worker now. + worker->SetLifetimeAllocatedInstances(allocated_instances); + } else { + worker->SetAllocatedInstances(allocated_instances); + } + worker->AssignTaskId(spec.TaskId()); + worker->AssignJobId(spec.JobId()); + worker->SetAssignedTask(task); + Dispatch(worker, leased_workers, spec, reply, callback); + } +} + +void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) { + Work work = std::make_tuple(task, reply, send_reply_callback); + tasks_to_schedule_.push_back(work); +} + +void ClusterTaskManager::TasksUnblocked(const std::vector ready_ids) { + for (const auto &task_id : ready_ids) { + auto it = waiting_tasks_.find(task_id); + if (it != waiting_tasks_.end()) { + tasks_to_dispatch_.push_back(it->second); + waiting_tasks_.erase(it); + } + } +} + +void ClusterTaskManager::Dispatch( + std::shared_ptr worker, + std::unordered_map> &leased_workers_, + const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) { + reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); + reply->mutable_worker_address()->set_port(worker->Port()); + reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary()); + reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary()); + RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end()); + leased_workers_[worker->WorkerId()] = worker; + std::shared_ptr allocated_resources; + if (task_spec.IsActorCreationTask()) { + allocated_resources = worker->GetLifetimeAllocatedInstances(); + } else { + allocated_resources = worker->GetAllocatedInstances(); + } + auto predefined_resources = allocated_resources->predefined_resources; + ::ray::rpc::ResourceMapEntry *resource; + for (size_t res_idx = 0; res_idx < predefined_resources.size(); res_idx++) { + bool first = true; // Set resource name only if at least one of its + // instances has available capacity. + for (size_t inst_idx = 0; inst_idx < predefined_resources[res_idx].size(); + inst_idx++) { + if (predefined_resources[res_idx][inst_idx] > 0.) { + if (first) { + resource = reply->add_resource_mapping(); + resource->set_name( + cluster_resource_scheduler_->GetResourceNameFromIndex(res_idx)); + first = false; + } + auto rid = resource->add_resource_ids(); + rid->set_index(inst_idx); + rid->set_quantity(predefined_resources[res_idx][inst_idx].Double()); + } + } + } + auto custom_resources = allocated_resources->custom_resources; + for (auto it = custom_resources.begin(); it != custom_resources.end(); ++it) { + bool first = true; // Set resource name only if at least one of its + // instances has available capacity. + for (size_t inst_idx = 0; inst_idx < it->second.size(); inst_idx++) { + if (it->second[inst_idx] > 0.) { + if (first) { + resource = reply->add_resource_mapping(); + resource->set_name( + cluster_resource_scheduler_->GetResourceNameFromIndex(it->first)); + first = false; + } + auto rid = resource->add_resource_ids(); + rid->set_index(inst_idx); + rid->set_quantity(it->second[inst_idx].Double()); + } + } + } + send_reply_callback(Status::OK(), nullptr, nullptr); +} + +void ClusterTaskManager::Spillback(ClientID spillback_to, std::string address, int port, + rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) { + reply->mutable_retry_at_raylet_address()->set_ip_address(address); + reply->mutable_retry_at_raylet_address()->set_port(port); + reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + +} // namespace raylet +} // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h new file mode 100644 index 0000000000000..4fe289d334f64 --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -0,0 +1,109 @@ +#pragma once + +#include "ray/common/task/task.h" +#include "ray/common/task/task_common.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler.h" +#include "ray/raylet/worker.h" +#include "ray/raylet/worker_pool.h" +#include "ray/rpc/grpc_client.h" +#include "ray/rpc/node_manager/node_manager_client.h" +#include "ray/rpc/node_manager/node_manager_server.h" + +namespace ray { +namespace raylet { + +typedef std::tuple Work; + +typedef std::function(const ClientID &node_id)> + NodeInfoGetter; + +/// Manages the queuing and dispatching of tasks. The logic is as follows: +/// 1. Queue tasks for scheduling. +/// 2. Pick a node on the cluster which has the available resources to run a +/// task. +/// * Step 2 should occur anytime any time the state of the cluster is +/// changed, or a new task is queued. +/// 3. If a task has unresolved dependencies, set it aside to wait for +/// dependencies to be resolved. +/// 4. When a task is ready to be dispatched, ensure that the local node is +/// still capable of running the task. +/// * Step 4 should be run any time there is a new task to dispatch *or* +/// there is a new worker which can dispatch the tasks. +class ClusterTaskManager { + public: + /// fullfills_dependencies_func Should return if all dependencies are + /// fulfilled and unsubscribe from dependencies only if they're fulfilled. If + /// a task has dependencies which are not fulfilled, wait for the + /// dependencies to be fulfilled, then run on the local node. + /// + /// \param self_node_id: ID of local node. + /// \param cluster_resource_scheduler: The resource scheduler which contains + /// the state of the cluster. + /// \param fulfills_dependencies_func: Returns true if all of a task's + /// dependencies are fulfilled. + /// \param gcs_client: A gcs client. + ClusterTaskManager(const ClientID &self_node_id, + std::shared_ptr cluster_resource_scheduler, + std::function fulfills_dependencies_func, + NodeInfoGetter get_node_info); + + /// (Step 2) For each task in tasks_to_schedule_, pick a node in the system + /// (local or remote) that has enough resources available to run the task, if + /// any such node exist. Skip tasks which are not schedulable. + /// + /// \return True if any tasks are ready for dispatch. + bool SchedulePendingTasks(); + + /// (Step 3) Attempts to dispatch all tasks which are ready to run. A task + /// will be dispatched if it is on `tasks_to_dispatch_` and there are still + /// avaialable resources on the node. + /// \param worker_pool: The pool of workers which will be dispatched to. + /// `worker_pool` state will be modified (idle workers will be popped) during + /// dispatching. + void DispatchScheduledTasksToWorkers( + WorkerPool &worker_pool, + std::unordered_map> &leased_workers); + + /// (Step 1) Queue tasks for scheduling. + /// \param fn: The function used during dispatching. + /// \param task: The incoming task to schedule. + void QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback); + + /// Move tasks from waiting to ready for dispatch. Called when a task's + /// dependencies are resolved. + /// + /// \param readyIds: The tasks which are now ready to be dispatched. + void TasksUnblocked(const std::vector ready_ids); + + private: + const ClientID &self_node_id_; + std::shared_ptr cluster_resource_scheduler_; + std::function fulfills_dependencies_func_; + NodeInfoGetter get_node_info_; + + /// Queue of lease requests that are waiting for resources to become available. + /// TODO this should be a queue for each SchedulingClass + std::deque tasks_to_schedule_; + /// Queue of lease requests that should be scheduled onto workers. + std::deque tasks_to_dispatch_; + /// Tasks waiting for arguments to be transferred locally. + absl::flat_hash_map waiting_tasks_; + + /// Determine whether a task should be immediately dispatched, + /// or placed on a wait queue. + /// + /// \return True if the work can be immediately dispatched. + bool WaitForTaskArgsRequests(Work work); + + void Dispatch(std::shared_ptr worker, + std::unordered_map> &leased_workers_, + const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback); + + void Spillback(ClientID spillback_to, std::string address, int port, + rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback); +}; +} // namespace raylet +} // namespace ray diff --git a/src/ray/common/scheduling/fixed_point.cc b/src/ray/raylet/scheduling/fixed_point.cc similarity index 100% rename from src/ray/common/scheduling/fixed_point.cc rename to src/ray/raylet/scheduling/fixed_point.cc diff --git a/src/ray/common/scheduling/fixed_point.h b/src/ray/raylet/scheduling/fixed_point.h similarity index 100% rename from src/ray/common/scheduling/fixed_point.h rename to src/ray/raylet/scheduling/fixed_point.h diff --git a/src/ray/common/scheduling/scheduling_ids.cc b/src/ray/raylet/scheduling/scheduling_ids.cc similarity index 100% rename from src/ray/common/scheduling/scheduling_ids.cc rename to src/ray/raylet/scheduling/scheduling_ids.cc diff --git a/src/ray/common/scheduling/scheduling_ids.h b/src/ray/raylet/scheduling/scheduling_ids.h similarity index 100% rename from src/ray/common/scheduling/scheduling_ids.h rename to src/ray/raylet/scheduling/scheduling_ids.h index 1f19f56bf62fd..81cd71ba9eb78 100644 --- a/src/ray/common/scheduling/scheduling_ids.h +++ b/src/ray/raylet/scheduling/scheduling_ids.h @@ -14,11 +14,11 @@ #pragma once +#include + #include "absl/container/flat_hash_map.h" #include "ray/util/logging.h" -#include - /// Limit the ID range to test for collisions. #define MAX_ID_TEST 8 diff --git a/src/ray/common/scheduling/scheduling_test.cc b/src/ray/raylet/scheduling/scheduling_test.cc similarity index 99% rename from src/ray/common/scheduling/scheduling_test.cc rename to src/ray/raylet/scheduling/scheduling_test.cc index 89e3152b049e4..c8c964403b76b 100644 --- a/src/ray/common/scheduling/scheduling_test.cc +++ b/src/ray/raylet/scheduling/scheduling_test.cc @@ -16,8 +16,8 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "ray/common/scheduling/cluster_resource_scheduler.h" -#include "ray/common/scheduling/scheduling_ids.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler.h" +#include "ray/raylet/scheduling/scheduling_ids.h" #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION #include @@ -575,7 +575,7 @@ TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) { cluster_resources.GetLocalResources().GetAvailableResourceInstances(); TaskResourceInstances expected_cluster_resources; - addTaskResourceInstances(true, {3.}, 0, &expected_cluster_resources); + addTaskResourceInstances(true, {1., 1., 1.}, 0, &expected_cluster_resources); addTaskResourceInstances(true, {4.}, 1, &expected_cluster_resources); addTaskResourceInstances(true, {1., 1., 1., 1., 1.}, 2, &expected_cluster_resources); @@ -704,7 +704,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { ASSERT_EQ(success, true); TaskResourceInstances expected_task_allocation; - addTaskResourceInstances(true, {0.}, CPU, &expected_task_allocation); + addTaskResourceInstances(true, {0., 0., 0.}, CPU, &expected_task_allocation); addTaskResourceInstances(true, {2.}, MEM, &expected_task_allocation); addTaskResourceInstances(true, {0., 0.5, 1., 1., 1.}, GPU, &expected_task_allocation); @@ -797,7 +797,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { ASSERT_EQ(success, true); TaskResourceInstances expected_task_allocation; - addTaskResourceInstances(true, {0.}, CPU, &expected_task_allocation); + addTaskResourceInstances(true, {0., 0., 0.}, CPU, &expected_task_allocation); addTaskResourceInstances(true, {2.}, MEM, &expected_task_allocation); addTaskResourceInstances(true, {0., 0.5, 1., 1., 1.}, GPU, &expected_task_allocation); addTaskResourceInstances(false, {1.}, 1, &expected_task_allocation); diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 94a38c4b14495..e03c6abc1679d 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -18,11 +18,11 @@ #include "ray/common/client_connection.h" #include "ray/common/id.h" -#include "ray/common/scheduling/cluster_resource_scheduler.h" -#include "ray/common/scheduling/scheduling_ids.h" #include "ray/common/task/scheduling_resources.h" #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler.h" +#include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/rpc/worker/core_worker_client.h" #include "ray/util/process.h"