Skip to content

Commit

Permalink
[core][metric] Adding object_store_memory related metrics [1/n] (ray-…
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyyx committed Oct 7, 2022
1 parent 251903b commit 4014207
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 46 deletions.
15 changes: 15 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ray._private import (
ray_constants,
)
from ray._private.worker import RayContext
import yaml
from grpc._channel import _InactiveRpcError

Expand Down Expand Up @@ -836,6 +837,20 @@ def fetch_prometheus_metrics(prom_addresses: List[str]) -> Dict[str, List[Any]]:
return samples_by_name


def raw_metrics(info: RayContext) -> Dict[str, List[Any]]:
"""Return prometheus metrics from a RayContext
Args:
info: Ray context returned from ray.init()
Returns:
Dict from metric name to a list of samples for the metrics
"""
metrics_page = "localhost:{}".format(info.address_info["metrics_export_port"])
print("Fetch metrics from", metrics_page)
return fetch_prometheus_metrics([metrics_page])


def load_test_config(config_file_name):
"""Loads a config yaml from tests/test_cli_patterns."""
here = os.path.realpath(__file__)
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ py_test_module_list(
"test_memory_pressure.py",
"test_metrics.py",
"test_task_metrics.py",
"test_object_store_metrics.py",
"test_multi_node.py",
"test_multi_node_2.py",
"test_multinode_failures.py",
Expand Down
3 changes: 3 additions & 0 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
# NOTE: Commented out metrics are not available in this test.
# TODO(Clark): Find ways to trigger commented out metrics in cluster setup.
_METRICS = [
# TODO(rickyx): refactoring the below 3 metric seem to be a bit involved
# , e.g. need to see how users currently depend on them.
"ray_object_store_available_memory",
"ray_object_store_used_memory",
"ray_object_store_num_local_objects",
"ray_object_store_memory",
"ray_object_manager_num_pull_requests",
"ray_object_directory_subscriptions",
"ray_object_directory_updates",
Expand Down
175 changes: 175 additions & 0 deletions python/ray/tests/test_object_store_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from collections import defaultdict
import pytest
from typing import Dict
import numpy as np

import ray
from ray._private.test_utils import (
raw_metrics,
wait_for_condition,
)
from ray._private.worker import RayContext

KiB = 1 << 10
MiB = 1 << 20

_SYSTEM_CONFIG = {
"automatic_object_spilling_enabled": True,
"max_io_workers": 100,
"min_spilling_size": 1,
"object_spilling_threshold": 0.99, # to prevent premature spilling
"metrics_report_interval_ms": 200,
"event_stats_print_interval_ms": 100, # so metrics get exported
}


def objects_by_loc(info: RayContext) -> Dict:
res = raw_metrics(info)
objects_info = defaultdict(int)
if "ray_object_store_memory" in res:
for sample in res["ray_object_store_memory"]:
objects_info[sample.labels["Location"]] += sample.value

print(f"Objects by location: {objects_info}")
return objects_info


def approx_eq_dict_in(actual: Dict, expected: Dict, e: int) -> bool:
"""Check if two dict are approximately similar (with error allowed)"""
assert set(actual.keys()) == set(expected.keys()), "Unequal key sets."

for k, actual_v in actual.items():
expect_v = expected[k]
assert (
abs(expect_v - actual_v) <= e
), f"expect={expect_v}, actual={actual_v}, diff allowed={e}"

return True


def test_all_shared_memory(shutdown_only):
"""Test objects allocated in shared memory"""
import numpy as np

info = ray.init(
object_store_memory=100 * MiB,
_system_config=_SYSTEM_CONFIG,
)

# Allocate 80MiB data
objs_in_use = ray.get(
[ray.put(np.zeros(20 * MiB, dtype=np.uint8)) for _ in range(4)]
)

expected = {
"IN_MEMORY": 80 * MiB,
"SPILLED": 0,
"UNSEALED": 0,
}

wait_for_condition(
# 1KiB for metadata difference
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
timeout=20,
retry_interval_ms=500,
)

# Free all of them
del objs_in_use

expected = {
"IN_MEMORY": 0,
"SPILLED": 0,
"UNSEALED": 0,
}

wait_for_condition(
# 1KiB for metadata difference
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
timeout=20,
retry_interval_ms=500,
)


def test_spilling(object_spilling_config, shutdown_only):
"""Test metrics with object spilling occurred"""

object_spilling_config, _ = object_spilling_config
delta = 5
info = ray.init(
num_cpus=1,
object_store_memory=100 * MiB + delta * MiB,
_system_config={
**_SYSTEM_CONFIG,
**{"object_spilling_config": object_spilling_config},
},
)

# Create and use 100MiB data, which should fit in memory
objs1 = [ray.put(np.zeros(50 * MiB, dtype=np.uint8)) for _ in range(2)]

expected = {
"IN_MEMORY": 100 * MiB,
"SPILLED": 0,
"UNSEALED": 0,
}

wait_for_condition(
# 1KiB for metadata difference
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
timeout=20,
retry_interval_ms=500,
)

# Create additional 100MiB, so that it needs to be triggered
objs2 = [ray.put(np.zeros(50 * MiB, dtype=np.uint8)) for _ in range(2)]

expected = {
"IN_MEMORY": 100 * MiB,
"SPILLED": 100 * MiB,
"UNSEALED": 0,
}
wait_for_condition(
# 1KiB for metadata difference
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
timeout=20,
retry_interval_ms=500,
)

# Delete spilled objects
del objs1
expected = {
"IN_MEMORY": 100 * MiB,
"SPILLED": 0,
"UNSEALED": 0,
}
wait_for_condition(
# 1KiB for metadata difference
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
timeout=20,
retry_interval_ms=500,
)

# Delete all
del objs2
expected = {
"IN_MEMORY": 0,
"SPILLED": 0,
"UNSEALED": 0,
}
wait_for_condition(
# 1KiB for metadata difference
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
timeout=20,
retry_interval_ms=500,
)


if __name__ == "__main__":
import sys
import os

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
9 changes: 1 addition & 8 deletions python/ray/tests/test_task_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import ray

from ray._private.test_utils import (
fetch_prometheus_metrics,
raw_metrics,
run_string_as_driver,
run_string_as_driver_nonblocking,
wait_for_condition,
Expand All @@ -27,13 +27,6 @@
}


def raw_metrics(info):
metrics_page = "localhost:{}".format(info["metrics_export_port"])
print("Fetch metrics from", metrics_page)
res = fetch_prometheus_metrics([metrics_page])
return res


def tasks_by_state(info) -> dict:
return tasks_breakdown(info, lambda s: s.labels["State"])

Expand Down
18 changes: 17 additions & 1 deletion src/ray/object_manager/plasma/stats_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ void ObjectStatsCollector::OnObjectCreated(const LocalObject &obj) {
const auto kSource = obj.GetSource();

num_bytes_created_total_ += kObjectSize;
// TODO(rickyx):
// Add fallback memory accounting here.

if (kSource == plasma::flatbuf::ObjectSource::CreatedByWorker) {
num_objects_created_by_worker_++;
Expand Down Expand Up @@ -168,8 +170,22 @@ void ObjectStatsCollector::OnObjectRefDecreased(const LocalObject &obj) {
}
}

int64_t ObjectStatsCollector::GetNumBytesCreatedCurrent() const {
return num_bytes_created_by_worker_ + num_bytes_restored_ + num_bytes_received_ +
num_bytes_errored_;
}

void ObjectStatsCollector::RecordMetrics() const {
// TODO(sang): Add metrics.
ray::stats::STATS_object_store_memory.Record(
GetNumBytesCreatedCurrent() - num_bytes_unsealed_,
{{ray::stats::LocationKey.name(), ray::stats::kObjectLocInMemory}});

ray::stats::STATS_object_store_memory.Record(
num_bytes_unsealed_,
{{ray::stats::LocationKey.name(), ray::stats::kObjectLocUnsealed}});

// TODO(rickyx):
// Add fallback memory recording here.
}

void ObjectStatsCollector::GetDebugDump(std::stringstream &buffer) const {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class ObjectStatsCollector {
private:
friend struct ObjectStatsCollectorTest;

int64_t GetNumBytesCreatedCurrent() const;

int64_t num_objects_spillable_ = 0;
int64_t num_bytes_spillable_ = 0;
int64_t num_objects_unsealed_ = 0;
Expand Down
10 changes: 3 additions & 7 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service,
this->AddToClientObjectIds(object_id, request->client);
},
[this](const auto &request) { this->ReturnFromGet(request); }) {
const auto event_stats_print_interval_ms =
RayConfig::instance().event_stats_print_interval_ms();
if (event_stats_print_interval_ms > 0 && RayConfig::instance().event_stats()) {
if (RayConfig::instance().event_stats_print_interval_ms() > 0 &&
RayConfig::instance().event_stats()) {
PrintAndRecordDebugDump();
}
}
Expand Down Expand Up @@ -553,10 +552,7 @@ void PlasmaStore::PrintAndRecordDebugDump() const {
RayConfig::instance().event_stats_print_interval_ms());
}

void PlasmaStore::RecordMetrics() const {
// TODO(sang): Add metrics.
object_lifecycle_mgr_.RecordMetrics();
}
void PlasmaStore::RecordMetrics() const { object_lifecycle_mgr_.RecordMetrics(); }

std::string PlasmaStore::GetDebugDump() const {
std::stringstream buffer;
Expand Down
21 changes: 17 additions & 4 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ void LocalObjectManager::PinObjectsAndWaitForFree(
continue;
}

const auto inserted =
local_objects_.emplace(object_id, LocalObjectInfo(owner_address, generator_id));
const auto inserted = local_objects_.emplace(
object_id, LocalObjectInfo(owner_address, generator_id, object->GetSize()));
if (inserted.second) {
// This is the first time we're pinning this object.
RAY_LOG(DEBUG) << "Pinning object " << object_id;
Expand Down Expand Up @@ -217,8 +217,6 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
auto now = absl::GetCurrentTimeNanos();
RAY_LOG(DEBUG) << "Spilled " << bytes_to_spill << " bytes in "
<< (now - start_time) / 1e6 << "ms";
spilled_bytes_total_ += bytes_to_spill;
spilled_objects_total_ += objects_to_spill.size();
// Adjust throughput timing to account for concurrent spill operations.
spill_time_total_s_ +=
(now - std::max(start_time, last_spill_finish_ns_)) / 1e9;
Expand Down Expand Up @@ -409,6 +407,11 @@ void LocalObjectManager::OnObjectSpilled(const std::vector<ObjectID> &object_ids
num_bytes_pending_spill_ -= object_size;
objects_pending_spill_.erase(it);

// Update the internal spill metrics
spilled_bytes_total_ += object_size;
spilled_bytes_current_ += object_size;
spilled_objects_total_++;

// Asynchronously Update the spilled URL.
auto freed_it = local_objects_.find(object_id);
if (freed_it == local_objects_.end() || freed_it->second.is_freed) {
Expand Down Expand Up @@ -539,6 +542,11 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz
object_urls_to_delete.emplace_back(object_url);
}
spilled_objects_url_.erase(spilled_objects_url_it);

// Update current spilled objects metrics
RAY_CHECK(local_objects_.contains(object_id))
<< "local objects should contain the spilled object: " << object_id;
spilled_bytes_current_ -= local_objects_.at(object_id).object_size;
} else {
// If the object was not spilled, it gets pinned again. Unpin here to
// prevent a memory leak.
Expand Down Expand Up @@ -613,6 +621,10 @@ void LocalObjectManager::RecordMetrics() const {
ray::stats::STATS_spill_manager_request_total.Record(spilled_objects_total_, "Spilled");
ray::stats::STATS_spill_manager_request_total.Record(restored_objects_total_,
"Restored");

ray::stats::STATS_object_store_memory.Record(
spilled_bytes_current_,
{{ray::stats::LocationKey.name(), ray::stats::kObjectLocSpilled}});
}

int64_t LocalObjectManager::GetPrimaryBytes() const {
Expand All @@ -638,6 +650,7 @@ std::string LocalObjectManager::DebugString() const {
result << "- num objects pending restore: " << objects_pending_restore_.size() << "\n";
result << "- num objects pending spill: " << objects_pending_spill_.size() << "\n";
result << "- num bytes pending spill: " << num_bytes_pending_spill_ << "\n";
result << "- num bytes currently spilled: " << spilled_bytes_current_ << "\n";
result << "- cumulative spill requests: " << spilled_objects_total_ << "\n";
result << "- cumulative restore requests: " << restored_objects_total_ << "\n";
result << "- spilled objects pending delete: " << spilled_object_pending_delete_.size()
Expand Down
Loading

0 comments on commit 4014207

Please sign in to comment.