Skip to content

Commit

Permalink
Merge pull request #8833 from influxdata/dom/merkle-rpc-client
Browse files Browse the repository at this point in the history
feat(sync): SyncRpcClient implementation for gRPC
  • Loading branch information
domodwyer committed Sep 26, 2023
2 parents 0e16b6f + d0335bb commit ba647d4
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
2 changes: 1 addition & 1 deletion router/src/gossip/anti_entropy/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
pub mod actor;
pub mod consistency_prober;
pub mod rpc_server;
pub(crate) mod rpc_worker;
pub mod rpc_worker;
pub mod traits;

#[cfg(test)]
Expand Down
86 changes: 86 additions & 0 deletions router/src/gossip/anti_entropy/sync/rpc_worker/grpc_connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! gRPC-based [`SyncRpcClient`] and it's [`SyncRpcConnector`] implementation.

use std::{net::SocketAddr, ops::RangeInclusive, time::Duration};

use async_trait::async_trait;
use data_types::NamespaceName;
use generated_types::influxdata::iox::gossip::v1::{
self as proto, anti_entropy_service_client::AntiEntropyServiceClient, NamespaceSchemaEntry,
};
use tonic::transport::Endpoint;

use crate::gossip::anti_entropy::{
mst::actor::MerkleSnapshot,
sync::traits::{BoxedError, SyncRpcClient, SyncRpcConnector},
};

const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);

/// A [`SyncRpcConnector`] using tonic/gRPC.
#[derive(Debug, Default, Clone)]
pub struct GrpcConnector;

#[async_trait]
impl SyncRpcConnector for GrpcConnector {
type Client = RpcClient;

async fn connect(&self, addr: SocketAddr) -> Result<Self::Client, BoxedError> {
let endpoint = Endpoint::from_shared(format!("https://{}", addr))?
.connect_timeout(CONNECT_TIMEOUT)
.timeout(REQUEST_TIMEOUT);

let c = endpoint.connect().await?;

Ok(RpcClient(AntiEntropyServiceClient::new(c)))
}
}

/// A gRPC implementation of the [`SyncRpcClient`].
///
/// Constructed by [`GrpcConnector`].
#[derive(Debug)]
pub struct RpcClient(AntiEntropyServiceClient<tonic::transport::Channel>);

#[async_trait]
impl SyncRpcClient for RpcClient {
async fn find_inconsistent_ranges(
&mut self,
pages: MerkleSnapshot,
) -> Result<Vec<RangeInclusive<NamespaceName<'static>>>, BoxedError> {
let pages = pages
.iter()
.map(|v| proto::PageRange {
min: v.start().to_string(),
max: v.end().to_string(),
page_hash: v.into_hash().as_bytes().to_vec(),
})
.collect();

self.0
.get_tree_diff(tonic::Request::new(proto::GetTreeDiffRequest { pages }))
.await?
.into_inner()
.ranges
.into_iter()
.map(|v| Ok(RangeInclusive::new(v.min.try_into()?, v.max.try_into()?)))
.collect()
}

async fn get_schemas_in_range(
&mut self,
range: RangeInclusive<NamespaceName<'static>>,
) -> Result<Vec<NamespaceSchemaEntry>, BoxedError> {
let resp = self
.0
.get_range(tonic::Request::new(proto::GetRangeRequest {
min: range.start().to_string(),
max: range.end().to_string(),
}))
.await?
.into_inner()
.namespaces;

Ok(resp)
}
}
4 changes: 3 additions & 1 deletion router/src/gossip/anti_entropy/sync/rpc_worker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Anti-entropy sync worker types and tasks.

pub mod grpc_connector;
mod worker_task;
pub(crate) use worker_task::*;

pub(crate) mod task_set;

0 comments on commit ba647d4

Please sign in to comment.