Skip to content

Commit

Permalink
feat(observability): migrating code from IC repo (#46)
Browse files Browse the repository at this point in the history
* migrating code from IC repo

* fixing conflicts

* repinning

* adding missing packages
  • Loading branch information
nikola-milosa committed Jan 8, 2024
1 parent 12868a4 commit e9a363b
Show file tree
Hide file tree
Showing 78 changed files with 9,157 additions and 98 deletions.
2,249 changes: 2,155 additions & 94 deletions Cargo.Bazel.lock

Large diffs are not rendered by default.

447 changes: 445 additions & 2 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@ members = [
"rs/decentralization",
"rs/ic-management-backend",
"rs/ic-management-types",
"rs/slack-notifications",
"rs/np-notifications",
"rs/ic-observability/config-writer-common",
"rs/ic-observability/multiservice-discovery",
"rs/ic-observability/multiservice-discovery-downloader",
"rs/ic-observability/multiservice-discovery-shared",
"rs/ic-observability/node-status-updater",
"rs/ic-observability/obs-canister-clients",
"rs/ic-observability/prometheus-config-updater",
"rs/ic-observability/service-discovery",
"rs/ic-observability/sns-downloader",
"rs/log-fetcher",
"rs/canister-log-fetcher",
"rs/np-notifications",
"rs/slack-notifications",
# Canisters below
"rs/canisters/node_status_canister/src/node_status_canister_backend",
]
Expand Down
10 changes: 10 additions & 0 deletions bazel/external_crates.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,18 @@ def external_crates_repository():
"//rs/canister-log-fetcher:Cargo.toml",
"//rs/cli:Cargo.toml",
"//rs/decentralization:Cargo.toml",
"//rs/ic-canisters:Cargo.toml",
"//rs/ic-management-backend:Cargo.toml",
"//rs/ic-management-types:Cargo.toml",
"//rs/ic-observability/config-writer-common:Cargo.toml",
"//rs/ic-observability/multiservice-discovery:Cargo.toml",
"//rs/ic-observability/multiservice-discovery-downloader:Cargo.toml",
"//rs/ic-observability/multiservice-discovery-shared:Cargo.toml",
"//rs/ic-observability/node-status-updater:Cargo.toml",
"//rs/ic-observability/obs-canister-clients:Cargo.toml",
"//rs/ic-observability/prometheus-config-updater:Cargo.toml",
"//rs/ic-observability/service-discovery:Cargo.toml",
"//rs/ic-observability/sns-downloader:Cargo.toml",
"//rs/log-fetcher:Cargo.toml",
"//rs/np-notifications:Cargo.toml",
"//rs/slack-notifications:Cargo.toml",
Expand Down
36 changes: 36 additions & 0 deletions rs/ic-observability/config-writer-common/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("@crate_index_dre//:defs.bzl", "aliases", "all_crate_deps")
load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test")

package(default_visibility = ["//visibility:public"])

DEPS = [
"//rs/ic-observability/service-discovery",
"//rs/ic-observability/multiservice-discovery-shared"
]

rust_library(
name = "config-writer-common",
srcs = glob(["src/**/*.rs"]),
aliases = aliases(),
proc_macro_deps = all_crate_deps(
proc_macro = True,
),
deps = all_crate_deps(
normal = True,
) + DEPS,
)

rust_test(
name = "unit_test",
aliases = aliases(
normal_dev = True,
proc_macro_dev = True,
),
crate = ":config-writer-common",
proc_macro_deps = all_crate_deps(
proc_macro_dev = True,
),
deps = all_crate_deps(
normal_dev = True,
) + DEPS,
)
27 changes: 27 additions & 0 deletions rs/ic-observability/config-writer-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "config-writer-common"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
regex = "1.7.0"
service-discovery = { path = "../service-discovery" }
ic-types = { git = "https://github.com/dfinity/ic.git", rev = "4b3b2ce76c4bde0c1c60fb80b0915931003b7eca" }
slog = { version = "2.7.0", features = [
"max_level_trace",
"nested-values",
"release_max_level_debug",
"release_max_level_trace",
] }
slog-async = { version = "2.5", features = ["nested-values"] }
slog-term = "2.6.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.107", features = ["std"] }
ic-utils = { git = "https://github.com/dfinity/ic.git", rev = "4b3b2ce76c4bde0c1c60fb80b0915931003b7eca" }
erased-serde = "0.3.23"
serde_derive = "1.0.150"
crossbeam = "0.8.0"
crossbeam-channel = "0.5.5"
url = { version = "2.1.1", features = ["serde"] }
14 changes: 14 additions & 0 deletions rs/ic-observability/config-writer-common/src/config_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use std::collections::BTreeSet;
use std::fmt::Debug;

use service_discovery::{jobs::Job, TargetGroup};

pub trait Config: erased_serde::Serialize + Debug {
fn updated(&self) -> bool;
fn name(&self) -> String;
}
erased_serde::serialize_trait_object!(Config);

pub trait ConfigBuilder {
fn build(&mut self, target_groups: BTreeSet<TargetGroup>, job: Job) -> Box<dyn Config>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use std::error::Error;

use crate::config_builder::Config;

pub trait ConfigUpdater {
fn update(&self, config: &dyn Config) -> Result<(), Box<dyn Error>>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::{collections::BTreeSet, sync::Arc};

use crossbeam::select;
use crossbeam_channel::Receiver;
use service_discovery::metrics::Metrics;
use service_discovery::{jobs::Job, IcServiceDiscovery, TargetGroup};
use slog::{info, warn};

use crate::{
config_builder::ConfigBuilder, config_updater::ConfigUpdater, filters::TargetGroupFilter,
};

pub fn config_updater_loop(
log: slog::Logger,
discovery: Arc<dyn IcServiceDiscovery>,
filters: Arc<dyn TargetGroupFilter>,
shutdown_signal: Receiver<()>,
jobs: Vec<Job>,
update_signal_recv: Receiver<()>,
mut config_builder: impl ConfigBuilder,
config_updater: impl ConfigUpdater,
metrics: Metrics,
) -> impl FnMut() {
move || loop {
for job in &jobs {
let target_groups = match discovery.get_target_groups(job._type, log.clone()) {
Ok(t) => t,
Err(e) => {
warn!(
log,
"Failed to retrieve targets for job {}: {:?}", job._type, e
);
continue;
}
};
let filtered_target_groups: BTreeSet<TargetGroup> = target_groups
.clone()
.into_iter()
.filter(|tg| TargetGroupFilter::filter(filters.as_ref(), tg.clone()))
.collect();

metrics
.total_targets
.with_label_values(&[job._type.to_string().as_str()])
.set(target_groups.len().try_into().unwrap());

let config = config_builder.build(filtered_target_groups, job.clone());
let config_binding = config.as_ref();
if let Err(e) = config_updater.update(config_binding) {
warn!(log, "Failed to write config {}: {:?}", &config.name(), e);
};
}
select! {
recv(shutdown_signal) -> _ => {
info!(log, "Received shutdown signal");
break;
},
recv(update_signal_recv) -> _ => continue,
};
}
}
109 changes: 109 additions & 0 deletions rs/ic-observability/config-writer-common/src/config_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::{
collections::{BTreeMap, BTreeSet},
error::Error,
path::{Path, PathBuf},
sync::Arc,
};

use service_discovery::{job_types::JobType, TargetGroup};

use crate::{
config_builder::Config, config_updater::ConfigUpdater, filters::TargetGroupFilter,
vector_config_structure::VectorConfigBuilder,
};
use slog::{debug, Logger};

#[derive(Debug)]
pub struct ConfigWriter {
base_directory: PathBuf,
last_targets: BTreeMap<String, BTreeSet<TargetGroup>>,
filters: Arc<dyn TargetGroupFilter>,
log: slog::Logger,
}

impl ConfigWriter {
pub fn new<P: AsRef<Path>>(
write_path: P,
filters: Arc<dyn TargetGroupFilter>,
log: Logger,
) -> Self {
ConfigWriter {
base_directory: PathBuf::from(write_path.as_ref()),
last_targets: Default::default(),
filters,
log,
}
}

/// Write configuration files for the job `job_name`.
///
/// The assumption is that no external process manipulates or deletes the written files.
/// FileSd will memoize the calls. Thus, calling this method twice with the
/// same arguments will have no effect.
pub fn write_config(
&mut self,
job: JobType,
target_groups: BTreeSet<TargetGroup>,
vector_config_builder: &impl VectorConfigBuilder,
) -> std::io::Result<()> {
let last_job_targets = self.last_targets.entry(job.to_string()).or_default();
if last_job_targets == &target_groups {
debug!(
self.log,
"Targets didn't change, skipped regenerating config"
);
return Ok(());
}
debug!(
self.log,
"Targets changed, proceeding with regenerating config"
);
let target_path = self.base_directory.join(format!("{}.json", job));

let filtered_target_groups: BTreeSet<TargetGroup> = target_groups
.clone()
.into_iter()
.filter(|tg| self.filters.filter(tg.clone()))
.collect();

let vector_config = vector_config_builder.build(filtered_target_groups, job);

ic_utils::fs::write_atomically(target_path.as_path(), |f| {
serde_json::to_writer_pretty(f, &vector_config).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Serialization error: {:?}", e),
)
})
})?;
self.last_targets.insert(job.to_string(), target_groups);
Ok(())
}
}

impl ConfigUpdater for ConfigWriter {
fn update(&self, config: &dyn Config) -> Result<(), Box<dyn Error>> {
if !config.updated() {
debug!(
self.log,
"Targets didn't change, skipped regenerating config"
);
return Ok(());
}
debug!(
self.log,
"Targets changed, proceeding with regenerating config"
);
let target_path = self.base_directory.join(format!("{}.json", config.name()));

ic_utils::fs::write_atomically(target_path.as_path(), |f| {
serde_json::to_writer_pretty(f, &config).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Serialization error: {:?}", e),
)
})
})?;
Ok(())
}
}
60 changes: 60 additions & 0 deletions rs/ic-observability/config-writer-common/src/config_writer_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! An experimental component that allows scraping logs using the http-endpoint
//! exposed by systemd-journal-gatewayd.
use crossbeam::select;
use service_discovery::metrics::Metrics;
use std::path::PathBuf;
use std::sync::Arc;

use crossbeam_channel::Receiver;
use slog::{info, warn};

use service_discovery::{job_types::JobType, IcServiceDiscovery};

use crate::config_writer::ConfigWriter;
use crate::filters::TargetGroupFilter;
use crate::vector_config_structure::VectorConfigBuilder;

pub fn config_writer_loop(
log: slog::Logger,
discovery: Arc<dyn IcServiceDiscovery>,
filters: Arc<dyn TargetGroupFilter>,
shutdown_signal: Receiver<()>,
jobs: Vec<JobType>,
update_signal_recv: Receiver<()>,
vector_config_dir: PathBuf,
vector_config_builder: impl VectorConfigBuilder,
metrics: Metrics,
) -> impl FnMut() {
move || {
let mut config_writer =
ConfigWriter::new(vector_config_dir.clone(), filters.clone(), log.clone());
loop {
for job in &jobs {
let targets = match discovery.get_target_groups(*job, log.clone()) {
Ok(t) => t,
Err(e) => {
warn!(log, "Failed to retrieve targets for job {}: {:?}", job, e);
continue;
}
};
metrics
.total_targets
.with_label_values(&[job.to_string().as_str()])
.set(targets.len().try_into().unwrap());
if let Err(e) = config_writer.write_config(*job, targets, &vector_config_builder) {
warn!(
log,
"Failed to write config for targets for job {}: {:?}", job, e
);
};
}
select! {
recv(shutdown_signal) -> _ => {
info!(log, "Received shutdown signal in log_scraper");
break;
},
recv(update_signal_recv) -> _ => continue,
};
}
}
}
Loading

0 comments on commit e9a363b

Please sign in to comment.