Skip to content

Commit

Permalink
[New scheduler] Queueing refactor (ray-project#9491)
Browse files Browse the repository at this point in the history
* .

* test_args passes

* .

* test_basic.py::test_many_fractional_resources causes ray to hang

* test_basic.py::test_many_fractional_resources causes ray to hang

* .

* .

* useful

* test_many_fractional_resources fails instead of hanging now :)

* Passes test_fractional_resources

* .

* .

* Some cleanup

* git is hard

* cleanup

* .

* .

* .

* .

* .

* .

* .

* cleanup

* address reviews

* address reviews

* more refactor

* :)

* travis pls

* .

* travis pls

* .
  • Loading branch information
wuisawesome committed Jul 17, 2020
1 parent 026c009 commit a78c5d5
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 228 deletions.
16 changes: 9 additions & 7 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -620,17 +620,19 @@ cc_library(
name = "raylet_lib",
srcs = glob(
[
"src/ray/raylet/*.cc",
"src/ray/raylet/**/*.cc",
],
exclude = [
"src/ray/raylet/*_test.cc",
"src/ray/raylet/main.cc",
],
),
hdrs = glob([
"src/ray/raylet/*.h",
"src/ray/core_worker/common.h",
]),
hdrs = glob(
[
"src/ray/raylet/**/*.h",
"src/ray/core_worker/common.h",
],
),
copts = COPTS,
linkopts = select({
"@bazel_tools//src/conditions:windows": [
Expand Down Expand Up @@ -853,10 +855,10 @@ cc_test(

cc_test(
name = "scheduling_test",
srcs = ["src/ray/common/scheduling/scheduling_test.cc"],
srcs = ["src/ray/raylet/scheduling/scheduling_test.cc"],
copts = COPTS,
deps = [
":ray_common",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
219 changes: 29 additions & 190 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,23 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler(
self_node_id_.Binary(),
local_resources.GetTotalResources().GetResourceMap()));
std::function<bool(const Task &)> fulfills_dependencies_func =
[this](const Task &task) {
bool args_ready = task_dependency_manager_.SubscribeGetDependencies(
task.GetTaskSpecification().TaskId(), task.GetDependencies());
if (args_ready) {
task_dependency_manager_.UnsubscribeGetDependencies(
task.GetTaskSpecification().TaskId());
}
return args_ready;
};

auto get_node_info_func = [this](const ClientID &node_id) {
return gcs_client_->Nodes().Get(node_id);
};
cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(
new ClusterTaskManager(self_node_id_, new_resource_scheduler_,
fulfills_dependencies_func, get_node_info_func));
}

RAY_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
Expand Down Expand Up @@ -858,7 +875,7 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
new_resource_scheduler_->AddOrUpdateNode(
client_id.Binary(), remote_resources.GetTotalResources().GetResourceMap(),
remote_resources.GetAvailableResources().GetResourceMap());
NewSchedulerSchedulePendingTasks();
ScheduleAndDispatch();
return;
}

Expand Down Expand Up @@ -1339,7 +1356,7 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<Worker> &worker) {

// Local resource availability changed: invoke scheduling policy for local node.
if (new_scheduler_enabled_) {
NewSchedulerSchedulePendingTasks();
ScheduleAndDispatch();
} else {
cluster_resource_map_[self_node_id_].SetLoadResources(
local_queues_.GetResourceLoad());
Expand Down Expand Up @@ -1463,7 +1480,7 @@ void NodeManager::ProcessDisconnectClientMessage(

// Since some resources may have been released, we can try to dispatch more tasks. YYY
if (new_scheduler_enabled_) {
NewSchedulerSchedulePendingTasks();
ScheduleAndDispatch();
} else {
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
Expand Down Expand Up @@ -1676,117 +1693,10 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) {
SubmitTask(Task(task_message), Lineage());
}

void NodeManager::DispatchScheduledTasksToWorkers() {
RAY_CHECK(new_scheduler_enabled_);

// Check every task in task_to_dispatch queue to see
// whether it can be dispatched and ran. This avoids head-of-line
// blocking where a task which cannot be dispatched because
// there are not enough available resources blocks other
// tasks from being dispatched.
for (size_t queue_size = tasks_to_dispatch_.size(); queue_size > 0; queue_size--) {
auto task = tasks_to_dispatch_.front();
auto reply = task.first;
auto spec = task.second.GetTaskSpecification();
tasks_to_dispatch_.pop_front();

std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec);
if (!worker) {
// No worker available to schedule this task.
// Put the task back in the dispatch queue.
tasks_to_dispatch_.push_front(task);
return;
}

std::shared_ptr<TaskResourceInstances> allocated_instances(
new TaskResourceInstances());
bool schedulable = new_resource_scheduler_->AllocateLocalTaskResources(
spec.GetRequiredResources().GetResourceMap(), allocated_instances);
if (!schedulable) {
// Not enough resources to schedule this task.
// Put it back at the end of the dispatch queue.
tasks_to_dispatch_.push_back(task);
worker_pool_.PushWorker(worker);
// Try next task in the dispatch queue.
continue;
}

worker->SetOwnerAddress(spec.CallerAddress());
if (spec.IsActorCreationTask()) {
// The actor belongs to this worker now.
worker->SetLifetimeAllocatedInstances(allocated_instances);
} else {
worker->SetAllocatedInstances(allocated_instances);
}
worker->AssignTaskId(spec.TaskId());
worker->AssignJobId(spec.JobId());
worker->SetAssignedTask(task.second);

reply(worker, ClientID::Nil(), "", -1);
}
}

void NodeManager::NewSchedulerSchedulePendingTasks() {
RAY_CHECK(new_scheduler_enabled_);
size_t queue_size = tasks_to_schedule_.size();

// Check every task in task_to_schedule queue to see
// whether it can be scheduled. This avoids head-of-line
// blocking where a task which cannot be scheduled because
// there are not enough available resources blocks other
// tasks from being scheduled.
while (queue_size-- > 0) {
auto work = tasks_to_schedule_.front();
tasks_to_schedule_.pop_front();
auto task = work.second;
auto request_resources =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
int64_t violations = 0;
std::string node_id_string =
new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations);
if (node_id_string.empty()) {
/// There is no node that has available resources to run the request.
tasks_to_schedule_.push_back(work);
continue;
} else {
if (node_id_string == self_node_id_.Binary()) {
WaitForTaskArgsRequests(work);
} else {
// Should spill over to a different node.
new_resource_scheduler_->AllocateRemoteTaskResources(node_id_string,
request_resources);

ClientID node_id = ClientID::FromBinary(node_id_string);
auto node_info_opt = gcs_client_->Nodes().Get(node_id);
RAY_CHECK(node_info_opt)
<< "Spilling back to a node manager, but no GCS info found for node "
<< node_id;
work.first(nullptr, node_id, node_info_opt->node_manager_address(),
node_info_opt->node_manager_port());
}
}
}
DispatchScheduledTasksToWorkers();
}

void NodeManager::WaitForTaskArgsRequests(std::pair<ScheduleFn, Task> &work) {
void NodeManager::ScheduleAndDispatch() {
RAY_CHECK(new_scheduler_enabled_);
const Task &task = work.second;
const auto &object_refs = task.GetDependencies();

if (object_refs.size() > 0) {
bool args_ready = task_dependency_manager_.SubscribeGetDependencies(
task.GetTaskSpecification().TaskId(), object_refs);
if (args_ready) {
task_dependency_manager_.UnsubscribeGetDependencies(
task.GetTaskSpecification().TaskId());
tasks_to_dispatch_.push_back(work);
} else {
waiting_tasks_[task.GetTaskSpecification().TaskId()] = work;
}
} else {
tasks_to_dispatch_.push_back(work);
}
cluster_task_manager_->SchedulePendingTasks();
cluster_task_manager_->DispatchScheduledTasksToWorkers(worker_pool_, leased_workers_);
}

void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request,
Expand All @@ -1811,72 +1721,8 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest

if (new_scheduler_enabled_) {
auto task_spec = task.GetTaskSpecification();
auto work = std::make_pair(
[this, task_spec, reply, send_reply_callback](std::shared_ptr<Worker> worker,
ClientID spillback_to,
std::string address, int port) {
if (worker != nullptr) {
reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
reply->mutable_worker_address()->set_port(worker->Port());
reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary());
reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary());
RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end());
leased_workers_[worker->WorkerId()] = worker;
std::shared_ptr<TaskResourceInstances> allocated_resources;
if (task_spec.IsActorCreationTask()) {
allocated_resources = worker->GetLifetimeAllocatedInstances();
} else {
allocated_resources = worker->GetAllocatedInstances();
}
auto predefined_resources = allocated_resources->predefined_resources;
::ray::rpc::ResourceMapEntry *resource;
for (size_t res_idx = 0; res_idx < predefined_resources.size(); res_idx++) {
bool first = true; // Set resource name only if at least one of its
// instances has available capacity.
for (size_t inst_idx = 0; inst_idx < predefined_resources[res_idx].size();
inst_idx++) {
if (predefined_resources[res_idx][inst_idx] > 0.) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
new_resource_scheduler_->GetResourceNameFromIndex(res_idx));
first = false;
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(predefined_resources[res_idx][inst_idx].Double());
}
}
}
auto custom_resources = allocated_resources->custom_resources;
for (auto it = custom_resources.begin(); it != custom_resources.end(); ++it) {
bool first = true; // Set resource name only if at least one of its
// instances has available capacity.
for (size_t inst_idx = 0; inst_idx < it->second.size(); inst_idx++) {
if (it->second[inst_idx] > 0.) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
new_resource_scheduler_->GetResourceNameFromIndex(it->first));
first = false;
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(it->second[inst_idx].Double());
}
}
}
} else {
reply->mutable_retry_at_raylet_address()->set_ip_address(address);
reply->mutable_retry_at_raylet_address()->set_port(port);
reply->mutable_retry_at_raylet_address()->set_raylet_id(
spillback_to.Binary());
}
send_reply_callback(Status::OK(), nullptr, nullptr);
},
task);
tasks_to_schedule_.push_back(work);
NewSchedulerSchedulePendingTasks();
cluster_task_manager_->QueueTask(task, reply, send_reply_callback);
ScheduleAndDispatch();
return;
}

Expand Down Expand Up @@ -2453,7 +2299,7 @@ void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr<Worker> &wor
worker->SetBorrowedCPUInstances(borrowed_cpu_instances);
worker->MarkBlocked();
}
NewSchedulerSchedulePendingTasks();
ScheduleAndDispatch();
return;
}

Expand Down Expand Up @@ -2481,7 +2327,7 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr<Worker> &w
new_resource_scheduler_->AddCPUResourceInstances(worker->GetBorrowedCPUInstances());
worker->MarkUnblocked();
}
NewSchedulerSchedulePendingTasks();
ScheduleAndDispatch();
return;
}

Expand Down Expand Up @@ -3162,15 +3008,8 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
<< " tasks ready";
// Transition the tasks whose dependencies are now fulfilled to the ready state.
if (new_scheduler_enabled_) {
for (auto task_id : ready_task_ids) {
auto it = waiting_tasks_.find(task_id);
if (it != waiting_tasks_.end()) {
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
tasks_to_dispatch_.push_back(it->second);
waiting_tasks_.erase(it);
}
}
NewSchedulerSchedulePendingTasks();
cluster_task_manager_->TasksUnblocked(ready_task_ids);
ScheduleAndDispatch();
} else {
if (ready_task_ids.size() > 0) {
std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(),
Expand Down
29 changes: 10 additions & 19 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
#include "ray/common/client_connection.h"
#include "ray/common/task/task_common.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/common/scheduling/cluster_resource_scheduler.h"
#include "ray/object_manager/object_manager.h"
#include "ray/raylet/actor_registration.h"
#include "ray/raylet/lineage_cache.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/raylet/scheduling_policy.h"
#include "ray/raylet/scheduling_queue.h"
#include "ray/raylet/reconstruction_policy.h"
Expand Down Expand Up @@ -664,7 +665,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// in the system (local or remote) that has enough resources available to
/// run the task, if any such node exist.
/// Repeat the process as long as we can schedule a task.
void NewSchedulerSchedulePendingTasks();
/// NEW SCHEDULER_FUNCTION
void ScheduleAndDispatch();

/// Whether a task is an actor creation task.
bool IsActorCreationTask(const TaskID &task_id);
Expand Down Expand Up @@ -772,20 +774,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// on all local workers of this raylet.
bool should_local_gc_ = false;

/// The new resource scheduler for direct task calls.
/// These two classes make up the new scheduler. ClusterResourceScheduler is
/// responsible for maintaining a view of the cluster state w.r.t resource
/// usage. ClusterTaskManager is responsible for queuing, spilling back, and
/// dispatching tasks.
std::shared_ptr<ClusterResourceScheduler> new_resource_scheduler_;

typedef std::function<void(std::shared_ptr<Worker>, ClientID spillback_to,
std::string address, int port)>
ScheduleFn;

/// Queue of lease requests that are waiting for resources to become available.
/// TODO this should be a queue for each SchedulingClass
std::deque<std::pair<ScheduleFn, Task>> tasks_to_schedule_;
/// Queue of lease requests that should be scheduled onto workers.
std::deque<std::pair<ScheduleFn, Task>> tasks_to_dispatch_;
/// Queue tasks waiting for arguments to be transferred locally.
absl::flat_hash_map<TaskID, std::pair<ScheduleFn, Task>> waiting_tasks_;
std::shared_ptr<ClusterTaskManager> cluster_task_manager_;

/// Cache of gRPC clients to workers (not necessarily running on this node).
/// Also includes the number of inflight requests to each worker - when this
Expand All @@ -796,9 +790,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {

absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;

/// Wait for a task's arguments to become ready.
void WaitForTaskArgsRequests(std::pair<ScheduleFn, Task> &work);

// TODO(swang): Evict entries from these caches.
/// Cache for the WorkerTable in the GCS.
absl::flat_hash_set<WorkerID> failed_workers_cache_;
Expand Down
Loading

0 comments on commit a78c5d5

Please sign in to comment.