Skip to content

Commit

Permalink
feat(collector)!: add collector config and solidifier names (#134)
Browse files Browse the repository at this point in the history
* Add collector config and solidifier names

* fmt

* clamp

Co-authored-by: Jochen Görtler <[email protected]>
  • Loading branch information
Alexandcoats and grtlr authored May 10, 2022
1 parent ba0b82a commit 095921b
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 12 deletions.
3 changes: 3 additions & 0 deletions bin/inx-chronicle/config.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ connection_retry_interval = "5s"
[api]
port = 9092
allow_origins = ["0.0.0.0"]

[collector]
solidifier_count = 10
3 changes: 3 additions & 0 deletions bin/inx-chronicle/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ pub struct CliArgs {
/// The address of the MongoDB database.
#[clap(long)]
pub db: Option<String>,
/// The number of solidifier worker tasks to spawn.
#[clap(long)]
pub solidifier_count: Option<usize>,
}
25 changes: 25 additions & 0 deletions bin/inx-chronicle/src/collector/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CollectorConfig {
pub solidifier_count: usize,
}

impl CollectorConfig {
const MAX_SOLIDIFIERS: usize = 100;

pub fn new(solidifier_count: usize) -> Self {
Self {
solidifier_count: solidifier_count.clamp(1, Self::MAX_SOLIDIFIERS),
}
}
}

impl Default for CollectorConfig {
fn default() -> Self {
Self::new(10)
}
}
17 changes: 7 additions & 10 deletions bin/inx-chronicle/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use chronicle::{
db::{bson::DocError, MongoDb},
runtime::{Actor, ActorContext, ActorError, Addr, ConfigureActor, HandleEvent, Report, RuntimeError},
};
pub use config::CollectorConfig;
use mongodb::bson::document::ValueAccessError;
use solidifier::Solidifier;
use thiserror::Error;

mod config;
pub mod solidifier;

#[derive(Debug, Error)]
Expand All @@ -29,17 +31,12 @@ pub enum CollectorError {
#[derive(Debug)]
pub struct Collector {
db: MongoDb,
solidifier_count: usize,
config: CollectorConfig,
}

impl Collector {
const MAX_SOLIDIFIERS: usize = 100;

pub fn new(db: MongoDb, solidifier_count: usize) -> Self {
Self {
db,
solidifier_count: solidifier_count.max(1).min(Self::MAX_SOLIDIFIERS),
}
pub fn new(db: MongoDb, config: CollectorConfig) -> Self {
Self { db, config }
}
}

Expand All @@ -50,7 +47,7 @@ impl Actor for Collector {

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
let mut solidifiers = HashMap::new();
for i in 0..self.solidifier_count {
for i in 0..self.config.solidifier_count {
solidifiers.insert(
i,
cx.spawn_child(Solidifier::new(i, self.db.clone()).with_registration(false))
Expand Down Expand Up @@ -211,7 +208,7 @@ pub mod stardust {
.extend(Vec::from(rec.payload.essence.parents).into_iter());
solidifiers
// Divide solidifiers fairly by milestone
.get(&(rec.milestone_index as usize % self.solidifier_count))
.get(&(rec.milestone_index as usize % self.config.solidifier_count))
// Unwrap: We never remove solidifiers, so they should always exist
.unwrap()
.send(state)?;
Expand Down
4 changes: 4 additions & 0 deletions bin/inx-chronicle/src/collector/solidifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl Actor for Solidifier {
async fn init(&mut self, _cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
Ok(())
}

fn name(&self) -> std::borrow::Cow<'static, str> {
format!("Solidifier {}", self.id).into()
}
}

#[cfg(feature = "stardust")]
Expand Down
5 changes: 5 additions & 0 deletions bin/inx-chronicle/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use thiserror::Error;

#[cfg(feature = "api")]
use crate::api::ApiConfig;
use crate::collector::CollectorConfig;
#[cfg(feature = "inx")]
use crate::inx::InxConfig;

Expand All @@ -28,6 +29,7 @@ pub struct ChronicleConfig {
pub inx: InxConfig,
#[cfg(feature = "api")]
pub api: ApiConfig,
pub collector: CollectorConfig,
}

impl ChronicleConfig {
Expand All @@ -46,6 +48,9 @@ impl ChronicleConfig {
if let Some(connect_url) = args.db {
self.mongodb = MongoDbConfig::new().with_connect_url(connect_url);
}
if let Some(solidifier_count) = args.solidifier_count {
self.collector = CollectorConfig::new(solidifier_count);
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions bin/inx-chronicle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ impl Actor for Launcher {
log::info!("No node status has been found in the database, it seems like the database is empty.");
};

cx.spawn_child(Collector::new(db.clone(), 1)).await;
cx.spawn_child(Collector::new(db.clone(), config.collector.clone()))
.await;

#[cfg(feature = "inx")]
cx.spawn_child(InxWorker::new(config.inx.clone())).await;
Expand Down Expand Up @@ -101,7 +102,7 @@ impl HandleEvent<Report<Collector>> for Launcher {
// Only a few possible errors we could potentially recover from
ErrorKind::Io(_) | ErrorKind::ServerSelection { message: _, .. } => {
let db = MongoDb::connect(&config.mongodb).await?;
cx.spawn_child(Collector::new(db, 1)).await;
cx.spawn_child(Collector::new(db, config.collector.clone())).await;
}
_ => {
cx.shutdown();
Expand Down

0 comments on commit 095921b

Please sign in to comment.