diff --git a/router/src/gossip/anti_entropy/mst/mod.rs b/router/src/gossip/anti_entropy/mst/mod.rs index c997f4c57f6..130eee090b8 100644 --- a/router/src/gossip/anti_entropy/mst/mod.rs +++ b/router/src/gossip/anti_entropy/mst/mod.rs @@ -8,7 +8,7 @@ pub mod merkle; #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use crate::{ gossip::anti_entropy::{ @@ -20,6 +20,7 @@ mod tests { use data_types::NamespaceSchema; use proptest::prelude::*; + use test_helpers::timeout::FutureTimeout; use super::handle::AntiEntropyHandle; @@ -67,7 +68,9 @@ mod tests { ns_b.put_schema(name, update); // Invariant: after applying the same update, the content hashes - // MUST match (even if this update was a no-op / not an update) + // MUST match (even if this update was a no-op / not an + // update) + wait_for_convergence(&handle_a, &handle_b).await; assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); // Invariant: and the same serialised snapshot content assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await); @@ -89,6 +92,7 @@ mod tests { // Invariant: applying the update to the other cache converges their // content hashes. ns_b.put_schema(name, last_update); + wait_for_convergence(&handle_a, &handle_b).await; assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); // Invariant: and the serialised snapshot content converges assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await); @@ -198,4 +202,17 @@ mod tests { } } } + + async fn wait_for_convergence(mst_a: &AntiEntropyHandle, mst_b: &AntiEntropyHandle) { + async { + loop { + if mst_a.content_hash().await == mst_b.content_hash().await { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await + } }