diff --git a/BUILD.bazel b/BUILD.bazel index ea62c4d8a31ff..b856b1cee0c29 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -705,6 +705,7 @@ cc_library( ":ray_util", ":raylet_client_lib", ":service_based_gcs_client_lib", + ":stats_lib", ":worker_cc_proto", ":worker_rpc", "@boost//:fiber", diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index d8973bc788968..c298f43ecb505 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -325,6 +325,10 @@ RAY_CONFIG(int64_t, enable_metrics_collection, true) /// Whether start the Plasma Store as a Raylet thread. RAY_CONFIG(bool, put_small_object_in_memory_store, false) +/// Metric agent port for reporting, default -1 means no such agent will be +/// listening. +RAY_CONFIG(int, metrics_agent_port, -1) + /// Maximum number of tasks that can be in flight between an owner and a worker for which /// the owner has been granted a lease. A value >1 is used when we want to enable /// pipelining task submission. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d2e0a1d566dc4..04c56e59763d5 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -21,6 +21,7 @@ #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/core_worker/transport/raylet_transport.h" #include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/stats/stats.h" #include "ray/util/process.h" #include "ray/util/util.h" @@ -127,6 +128,19 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options) CreateWorker(); } } + + // Assume stats module will be initialized exactly once in once process. + // So it must be called in CoreWorkerProcess constructor and will be reused + // by all of core worker. + RAY_LOG(DEBUG) << "Stats setup in core worker."; + // Initialize stats in core worker global tags. + const ray::stats::TagsType global_tags = {{ray::stats::ComponentKey, "core_worker"}, + {ray::stats::VersionKey, "0.9.0.dev0"}}; + + // NOTE(lingxuan.zlx): We assume RayConfig is initialized before it's used. + // RayConfig is generated in Java_io_ray_runtime_RayNativeRuntime_nativeInitialize + // for java worker or in constructor of CoreWorker for python worker. + ray::stats::Init(global_tags, RayConfig::instance().metrics_agent_port()); } CoreWorkerProcess::~CoreWorkerProcess() { @@ -136,6 +150,9 @@ CoreWorkerProcess::~CoreWorkerProcess() { absl::ReaderMutexLock lock(&worker_map_mutex_); RAY_CHECK(workers_.empty()); } + RAY_LOG(DEBUG) << "Stats stop in core worker."; + // Shutdown stats module if worker process exits. + ray::stats::Shutdown(); if (options_.enable_logging) { RayLog::ShutDownRayLog(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 877afb5cb946c..2915ede6fbb73 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -270,6 +270,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_id); CoreWorker(CoreWorker const &) = delete; + void operator=(CoreWorker const &other) = delete; /// diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index c078007127d09..74e9a3295cc7a 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -215,6 +215,12 @@ int main(int argc, char *argv[]) { << object_manager_config.rpc_service_threads_number << ", object_chunk_size = " << object_manager_config.object_chunk_size; + // Initialize stats. + const ray::stats::TagsType global_tags = { + {ray::stats::ComponentKey, "raylet"}, + {ray::stats::VersionKey, "0.9.0.dev0"}, + {ray::stats::NodeAddressKey, node_ip_address}}; + ray::stats::Init(global_tags, metrics_agent_port); // Initialize the node manager. server.reset(new ray::raylet::Raylet( @@ -224,13 +230,6 @@ int main(int argc, char *argv[]) { server->Start(); })); - // Initialize stats. - const ray::stats::TagsType global_tags = { - {ray::stats::JobNameKey, "raylet"}, - {ray::stats::VersionKey, "0.9.0.dev0"}, - {ray::stats::NodeAddressKey, node_ip_address}}; - ray::stats::Init(global_tags, metrics_agent_port); - // Destroy the Raylet on a SIGTERM. The pointer to main_service is // guaranteed to be valid since this function will run the event loop // instead of returning immediately. diff --git a/src/ray/stats/stats.h b/src/ray/stats/stats.h index 629087b115e32..807271afe19bd 100644 --- a/src/ray/stats/stats.h +++ b/src/ray/stats/stats.h @@ -74,6 +74,7 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por RAY_LOG(INFO) << "Disabled stats."; return; } + RAY_LOG(DEBUG) << "Initialized stats"; metrics_io_service_pool = std::make_shared(1); metrics_io_service_pool->Run(); @@ -113,7 +114,6 @@ static inline void Shutdown() { exporter = nullptr; StatsConfig::instance().SetIsInitialized(false); } - } // namespace stats } // namespace ray diff --git a/src/ray/stats/tag_defs.h b/src/ray/stats/tag_defs.h index 462a05f2dfad9..19e35ac684551 100644 --- a/src/ray/stats/tag_defs.h +++ b/src/ray/stats/tag_defs.h @@ -20,6 +20,8 @@ using TagKeyType = opencensus::tags::TagKey; using TagsType = std::vector>; +static const TagKeyType ComponentKey = TagKeyType::Register("Component"); + static const TagKeyType JobNameKey = TagKeyType::Register("JobName"); static const TagKeyType CustomKey = TagKeyType::Register("CustomKey");