Skip to content

Commit

Permalink
feat(testing): altered publish_between_two_nodes test
Browse files Browse the repository at this point in the history
  • Loading branch information
rand0m-cloud committed Mar 25, 2022
1 parent 5eaace7 commit 8eae8e1
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions tests/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::future;
use futures::future::pending;
use futures::stream::StreamExt;
use ipfs::Node;
Expand Down Expand Up @@ -54,25 +55,30 @@ async fn publish_between_two_nodes() {
use std::collections::HashSet;

let nodes = spawn_nodes(2, Topology::Line).await;
let node_a = &nodes[0];
let node_b = &nodes[1];

let topic = "shared".to_owned();
let topic_a = "shared-a".to_owned();
let topic_b = "shared-b".to_owned();

let mut a_msgs = nodes[0].pubsub_subscribe(topic.clone()).await.unwrap();
let mut b_msgs = nodes[1].pubsub_subscribe(topic.clone()).await.unwrap();
// Node A subscribes to Topic B
// Node B subscribes to Topic A
let mut a_msgs = node_a.pubsub_subscribe(topic_b.clone()).await.unwrap();
let mut b_msgs = node_b.pubsub_subscribe(topic_a.clone()).await.unwrap();

// need to wait to see both sides so that the messages will get through
let mut appeared = false;
for _ in 0..100usize {
if nodes[0]
.pubsub_peers(Some(topic.clone()))
if node_a
.pubsub_peers(Some(topic_a.clone()))
.await
.unwrap()
.contains(&nodes[1].id)
&& nodes[1]
.pubsub_peers(Some(topic.clone()))
.contains(&node_b.id)
&& node_b
.pubsub_peers(Some(topic_b.clone()))
.await
.unwrap()
.contains(&nodes[0].id)
.contains(&node_a.id)
{
appeared = true;
break;
Expand All @@ -87,49 +93,55 @@ async fn publish_between_two_nodes() {
"timed out before both nodes appeared as pubsub peers"
);

nodes[0]
.pubsub_publish(topic.clone(), b"foobar".to_vec())
// Each node publishes to their own topic
node_a
.pubsub_publish(topic_a.clone(), b"foobar".to_vec())
.await
.unwrap();
nodes[1]
.pubsub_publish(topic.clone(), b"barfoo".to_vec())
node_b
.pubsub_publish(topic_b.clone(), b"barfoo".to_vec())
.await
.unwrap();

// the order is not defined, but both should see the other's message and the message they sent
// the order is not defined, but both should see the other's message
let expected = [
(&[topic.clone()], &nodes[0].id, b"foobar"),
(&[topic.clone()], &nodes[1].id, b"barfoo"),
(&[topic_a.clone()], &node_a.id, b"foobar"),
(&[topic_b.clone()], &node_b.id, b"barfoo"),
]
.iter()
.cloned()
.map(|(topics, id, data)| (topics.to_vec(), *id, data.to_vec()))
.collect::<HashSet<_>>();

for st in &mut [b_msgs.by_ref(), a_msgs.by_ref()] {
let actual = timeout(
let mut actual = HashSet::new();
for (st, own_peer_id) in &mut [(b_msgs.by_ref(), node_b.id), (a_msgs.by_ref(), node_a.id)] {
let actual_msg = timeout(
Duration::from_secs(2),
st.take(2)
st.take(1)
// Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305
// can still be looping
.map(|msg| (*msg).clone())
.map(|msg| (msg.topics, msg.source, msg.data))
.collect::<HashSet<_>>(),
.filter(|(_, source_peer_id, _)| future::ready(source_peer_id != own_peer_id))
.next(),
)
.await
.unwrap()
.unwrap();
assert_eq!(expected, actual);
actual.insert(actual_msg);
}

assert_eq!(expected, actual);

drop(b_msgs);

let mut disappeared = false;
for _ in 0..100usize {
if !nodes[0]
.pubsub_peers(Some(topic.clone()))
if !node_a
.pubsub_peers(Some(topic_a.clone()))
.await
.unwrap()
.contains(&nodes[1].id)
.contains(&node_b.id)
{
disappeared = true;
break;
Expand Down

0 comments on commit 8eae8e1

Please sign in to comment.