Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Jun 21, 2024
1 parent 4da3af2 commit 40589a1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/create_flow/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ impl CreateFlowProcedure {
pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
let partitions = 1;
let cluster_id = self.data.cluster_id;
let (flow_id, peers) = self
.context
.flow_metadata_allocator
.create(0, partitions)
.create(cluster_id, partitions)
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
Expand Down
15 changes: 6 additions & 9 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
}

Expand All @@ -55,8 +55,8 @@ impl HeartbeatTask {
node_id: opts.node_id.unwrap_or(0),
server_addr: opts.grpc.addr.clone(),
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
}
}
Expand Down Expand Up @@ -88,15 +88,14 @@ impl HeartbeatTask {
) {
let report_interval = self.report_interval;
let self_peer = Some(Peer {
// The peer id doesn't make sense for frontend, so we just set it 0.
id: self.node_id,
addr: self.server_addr.clone(),
});

common_runtime::spawn_hb(async move {
// note that using interval will cause it to first immediately send
// a heartbeat
let mut interval = tokio::time::interval(Duration::from_millis(report_interval));
let mut interval = tokio::time::interval(report_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

loop {
Expand Down Expand Up @@ -161,9 +160,7 @@ impl HeartbeatTask {
Ok(None) => break,
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_millis(retry_interval))
.await;
capture_self.start_with_retry(retry_interval).await;

break;
}
Expand Down
54 changes: 38 additions & 16 deletions src/meta-srv/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::hash::Hash;
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackend;
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::{util, ClusterId, DatanodeId, FlownodeId};
use common_meta::{distributed_time_constants, util, ClusterId, DatanodeId, FlownodeId};
use common_time::util as time_util;
use snafu::ResultExt;

Expand All @@ -33,11 +33,14 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
}
}

/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], if `lease_secs`` is `Some`, will only return if it's alive
pub async fn lookup_datanode_peer(
cluster_id: ClusterId,
datanode_id: u64,
datanode_id: DatanodeId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: Option<u64>,
) -> Result<Option<Peer>> {
let lease_filter = lease_secs.map(build_lease_filter);
let lease_key = DatanodeLeaseKey {
cluster_id,
node_id: datanode_id,
Expand All @@ -47,13 +50,18 @@ pub async fn lookup_datanode_peer(
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;

Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
let is_alive = lease_filter.map(|f| f(&lease_value)).unwrap_or(true);
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}

/// Find all alive datanodes
pub async fn alive_datanodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
Expand All @@ -68,11 +76,14 @@ pub async fn alive_datanodes(
.await
}

/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`]
pub async fn lookup_flownode_peer(
cluster_id: ClusterId,
flownode_id: FlownodeId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: Option<u64>,
) -> Result<Option<Peer>> {
let lease_filter = lease_secs.map(build_lease_filter);
let lease_key = FlownodeLeaseKey {
cluster_id,
node_id: flownode_id,
Expand All @@ -83,12 +94,18 @@ pub async fn lookup_flownode_peer(
};
let lease_value: LeaseValue = kv.value.try_into()?;

Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
let is_alive = lease_filter.map(|f| f(&lease_value)).unwrap_or(true);
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}

/// Find all alive flownodes
pub async fn alive_flownodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
Expand Down Expand Up @@ -151,17 +168,22 @@ impl PeerLookupService for MetaPeerLookupService {
cluster_id: ClusterId,
id: DatanodeId,
) -> common_meta::error::Result<Option<Peer>> {
lookup_datanode_peer(cluster_id, id, &self.meta_peer_client)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
lookup_datanode_peer(
cluster_id,
id,
&self.meta_peer_client,
Some(distributed_time_constants::DATANODE_LEASE_SECS),
)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn flownode(
&self,
cluster_id: ClusterId,
id: FlownodeId,
) -> common_meta::error::Result<Option<Peer>> {
lookup_flownode_peer(cluster_id, id, &self.meta_peer_client)
lookup_flownode_peer(cluster_id, id, &self.meta_peer_client, None)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
Expand Down
10 changes: 8 additions & 2 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
use common_meta::ClusterId;
use common_meta::{distributed_time_constants, ClusterId};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
Expand Down Expand Up @@ -484,7 +484,13 @@ impl Metasrv {
cluster_id: ClusterId,
peer_id: u64,
) -> Result<Option<Peer>> {
lookup_datanode_peer(cluster_id, peer_id, &self.meta_peer_client).await
lookup_datanode_peer(
cluster_id,
peer_id,
&self.meta_peer_client,
Some(distributed_time_constants::DATANODE_LEASE_SECS),
)
.await
}

pub fn options(&self) -> &MetasrvOptions {
Expand Down

0 comments on commit 40589a1

Please sign in to comment.