Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Expose Floodsub target_peer list. #498

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Expose Floodsub target_peer list.
The IPFS facade does not have access to the functions which add and
remove nodes to the floodsub target_peer list. As such, messages are
never propagated to connected peers.
  • Loading branch information
cobward committed Feb 28, 2022
commit a1bc6111241ca35e2d9bf1d5fc2d01d9a61f40f1
101 changes: 101 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ enum IpfsEvent {
Disconnect(MultiaddrWithPeerId, Channel<()>),
/// Request background task to return the listened and external addresses
GetAddresses(OneshotSender<Vec<Multiaddr>>),
PubsubAddPeer(PeerId, OneshotSender<()>),
PubsubRemovePeer(PeerId, OneshotSender<()>),
PubsubSubscribe(String, OneshotSender<Option<SubscriptionStream>>),
PubsubUnsubscribe(String, OneshotSender<bool>),
PubsubPublish(String, Vec<u8>, OneshotSender<()>),
Expand Down Expand Up @@ -755,6 +757,42 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.await
}

/// Add a peer to list of nodes to propagate messages to.
///
/// Unless a peer is added to the list in this way it will not receive any pubsub messages from this node.
pub async fn pubsub_add_peer(&self, peer_id: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel::<()>();

self.to_task
.clone()
.send(IpfsEvent::PubsubAddPeer(peer_id, tx))
.await?;

Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}

/// Remove a peer from the list of nodes that messages are propagated to.
///
/// This will not stop messages being sent to the specified peers for subscribed topics which have already been communicated.
pub async fn pubsub_remove_peer(&self, peer_id: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel::<()>();

self.to_task
.clone()
.send(IpfsEvent::PubsubRemovePeer(peer_id, tx))
.await?;

Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}

/// Subscribes to a given topic. Can be done at most once without unsubscribing in the between.
/// The subscription can be unsubscribed by dropping the stream or calling
/// [`Ipfs::pubsub_unsubscribe`].
Expand Down Expand Up @@ -1431,6 +1469,20 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::PubsubAddPeer(peer_id, ret) => {
self.swarm
.behaviour_mut()
.pubsub()
.add_node_to_partial_view(peer_id);
let _ = ret.send(());
}
IpfsEvent::PubsubRemovePeer(peer_id, ret) => {
self.swarm
.behaviour_mut()
.pubsub()
.remove_node_from_partial_view(&peer_id);
let _ = ret.send(());
}
IpfsEvent::PubsubSubscribe(topic, ret) => {
let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribe(topic));
}
Expand Down Expand Up @@ -1780,8 +1832,11 @@ mod node {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::make_ipld;
use futures::{stream::poll_immediate, StreamExt};
use multihash::Sha2_256;

#[tokio::test]
Expand Down Expand Up @@ -1819,4 +1874,50 @@ mod tests {
ipfs.remove_pin(&cid, false).await.unwrap();
assert!(!ipfs.is_pinned(&cid).await.unwrap());
}

#[tokio::test]
async fn test_pubsub_send_and_receive() {
let alice = Node::new("alice").await;
let alice_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10001".parse().unwrap();
alice.add_listening_address(alice_addr).await.unwrap();
let bob = Node::new("bob").await;
let bob_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10002".parse().unwrap();
bob.add_listening_address(bob_addr.clone()).await.unwrap();

let topic = String::from("test_topic");
alice
.connect(bob_addr.with(Protocol::P2p(bob.id.into())))
.await
.expect("alice failed to connect to bob");
let _alice_messages = alice.pubsub_subscribe(topic.clone()).await.unwrap();
let mut bob_messages = poll_immediate(bob.pubsub_subscribe(topic.clone()).await.unwrap());

let data = vec![1, 2, 3];

alice
.pubsub_publish(topic.clone(), data.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(bob_messages.next().await, Some(Poll::Pending));

bob.pubsub_add_peer(alice.id).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(bob_messages.next().await, Some(Poll::Pending));

alice
.pubsub_publish(topic.clone(), data.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

let received_data = bob_messages
.next()
.await
.expect("unexpected end of stream")
.map(|msg| msg.data.clone());
assert_eq!(received_data, Poll::Ready(data.clone()));
}
}