Skip to content

Commit

Permalink
feat: add metasrv start time to node info
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed May 8, 2024
1 parent cc8d6b1 commit c52b0b0
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "07de9a47ed46eae0fab4cbc5c32a17c67ed5cb38" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e15ed92dfe60cb75946038844ddc9a12079fb4ba" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
5 changes: 2 additions & 3 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ impl ClusterInfo for MetaClient {

let mut nodes = if get_metasrv_nodes {
let last_activity_ts = -1; // Metasrv does not provide this information.
let start_time_ms = 0;

let (leader, followers) = cluster_client.get_metasrv_peers().await?;
followers
Expand All @@ -275,15 +274,15 @@ impl ClusterInfo for MetaClient {
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: node.version,
git_commit: node.git_commit,
start_time_ms,
start_time_ms: node.start_time_ms,
})
.chain(leader.into_iter().map(|node| NodeInfo {
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: node.version,
git_commit: node.git_commit,
start_time_ms,
start_time_ms: node.start_time_ms,
}))
.collect::<Vec<_>>()
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub trait Election: Send + Sync {
fn in_infancy(&self) -> bool;

/// Registers a candidate for the election.
async fn register_candidate(&self) -> Result<()>;
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;

/// Gets all candidates in the election.
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
Expand Down
12 changes: 3 additions & 9 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Election for EtcdElection {
.is_ok()
}

async fn register_candidate(&self) -> Result<()> {
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;

Expand All @@ -144,15 +144,9 @@ impl Election for EtcdElection {

// The register info: key is the candidate key, value is its node info(addr, version, git_commit).
let key = self.candidate_key().into_bytes();
let build_info = common_version::build_info();
let value = MetasrvNodeInfo {
addr: self.leader_value.clone(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
};
let value = serde_json::to_string(&value)
let value = serde_json::to_string(node_info)
.with_context(|_| error::SerializeToJsonSnafu {
input: format!("{value:?}"),
input: format!("{node_info:?}"),
})?
.into_bytes();
// Puts with the lease id
Expand Down
35 changes: 26 additions & 9 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ pub struct MetasrvNodeInfo {
pub version: String,
// The node build git commit hash
pub git_commit: String,
// The node start timestamp in milliseconds
pub start_time_ms: u64,
}

impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
Expand All @@ -236,6 +238,7 @@ impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
}),
version: node_info.version,
git_commit: node_info.git_commit,
start_time_ms: node_info.start_time_ms,
}
}
}
Expand Down Expand Up @@ -305,6 +308,7 @@ impl MetaStateHandler {
pub struct Metasrv {
state: StateRef,
started: Arc<AtomicBool>,
start_time_ms: u64,
options: MetasrvOptions,
// It is only valid at the leader node and is used to temporarily
// store some data that will not be persisted.
Expand Down Expand Up @@ -339,7 +343,11 @@ impl Metasrv {
return Ok(());
}

self.create_default_schema_if_not_exist().await?;
// Creates default schema if not exists
self.table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;

if let Some(election) = self.election() {
let procedure_manager = self.procedure_manager.clone();
Expand Down Expand Up @@ -394,9 +402,10 @@ impl Metasrv {
{
let election = election.clone();
let started = self.started.clone();
let node_info = self.node_info();
let _handle = common_runtime::spawn_bg(async move {
while started.load(Ordering::Relaxed) {
let res = election.register_candidate().await;
let res = election.register_candidate(&node_info).await;
if let Err(e) = res {
warn!("Metasrv register candidate error: {}", e);
}
Expand Down Expand Up @@ -435,14 +444,8 @@ impl Metasrv {
}

info!("Metasrv started");
Ok(())
}

async fn create_default_schema_if_not_exist(&self) -> Result<()> {
self.table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)
Ok(())
}

pub async fn shutdown(&self) -> Result<()> {
Expand All @@ -453,6 +456,20 @@ impl Metasrv {
.context(StopProcedureManagerSnafu)
}

pub fn start_time_ms(&self) -> u64 {
self.start_time_ms
}

pub fn node_info(&self) -> MetasrvNodeInfo {
let build_info = common_version::build_info();
MetasrvNodeInfo {
addr: self.options().server_addr.clone(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms: self.start_time_ms(),
}
}

/// Lookup a peer by peer_id, return it only when it's alive.
pub(crate) async fn lookup_peer(
&self,
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl MetasrvBuilder {
Ok(Metasrv {
state,
started,
start_time_ms: common_time::util::current_time_millis() as u64,
options,
in_memory,
kv_backend,
Expand Down
33 changes: 16 additions & 17 deletions src/meta-srv/src/service/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,35 +88,23 @@ impl cluster_server::Cluster for Metasrv {
return Ok(Response::new(resp));
}

fn make_node_info(addr: String) -> Option<MetasrvNodeInfo> {
let build_info = common_version::build_info();
Some(
metasrv::MetasrvNodeInfo {
addr,
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
}
.into(),
)
}

let leader_addr = &self.options().server_addr;
let (leader, followers) = match self.election() {
Some(election) => {
let leader = election.leader().await?;
let nodes = election.all_candidates().await?;
let followers = nodes
.into_iter()
.filter(|node_info| node_info.addr != leader.0)
.filter(|node_info| &node_info.addr != leader_addr)
.map(api::v1::meta::MetasrvNodeInfo::from)
.collect();
(make_node_info(leader.0.clone()), followers)
(self.node_info().into(), followers)
}
None => (make_node_info(self.options().server_addr.clone()), vec![]),
None => (self.make_node_info(leader_addr), vec![]),
};

let resp = MetasrvPeersResponse {
header: Some(ResponseHeader::success(0)),
leader,
leader: Some(leader),
followers,
};

Expand All @@ -129,4 +117,15 @@ impl Metasrv {
// Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader.
self.election().map(|x| x.is_leader()).unwrap_or(true)
}

fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
let build_info = common_version::build_info();
metasrv::MetasrvNodeInfo {
addr: addr.to_string(),
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
start_time_ms: self.start_time_ms(),
}
.into()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ DESC TABLE CLUSTER_INFO;
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
Expand All @@ -35,7 +35,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
Expand All @@ -55,7 +55,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
-- SQLNESS REPLACE [\s\-]+
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;

+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++

-- SQLNESS REPLACE version node_version
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
Expand Down

0 comments on commit c52b0b0

Please sign in to comment.