Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Commit

Permalink
test(pubsub): disjoint topics as new test case
Browse files Browse the repository at this point in the history
originally created in 8eae8e1 by
altering the single topic test, included in this commit as duplicating
version.

Co-authored-by: Addy Bryant <[email protected]>
  • Loading branch information
koivunej and rand0m-cloud committed Apr 1, 2022
1 parent 82453e5 commit 277954b
Showing 1 changed file with 106 additions and 0 deletions.
106 changes: 106 additions & 0 deletions tests/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,112 @@ async fn publish_between_two_nodes() {
assert!(disappeared, "timed out before a saw b's unsubscription");
}

#[tokio::test]
#[allow(clippy::mutable_key_type)] // clippy doesn't like Vec inside HashSet
async fn publish_between_two_nodes_different_topics() {
use futures::stream::StreamExt;
use std::collections::HashSet;

let nodes = spawn_nodes(2, Topology::Line).await;
let node_a = &nodes[0];
let node_b = &nodes[1];

let topic_a = "shared-a".to_owned();
let topic_b = "shared-b".to_owned();

// Node A subscribes to Topic B
// Node B subscribes to Topic A
let mut a_msgs = node_a.pubsub_subscribe(topic_b.clone()).await.unwrap();
let mut b_msgs = node_b.pubsub_subscribe(topic_a.clone()).await.unwrap();

// need to wait to see both sides so that the messages will get through
let mut appeared = false;
for _ in 0..100usize {
if node_a
.pubsub_peers(Some(topic_a.clone()))
.await
.unwrap()
.contains(&node_b.id)
&& node_b
.pubsub_peers(Some(topic_b.clone()))
.await
.unwrap()
.contains(&node_a.id)
{
appeared = true;
break;
}
timeout(Duration::from_millis(100), pending::<()>())
.await
.unwrap_err();
}

assert!(
appeared,
"timed out before both nodes appeared as pubsub peers"
);

// Each node publishes to their own topic
node_a
.pubsub_publish(topic_a.clone(), b"foobar".to_vec())
.await
.unwrap();
node_b
.pubsub_publish(topic_b.clone(), b"barfoo".to_vec())
.await
.unwrap();

// the order is not defined, but both should see the other's message
let expected = [
(&[topic_a.clone()], &node_a.id, b"foobar"),
(&[topic_b.clone()], &node_b.id, b"barfoo"),
]
.iter()
.cloned()
.map(|(topics, id, data)| (topics.to_vec(), *id, data.to_vec()))
.collect::<HashSet<_>>();

let mut actual = HashSet::new();
for (st, own_peer_id) in &mut [(b_msgs.by_ref(), node_b.id), (a_msgs.by_ref(), node_a.id)] {
let actual_msg = timeout(
Duration::from_secs(2),
st.take(1)
// Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305
// can still be looping
.map(|msg| (*msg).clone())
.map(|msg| (msg.topics, msg.source, msg.data))
.filter(|(_, source_peer_id, _)| future::ready(source_peer_id != own_peer_id))
.next(),
)
.await
.unwrap()
.unwrap();
actual.insert(actual_msg);
}

assert_eq!(expected, actual);

drop(b_msgs);

let mut disappeared = false;
for _ in 0..100usize {
if !node_a
.pubsub_peers(Some(topic_a.clone()))
.await
.unwrap()
.contains(&node_b.id)
{
disappeared = true;
break;
}
timeout(Duration::from_millis(100), pending::<()>())
.await
.unwrap_err();
}

assert!(disappeared, "timed out before a saw b's unsubscription");
}

#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
#[tokio::test]
#[ignore = "doesn't work yet"]
Expand Down

0 comments on commit 277954b

Please sign in to comment.