Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: heartbeat task&peer lookup in proc #4179

Merged
merged 11 commits into from
Jun 24, 2024
Prev Previous commit
Next Next commit
refactor: per bot advices
  • Loading branch information
discord9 committed Jun 24, 2024
commit efc1d02e8aa0e963f046decbb998c637fe9c91e8
5 changes: 3 additions & 2 deletions src/common/meta/src/ddl/drop_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,16 @@ impl DropFlowProcedure {
let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
let flow_id = self.data.task.flow_id;
let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
let cluster_id = self.data.cluster_id;

for flownode in flownode_ids.values() {
let peer = self
.context
.peer_lookup_service
.flownode(0, *flownode)
.flownode(cluster_id, *flownode)
.await?
.with_context(|| UnexpectedSnafu {
err_msg: "Flownode not found when trying to drop flow on it",
err_msg: "Attempted to drop flow on a node that could not be found. Consider verifying node availability.",
})?;
let requester = self.context.node_manager.flownode(&peer).await;
let request = FlowRequest {
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ mod tests {
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::{DummyPeerLookupService, Peer};
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
Expand Down Expand Up @@ -855,7 +855,7 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
peer_lookup_service: Arc::new(DummyPeerLookupService {}),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
},
procedure_manager.clone(),
true,
Expand Down
36 changes: 15 additions & 21 deletions src/common/meta/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,46 +78,40 @@ pub trait PeerLookupService {

pub type PeerLookupServiceRef = Arc<dyn PeerLookupService + Send + Sync>;

/// A dummy implementation of [PeerLookupService] for testing purpose.
pub struct DummyPeerLookupService;
/// always return `Peer::new(0, "")` for any query
pub struct StandalonePeerLookupService {
default_peer: Peer,
}

#[async_trait::async_trait]
impl PeerLookupService for DummyPeerLookupService {
async fn datanode(
&self,
_cluster_id: ClusterId,
_id: DatanodeId,
) -> Result<Option<Peer>, Error> {
Ok(None)
impl StandalonePeerLookupService {
pub fn new() -> Self {
Self {
default_peer: Peer::new(0, ""),
}
}
}

async fn flownode(
&self,
_cluster_id: ClusterId,
_id: FlownodeId,
) -> Result<Option<Peer>, Error> {
Ok(None)
impl Default for StandalonePeerLookupService {
fn default() -> Self {
Self::new()
}
}

/// always return `Peer::new(0, "")` for any query
pub struct StandalonePeerLookupService;

#[async_trait::async_trait]
impl PeerLookupService for StandalonePeerLookupService {
async fn datanode(
&self,
_cluster_id: ClusterId,
_id: DatanodeId,
) -> Result<Option<Peer>, Error> {
Ok(Some(Peer::new(0, "")))
Ok(Some(self.default_peer.clone()))
}

async fn flownode(
&self,
_cluster_id: ClusterId,
_id: FlownodeId,
) -> Result<Option<Peer>, Error> {
Ok(Some(Peer::new(0, "")))
Ok(Some(self.default_peer.clone()))
}
}
2 changes: 1 addition & 1 deletion src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,6 @@ pub fn new_ddl_context_with_kv_backend(
table_metadata_manager,
flow_metadata_allocator,
flow_metadata_manager,
peer_lookup_service: Arc::new(StandalonePeerLookupService),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}
36 changes: 22 additions & 14 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl HeartbeatTask {
}

pub async fn start(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
.meta_client
.heartbeat()
Expand All @@ -81,6 +82,26 @@ impl HeartbeatTask {
Ok(())
}

fn create_heartbeat_request(
message: OutgoingMessage,
self_peer: &Option<Peer>,
) -> Option<HeartbeatRequest> {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
mailbox_message: Some(message),
peer: self_peer.clone(),
..Default::default()
};
Some(req)
}
Err(e) => {
error!(e; "Failed to encode mailbox messages");
None
}
}
}

fn start_heartbeat_report(
&self,
req_sender: HeartbeatSender,
Expand All @@ -102,20 +123,7 @@ impl HeartbeatTask {
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
mailbox_message: Some(message),
peer: self_peer.clone(),
..Default::default()
};
Some(req)
}
Err(e) => {
error!(e; "Failed to encode mailbox messages");
None
}
}
Self::create_heartbeat_request(message, &self_peer)
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub mod test_data {
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::{DummyPeerLookupService, Peer};
use common_meta::peer::{Peer, StandalonePeerLookupService};
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::RegionRoute;
use common_meta::sequence::SequenceBuilder;
Expand Down Expand Up @@ -225,7 +225,7 @@ pub mod test_data {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
peer_lookup_service: Arc::new(DummyPeerLookupService {}),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}
}