Skip to content

Commit

Permalink
Pipelining task submission to workers (ray-project#9363)
Browse files Browse the repository at this point in the history
* first step of pipelining

* pipelining tests & default configs
- added pipelining unit tests in direct_task_transport_test.cc
- added an entry in ray_config_def.h, ray_config.pxi, and ray_config.pxd to configure the parameter controlling the maximum number of tasks that can be in fligh to each worker
- consolidated worker_to_lease_client_ and worker_to_lease_client_ hash maps in direct_task_transport.h into a single one called worker_to_lease_entry_

* post-review revisions

* linting, following naming/style convention

* linting
  • Loading branch information
Gabriele Oliaro committed Jul 17, 2020
1 parent b351d13 commit 026c009
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 19 deletions.
2 changes: 2 additions & 0 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_bool gcs_actor_service_enabled() const

c_bool put_small_object_in_memory_store() const

uint32_t max_tasks_in_flight_per_worker() const
4 changes: 4 additions & 0 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,7 @@ cdef class Config:
@staticmethod
def put_small_object_in_memory_store():
return RayConfig.instance().put_small_object_in_memory_store()

@staticmethod
def max_tasks_in_flight_per_worker():
return RayConfig.instance().max_tasks_in_flight_per_worker()
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,8 @@ RAY_CONFIG(int64_t, enable_metrics_collection, true)

/// Whether start the Plasma Store as a Raylet thread.
RAY_CONFIG(bool, put_small_object_in_memory_store, false)

/// Maximum number of tasks that can be in flight between an owner and a worker for which
/// the owner has been granted a lease. A value >1 is used when we want to enable
/// pipelining task submission.
RAY_CONFIG(uint32_t, max_tasks_in_flight_per_worker, 1)
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
rpc_address_, local_raylet_client_, client_factory, raylet_client_factory,
memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds(),
RayConfig::instance().max_tasks_in_flight_per_worker(),
std::move(actor_create_callback), boost::asio::steady_timer(io_service_)));
future_resolver_.reset(new FutureResolver(memory_store_, client_factory, rpc_address_));
// Unfortunately the raylet client has to be constructed after the receivers.
Expand Down
141 changes: 141 additions & 0 deletions src/ray/core_worker/test/direct_task_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,147 @@ TEST(DirectTaskTransportTest, TestKillResolvingTask) {
ASSERT_EQ(task_finisher->num_tasks_failed, 1);
}

TEST(DirectTaskTransportTest, TestPipeliningConcurrentWorkerLeases) {
rpc::Address address;
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::Address &addr) { return worker_client; };
auto task_finisher = std::make_shared<MockTaskFinisher>();

// Set max_tasks_in_flight_per_worker to a value larger than 1 to enable the pipelining
// of task submissions. This is done by passing a max_tasks_in_flight_per_worker
// parameter to the CoreWorkerDirectTaskSubmitter.
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, factory, nullptr, store,
task_finisher, ClientID::Nil(), kLongTimeout,
max_tasks_in_flight_per_worker);

// Prepare 20 tasks and save them in a vector.
std::unordered_map<std::string, double> empty_resources;
ray::FunctionDescriptor empty_descriptor =
ray::FunctionDescriptorBuilder::BuildPython("", "", "", "");
std::vector<TaskSpecification> tasks;
for (int i = 1; i <= 20; i++) {
tasks.push_back(BuildTaskSpec(empty_resources, empty_descriptor));
}
ASSERT_EQ(tasks.size(), 20);

// Submit the 20 tasks and check that one worker is requested.
for (auto task : tasks) {
ASSERT_TRUE(submitter.SubmitTask(task).ok());
}
ASSERT_EQ(raylet_client->num_workers_requested, 1);

// First 10 tasks are pushed; worker 2 is requested.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1000, ClientID::Nil()));
ASSERT_EQ(worker_client->callbacks.size(), 10);
ASSERT_EQ(raylet_client->num_workers_requested, 2);

// Last 10 tasks are pushed; no more workers are requested.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil()));
ASSERT_EQ(worker_client->callbacks.size(), 20);
ASSERT_EQ(raylet_client->num_workers_requested, 2);

for (int i = 1; i <= 20; i++) {
ASSERT_FALSE(worker_client->callbacks.empty());
ASSERT_TRUE(worker_client->ReplyPushTask());
// No worker should be returned until all the tasks that were submitted to it have
// been completed. In our case, the first worker should only be returned after the
// 10th task has been executed. The second worker should only be returned at the end,
// or after the 20th task has been executed.
if (i < 10) {
ASSERT_EQ(raylet_client->num_workers_returned, 0);
} else if (i >= 10 && i < 20) {
ASSERT_EQ(raylet_client->num_workers_returned, 1);
} else if (i == 20) {
ASSERT_EQ(raylet_client->num_workers_returned, 2);
}
}

ASSERT_EQ(raylet_client->num_workers_requested, 2);
ASSERT_EQ(raylet_client->num_workers_returned, 2);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
ASSERT_EQ(task_finisher->num_tasks_complete, 20);
ASSERT_EQ(task_finisher->num_tasks_failed, 0);
ASSERT_EQ(raylet_client->num_leases_canceled, 0);

ASSERT_FALSE(raylet_client->ReplyCancelWorkerLease());
}

TEST(DirectTaskTransportTest, TestPipeliningReuseWorkerLease) {
rpc::Address address;
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::Address &addr) { return worker_client; };
auto task_finisher = std::make_shared<MockTaskFinisher>();

// Set max_tasks_in_flight_per_worker to a value larger than 1 to enable the pipelining
// of task submissions. This is done by passing a max_tasks_in_flight_per_worker
// parameter to the CoreWorkerDirectTaskSubmitter.
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, factory, nullptr, store,
task_finisher, ClientID::Nil(), kLongTimeout,
max_tasks_in_flight_per_worker);

// prepare 30 tasks and save them in a vector
std::unordered_map<std::string, double> empty_resources;
ray::FunctionDescriptor empty_descriptor =
ray::FunctionDescriptorBuilder::BuildPython("", "", "", "");
std::vector<TaskSpecification> tasks;
for (int i = 0; i < 30; i++) {
tasks.push_back(BuildTaskSpec(empty_resources, empty_descriptor));
}
ASSERT_EQ(tasks.size(), 30);

// Submit the 30 tasks and check that one worker is requested
for (auto task : tasks) {
ASSERT_TRUE(submitter.SubmitTask(task).ok());
}
ASSERT_EQ(raylet_client->num_workers_requested, 1);

// Task 1-10 are pushed, and a new worker is requested.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1000, ClientID::Nil()));
ASSERT_EQ(worker_client->callbacks.size(), 10);
ASSERT_EQ(raylet_client->num_workers_requested, 2);
// The lease is not cancelled, as there is more work to do
ASSERT_EQ(raylet_client->num_leases_canceled, 0);

// Task 1-10 finish, Tasks 11-20 are scheduled on the same worker.
for (int i = 1; i <= 10; i++) {
ASSERT_TRUE(worker_client->ReplyPushTask());
}
ASSERT_EQ(worker_client->callbacks.size(), 10);
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(raylet_client->num_leases_canceled, 0);

// Task 11-20 finish, Tasks 21-30 are scheduled on the same worker.
for (int i = 11; i <= 20; i++) {
ASSERT_TRUE(worker_client->ReplyPushTask());
}
ASSERT_EQ(worker_client->callbacks.size(), 10);
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(raylet_client->num_leases_canceled, 1);
ASSERT_TRUE(raylet_client->ReplyCancelWorkerLease());

// Tasks 21-30 finish, and the worker is finally returned.
for (int i = 21; i <= 30; i++) {
ASSERT_TRUE(worker_client->ReplyPushTask());
}
ASSERT_EQ(raylet_client->num_workers_returned, 1);

// The second lease request is returned immediately.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil()));
ASSERT_EQ(worker_client->callbacks.size(), 0);
ASSERT_EQ(raylet_client->num_workers_returned, 2);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
ASSERT_EQ(task_finisher->num_tasks_complete, 30);
ASSERT_EQ(task_finisher->num_tasks_failed, 0);
ASSERT_EQ(raylet_client->num_leases_canceled, 1);
ASSERT_FALSE(raylet_client->ReplyCancelWorkerLease());
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down
49 changes: 35 additions & 14 deletions src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,32 +86,48 @@ void CoreWorkerDirectTaskSubmitter::AddWorkerLeaseClient(
RAY_LOG(INFO) << "Connected to " << addr.ip_address << ":" << addr.port;
}
int64_t expiration = current_time_ms() + lease_timeout_ms_;
worker_to_lease_client_.emplace(addr,
std::make_pair(std::move(lease_client), expiration));
LeaseEntry new_lease_entry = LeaseEntry(std::move(lease_client), expiration, 0);
worker_to_lease_entry_.emplace(addr, new_lease_entry);
}

void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(
const rpc::WorkerAddress &addr, const SchedulingKey &scheduling_key, bool was_error,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
auto lease_entry = worker_to_lease_client_[addr];
RAY_CHECK(lease_entry.first);
auto &lease_entry = worker_to_lease_entry_[addr];
if (!lease_entry.lease_client_) {
return;
}
RAY_CHECK(lease_entry.lease_client_);

auto queue_entry = task_queues_.find(scheduling_key);
// Return the worker if there was an error executing the previous task,
// the previous task is an actor creation task,
// there are no more applicable queued tasks, or the lease is expired.
if (was_error || queue_entry == task_queues_.end() ||
current_time_ms() > lease_entry.second) {
auto status = lease_entry.first->ReturnWorker(addr.port, addr.worker_id, was_error);
if (!status.ok()) {
RAY_LOG(ERROR) << "Error returning worker to raylet: " << status.ToString();
current_time_ms() > lease_entry.lease_expiration_time_) {
// Return the worker only if there are no tasks in flight
if (lease_entry.tasks_in_flight_ == 0) {
auto status =
lease_entry.lease_client_->ReturnWorker(addr.port, addr.worker_id, was_error);
if (!status.ok()) {
RAY_LOG(ERROR) << "Error returning worker to raylet: " << status.ToString();
}
worker_to_lease_entry_.erase(addr);
}
worker_to_lease_client_.erase(addr);

} else {
auto &client = *client_cache_[addr];
auto task_spec = queue_entry->second.front();
PushNormalTask(addr, client, scheduling_key, task_spec, assigned_resources);
executing_tasks_.emplace(task_spec.TaskId(), addr);
queue_entry->second.pop_front();

while (!queue_entry->second.empty() &&
lease_entry.tasks_in_flight_ < max_tasks_in_flight_per_worker_) {
auto task_spec = queue_entry->second.front();
lease_entry
.tasks_in_flight_++; // Increment the number of tasks in flight to the worker
executing_tasks_.emplace(task_spec.TaskId(), addr);
PushNormalTask(addr, client, scheduling_key, task_spec, assigned_resources);
queue_entry->second.pop_front();
}

// Delete the queue if it's now empty. Note that the queue cannot already be empty
// because this is the only place tasks are removed from it.
if (queue_entry->second.empty()) {
Expand Down Expand Up @@ -269,12 +285,17 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
{
absl::MutexLock lock(&mu_);
executing_tasks_.erase(task_id);

// Decrement the number of tasks in flight to the worker
auto &lease_entry = worker_to_lease_entry_[addr];
RAY_CHECK(lease_entry.tasks_in_flight_ > 0);
lease_entry.tasks_in_flight_--;
}
if (reply.worker_exiting()) {
// The worker is draining and will shutdown after it is done. Don't return
// it to the Raylet since that will kill it early.
absl::MutexLock lock(&mu_);
worker_to_lease_client_.erase(addr);
worker_to_lease_entry_.erase(addr);
} else if (!status.ok() || !is_actor_creation) {
// Successful actor creation leases the worker indefinitely from the raylet.
absl::MutexLock lock(&mu_);
Expand Down
31 changes: 26 additions & 5 deletions src/ray/core_worker/transport/direct_task_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class CoreWorkerDirectTaskSubmitter {
std::shared_ptr<CoreWorkerMemoryStore> store,
std::shared_ptr<TaskFinisherInterface> task_finisher, ClientID local_raylet_id,
int64_t lease_timeout_ms,
uint32_t max_tasks_in_flight_per_worker =
RayConfig::instance().max_tasks_in_flight_per_worker(),
std::function<Status(const TaskSpecification &, const gcs::StatusCallback &)>
actor_create_callback = nullptr,
absl::optional<boost::asio::steady_timer> cancel_timer = absl::nullopt)
Expand All @@ -64,6 +66,7 @@ class CoreWorkerDirectTaskSubmitter {
resolver_(store, task_finisher),
task_finisher_(task_finisher),
lease_timeout_ms_(lease_timeout_ms),
max_tasks_in_flight_per_worker_(max_tasks_in_flight_per_worker),
local_raylet_id_(local_raylet_id),
actor_create_callback_(std::move(actor_create_callback)),
cancel_retry_timer_(std::move(cancel_timer)) {}
Expand Down Expand Up @@ -174,11 +177,29 @@ class CoreWorkerDirectTaskSubmitter {
absl::flat_hash_map<rpc::WorkerAddress, std::shared_ptr<rpc::CoreWorkerClientInterface>>
client_cache_ GUARDED_BY(mu_);

/// Map from worker address to the lease client through which it should be
/// returned and its lease expiration time.
absl::flat_hash_map<rpc::WorkerAddress,
std::pair<std::shared_ptr<WorkerLeaseInterface>, int64_t>>
worker_to_lease_client_ GUARDED_BY(mu_);
// max_tasks_in_flight_per_worker_ limits the number of tasks that can be pipelined to a
// worker using a single lease.
const uint32_t max_tasks_in_flight_per_worker_;

/// A LeaseEntry struct is used to condense the metadata about a single executor:
/// (1) The lease client through which the worker should be returned
/// (2) The expiration time of a worker's lease.
/// (3) The number of tasks that are currently in flight to the worker
struct LeaseEntry {
std::shared_ptr<WorkerLeaseInterface> lease_client_;
int64_t lease_expiration_time_;
uint32_t tasks_in_flight_;

LeaseEntry(std::shared_ptr<WorkerLeaseInterface> lease_client = nullptr,
int64_t lease_expiration_time = 0, uint32_t tasks_in_flight = 0)
: lease_client_(lease_client),
lease_expiration_time_(lease_expiration_time),
tasks_in_flight_(tasks_in_flight) {}
};

// Map from worker address to a LeaseEntry struct containing the lease's metadata.
absl::flat_hash_map<rpc::WorkerAddress, LeaseEntry> worker_to_lease_entry_
GUARDED_BY(mu_);

// Keeps track of pending worker lease requests to the raylet.
absl::flat_hash_map<SchedulingKey,
Expand Down

0 comments on commit 026c009

Please sign in to comment.