From 12cd507672eb1b512f381d5610af9e9611e6159e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 30 Jun 2023 17:55:29 +0100 Subject: [PATCH 1/3] refactor(stages): use MetricsListener for Execution gas metric --- crates/stages/src/metrics/listener.rs | 11 ++++++++ crates/stages/src/metrics/sync_metrics.rs | 9 +++++++ crates/stages/src/stages/execution.rs | 31 ++++++++++++----------- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/crates/stages/src/metrics/listener.rs b/crates/stages/src/metrics/listener.rs index f6672a4e68d..c723402255a 100644 --- a/crates/stages/src/metrics/listener.rs +++ b/crates/stages/src/metrics/listener.rs @@ -1,5 +1,6 @@ use crate::metrics::{StageMetrics, SyncMetrics}; use reth_primitives::{ + constants::MGAS_TO_GAS, stage::{StageCheckpoint, StageId}, BlockNumber, }; @@ -26,6 +27,11 @@ pub enum MetricEvent { /// If specified, `entities_total` metric is updated. max_block_number: Option, }, + /// Execution stage processed some amount of gas. + ExecutionStageGas { + /// Gas processed. + gas: u64, + }, } /// Metrics routine that listens to new metric events on the `events_rx` receiver. @@ -62,6 +68,11 @@ impl MetricsListener { stage_metrics.entities_total.set(total as f64); } } + MetricEvent::ExecutionStageGas { gas } => self + .sync_metrics + .execution_stage + .mgas_processed_total + .increment(gas as f64 / MGAS_TO_GAS as f64), } } } diff --git a/crates/stages/src/metrics/sync_metrics.rs b/crates/stages/src/metrics/sync_metrics.rs index 859a7e6d778..ba560c5399c 100644 --- a/crates/stages/src/metrics/sync_metrics.rs +++ b/crates/stages/src/metrics/sync_metrics.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; #[derive(Debug, Default)] pub(crate) struct SyncMetrics { pub(crate) stages: HashMap, + pub(crate) execution_stage: ExecutionStageMetrics, } #[derive(Metrics)] @@ -20,3 +21,11 @@ pub(crate) struct StageMetrics { /// The number of total entities of the last commit for a stage, if applicable. pub(crate) entities_total: Gauge, } + +/// Execution stage metrics. +#[derive(Metrics)] +#[metrics(scope = "sync.execution")] +pub(crate) struct ExecutionStageMetrics { + /// The total amount of gas processed (in millions) + pub(crate) mgas_processed_total: Gauge, +} diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index e54e7a446d2..4ab996950c4 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,4 +1,7 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; +use crate::{ + ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError, UnwindInput, + UnwindOutput, +}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, @@ -25,14 +28,6 @@ use reth_provider::{ use std::{ops::RangeInclusive, time::Instant}; use tracing::*; -/// Execution stage metrics. -#[derive(Metrics)] -#[metrics(scope = "sync.execution")] -pub struct ExecutionStageMetrics { - /// The total amount of gas processed (in millions) - mgas_processed_total: Gauge, -} - /// The execution stage executes all transactions and /// update history indexes. /// @@ -64,7 +59,7 @@ pub struct ExecutionStageMetrics { // false positive, we cannot derive it if !DB: Debug. #[allow(missing_debug_implementations)] pub struct ExecutionStage { - metrics: ExecutionStageMetrics, + metrics_tx: Option, /// The stage's internal executor executor_factory: EF, /// The commit thresholds of the execution stage. @@ -74,7 +69,7 @@ pub struct ExecutionStage { impl ExecutionStage { /// Create new execution stage with specified config. pub fn new(executor_factory: EF, thresholds: ExecutionStageThresholds) -> Self { - Self { metrics: ExecutionStageMetrics::default(), executor_factory, thresholds } + Self { metrics_tx: None, executor_factory, thresholds } } /// Create an execution stage with the provided executor factory. @@ -84,9 +79,14 @@ impl ExecutionStage { Self::new(executor_factory, ExecutionStageThresholds::default()) } + pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { + self.metrics_tx = Some(metrics_tx); + self + } + /// Execute the stage. pub fn execute_inner( - &self, + &mut self, provider: &DatabaseProviderRW<'_, &DB>, input: ExecInput, ) -> Result { @@ -129,9 +129,10 @@ impl ExecutionStage { })?; // Gas metrics - self.metrics - .mgas_processed_total - .increment(block.header.gas_used as f64 / MGAS_TO_GAS as f64); + if let Some(metrics_tx) = &mut self.metrics_tx { + let _ = + metrics_tx.send(MetricEvent::ExecutionStageGas { gas: block.header.gas_used }); + } // Merge state changes state.extend(block_state); From 98c7001ce7158b4ccb0feb4e3e52dd616037c788 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 30 Jun 2023 19:08:42 +0100 Subject: [PATCH 2/3] fix lint --- bin/reth/src/node/mod.rs | 2 +- crates/stages/src/pipeline/builder.rs | 2 +- crates/stages/src/stages/execution.rs | 6 +----- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index d8c3cbf5a0c..f4dae495934 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -681,7 +681,7 @@ impl Command { if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder .with_tip_sender(tip_tx) - .with_metric_events(metrics_tx) + .with_metrics_tx(metrics_tx) .add_stages( DefaultStages::new( header_mode, diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 5e72da6b030..7679361c839 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -62,7 +62,7 @@ where } /// Set the metric events sender. - pub fn with_metric_events(mut self, metrics_tx: MetricEventsSender) -> Self { + pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { self.metrics_tx = Some(metrics_tx); self } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 4ab996950c4..38284b1d125 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -10,12 +10,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_interfaces::db::DatabaseError; -use reth_metrics::{ - metrics::{self, Gauge}, - Metrics, -}; use reth_primitives::{ - constants::MGAS_TO_GAS, stage::{ CheckpointBlockRange, EntitiesCheckpoint, ExecutionCheckpoint, StageCheckpoint, StageId, }, @@ -79,6 +74,7 @@ impl ExecutionStage { Self::new(executor_factory, ExecutionStageThresholds::default()) } + /// Set the metric events sender. pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { self.metrics_tx = Some(metrics_tx); self From 44cd141b19827a9307a7f580b5faca867d7a9450 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 3 Jul 2023 13:22:30 +0100 Subject: [PATCH 3/3] pass metrics_tx to execution stage --- bin/reth/src/node/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 905bf32d044..cb4a9c37726 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -690,7 +690,7 @@ impl Command { if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder .with_tip_sender(tip_tx) - .with_metrics_tx(metrics_tx) + .with_metrics_tx(metrics_tx.clone()) .add_stages( DefaultStages::new( header_mode, @@ -706,13 +706,16 @@ impl Command { .set(SenderRecoveryStage { commit_threshold: stage_config.sender_recovery.commit_threshold, }) - .set(ExecutionStage::new( - factory, - ExecutionStageThresholds { - max_blocks: stage_config.execution.max_blocks, - max_changes: stage_config.execution.max_changes, - }, - )) + .set( + ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: stage_config.execution.max_blocks, + max_changes: stage_config.execution.max_changes, + }, + ) + .with_metrics_tx(metrics_tx), + ) .set(AccountHashingStage::new( stage_config.account_hashing.clean_threshold, stage_config.account_hashing.commit_threshold,