Skip to content

Commit

Permalink
Merge pull request #8832 from influxdata/dom/merkle-integration
Browse files Browse the repository at this point in the history
test: anti-entropy integration sync
  • Loading branch information
domodwyer committed Sep 26, 2023
2 parents d55a425 + c7d9003 commit 0e16b6f
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ test_helpers = { version = "0.1.0", path = "../test_helpers", features = [
] }
tokio = { version = "1", features = ["test-util"] }
tokio-stream = { version = "0.1.13", default_features = false, features = [] }
uuid = { version = "1.4.1", features = ["v4"] }

[lib]
# Allow --save-baseline to work
Expand Down
2 changes: 1 addition & 1 deletion router/src/gossip/anti_entropy/mst/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ where
}
};

trace!(%name, ?schema, "applying schema");
trace!(%name, ?schema, "updating merkle tree");

self.mst.upsert(name, &NamespaceContentHash(schema));
}
Expand Down
10 changes: 7 additions & 3 deletions router/src/gossip/anti_entropy/sync/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ where
// Wait at least SYNC_ROUND_INTERVAL between consistency checks.
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

let _ = ticker.tick().await;

loop {
tokio::select! {
_ = ticker.tick() =>
Expand All @@ -199,7 +201,7 @@ where
/// If the local node's MST root hash is equal to the peer root hash, this
/// is a no-op.
///
/// If the hashes differ, this function switchs the exchange to gRPC/TCP,
/// If the hashes differ, this function switches the exchange to gRPC/TCP,
/// and sends a serialised representation of the MST to the sender (request
/// 2).
async fn perform_consistency_check(
Expand Down Expand Up @@ -313,15 +315,17 @@ mod tests {

// Wait for a probe to happen.
let mut calls = async {
loop {
for _ in 0..100 {
tokio::time::pause();
let calls = consistency_prober.calls();
if !calls.is_empty() {
return calls;
}
tokio::time::resume();
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("timeout");
}
.with_timeout_panic(Duration::from_secs(5))
.await;

// It's technically possible that multiple probes could have been sent
Expand Down
Loading

0 comments on commit 0e16b6f

Please sign in to comment.