Skip to content

Commit

Permalink
[core] Trigger global gc when plasma store is under pressure. (ray-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
fishbone committed May 27, 2021
1 parent 881e491 commit 5d0b302
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 17 deletions.
11 changes: 11 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,17 @@ cc_test(
],
)

cc_test(
name = "throttler_test",
srcs = ["src/ray/util/throttler_test.cc"],
copts = COPTS,
deps = [
":ray_util",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "sample_test",
srcs = ["src/ray/util/sample_test.cc"],
Expand Down
46 changes: 46 additions & 0 deletions python/ray/tests/test_basic_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,52 @@
logger = logging.getLogger(__name__)


def test_auto_global_gc(shutdown_only):
# 100MB
ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024)

@ray.remote
class Test:
def __init__(self):
self.collected = False
import gc
gc.disable()

def gc_called(phase, info):
self.collected = True

gc.callbacks.append(gc_called)

def circular_ref(self):
# 20MB
buf1 = b"0" * (10 * 1024 * 1024)
buf2 = b"1" * (10 * 1024 * 1024)
ref1 = ray.put(buf1)
ref2 = ray.put(buf2)
b = []
a = []
b.append(a)
a.append(b)
b.append(ref1)
a.append(ref2)
return a

def collected(self):
return self.collected

test = Test.remote()
# 60MB
for i in range(3):
ray.get(test.circular_ref.remote())
time.sleep(2)
assert not ray.get(test.collected.remote())
# 80MB
for _ in range(1):
ray.get(test.circular_ref.remote())
time.sleep(2)
assert ray.get(test.collected.remote())


def test_many_fractional_resources(shutdown_only):
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})

Expand Down
6 changes: 4 additions & 2 deletions python/ray/tests/test_global_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class LargeObjectWithCyclicRef:
def __init__(self):
self.loop = self
self.large_object = ray.put(
np.zeros(40 * 1024 * 1024, dtype=np.uint8))
np.zeros(20 * 1024 * 1024, dtype=np.uint8))

@ray.remote(num_cpus=1)
class GarbageHolder:
Expand All @@ -119,15 +119,17 @@ def has_garbage(self):
return self.garbage() is not None

def return_large_array(self):
return np.zeros(80 * 1024 * 1024, dtype=np.uint8)
return np.zeros(60 * 1024 * 1024, dtype=np.uint8)

try:
gc.disable()

# Local driver.
# 20MB
local_ref = weakref.ref(LargeObjectWithCyclicRef())

# Remote workers.
# 20MB * 2
actors = [GarbageHolder.remote() for _ in range(2)]
assert local_ref() is not None
assert all(ray.get([a.has_garbage.remote() for a in actors]))
Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,18 @@ RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)
/// The amount of time to wait between logging plasma space usage debug messages.
RAY_CONFIG(uint64_t, object_store_usage_log_interval_s, 10 * 60)

/// The threshold to trigger a global gc
RAY_CONFIG(double, high_plasma_storage_usage, 0.7)

/// The amount of time between automatic local Python GC triggers.
RAY_CONFIG(uint64_t, local_gc_interval_s, 10 * 60)

/// The min amount of time between local GCs (whether auto or mem pressure triggered).
RAY_CONFIG(uint64_t, local_gc_min_interval_s, 10)

/// The min amount of time between triggering global_gc in raylet
RAY_CONFIG(uint64_t, global_gc_min_interval_s, 30)

/// Duration to wait between retries for failed tasks.
RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000)

Expand Down
4 changes: 4 additions & 0 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ class ObjectManager : public ObjectManagerInterface,

int64_t GetMemoryCapacity() const { return config_.object_store_memory; }

double GetUsedMemoryPercentage() const {
return static_cast<double>(used_memory_) / config_.object_store_memory;
}

private:
friend class TestObjectManager;

Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ int main(int argc, char *argv[]) {
RayConfig::instance().object_manager_pull_timeout_ms();
object_manager_config.push_timeout_ms =
RayConfig::instance().object_manager_push_timeout_ms();
if (object_store_memory < 0) {
if (object_store_memory <= 0) {
RAY_LOG(FATAL) << "Object store memory should be set.";
}
object_manager_config.object_store_memory = object_store_memory;
Expand Down
23 changes: 16 additions & 7 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
std::make_shared<pubsub::Subscriber>(self_node_id_, config.node_manager_address,
config.node_manager_port,
worker_rpc_pool_)),
last_local_gc_ns_(absl::GetCurrentTimeNanos()),
high_plasma_storage_usage_(RayConfig::instance().high_plasma_storage_usage()),
local_gc_run_time_ns_(absl::GetCurrentTimeNanos()),
local_gc_throttler_(RayConfig::instance().local_gc_min_interval_s() * 1e9),
global_gc_throttler_(RayConfig::instance().global_gc_min_interval_s() * 1e9),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
local_gc_min_interval_ns_(RayConfig::instance().local_gc_min_interval_s() * 1e9),
record_metrics_period_ms_(config.record_metrics_period_ms),
runtime_env_manager_([this](const std::string &uri, std::function<void(bool)> cb) {
return DeleteLocalURI(uri, cb);
Expand Down Expand Up @@ -519,20 +521,27 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
cluster_resource_scheduler_->FillResourceUsage(resources_data);
cluster_task_manager_->FillResourceUsage(resources_data);

// If plasma store is under high pressure, we should try to schedule a global gc.
bool plasma_high_pressure =
object_manager_.GetUsedMemoryPercentage() > high_plasma_storage_usage_;
if (plasma_high_pressure && global_gc_throttler_.AbleToRun()) {
TriggerGlobalGC();
}

// Set the global gc bit on the outgoing heartbeat message.
if (should_global_gc_) {
resources_data.set_should_global_gc(true);
should_global_gc_ = false;
global_gc_throttler_.RunNow();
}

// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
auto now = absl::GetCurrentTimeNanos();
if ((should_local_gc_ || now - last_local_gc_ns_ > local_gc_interval_ns_) &&
now - last_local_gc_ns_ > local_gc_min_interval_ns_) {
if ((should_local_gc_ ||
(absl::GetCurrentTimeNanos() - local_gc_run_time_ns_ > local_gc_interval_ns_)) &&
local_gc_throttler_.AbleToRun()) {
DoLocalGC();
should_local_gc_ = false;
last_local_gc_ns_ = now;
}
}

Expand Down Expand Up @@ -579,6 +588,7 @@ void NodeManager::DoLocalGC() {
}
});
}
local_gc_run_time_ns_ = absl::GetCurrentTimeNanos();
}

void NodeManager::HandleRequestObjectSpillage(
Expand Down Expand Up @@ -1489,7 +1499,6 @@ void NodeManager::HandleUpdateResourceUsage(
void NodeManager::HandleRequestResourceReport(
const rpc::RequestResourceReportRequest &request,
rpc::RequestResourceReportReply *reply, rpc::SendReplyCallback send_reply_callback) {
// RAY_LOG(ERROR) << "Resource report requested";
auto resources_data = reply->mutable_resources();
FillResourceReport(*resources_data);

Expand Down
20 changes: 13 additions & 7 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "ray/raylet/worker_pool.h"
#include "ray/rpc/worker/core_worker_client_pool.h"
#include "ray/util/ordered_set.h"
#include "ray/util/throttler.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/bundle_spec.h"
#include "ray/raylet/placement_group_resource_manager.h"
Expand Down Expand Up @@ -699,15 +700,20 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// on all local workers of this raylet.
bool should_local_gc_ = false;

/// The last time local gc was run.
int64_t last_local_gc_ns_ = 0;
/// When plasma storage usage is high, we'll run gc to reduce it.
double high_plasma_storage_usage_ = 1.0;

/// The interval in nanoseconds between local GC automatic triggers.
const int64_t local_gc_interval_ns_;
/// the timestampe local gc run
uint64_t local_gc_run_time_ns_;

/// The min interval in nanoseconds between local GC runs (auto + memory pressure
/// triggered).
const int64_t local_gc_min_interval_ns_;
/// Throttler for local gc
Throttler local_gc_throttler_;

/// Throttler for global gc
Throttler global_gc_throttler_;

/// Seconds to initialize a local gc
const uint64_t local_gc_interval_ns_;

/// These two classes make up the new scheduler. ClusterResourceScheduler is
/// responsible for maintaining a view of the cluster state w.r.t resource
Expand Down
52 changes: 52 additions & 0 deletions src/ray/util/throttler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <functional>
#include <iostream>
#include "absl/time/clock.h"
namespace ray {

class Throttler {
public:
explicit Throttler(int64_t interval_ns, std::function<int64_t()> now = nullptr)
: last_run_ns_(0), interval_ns_(interval_ns), now_(now) {}

bool AbleToRun() {
auto now = Now();
if (now - last_run_ns_ >= interval_ns_) {
last_run_ns_ = now;
return true;
}
return false;
}

void RunNow() { last_run_ns_ = Now(); }

private:
int64_t Now() {
if (now_) {
return now_();
} else {
return absl::GetCurrentTimeNanos();
}
}

uint64_t last_run_ns_;
uint64_t interval_ns_;
std::function<int64_t()> now_;
};

} // namespace ray
35 changes: 35 additions & 0 deletions src/ray/util/throttler_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "gtest/gtest.h"

#include <cstdlib>
#include <thread>

#include "ray/util/throttler.h"

TEST(ThrottlerTest, BasicTest) {
int64_t now = 100;
ray::Throttler throttler(5, [&now] { return now; });
EXPECT_TRUE(throttler.AbleToRun());
now += 5;
EXPECT_TRUE(throttler.AbleToRun());
now += 1;
EXPECT_FALSE(throttler.AbleToRun());
now += 4;
throttler.RunNow();
EXPECT_FALSE(throttler.AbleToRun());
now += 5;
EXPECT_TRUE(throttler.AbleToRun());
}

0 comments on commit 5d0b302

Please sign in to comment.