Skip to content

Commit

Permalink
Re-merge census export PR (ray-project#28615)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Sep 20, 2022
1 parent 42da444 commit 35cfb86
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 4 deletions.
45 changes: 44 additions & 1 deletion python/ray/tests/test_task_metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import defaultdict
import sys
import os

import pytest
Expand All @@ -7,6 +8,7 @@

from ray._private.test_utils import (
fetch_prometheus_metrics,
run_string_as_driver,
run_string_as_driver_nonblocking,
wait_for_condition,
)
Expand All @@ -18,6 +20,12 @@
}
}

SLOW_METRIC_CONFIG = {
"_system_config": {
"metrics_report_interval_ms": 3000,
}
}


def tasks_by_state(info) -> dict:
metrics_page = "localhost:{}".format(info["metrics_export_port"])
Expand Down Expand Up @@ -58,7 +66,6 @@ def f():
"SCHEDULED": 8.0,
"WAITING_FOR_DEPENDENCIES": 0.0,
}
# TODO(ekl) optimize the reporting interval to be faster for testing
wait_for_condition(
lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500
)
Expand Down Expand Up @@ -270,6 +277,42 @@ async def f(self):
proc.kill()


@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
def test_metrics_export_now(shutdown_only):
info = ray.init(num_cpus=2, **SLOW_METRIC_CONFIG)

driver = """
import ray
import time
ray.init("auto")
@ray.remote
def f():
pass
a = [f.remote() for _ in range(10)]
ray.get(a)
"""

# If force export at process death is broken, we won't see the recently completed
# tasks from the drivers.
for i in range(10):
print("Run job", i)
run_string_as_driver(driver)
tasks_by_state(info)

expected = {
"RUNNING": 0.0,
"WAITING_FOR_EXECUTION": 0.0,
"SCHEDULED": 0.0,
"WAITING_FOR_DEPENDENCIES": 0.0,
"FINISHED": 100.0,
}
wait_for_condition(
lambda: tasks_by_state(info) == expected, timeout=20, retry_interval_ms=500
)


if __name__ == "__main__":
import sys

Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000)
RAY_CONFIG(int64_t, num_workers_soft_limit, -1)

// The interval where metrics are exported in milliseconds.
RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000)
RAY_CONFIG(uint64_t, metrics_report_interval_ms, 5000)

/// Enable the task timeline. If this is enabled, certain events such as task
/// execution are profiled and sent to the GCS.
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ void CoreWorker::Disconnect(
const rpc::WorkerExitType &exit_type,
const std::string &exit_detail,
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) {
// Force stats export before exiting the worker.
opencensus::stats::StatsExporter::ExportNow();
if (connected_) {
RAY_LOG(INFO) << "Disconnecting to the raylet.";
connected_ = false;
Expand Down
9 changes: 7 additions & 2 deletions thirdparty/patches/opencensus-cpp-shutdown-api.patch
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal
index 43ddbc7..37b4ae1 100644
--- opencensus/stats/internal/stats_exporter.cc
+++ opencensus/stats/internal/stats_exporter.cc
@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() {
@@ -95,25 +95,56 @@ void StatsExporterImpl::ClearHandlersForTesting() {
}

void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
Expand Down Expand Up @@ -138,6 +138,10 @@ index 43ddbc7..37b4ae1 100644
+ StatsExporterImpl::Get()->Shutdown();
+ StatsExporterImpl::Get()->ClearHandlersForTesting();
+}
+
+void StatsExporter::ExportNow() {
+ StatsExporterImpl::Get()->Export();
+}
+
// static
void StatsExporter::SetInterval(absl::Duration interval) {
Expand All @@ -158,11 +162,12 @@ diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h
index 6756858..65e0262 100644
--- opencensus/stats/stats_exporter.h
+++ opencensus/stats/stats_exporter.h
@@ -45,6 +45,8 @@ class StatsExporter final {
@@ -45,6 +45,9 @@ class StatsExporter final {
// Removes the view with 'name' from the registry, if one is registered.
static void RemoveView(absl::string_view name);

+ static void Shutdown();
+ static void ExportNow();
+
// StatsExporter::Handler is the interface for push exporters that export
// recorded data for registered views. The exporter should provide a static
Expand Down

0 comments on commit 35cfb86

Please sign in to comment.