Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Commit

Permalink
test(pubsub): simplify, comment
Browse files Browse the repository at this point in the history
simplify away the use of hashset's for messages along with any
filtering, instead simply assert that who witnessed what message and
include the sent message in the assertion as well.

comment as in use less broad technical names and more context specific
names.

also removes some of the duplicate comments.
  • Loading branch information
koivunej committed Apr 1, 2022
1 parent 277954b commit 50ad10f
Showing 1 changed file with 46 additions and 26 deletions.
72 changes: 46 additions & 26 deletions tests/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ async fn can_publish_without_subscribing() {
}

#[tokio::test]
#[allow(clippy::mutable_key_type)] // clippy doesn't like Vec inside HashSet
async fn publish_between_two_nodes() {
async fn publish_between_two_nodes_single_topic() {
use futures::stream::StreamExt;
use std::collections::HashSet;

let nodes = spawn_nodes(2, Topology::Line).await;

Expand Down Expand Up @@ -98,29 +96,50 @@ async fn publish_between_two_nodes() {

// the order is not defined, but both should see the other's message and the message they sent
let expected = [
(&[topic.clone()], &nodes[0].id, b"foobar"),
(&[topic.clone()], &nodes[1].id, b"barfoo"),
// first node should witness it's the message it sent
(&[topic.clone()], nodes[0].id, b"foobar", nodes[0].id),
// second node should witness first nodes message, and so on.
(&[topic.clone()], nodes[0].id, b"foobar", nodes[1].id),
(&[topic.clone()], nodes[1].id, b"barfoo", nodes[0].id),
(&[topic.clone()], nodes[1].id, b"barfoo", nodes[1].id),
]
.iter()
.cloned()
.map(|(topics, id, data)| (topics.to_vec(), *id, data.to_vec()))
.collect::<HashSet<_>>();
.map(|(topics, sender, data, witness)| (topics.to_vec(), sender, data.to_vec(), witness))
.collect::<Vec<_>>();

for st in &mut [b_msgs.by_ref(), a_msgs.by_ref()] {
let actual = timeout(
let mut actual = Vec::new();

for (st, own_peer_id) in &mut [
(b_msgs.by_ref(), nodes[1].id),
(a_msgs.by_ref(), nodes[0].id),
] {
let received = timeout(
Duration::from_secs(2),
st.take(2)
// Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305
// can still be looping
.map(|msg| (*msg).clone())
.map(|msg| (msg.topics, msg.source, msg.data))
.collect::<HashSet<_>>(),
.map(|msg| (msg.topics, msg.source, msg.data, *own_peer_id))
.collect::<Vec<_>>(),
)
.await
.unwrap();
assert_eq!(expected, actual);

actual.extend(received);
}

// sort the received messages both in expected and actual to make sure they are comparable;
// order of receiving is not part of the tuple and shouldn't matter.
let mut expected = expected;
expected.sort_unstable();
actual.sort_unstable();

assert_eq!(
actual, expected,
"sent and received messages must be present on both nodes' streams"
);

drop(b_msgs);

let mut disappeared = false;
Expand All @@ -143,10 +162,8 @@ async fn publish_between_two_nodes() {
}

#[tokio::test]
#[allow(clippy::mutable_key_type)] // clippy doesn't like Vec inside HashSet
async fn publish_between_two_nodes_different_topics() {
use futures::stream::StreamExt;
use std::collections::HashSet;

let nodes = spawn_nodes(2, Topology::Line).await;
let node_a = &nodes[0];
Expand Down Expand Up @@ -197,34 +214,37 @@ async fn publish_between_two_nodes_different_topics() {
.await
.unwrap();

// the order is not defined, but both should see the other's message
// the order between messages is not defined, but both should see the other's message. since we
// receive messages first from node_b's stream we expect this order.
//
// in this test case the nodes are not expected to see their own message because nodes are not
// subscribing to the streams they are sending to.
let expected = [
(&[topic_a.clone()], &node_a.id, b"foobar"),
(&[topic_b.clone()], &node_b.id, b"barfoo"),
(&[topic_a.clone()], node_a.id, b"foobar", node_b.id),
(&[topic_b.clone()], node_b.id, b"barfoo", node_a.id),
]
.iter()
.cloned()
.map(|(topics, id, data)| (topics.to_vec(), *id, data.to_vec()))
.collect::<HashSet<_>>();
.map(|(topics, sender, data, witness)| (topics.to_vec(), sender, data.to_vec(), witness))
.collect::<Vec<_>>();

let mut actual = HashSet::new();
let mut actual = Vec::new();
for (st, own_peer_id) in &mut [(b_msgs.by_ref(), node_b.id), (a_msgs.by_ref(), node_a.id)] {
let actual_msg = timeout(
let received = timeout(
Duration::from_secs(2),
st.take(1)
// Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305
// can still be looping
.map(|msg| (*msg).clone())
.map(|msg| (msg.topics, msg.source, msg.data))
.filter(|(_, source_peer_id, _)| future::ready(source_peer_id != own_peer_id))
.map(|msg| (msg.topics, msg.source, msg.data, *own_peer_id))
.next(),
)
.await
.unwrap()
.unwrap();
actual.insert(actual_msg);
actual.push(received);
}

// ordering is defined for expected and actual by the order of the looping above and the
// initial expected creation.
assert_eq!(expected, actual);

drop(b_msgs);
Expand Down

0 comments on commit 50ad10f

Please sign in to comment.