From 4014207a59bb7255df177100251f4117f09df1e0 Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Thu, 6 Oct 2022 17:43:18 -0700 Subject: [PATCH] [core][metric] Adding object_store_memory related metrics [1/n] (#28941) --- python/ray/_private/test_utils.py | 15 ++ python/ray/tests/BUILD | 1 + python/ray/tests/test_metrics_agent.py | 3 + python/ray/tests/test_object_store_metrics.py | 175 +++++++++++++++++ python/ray/tests/test_task_metrics.py | 9 +- .../object_manager/plasma/stats_collector.cc | 18 +- .../object_manager/plasma/stats_collector.h | 2 + src/ray/object_manager/plasma/store.cc | 10 +- src/ray/raylet/local_object_manager.cc | 21 +- src/ray/raylet/local_object_manager.h | 11 +- .../raylet/test/local_object_manager_test.cc | 180 +++++++++++++++--- src/ray/stats/metric_defs.cc | 18 ++ src/ray/stats/metric_defs.h | 4 + src/ray/stats/tag_defs.cc | 2 + src/ray/stats/tag_defs.h | 6 + 15 files changed, 429 insertions(+), 46 deletions(-) create mode 100644 python/ray/tests/test_object_store_metrics.py diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 3668fa708f459..81e133ba2fea7 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -24,6 +24,7 @@ from ray._private import ( ray_constants, ) +from ray._private.worker import RayContext import yaml from grpc._channel import _InactiveRpcError @@ -836,6 +837,20 @@ def fetch_prometheus_metrics(prom_addresses: List[str]) -> Dict[str, List[Any]]: return samples_by_name +def raw_metrics(info: RayContext) -> Dict[str, List[Any]]: + """Return prometheus metrics from a RayContext + + Args: + info: Ray context returned from ray.init() + + Returns: + Dict from metric name to a list of samples for the metrics + """ + metrics_page = "localhost:{}".format(info.address_info["metrics_export_port"]) + print("Fetch metrics from", metrics_page) + return fetch_prometheus_metrics([metrics_page]) + + def load_test_config(config_file_name): """Loads a config yaml from tests/test_cli_patterns.""" here = os.path.realpath(__file__) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 300d19f0fb963..e316b59e3ea72 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -118,6 +118,7 @@ py_test_module_list( "test_memory_pressure.py", "test_metrics.py", "test_task_metrics.py", + "test_object_store_metrics.py", "test_multi_node.py", "test_multi_node_2.py", "test_multinode_failures.py", diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 1fafe2937b5f2..6812a4d111f0d 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -31,9 +31,12 @@ # NOTE: Commented out metrics are not available in this test. # TODO(Clark): Find ways to trigger commented out metrics in cluster setup. _METRICS = [ + # TODO(rickyx): refactoring the below 3 metric seem to be a bit involved + # , e.g. need to see how users currently depend on them. "ray_object_store_available_memory", "ray_object_store_used_memory", "ray_object_store_num_local_objects", + "ray_object_store_memory", "ray_object_manager_num_pull_requests", "ray_object_directory_subscriptions", "ray_object_directory_updates", diff --git a/python/ray/tests/test_object_store_metrics.py b/python/ray/tests/test_object_store_metrics.py new file mode 100644 index 0000000000000..a51b10ec8755a --- /dev/null +++ b/python/ray/tests/test_object_store_metrics.py @@ -0,0 +1,175 @@ +from collections import defaultdict +import pytest +from typing import Dict +import numpy as np + +import ray +from ray._private.test_utils import ( + raw_metrics, + wait_for_condition, +) +from ray._private.worker import RayContext + +KiB = 1 << 10 +MiB = 1 << 20 + +_SYSTEM_CONFIG = { + "automatic_object_spilling_enabled": True, + "max_io_workers": 100, + "min_spilling_size": 1, + "object_spilling_threshold": 0.99, # to prevent premature spilling + "metrics_report_interval_ms": 200, + "event_stats_print_interval_ms": 100, # so metrics get exported +} + + +def objects_by_loc(info: RayContext) -> Dict: + res = raw_metrics(info) + objects_info = defaultdict(int) + if "ray_object_store_memory" in res: + for sample in res["ray_object_store_memory"]: + objects_info[sample.labels["Location"]] += sample.value + + print(f"Objects by location: {objects_info}") + return objects_info + + +def approx_eq_dict_in(actual: Dict, expected: Dict, e: int) -> bool: + """Check if two dict are approximately similar (with error allowed)""" + assert set(actual.keys()) == set(expected.keys()), "Unequal key sets." + + for k, actual_v in actual.items(): + expect_v = expected[k] + assert ( + abs(expect_v - actual_v) <= e + ), f"expect={expect_v}, actual={actual_v}, diff allowed={e}" + + return True + + +def test_all_shared_memory(shutdown_only): + """Test objects allocated in shared memory""" + import numpy as np + + info = ray.init( + object_store_memory=100 * MiB, + _system_config=_SYSTEM_CONFIG, + ) + + # Allocate 80MiB data + objs_in_use = ray.get( + [ray.put(np.zeros(20 * MiB, dtype=np.uint8)) for _ in range(4)] + ) + + expected = { + "IN_MEMORY": 80 * MiB, + "SPILLED": 0, + "UNSEALED": 0, + } + + wait_for_condition( + # 1KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + timeout=20, + retry_interval_ms=500, + ) + + # Free all of them + del objs_in_use + + expected = { + "IN_MEMORY": 0, + "SPILLED": 0, + "UNSEALED": 0, + } + + wait_for_condition( + # 1KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + timeout=20, + retry_interval_ms=500, + ) + + +def test_spilling(object_spilling_config, shutdown_only): + """Test metrics with object spilling occurred""" + + object_spilling_config, _ = object_spilling_config + delta = 5 + info = ray.init( + num_cpus=1, + object_store_memory=100 * MiB + delta * MiB, + _system_config={ + **_SYSTEM_CONFIG, + **{"object_spilling_config": object_spilling_config}, + }, + ) + + # Create and use 100MiB data, which should fit in memory + objs1 = [ray.put(np.zeros(50 * MiB, dtype=np.uint8)) for _ in range(2)] + + expected = { + "IN_MEMORY": 100 * MiB, + "SPILLED": 0, + "UNSEALED": 0, + } + + wait_for_condition( + # 1KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + timeout=20, + retry_interval_ms=500, + ) + + # Create additional 100MiB, so that it needs to be triggered + objs2 = [ray.put(np.zeros(50 * MiB, dtype=np.uint8)) for _ in range(2)] + + expected = { + "IN_MEMORY": 100 * MiB, + "SPILLED": 100 * MiB, + "UNSEALED": 0, + } + wait_for_condition( + # 1KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + timeout=20, + retry_interval_ms=500, + ) + + # Delete spilled objects + del objs1 + expected = { + "IN_MEMORY": 100 * MiB, + "SPILLED": 0, + "UNSEALED": 0, + } + wait_for_condition( + # 1KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + timeout=20, + retry_interval_ms=500, + ) + + # Delete all + del objs2 + expected = { + "IN_MEMORY": 0, + "SPILLED": 0, + "UNSEALED": 0, + } + wait_for_condition( + # 1KiB for metadata difference + lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB), + timeout=20, + retry_interval_ms=500, + ) + + +if __name__ == "__main__": + import sys + import os + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_task_metrics.py b/python/ray/tests/test_task_metrics.py index 166f349ed070c..6169c9035151a 100644 --- a/python/ray/tests/test_task_metrics.py +++ b/python/ray/tests/test_task_metrics.py @@ -7,7 +7,7 @@ import ray from ray._private.test_utils import ( - fetch_prometheus_metrics, + raw_metrics, run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition, @@ -27,13 +27,6 @@ } -def raw_metrics(info): - metrics_page = "localhost:{}".format(info["metrics_export_port"]) - print("Fetch metrics from", metrics_page) - res = fetch_prometheus_metrics([metrics_page]) - return res - - def tasks_by_state(info) -> dict: return tasks_breakdown(info, lambda s: s.labels["State"]) diff --git a/src/ray/object_manager/plasma/stats_collector.cc b/src/ray/object_manager/plasma/stats_collector.cc index c722adf134328..5372815784ed4 100644 --- a/src/ray/object_manager/plasma/stats_collector.cc +++ b/src/ray/object_manager/plasma/stats_collector.cc @@ -26,6 +26,8 @@ void ObjectStatsCollector::OnObjectCreated(const LocalObject &obj) { const auto kSource = obj.GetSource(); num_bytes_created_total_ += kObjectSize; + // TODO(rickyx): + // Add fallback memory accounting here. if (kSource == plasma::flatbuf::ObjectSource::CreatedByWorker) { num_objects_created_by_worker_++; @@ -168,8 +170,22 @@ void ObjectStatsCollector::OnObjectRefDecreased(const LocalObject &obj) { } } +int64_t ObjectStatsCollector::GetNumBytesCreatedCurrent() const { + return num_bytes_created_by_worker_ + num_bytes_restored_ + num_bytes_received_ + + num_bytes_errored_; +} + void ObjectStatsCollector::RecordMetrics() const { - // TODO(sang): Add metrics. + ray::stats::STATS_object_store_memory.Record( + GetNumBytesCreatedCurrent() - num_bytes_unsealed_, + {{ray::stats::LocationKey.name(), ray::stats::kObjectLocInMemory}}); + + ray::stats::STATS_object_store_memory.Record( + num_bytes_unsealed_, + {{ray::stats::LocationKey.name(), ray::stats::kObjectLocUnsealed}}); + + // TODO(rickyx): + // Add fallback memory recording here. } void ObjectStatsCollector::GetDebugDump(std::stringstream &buffer) const { diff --git a/src/ray/object_manager/plasma/stats_collector.h b/src/ray/object_manager/plasma/stats_collector.h index c1980cbc355de..a28948fc35a9a 100644 --- a/src/ray/object_manager/plasma/stats_collector.h +++ b/src/ray/object_manager/plasma/stats_collector.h @@ -60,6 +60,8 @@ class ObjectStatsCollector { private: friend struct ObjectStatsCollectorTest; + int64_t GetNumBytesCreatedCurrent() const; + int64_t num_objects_spillable_ = 0; int64_t num_bytes_spillable_ = 0; int64_t num_objects_unsealed_ = 0; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 932da27b0029d..16271d7ac2ccb 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -113,9 +113,8 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service, this->AddToClientObjectIds(object_id, request->client); }, [this](const auto &request) { this->ReturnFromGet(request); }) { - const auto event_stats_print_interval_ms = - RayConfig::instance().event_stats_print_interval_ms(); - if (event_stats_print_interval_ms > 0 && RayConfig::instance().event_stats()) { + if (RayConfig::instance().event_stats_print_interval_ms() > 0 && + RayConfig::instance().event_stats()) { PrintAndRecordDebugDump(); } } @@ -553,10 +552,7 @@ void PlasmaStore::PrintAndRecordDebugDump() const { RayConfig::instance().event_stats_print_interval_ms()); } -void PlasmaStore::RecordMetrics() const { - // TODO(sang): Add metrics. - object_lifecycle_mgr_.RecordMetrics(); -} +void PlasmaStore::RecordMetrics() const { object_lifecycle_mgr_.RecordMetrics(); } std::string PlasmaStore::GetDebugDump() const { std::stringstream buffer; diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 3bb8c39d5cb8d..aded48bd5baa2 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -36,8 +36,8 @@ void LocalObjectManager::PinObjectsAndWaitForFree( continue; } - const auto inserted = - local_objects_.emplace(object_id, LocalObjectInfo(owner_address, generator_id)); + const auto inserted = local_objects_.emplace( + object_id, LocalObjectInfo(owner_address, generator_id, object->GetSize())); if (inserted.second) { // This is the first time we're pinning this object. RAY_LOG(DEBUG) << "Pinning object " << object_id; @@ -217,8 +217,6 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) { auto now = absl::GetCurrentTimeNanos(); RAY_LOG(DEBUG) << "Spilled " << bytes_to_spill << " bytes in " << (now - start_time) / 1e6 << "ms"; - spilled_bytes_total_ += bytes_to_spill; - spilled_objects_total_ += objects_to_spill.size(); // Adjust throughput timing to account for concurrent spill operations. spill_time_total_s_ += (now - std::max(start_time, last_spill_finish_ns_)) / 1e9; @@ -409,6 +407,11 @@ void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids num_bytes_pending_spill_ -= object_size; objects_pending_spill_.erase(it); + // Update the internal spill metrics + spilled_bytes_total_ += object_size; + spilled_bytes_current_ += object_size; + spilled_objects_total_++; + // Asynchronously Update the spilled URL. auto freed_it = local_objects_.find(object_id); if (freed_it == local_objects_.end() || freed_it->second.is_freed) { @@ -539,6 +542,11 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz object_urls_to_delete.emplace_back(object_url); } spilled_objects_url_.erase(spilled_objects_url_it); + + // Update current spilled objects metrics + RAY_CHECK(local_objects_.contains(object_id)) + << "local objects should contain the spilled object: " << object_id; + spilled_bytes_current_ -= local_objects_.at(object_id).object_size; } else { // If the object was not spilled, it gets pinned again. Unpin here to // prevent a memory leak. @@ -613,6 +621,10 @@ void LocalObjectManager::RecordMetrics() const { ray::stats::STATS_spill_manager_request_total.Record(spilled_objects_total_, "Spilled"); ray::stats::STATS_spill_manager_request_total.Record(restored_objects_total_, "Restored"); + + ray::stats::STATS_object_store_memory.Record( + spilled_bytes_current_, + {{ray::stats::LocationKey.name(), ray::stats::kObjectLocSpilled}}); } int64_t LocalObjectManager::GetPrimaryBytes() const { @@ -638,6 +650,7 @@ std::string LocalObjectManager::DebugString() const { result << "- num objects pending restore: " << objects_pending_restore_.size() << "\n"; result << "- num objects pending spill: " << objects_pending_spill_.size() << "\n"; result << "- num bytes pending spill: " << num_bytes_pending_spill_ << "\n"; + result << "- num bytes currently spilled: " << spilled_bytes_current_ << "\n"; result << "- cumulative spill requests: " << spilled_objects_total_ << "\n"; result << "- cumulative restore requests: " << restored_objects_total_ << "\n"; result << "- spilled objects pending delete: " << spilled_object_pending_delete_.size() diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index c1e2c4dfcd3d2..8af249ec4f87d 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -168,13 +168,17 @@ class LocalObjectManager { private: struct LocalObjectInfo { - LocalObjectInfo(const rpc::Address &owner_address, const ObjectID &generator_id) + LocalObjectInfo(const rpc::Address &owner_address, + const ObjectID &generator_id, + size_t object_size) : owner_address(owner_address), generator_id(generator_id.IsNil() ? std::nullopt - : std::optional(generator_id)) {} + : std::optional(generator_id)), + object_size(object_size) {} rpc::Address owner_address; bool is_freed = false; const std::optional generator_id; + size_t object_size; }; FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSizeZero); @@ -341,6 +345,9 @@ class LocalObjectManager { /// The total wall time in seconds spent in spilling. double spill_time_total_s_ = 0; + /// The total number of bytes spilled currently. + int64_t spilled_bytes_current_ = 0; + /// The total number of bytes spilled. int64_t spilled_bytes_total_ = 0; diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index c46a1fe248b43..30f005c55d187 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -342,6 +342,10 @@ class LocalObjectManagerTestWithMinSpillingSize { int64_t NumBytesPendingSpill() { return manager.num_bytes_pending_spill_; } + int64_t GetCurrentSpilledBytes() { return manager.spilled_bytes_current_; } + + size_t GetCurrentSpilledCount() { return manager.spilled_objects_url_.size(); } + void AssertNoLeaks() { // TODO(swang): Assert this for all tests. ASSERT_TRUE(manager.pinned_objects_size_ == 0); @@ -361,8 +365,22 @@ class LocalObjectManagerTestWithMinSpillingSize { "&offset=" + std::to_string(offset); } + void AssertIOWorkersDoSpill(size_t num_objects, size_t num_batches) { + ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + EXPECT_CALL(worker_pool, PushSpillWorker(_)); + std::vector urls; + for (size_t i = 0; i < num_objects; i++) { + urls.push_back(BuildURL("url" + std::to_string(i))); + } + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < num_batches; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + } + } + instrumented_io_context io_service_; size_t free_objects_batch_size = 3; + size_t object_size = 4; std::shared_ptr subscriber_; std::shared_ptr owner_client; rpc::CoreWorkerClientPool client_pool; @@ -431,7 +449,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -448,6 +466,11 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + + // Spilled + ASSERT_EQ(GetCurrentSpilledCount(), object_ids.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), object_ids.size() * object_size); + // The first update is sent out immediately and the remaining ones are batched // since the first one is still in-flight. for (size_t i = 0; i < 2; i++) { @@ -463,7 +486,6 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { const auto url = urls[0]; int num_times_fired = 0; EXPECT_CALL(worker_pool, PushRestoreWorker(_)); - int64_t object_size = 100; // Subsequent calls should be deduped, so that only one callback should be fired. for (int i = 0; i < 10; i++) { manager.AsyncRestoreSpilledObject( @@ -489,11 +511,10 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { std::vector> objects; rpc::Address owner_address; owner_address.set_worker_id(WorkerID::FromRandom().Binary()); - for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -527,6 +548,9 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 1); } + + ASSERT_EQ(GetCurrentSpilledCount(), object_ids.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), object_ids.size() * object_size); } TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { @@ -539,7 +563,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -579,6 +603,10 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 1); } + + // Only spilled once + ASSERT_EQ(GetCurrentSpilledCount(), object_ids.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), object_ids.size() * object_size); } TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSizeZero) { @@ -609,6 +637,8 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSizeZero) { ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url})); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + ASSERT_EQ(GetCurrentSpilledCount(), 1); + ASSERT_EQ(GetCurrentSpilledBytes(), 1 * object_size); } TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) { @@ -658,6 +688,8 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) { ASSERT_TRUE(it != urls.end()); ASSERT_EQ((*unpins)[object_url.first], 1); } + ASSERT_EQ(GetCurrentSpilledCount(), max_fused_object_count_); + ASSERT_EQ(GetCurrentSpilledBytes(), max_fused_object_count_ * object_size); } TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) { @@ -685,7 +717,10 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) { // Now object is evictable. Spill should succeed. unevictable_objects_.erase(object_id); ASSERT_TRUE(manager.SpillObjectsOfSize(1000)); - ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); + + AssertIOWorkersDoSpill(/*num_objects*/ 1, /*num_batches*/ 1); + ASSERT_EQ(GetCurrentSpilledCount(), 1); + ASSERT_EQ(GetCurrentSpilledBytes(), 1 * object_size); } TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { @@ -733,6 +768,8 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { ASSERT_EQ((*unpins)[object_url.first], 1); } } + ASSERT_EQ(GetCurrentSpilledCount(), 1); + ASSERT_EQ(GetCurrentSpilledBytes(), 1 * object_size); // Now, there's only one object that is current spilling. // SpillObjectUptoMaxThroughput will spill one more object (since one worker is @@ -752,6 +789,9 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[i]})); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } + ASSERT_EQ(GetCurrentSpilledCount(), object_ids.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size * object_ids.size()); + ASSERT_EQ(owner_client->object_urls.size(), 3); for (auto &object_url : owner_client->object_urls) { auto it = std::find(urls.begin(), urls.end(), object_url.second); @@ -772,7 +812,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { owner_address.set_worker_id(WorkerID::FromRandom().Binary()); ObjectID object_id = ObjectID::FromRandom(); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( std::move(data_buffer), nullptr, std::vector()); @@ -795,6 +835,10 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { ASSERT_EQ(num_times_fired, 1); ASSERT_EQ((*unpins)[object_id], 0); + // Make sure no spilled + ASSERT_EQ(GetCurrentSpilledCount(), 0); + ASSERT_EQ(GetCurrentSpilledBytes(), 0); + // Try to spill the same object again. manager.SpillObjects({object_id}, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); @@ -808,6 +852,10 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { ASSERT_EQ(owner_client->object_urls[object_id], url); ASSERT_EQ(num_times_fired, 2); ASSERT_EQ((*unpins)[object_id], 1); + + // Now spill happened + ASSERT_EQ(GetCurrentSpilledCount(), 1); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size); } TEST_F(LocalObjectManagerTest, TestPartialSpillError) { @@ -820,7 +868,7 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) { for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -837,6 +885,10 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + // only tracking 2 spilled objected + ASSERT_EQ(GetCurrentSpilledCount(), 2); + ASSERT_EQ(GetCurrentSpilledBytes(), 2 * object_size); + for (size_t i = 0; i < free_objects_batch_size; i++) { if (i < urls.size()) { ASSERT_EQ(urls[i], manager.GetLocalSpilledObjectURL(object_ids[i])); @@ -885,30 +937,34 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); + std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); + auto metadata = const_cast(reinterpret_cast(meta.data())); + auto meta_buffer = std::make_shared(metadata, meta.size()); + auto data_buffer = std::make_shared(0, object_id, unpins); auto object = std::make_unique( - data_buffer, nullptr, std::vector()); + data_buffer, meta_buffer, std::vector()); + objects.push_back(std::move(object)); } - manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); - // 2 Objects are spilled out of 3. + // 2 objects should be spilled out of 3. + int64_t total_spill_size = 0; std::vector object_ids_to_spill; int spilled_urls_size = free_objects_batch_size - 1; for (int i = 0; i < spilled_urls_size; i++) { object_ids_to_spill.push_back(object_ids[i]); + total_spill_size += objects[i]->GetSize(); } + + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); manager.SpillObjects(object_ids_to_spill, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); - ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); - std::vector urls; - for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - urls.push_back(BuildURL("url" + std::to_string(i))); - } - ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < 2; i++) { - ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); - } + + AssertIOWorkersDoSpill(/*num_objects*/ object_ids_to_spill.size(), /*num_batches*/ 2); + + ASSERT_EQ(GetCurrentSpilledCount(), object_ids_to_spill.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), total_spill_size); // All objects are out of scope now. for (size_t i = 0; i < free_objects_batch_size; i++) { @@ -920,6 +976,8 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size()); + ASSERT_EQ(GetCurrentSpilledCount(), 0); + ASSERT_EQ(GetCurrentSpilledBytes(), 0); } TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { @@ -931,10 +989,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { std::vector> objects; // Objects are pinned. + size_t total_spill_size = object_size * free_objects_batch_size; for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -963,6 +1022,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } + ASSERT_EQ(GetCurrentSpilledCount(), object_ids_to_spill.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), total_spill_size); + // Everything is evicted except the last object. In this case, ref count is still > 0. for (size_t i = 0; i < free_objects_batch_size - 1; i++) { EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); @@ -973,6 +1035,10 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { // Nothing is deleted yet because the ref count is > 0. ASSERT_EQ(deleted_urls_size, 0); + // Only 1 spilled object left + ASSERT_EQ(GetCurrentSpilledCount(), 1); + ASSERT_EQ(GetCurrentSpilledBytes(), 1 * object_size); + // The last reference is deleted. EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[free_objects_batch_size - 1].Binary())); @@ -981,6 +1047,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); // Now the object is deleted. ASSERT_EQ(deleted_urls_size, 1); + + ASSERT_EQ(GetCurrentSpilledCount(), 0); + ASSERT_EQ(GetCurrentSpilledBytes(), 0); } TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { @@ -995,7 +1064,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { for (size_t i = 0; i < spilled_urls_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -1007,6 +1076,8 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { std::vector spill_set_2; size_t spill_set_1_size = spilled_urls_size / 2; size_t spill_set_2_size = spilled_urls_size - spill_set_1_size; + size_t spill_set_1_bytes = spill_set_1_size * object_size; + size_t spill_set_2_bytes = spill_set_2_size * object_size; for (size_t i = 0; i < spill_set_1_size; i++) { spill_set_1.push_back(object_ids[i]); @@ -1033,6 +1104,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { // Spillset 1 objects are spilled. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_1)); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + ASSERT_EQ(GetCurrentSpilledCount(), spill_set_1_size); + ASSERT_EQ(GetCurrentSpilledBytes(), spill_set_1_bytes); + // Every object has gone out of scope. for (size_t i = 0; i < spilled_urls_size; i++) { EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); @@ -1050,11 +1124,15 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { // These fail because the object is already freed, so the raylet does not // send the RPC. ASSERT_FALSE(owner_client->ReplyUpdateObjectLocationBatch()); + ASSERT_EQ(GetCurrentSpilledCount(), spill_set_2_size); + ASSERT_EQ(GetCurrentSpilledBytes(), spill_set_2_bytes); // Every object is now deleted. manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); ASSERT_EQ(deleted_urls_size, spill_set_2_size); + ASSERT_EQ(GetCurrentSpilledCount(), 0); + ASSERT_EQ(GetCurrentSpilledBytes(), 0); } TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { @@ -1068,7 +1146,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { for (size_t i = 0; i < free_objects_batch_size + 1; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -1094,6 +1172,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } + ASSERT_EQ(GetCurrentSpilledCount(), spilled_urls_size); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size * spilled_urls_size); + // Every reference has gone out of scope. for (size_t i = 0; i < free_objects_batch_size; i++) { EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); @@ -1120,7 +1201,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); object_ids.push_back(object_id); - auto data_buffer = std::make_shared(0, object_id, unpins); + auto data_buffer = std::make_shared(object_size, object_id, unpins); auto object = std::make_unique( data_buffer, nullptr, std::vector()); objects.push_back(std::move(object)); @@ -1145,6 +1226,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + ASSERT_EQ(GetCurrentSpilledCount(), object_ids_to_spill.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size * object_ids_to_spill.size()); + EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[0].Binary())); ASSERT_TRUE(subscriber_->PublishObjectEviction()); // Delete operation is called. In this case, the file with the url should not be @@ -1153,6 +1237,10 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); ASSERT_EQ(deleted_urls_size, 0); + // But 1 spilled object shoudl be deleted + ASSERT_EQ(GetCurrentSpilledCount(), free_objects_batch_size - 1); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size * (free_objects_batch_size - 1)); + // Everything else is now deleted. for (size_t i = 1; i < free_objects_batch_size; i++) { EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); @@ -1162,6 +1250,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); // Nothing is deleted yet because the ref count is > 0. ASSERT_EQ(deleted_urls_size, 1); + + ASSERT_EQ(GetCurrentSpilledCount(), 0); + ASSERT_EQ(GetCurrentSpilledBytes(), 0); } TEST_F(LocalObjectManagerTest, TestDuplicatePin) { @@ -1331,6 +1422,11 @@ TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) { for (size_t i = 0; i < 2; i++) { ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } + + // Spilled 2 objects + ASSERT_EQ(GetCurrentSpilledCount(), 2); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size * 2); + ASSERT_EQ(owner_client->object_urls.size(), 2); int num_unpinned = 0; for (const auto &id : object_ids) { @@ -1386,12 +1482,27 @@ TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSizeMaxFusionCount) { for (size_t i = 0; i < 2; i++) { ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } + // Spilled first 2 batches + ASSERT_EQ(GetCurrentSpilledCount(), 30); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size * 30); // We will spill the last objects even though we're under the min spilling // size because they are the only spillable objects. manager.SpillObjectUptoMaxThroughput(); ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks()); ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); + + urls.clear(); + for (int i = 15; i < 25; i++) { + urls.push_back(BuildURL("url", i)); + } + EXPECT_CALL(worker_pool, PushSpillWorker(_)).Times(1); + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + + // Spilled all objects + ASSERT_EQ(GetCurrentSpilledCount(), 40); + ASSERT_EQ(GetCurrentSpilledBytes(), object_size * 40); } TEST_F(LocalObjectManagerTest, TestPinBytes) { @@ -1406,12 +1517,14 @@ TEST_F(LocalObjectManagerTest, TestPinBytes) { } std::vector> objects; + size_t total_objects_size = 0; for (size_t i = 0; i < free_objects_batch_size; i++) { std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); auto metadata = const_cast(reinterpret_cast(meta.data())); auto meta_buffer = std::make_shared(metadata, meta.size()); auto object = std::make_unique( nullptr, meta_buffer, std::vector()); + total_objects_size += object->GetSize(); objects.push_back(std::move(object)); } @@ -1447,6 +1560,9 @@ TEST_F(LocalObjectManagerTest, TestPinBytes) { } ASSERT_TRUE(spilled); + ASSERT_EQ(GetCurrentSpilledCount(), object_ids.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), total_objects_size); + // With all objects spilled, the pinned bytes would be 0. ASSERT_EQ(manager.GetPrimaryBytes(), 0); ASSERT_TRUE(manager.HasLocallySpilledObjects()); @@ -1460,6 +1576,9 @@ TEST_F(LocalObjectManagerTest, TestPinBytes) { int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); ASSERT_EQ(deleted_urls_size, object_ids.size()); + ASSERT_EQ(GetCurrentSpilledCount(), 0); + ASSERT_EQ(GetCurrentSpilledBytes(), 0); + // With no pinned or spilled object, the pinned bytes should be 0. ASSERT_EQ(manager.GetPrimaryBytes(), 0); ASSERT_FALSE(manager.HasLocallySpilledObjects()); @@ -1480,12 +1599,14 @@ TEST_F(LocalObjectManagerTest, TestConcurrentSpillAndDelete1) { } std::vector> objects; + size_t total_size = 0; for (size_t i = 0; i < free_objects_batch_size; i++) { std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); auto metadata = const_cast(reinterpret_cast(meta.data())); auto meta_buffer = std::make_shared(metadata, meta.size()); auto object = std::make_unique( nullptr, meta_buffer, std::vector()); + total_size += object->GetSize(); objects.push_back(std::move(object)); } @@ -1523,6 +1644,10 @@ TEST_F(LocalObjectManagerTest, TestConcurrentSpillAndDelete1) { ASSERT_FALSE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_TRUE(spilled); + // No spill actually happens on the IO worker + ASSERT_EQ(GetCurrentSpilledCount(), object_ids.size()); + ASSERT_EQ(GetCurrentSpilledBytes(), total_size); + manager.ProcessSpilledObjectsDeleteQueue(free_objects_batch_size); int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); ASSERT_EQ(deleted_urls_size, object_ids.size()); @@ -1564,6 +1689,9 @@ TEST_F(LocalObjectManagerTest, TestConcurrentSpillAndDelete2) { // Pinned object memory should be reported. ASSERT_GT(manager.GetPrimaryBytes(), 0); + // No spill reported + ASSERT_EQ(GetCurrentSpilledBytes(), 0); + // Spill all objects. bool spilled = false; manager.SpillObjects(object_ids, [&](const Status &status) { @@ -1585,6 +1713,10 @@ TEST_F(LocalObjectManagerTest, TestConcurrentSpillAndDelete2) { ASSERT_FALSE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_TRUE(spilled); + // No spill actually happens on the IO worker + ASSERT_EQ(GetCurrentSpilledCount(), 0); + ASSERT_EQ(GetCurrentSpilledBytes(), 0); + manager.ProcessSpilledObjectsDeleteQueue(free_objects_batch_size); int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); ASSERT_EQ(deleted_urls_size, 0); diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index 176258ee66bc9..3edda46fcedae 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -213,6 +213,24 @@ DEFINE_stats(gcs_storage_operation_count, (), ray::stats::COUNT); +/// Object store +DEFINE_stats(object_store_memory, + "Object store memory by various sub-kinds on this node", + /// Location: + /// TODO(rickyx): spill fallback from in memory + /// - IN_MEMORY: currently in shared memory(e.g. /dev/shm) and + /// fallback allocated. This is memory already sealed. + /// - SPILLED: current number of bytes from objects spilled + /// to external storage. Note this might be smaller than + /// the physical storage incurred on the external storage because + /// Ray might fuse spilled objects into a single file, so a deleted + /// spill object might still exist in the spilled file. Check + /// spilled object fusing for more details. + /// - UNSEALED: unsealed bytes that come from objects just created. + ("Location"), + (), + ray::stats::GAUGE); + /// Placement Group // The end to end placement group creation latency. // The time from placement group creation request has received diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index 830e5a83a09fa..d08218d1581a8 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -84,6 +84,7 @@ DECLARE_stats(scheduler_unscheduleable_tasks); /// Raylet Resource Manager DECLARE_stats(resources); +/// TODO(rickyx): migrate legacy metrics /// Local Object Manager DECLARE_stats(spill_manager_objects); DECLARE_stats(spill_manager_objects_bytes); @@ -94,6 +95,9 @@ DECLARE_stats(spill_manager_throughput_mb); DECLARE_stats(gcs_storage_operation_latency_ms); DECLARE_stats(gcs_storage_operation_count); +/// Object Store +DECLARE_stats(object_store_memory); + /// Placement Group DECLARE_stats(gcs_placement_group_creation_latency_ms); DECLARE_stats(gcs_placement_group_scheduling_latency_ms); diff --git a/src/ray/stats/tag_defs.cc b/src/ray/stats/tag_defs.cc index 46272e408c69e..51a069810229d 100644 --- a/src/ray/stats/tag_defs.cc +++ b/src/ray/stats/tag_defs.cc @@ -41,5 +41,7 @@ const TagKeyType WorkerIdKey = TagKeyType::Register("WorkerId"); const TagKeyType JobIdKey = TagKeyType::Register("JobId"); const TagKeyType NameKey = TagKeyType::Register("Name"); + +const TagKeyType LocationKey = TagKeyType::Register("Location"); } // namespace stats } // namespace ray diff --git a/src/ray/stats/tag_defs.h b/src/ray/stats/tag_defs.h index 081ca1c40a8db..2e601b92482f5 100644 --- a/src/ray/stats/tag_defs.h +++ b/src/ray/stats/tag_defs.h @@ -45,3 +45,9 @@ extern const TagKeyType WorkerIdKey; extern const TagKeyType JobIdKey; extern const TagKeyType NameKey; + +// Object store memory location tag constants +extern const TagKeyType LocationKey; +constexpr char kObjectLocInMemory[] = "IN_MEMORY"; +constexpr char kObjectLocSpilled[] = "SPILLED"; +constexpr char kObjectLocUnsealed[] = "UNSEALED";