Skip to content

Commit

Permalink
refactor(gossip): reusable table serialisation
Browse files Browse the repository at this point in the history
Extract the "new table was created" gossip event constructor for reuse.
  • Loading branch information
domodwyer committed Sep 25, 2023
1 parent 95ae5ed commit cb1d666
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 25 deletions.
31 changes: 29 additions & 2 deletions router/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ pub mod namespace_cache;
pub mod schema_change_observer;
pub mod traits;

use data_types::NamespaceSchema;
use generated_types::influxdata::iox::gossip::v1::NamespaceCreated;
use data_types::{NamespaceName, NamespaceSchema};
use generated_types::influxdata::iox::gossip::v1::{
Column, NamespaceCreated, TableCreated, TableUpdated,
};

/// Make a `NamespaceCreated` protobuf instance from the specified name and schema.
pub(crate) fn namespace_created(
Expand All @@ -70,6 +72,31 @@ pub(crate) fn namespace_created(
}
}

/// Construct a [`TableCreated`] protobuf message for the given `schema`.
pub(crate) fn table_created(
table_name: String,
namespace_name: &NamespaceName<'_>,
schema: &data_types::TableSchema,
) -> TableCreated {
TableCreated {
table: Some(TableUpdated {
table_name,
namespace_name: namespace_name.into(),
table_id: schema.id.get(),
columns: schema
.columns
.iter()
.map(|(col_name, col_schema)| Column {
column_id: col_schema.id.get(),
name: col_name.to_owned(),
column_type: col_schema.column_type as i32,
})
.collect(),
}),
partition_template: schema.partition_template.as_proto().cloned(),
}
}

#[cfg(test)]
mod mock_schema_broadcast;

Expand Down
27 changes: 4 additions & 23 deletions router/src/gossip/schema_change_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ use std::{collections::BTreeMap, fmt::Debug, sync::Arc};

use async_trait::async_trait;
use data_types::{ColumnsByName, NamespaceName, NamespaceSchema};
use generated_types::influxdata::iox::gossip::v1::{
schema_message::Event, Column, TableCreated, TableUpdated,
};
use generated_types::influxdata::iox::gossip::v1::{schema_message::Event, Column, TableUpdated};

use crate::namespace_cache::{ChangeStats, NamespaceCache};

use super::{namespace_created, traits::SchemaBroadcast};
use super::{namespace_created, table_created, traits::SchemaBroadcast};

/// A [`NamespaceCache`] decorator implementing cluster-wide, best-effort
/// propagation of local schema changes via the gossip subsystem.
Expand Down Expand Up @@ -129,24 +127,7 @@ where
new_tables: &BTreeMap<String, data_types::TableSchema>,
) {
for (table_name, schema) in new_tables {
let msg = TableCreated {
table: Some(TableUpdated {
table_name: table_name.to_owned(),
namespace_name: namespace_name.to_string(),
table_id: schema.id.get(),
columns: schema
.columns
.iter()
.map(|(col_name, col_schema)| Column {
column_id: col_schema.id.get(),
name: col_name.to_owned(),
column_type: col_schema.column_type as i32,
})
.collect(),
}),
partition_template: schema.partition_template.as_proto().cloned(),
};

let msg = table_created(table_name.to_string(), namespace_name, schema);
self.tx.broadcast(Event::TableCreated(msg));
}
}
Expand Down Expand Up @@ -208,7 +189,7 @@ mod tests {
use data_types::{
partition_template::test_table_partition_override, ColumnId, TableId, TableSchema,
};
use generated_types::influxdata::iox::gossip::v1::column::ColumnType;
use generated_types::influxdata::iox::gossip::v1::{column::ColumnType, TableCreated};

macro_rules! test_observe {
(
Expand Down

0 comments on commit cb1d666

Please sign in to comment.