Skip to content

Commit

Permalink
[Stats] enable core worker stats (ray-project#9355)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashione committed Jul 29, 2020
1 parent a484947 commit 156067b
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 8 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ cc_library(
":ray_util",
":raylet_client_lib",
":service_based_gcs_client_lib",
":stats_lib",
":worker_cc_proto",
":worker_rpc",
"@boost//:fiber",
Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ RAY_CONFIG(int64_t, enable_metrics_collection, true)
/// Whether start the Plasma Store as a Raylet thread.
RAY_CONFIG(bool, put_small_object_in_memory_store, false)

/// Metric agent port for reporting, default -1 means no such agent will be
/// listening.
RAY_CONFIG(int, metrics_agent_port, -1)

/// Maximum number of tasks that can be in flight between an owner and a worker for which
/// the owner has been granted a lease. A value >1 is used when we want to enable
/// pipelining task submission.
Expand Down
17 changes: 17 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/raylet_transport.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
#include "ray/stats/stats.h"
#include "ray/util/process.h"
#include "ray/util/util.h"

Expand Down Expand Up @@ -127,6 +128,19 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options)
CreateWorker();
}
}

// Assume stats module will be initialized exactly once in once process.
// So it must be called in CoreWorkerProcess constructor and will be reused
// by all of core worker.
RAY_LOG(DEBUG) << "Stats setup in core worker.";
// Initialize stats in core worker global tags.
const ray::stats::TagsType global_tags = {{ray::stats::ComponentKey, "core_worker"},
{ray::stats::VersionKey, "0.9.0.dev0"}};

// NOTE(lingxuan.zlx): We assume RayConfig is initialized before it's used.
// RayConfig is generated in Java_io_ray_runtime_RayNativeRuntime_nativeInitialize
// for java worker or in constructor of CoreWorker for python worker.
ray::stats::Init(global_tags, RayConfig::instance().metrics_agent_port());
}

CoreWorkerProcess::~CoreWorkerProcess() {
Expand All @@ -136,6 +150,9 @@ CoreWorkerProcess::~CoreWorkerProcess() {
absl::ReaderMutexLock lock(&worker_map_mutex_);
RAY_CHECK(workers_.empty());
}
RAY_LOG(DEBUG) << "Stats stop in core worker.";
// Shutdown stats module if worker process exits.
ray::stats::Shutdown();
if (options_.enable_logging) {
RayLog::ShutDownRayLog();
}
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_id);

CoreWorker(CoreWorker const &) = delete;

void operator=(CoreWorker const &other) = delete;

///
Expand Down
13 changes: 6 additions & 7 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ int main(int argc, char *argv[]) {
<< object_manager_config.rpc_service_threads_number
<< ", object_chunk_size = "
<< object_manager_config.object_chunk_size;
// Initialize stats.
const ray::stats::TagsType global_tags = {
{ray::stats::ComponentKey, "raylet"},
{ray::stats::VersionKey, "0.9.0.dev0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(global_tags, metrics_agent_port);

// Initialize the node manager.
server.reset(new ray::raylet::Raylet(
Expand All @@ -224,13 +230,6 @@ int main(int argc, char *argv[]) {
server->Start();
}));

// Initialize stats.
const ray::stats::TagsType global_tags = {
{ray::stats::JobNameKey, "raylet"},
{ray::stats::VersionKey, "0.9.0.dev0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(global_tags, metrics_agent_port);

// Destroy the Raylet on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
// instead of returning immediately.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por
RAY_LOG(INFO) << "Disabled stats.";
return;
}
RAY_LOG(DEBUG) << "Initialized stats";

metrics_io_service_pool = std::make_shared<IOServicePool>(1);
metrics_io_service_pool->Run();
Expand Down Expand Up @@ -113,7 +114,6 @@ static inline void Shutdown() {
exporter = nullptr;
StatsConfig::instance().SetIsInitialized(false);
}

} // namespace stats

} // namespace ray
2 changes: 2 additions & 0 deletions src/ray/stats/tag_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
using TagKeyType = opencensus::tags::TagKey;
using TagsType = std::vector<std::pair<opencensus::tags::TagKey, std::string>>;

static const TagKeyType ComponentKey = TagKeyType::Register("Component");

static const TagKeyType JobNameKey = TagKeyType::Register("JobName");

static const TagKeyType CustomKey = TagKeyType::Register("CustomKey");
Expand Down

0 comments on commit 156067b

Please sign in to comment.