Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Expose Floodsub target_peer list. #498

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ enum IpfsEvent {
Disconnect(MultiaddrWithPeerId, Channel<()>),
/// Request background task to return the listened and external addresses
GetAddresses(OneshotSender<Vec<Multiaddr>>),
PubsubAddPeer(PeerId, OneshotSender<()>),
PubsubRemovePeer(PeerId, OneshotSender<()>),
PubsubSubscribe(String, OneshotSender<Option<SubscriptionStream>>),
PubsubUnsubscribe(String, OneshotSender<bool>),
PubsubPublish(String, Vec<u8>, OneshotSender<()>),
Expand Down Expand Up @@ -755,6 +757,43 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.await
}

/// Add a peer to list of nodes to propagate messages to.
///
/// A peer will not receive any pubsub messages from this node until it is added using this function,
/// unless it has added this node in the same way.
pub async fn pubsub_add_peer(&self, peer_id: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel::<()>();

self.to_task
.clone()
.send(IpfsEvent::PubsubAddPeer(peer_id, tx))
.await?;

Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}

/// Remove a peer from the list of nodes that messages are propagated to.
///
/// Calling this function will not stop messages being sent to the specified peers for subscribed topics which have already been communicated.
pub async fn pubsub_remove_peer(&self, peer_id: PeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel::<()>();

self.to_task
.clone()
.send(IpfsEvent::PubsubRemovePeer(peer_id, tx))
.await?;

Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}

/// Subscribes to a given topic. Can be done at most once without unsubscribing in the between.
/// The subscription can be unsubscribed by dropping the stream or calling
/// [`Ipfs::pubsub_unsubscribe`].
Expand Down Expand Up @@ -1431,6 +1470,20 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::PubsubAddPeer(peer_id, ret) => {
self.swarm
.behaviour_mut()
.pubsub()
.add_node_to_partial_view(peer_id);
let _ = ret.send(());
}
IpfsEvent::PubsubRemovePeer(peer_id, ret) => {
self.swarm
.behaviour_mut()
.pubsub()
.remove_node_from_partial_view(&peer_id);
let _ = ret.send(());
}
IpfsEvent::PubsubSubscribe(topic, ret) => {
let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribe(topic));
}
Expand Down Expand Up @@ -1780,8 +1833,11 @@ mod node {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::make_ipld;
use futures::{stream::poll_immediate, StreamExt};
use multihash::Sha2_256;

#[tokio::test]
Expand Down Expand Up @@ -1819,4 +1875,47 @@ mod tests {
ipfs.remove_pin(&cid, false).await.unwrap();
assert!(!ipfs.is_pinned(&cid).await.unwrap());
}

#[tokio::test]
async fn test_pubsub_send_and_receive() {
let alice = Node::new("alice").await;
let bob = Node::new("bob").await;
let bob_addr = bob.addrs_local().await.unwrap()[0].clone();

let topic = String::from("test_topic");
alice
.connect(bob_addr.with(Protocol::P2p(bob.id.into())))
.await
.expect("alice failed to connect to bob");
let _alice_messages = alice.pubsub_subscribe(topic.clone()).await.unwrap();
let mut bob_messages = poll_immediate(bob.pubsub_subscribe(topic.clone()).await.unwrap());

let data = vec![1, 2, 3];

alice
.pubsub_publish(topic.clone(), data.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(bob_messages.next().await, Some(Poll::Pending));

bob.pubsub_add_peer(alice.id).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(bob_messages.next().await, Some(Poll::Pending));

alice
.pubsub_publish(topic.clone(), data.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

let received_data = bob_messages
.next()
.await
.expect("unexpected end of stream")
.map(|msg| msg.data.clone());
assert_eq!(received_data, Poll::Ready(data.clone()));
}
}