Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Used labels and service while calling metrics-rs macros #234

Merged
merged 51 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
4ae7934
Added mock counter with node descriptor as metric key
Sanskar95 Jun 28, 2021
15e1389
Added gauge to get the number of messages handled at one node
Sanskar95 Jul 2, 2021
5af93ac
removed unused code and libraries
Sanskar95 Jul 2, 2021
12fab89
some refactorings
Sanskar95 Jul 2, 2021
456708b
some refactorings
Sanskar95 Jul 2, 2021
7c1316f
Added perf evets library where hardware metrics are getting exported …
Sanskar95 Jul 9, 2021
72f627e
added message per second metric exposed over a prometheus gauge
Sanskar95 Jul 9, 2021
78f0c5f
added more hardware metrics and some code refactor
Sanskar95 Jul 10, 2021
79be73e
using tota events method to get batch events
Sanskar95 Jul 11, 2021
5b290ab
added all the hardware metrics , configurable through OpearatorConf
Sanskar95 Jul 13, 2021
94477bc
Added hardware_counter feature flag and removed old metric structs fr…
Sanskar95 Jul 14, 2021
98b9a53
Added node run time event: inbound throughput through the meter
Sanskar95 Jul 16, 2021
819a9be
Variable name refactor
Sanskar95 Jul 16, 2021
afc8898
Added epoch counter node run time metric
Sanskar95 Jul 16, 2021
60b6595
Added watermark_counter
Sanskar95 Jul 16, 2021
4ad0428
added mock metrics to source node ,like error counter
Sanskar95 Jul 19, 2021
3a7cb4f
added mock metrics to source node ,like error counter
Sanskar95 Jul 19, 2021
233d196
Added description in source node so as to name the metrics
Sanskar95 Jul 19, 2021
88025fc
Counting the messages in Poll ready in process fn of SourceNode
Sanskar95 Jul 20, 2021
bc4bed7
Adding custom gauges in counter through operatorContext
Sanskar95 Jul 21, 2021
baa8068
Added metrics feature flags at required places
Sanskar95 Jul 21, 2021
118578d
Changed the join method to concatenate two string references
Sanskar95 Jul 21, 2021
a26c190
Changed to format from join method to concatenate string references
Sanskar95 Jul 21, 2021
a01c3f0
Code Cleanup
Sanskar95 Jul 22, 2021
b6a9899
changed to counter from gauge for watermark and epoch counter
Sanskar95 Jul 22, 2021
abb3e5b
Added [cfg(not(test))] for hardware counters
Sanskar95 Jul 22, 2021
5d5d9ed
added description about feature flags
Sanskar95 Jul 23, 2021
5c80ea9
syncing kafka_source with master to avoid conflicts
Sanskar95 Jul 23, 2021
677c52c
Added [cfg(feature="metrics")] to rest of the places
Sanskar95 Jul 23, 2021
ba1cd6a
Changed struct naming scheme and some refactor
Sanskar95 Jul 23, 2021
01b26b3
Code Refactor
Sanskar95 Jul 23, 2021
93b97bc
register_hardware_metric_gauges method name change
Sanskar95 Jul 23, 2021
b8ee3e1
Refactor variable names
Sanskar95 Jul 23, 2021
eb5ccc6
Resolving merge conflicts due to upstream changes
Sanskar95 Jul 23, 2021
48599cf
Enabled target os linux for perf event
Sanskar95 Jul 23, 2021
93887f9
added #[cfg(all(feature = "hardware_counters", target_os = "linux"))]
Sanskar95 Jul 23, 2021
39a5de8
Fix
Sanskar95 Jul 23, 2021
5aabaff
PR review changes and added a software metric batch_execution time
Sanskar95 Jul 26, 2021
87118f8
Code formatting
Sanskar95 Jul 26, 2021
e19336d
Improving in code docs
Sanskar95 Jul 26, 2021
c27dcf8
Changed to increment_counter! macro from counter! where the value is …
Sanskar95 Jul 27, 2021
5d6f23a
PR review comments changes
Sanskar95 Jul 27, 2021
5dbbd2e
Removed counter fields from struct
Sanskar95 Jul 27, 2021
be4ea1f
Used labels and service while calling metrics-rs macros
Sanskar95 Jul 29, 2021
ae63104
Resolved merge conflicts from upstream master
Sanskar95 Jul 29, 2021
5b383ed
Checking in the updated CI file
Sanskar95 Jul 29, 2021
7fa1843
Code Refactor
Sanskar95 Jul 29, 2021
3a11d0c
removed comments
Sanskar95 Jul 29, 2021
4d9f22e
Shifted group creation from constructor to handle_message
Sanskar95 Aug 4, 2021
114e253
removed redundant code and some refactor
Sanskar95 Aug 4, 2021
c09681b
removed var type from tuples while creating counters
Sanskar95 Aug 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions src/metrics/perf_event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::prelude::alloc::fmt::Formatter;
use metrics::register_histogram;
use perf_event::{events::Hardware, Counter, Group};
use perf_event::events::Hardware;
use serde::Deserialize;
use std::fmt;

Expand Down Expand Up @@ -66,26 +65,3 @@ impl PerfEvents {
self.counters.push(hardware_metric_kind);
}
}

pub struct HardwareMetricGroup {
pub group: Group,
pub counters: Vec<(String, Counter)>,
}

impl HardwareMetricGroup {
pub(crate) fn register_hardware_metric_gauges(
&mut self,
node_name: String,
perf_events: PerfEvents,
) -> std::io::Result<()> {
let iterator = perf_events.counters.iter();
for value in iterator {
register_histogram!(self.get_field_gauge_name(&node_name, &value.to_string()));
}
self.group.enable()
}

pub fn get_field_gauge_name(&self, field_name: &str, node_name: &str) -> String {
format!("{}_{}", node_name, field_name)
}
}
12 changes: 2 additions & 10 deletions src/metrics/runtime_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
use crate::metrics::meter::Meter;
use metrics::{register_counter, register_gauge, register_histogram};

#[derive(Clone, Debug, Default)]
pub struct NodeMetrics {
pub inbound_throughput: Meter,
}

impl NodeMetrics {
pub fn new(node_name: &str) -> NodeMetrics {
register_gauge!(format!("{}_{}", node_name, "inbound_throughput"));
register_counter!(format!("{}_{}", node_name, "epoch_counter"));
register_counter!(format!("{}_{}", node_name, "watermark_counter"));
register_histogram!(format!("{}_{}", node_name, "batch_execution_time"));

pub fn new() -> NodeMetrics {
NodeMetrics {
inbound_throughput: Meter::new(),
}
Expand All @@ -22,9 +16,7 @@ pub struct SourceMetrics {
pub incoming_message_rate: Meter,
}
impl SourceMetrics {
pub fn new(source_node_name: &str) -> SourceMetrics {
register_gauge!(format!("{}_{}", source_node_name, "incoming_message_rate"));
register_counter!(format!("{}_{}", source_node_name, "error_counter"));
pub fn new() -> SourceMetrics {
SourceMetrics {
incoming_message_rate: Meter::new(),
}
Expand Down
94 changes: 47 additions & 47 deletions src/stream/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub mod debug;
pub mod source;

#[cfg(feature = "metrics")]
use metrics::{gauge, histogram, increment_counter};
use metrics::{
gauge, histogram, increment_counter, register_counter, register_gauge, register_histogram,
};

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
use perf_event::{Builder, Group};
Expand Down Expand Up @@ -38,7 +40,7 @@ use std::{cell::UnsafeCell, sync::Arc};
pub type NodeDescriptor = String;

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
use crate::metrics::perf_event::{HardwareMetricGroup, PerfEvents};
use crate::metrics::perf_event::PerfEvents;

#[cfg(feature = "metrics")]
use crate::metrics::runtime_metrics::NodeMetrics;
Expand Down Expand Up @@ -140,7 +142,7 @@ where
logger: ArconLogger,

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
hardware_metric_group: HardwareMetricGroup,
perf_events: PerfEvents,

#[cfg(feature = "metrics")]
node_metrics: NodeMetrics,
Expand Down Expand Up @@ -168,32 +170,19 @@ where
let timer = ArconTimer::new(timer_id, backend);

#[cfg(feature = "metrics")]
let borrowed_descriptor: &str = &descriptor.clone();
{
register_gauge!("inbound_throughput", "node" => descriptor.clone());
register_counter!("epoch_counter", "node" => descriptor.clone());
register_counter!("watermark_counter", "node" => descriptor.clone());
register_histogram!("batch_execution_time","execution time per events batch","node" => descriptor.clone());
}

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
let hardware_metric_group = {
let hardware_metrics_group = Group::new().unwrap();
let mut hardware_metric_group = HardwareMetricGroup {
group: hardware_metrics_group,
counters: vec![],
};

let iterator = perf_events.counters.iter();
for val in iterator {
hardware_metric_group.counters.push((
val.to_string(),
Builder::new()
.group(&mut hardware_metric_group.group)
.kind(val.get_hardware_kind())
.build()
.unwrap(),
));
{
for value in perf_events.counters.iter() {
register_histogram!(value.to_string(),"node" => descriptor.clone());
}
hardware_metric_group
.register_hardware_metric_gauges(descriptor.clone(), perf_events)
.ok();
hardware_metric_group
};
}

Node {
ctx: ComponentContext::uninitialised(),
Expand All @@ -207,10 +196,10 @@ where
logger,

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
hardware_metric_group,
perf_events,

#[cfg(feature = "metrics")]
node_metrics: NodeMetrics::new(borrowed_descriptor),
node_metrics: NodeMetrics::new(),
}
}

Expand All @@ -236,40 +225,50 @@ where
return Ok(());
}

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
Copy link
Member

@Max-Meldrum Max-Meldrum Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registration should happen in Node::new().

I think we can remove the HardwareMetricGroup now that we operate locally. Also, a lot of code can be removed.

We don't need to reset the group.

for iterators, no need to store the iterator in a variable.

for counter in self.perf_events.counters.iter() {
}
// instead of
let iterator = self.perf_events.counters.iter();
for counter in iterator {

}

The code below has not been verified to compile but should be possible to write it like that.

// Node constructor
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
for counter in self.perf_events.counters.iter() {
    register_histogram!(counter.to_string(),"node" => self.descriptor.clone());
}

// handle message

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
let mut (group, counters) = {
    let group = Group::new()?;
    let counters = Vec::with_capacity(self.perf_events.counters.len());
    for hardware_counter in self.perf_events.counters.iter() {
        let counter = Builder::new()
                        .group(&mut group)
                        .kind(hardware_counter.get_hardware_kind())
                        .build()?;
                        
        counters.push((hardware_counter.to_string(), counter));
    }
    (group, counters)
};

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
group.enable()?;

// do work

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
group.disable()?;

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
let counts = group.read()?;

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
for (metric_name, counter) in counters.iter() {
  histogram!(String::from(metric_name), counts[counter] as f64, "node" => self.descriptor.clone());
}

let (mut group, counters) = {
let mut group = Group::new()?;
let mut counters = Vec::with_capacity(self.perf_events.counters.len());
for hardware_counter in self.perf_events.counters.iter() {
let counter = Builder::new()
.group(&mut group)
.kind(hardware_counter.get_hardware_kind())
.build()?;

counters.push((hardware_counter.to_string(), counter));
}
(group, counters)
};

#[cfg(feature = "metrics")]
let start_time = Instant::now();

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
group.enable()?;

match message {
MessageContainer::Raw(r) => self.handle_events(r.sender, r.events)?,
MessageContainer::Local(l) => self.handle_events(l.sender, l.events)?,
}

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
group.disable()?;

#[cfg(feature = "metrics")]
{
let elapsed = start_time.elapsed();
histogram!(
format!("{}_{}", &self.descriptor, "batch_execution_time"),
elapsed.as_micros() as f64
);
histogram!("batch_execution_time", elapsed.as_micros() as f64,"node" => self.descriptor.clone());
}

#[cfg(feature = "metrics")]
gauge!(
format!("{}_{}", &self.descriptor, "inbound_throughput"),
self.node_metrics.inbound_throughput.get_one_min_rate()
);
gauge!("inbound_throughput", self.node_metrics.inbound_throughput.get_one_min_rate(), "node" => self.descriptor.clone());

#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
{
let counts = self.hardware_metric_group.group.read()?;
let counter_iterator = self.hardware_metric_group.counters.iter();
for (metric_name, counter) in counter_iterator {
histogram!(
self.hardware_metric_group
.get_field_gauge_name(metric_name, &self.descriptor),
counts[counter] as f64
);
let counts = group.read()?;
for (metric_name, counter) in counters.iter() {
histogram!(String::from(metric_name), counts[counter] as f64, "node" => self.descriptor.clone());
}
self.hardware_metric_group.group.reset()?;
}

Ok(())
Expand Down Expand Up @@ -342,7 +341,7 @@ where
};

#[cfg(feature = "metrics")]
increment_counter!(format!("{}_{}", &self.descriptor, "watermark_counter"));
increment_counter!("watermark_counter", "node" => self.descriptor.clone());

// Forward the watermark
unsafe {
Expand Down Expand Up @@ -408,7 +407,8 @@ where
#[inline]
fn complete_epoch(&mut self) -> ArconResult<()> {
#[cfg(feature = "metrics")]
increment_counter!(format!("{}_{}", &self.descriptor, "epoch_counter"));
increment_counter!("epoch_counter", "node" => self.descriptor.clone());

// flush the blocked_channels list
self.node_state.blocked_channels().clear();

Expand Down
62 changes: 40 additions & 22 deletions src/stream/node/source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020, KTH Royal Institute of Technology.
// SPDX-License-Identifier: AGPL-3.0-only
#[cfg(feature = "metrics")]
use metrics::{gauge, increment_counter};
use metrics::{gauge, increment_counter, register_counter, register_gauge};

#[cfg(feature = "metrics")]
use crate::metrics::runtime_metrics::SourceMetrics;
Expand Down Expand Up @@ -74,6 +74,13 @@ where
logger: ArconLogger,
) -> Self {
let borrowed_source_name: &str = &conf.name.clone();

#[cfg(feature = "metrics")]
{
register_gauge!("incoming_message_rate", "source" => conf.name.clone());
register_counter!("error_counter", "source" => conf.name.clone());
}

Self {
ctx: ComponentContext::uninitialised(),
manager_port: RequiredPort::uninitialised(),
Expand All @@ -88,31 +95,23 @@ where
logger,

#[cfg(feature = "metrics")]
source_metrics: SourceMetrics::new(borrowed_source_name),
source_metrics: SourceMetrics::new(),

descriptor: String::from(borrowed_source_name),
}
}
pub fn process(&mut self) -> ArconResult<()> {
pub fn process(&mut self) -> ArconResult<usize> {
let mut counter = 0;

loop {
if counter >= self.conf.batch_size {
return Ok(());
return Ok(counter);
}

let poll = self.source.poll_next()?;

match poll {
Ok(Poll::Ready(record)) => {
#[cfg(feature = "metrics")]
self.source_metrics.incoming_message_rate.mark_n(1);

#[cfg(feature = "metrics")]
gauge!(
format!("{}_{}", &self.descriptor, "incoming_message_rate"),
self.source_metrics.incoming_message_rate.get_one_min_rate()
);
match self.conf.time {
ArconTime::Event => match &self.conf.extractor {
Some(extractor) => {
Expand All @@ -129,17 +128,25 @@ where
}
Ok(Poll::Pending) => {
// nothing to collect, reschedule...
return Ok(());
return Ok(counter);
}
Ok(Poll::Done) => {
// signal end..
self.ended = true;
return Ok(());
return Ok(counter);
}
Err(error) => {
#[cfg(feature = "metrics")]
increment_counter!(format!("{}_{}", &self.descriptor, "error_counter"),);
return self.handle_source_error(error);
increment_counter!("error_counter", "source" => self.descriptor.clone());

match self.handle_source_error(error) {
Ok(_) => {
counter += 1;
}
Err(err) => {
return Err(err);
}
}
}
}
}
Expand All @@ -153,7 +160,7 @@ where
KafkaError::Canceled | KafkaError::ConsumerCommit(_) => {
return Err(crate::error::Error::Unsupported {
msg: error.to_string(),
})
});
}
_ => (),
}
Expand Down Expand Up @@ -235,12 +242,23 @@ where
S: Source,
{
fn handle(&mut self, _event: ProcessSource) -> Handled {
if let Err(error) = self.process() {
// fatal error, must shutdown..
// TODO: coordinate shutdown of the application..
error!(self.logger, "{}", error);
}
match self.process() {
#[cfg(not(feature = "metrics"))]
Ok(_) => (),
#[cfg(feature = "metrics")]
Ok(polled_records) => {
self.source_metrics
.incoming_message_rate
.mark_n(polled_records as u64);
gauge!("incoming_message_rate", self.source_metrics.incoming_message_rate.get_one_min_rate(), "source" => self.descriptor.clone());
}

Err(error) => {
// fatal error, must shutdown..
// TODO: coordinate shutdown of the application..
error!(self.logger, "{}", error);
}
}
if self.ended {
self.manager_port.trigger(SourceManagerEvent::End);
} else {
Expand Down