Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

chore: upgrade to libp2p 0.39 #472

Merged
merged 6 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: upgrade to libp2p 0.38
  • Loading branch information
koivunej authored and CHr15F0x committed Aug 17, 2021
commit b487c3e7fd866856bfd2f6605e86d90a5b4c1e85
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ either = { default-features = false, version = "1.5" }
futures = { default-features = false, version = "0.3.9", features = ["alloc", "std"] }
hash_hasher = "2.0.3"
ipfs-unixfs = { version = "0.2", path = "unixfs" }
libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-tokio", "mplex", "noise", "ping", "yamux", "dns"], version = "0.34" }
libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-tokio", "mplex", "noise", "ping", "yamux", "dns-tokio"], version = "0.38" }
multibase = { default-features = false, version = "0.8" }
multihash = { default-features = false, version = "0.11" }
prost = { default-features = false, version = "0.7" }
Expand Down
4 changes: 2 additions & 2 deletions bitswap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ cid = { default-features = false, version = "0.5" }
fnv = { default-features = false, version = "1.0" }
futures = { default-features = false, version = "0.3" }
hash_hasher = "2.0.3"
libp2p-core = { default-features = false, version = "0.27" }
libp2p-swarm = { default-features = false, version = "0.27" }
libp2p-core = { default-features = false, version = "0.28" }
libp2p-swarm = { default-features = false, version = "0.29" }
multihash = { default-features = false, version = "0.11" }
prost = { default-features = false, version = "0.7" }
thiserror = { default-features = false, version = "1.0" }
Expand Down
93 changes: 50 additions & 43 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use libp2p::{swarm::SwarmEvent, Swarm};
use libp2p::swarm::SwarmEvent;

// begin by polling the swarm so that initially it'll first have chance to bind listeners
// and such.
Expand Down Expand Up @@ -1401,73 +1401,72 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {

match inner {
IpfsEvent::Connect(target, ret) => {
ret.send(self.swarm.connect(target)).ok();
ret.send(self.swarm.behaviour_mut().connect(target)).ok();
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
let addrs = self.swarm.behaviour_mut().addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm)
.cloned()
.collect::<Vec<Multiaddr>>();
let listeners = self.swarm.listeners().cloned().collect::<Vec<Multiaddr>>();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
let connections = self.swarm.behaviour_mut().connections();
ret.send(Ok(connections.collect())).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
if let Some(disconnector) = self.swarm.behaviour_mut().disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(
Swarm::external_addresses(&self.swarm).map(|ar| ar.addr.clone()),
);
addresses.extend(self.swarm.listeners().map(|a| a.to_owned()));
addresses
.extend(self.swarm.external_addresses().map(|ar| ar.addr.to_owned()));
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::PubsubSubscribe(topic, ret) => {
let _ = ret.send(self.swarm.pubsub().subscribe(topic));
let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribe(topic));
}
IpfsEvent::PubsubUnsubscribe(topic, ret) => {
let _ = ret.send(self.swarm.pubsub().unsubscribe(topic));
let _ = ret.send(self.swarm.behaviour_mut().pubsub().unsubscribe(topic));
}
IpfsEvent::PubsubPublish(topic, data, ret) => {
self.swarm.pubsub().publish(topic, data);
self.swarm.behaviour_mut().pubsub().publish(topic, data);
let _ = ret.send(());
}
IpfsEvent::PubsubPeers(Some(topic), ret) => {
let topic = libp2p::floodsub::Topic::new(topic);
let _ = ret.send(self.swarm.pubsub().subscribed_peers(&topic));
let _ =
ret.send(self.swarm.behaviour_mut().pubsub().subscribed_peers(&topic));
}
IpfsEvent::PubsubPeers(None, ret) => {
let _ = ret.send(self.swarm.pubsub().known_peers());
let _ = ret.send(self.swarm.behaviour_mut().pubsub().known_peers());
}
IpfsEvent::PubsubSubscribed(ret) => {
let _ = ret.send(self.swarm.pubsub().subscribed_topics());
let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribed_topics());
}
IpfsEvent::WantList(peer, ret) => {
let list = if let Some(peer) = peer {
self.swarm
.behaviour_mut()
.bitswap()
.peer_wantlist(&peer)
.unwrap_or_default()
} else {
self.swarm.bitswap().local_wantlist()
self.swarm.behaviour_mut().bitswap().local_wantlist()
};
let _ = ret.send(list);
}
IpfsEvent::BitswapStats(ret) => {
let stats = self.swarm.bitswap().stats();
let peers = self.swarm.bitswap().peers();
let wantlist = self.swarm.bitswap().local_wantlist();
let stats = self.swarm.behaviour_mut().bitswap().stats();
let peers = self.swarm.behaviour_mut().bitswap().peers();
let wantlist = self.swarm.behaviour_mut().bitswap().local_wantlist();
let _ = ret.send((stats, peers, wantlist).into());
}
IpfsEvent::AddListeningAddress(addr, ret) => {
Expand All @@ -1476,7 +1475,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
IpfsEvent::RemoveListeningAddress(addr, ret) => {
let removed = if let Some((id, _)) = self.listening_addresses.remove(&addr)
{
Swarm::remove_listener(&mut self.swarm, id).map_err(|_: ()| {
self.swarm.remove_listener(id).map_err(|_: ()| {
format_err!(
"Failed to remove previously added listening address: {}",
addr
Expand All @@ -1489,19 +1488,20 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
let _ = ret.send(removed);
}
IpfsEvent::Bootstrap(ret) => {
let future = self.swarm.bootstrap();
let future = self.swarm.behaviour_mut().bootstrap();
let _ = ret.send(future);
}
IpfsEvent::AddPeer(peer_id, addr) => {
self.swarm.add_peer(peer_id, addr);
self.swarm.behaviour_mut().add_peer(peer_id, addr);
}
IpfsEvent::GetClosestPeers(peer_id, ret) => {
let future = self.swarm.get_closest_peers(peer_id);
let future = self.swarm.behaviour_mut().get_closest_peers(peer_id);
let _ = ret.send(future);
}
IpfsEvent::GetBitswapPeers(ret) => {
let peers = self
.swarm
.behaviour_mut()
.bitswap()
.connected_peers
.keys()
Expand All @@ -1510,52 +1510,55 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
let _ = ret.send(peers);
}
IpfsEvent::FindPeer(peer_id, local_only, ret) => {
let swarm_addrs = self.swarm.swarm.connections_to(&peer_id);
let swarm_addrs = self.swarm.behaviour_mut().swarm.connections_to(&peer_id);
let locally_known_addrs = if !swarm_addrs.is_empty() {
swarm_addrs
} else {
self.swarm.kademlia().addresses_of_peer(&peer_id)
self.swarm
.behaviour_mut()
.kademlia()
.addresses_of_peer(&peer_id)
};
let addrs = if !locally_known_addrs.is_empty() || local_only {
Either::Left(locally_known_addrs)
} else {
Either::Right(self.swarm.get_closest_peers(peer_id))
Either::Right(self.swarm.behaviour_mut().get_closest_peers(peer_id))
};
let _ = ret.send(addrs);
}
IpfsEvent::GetProviders(cid, ret) => {
let future = self.swarm.get_providers(cid);
let future = self.swarm.behaviour_mut().get_providers(cid);
let _ = ret.send(future);
}
IpfsEvent::Provide(cid, ret) => {
let _ = ret.send(self.swarm.start_providing(cid));
let _ = ret.send(self.swarm.behaviour_mut().start_providing(cid));
}
IpfsEvent::DhtGet(key, quorum, ret) => {
let future = self.swarm.dht_get(key, quorum);
let future = self.swarm.behaviour_mut().dht_get(key, quorum);
let _ = ret.send(future);
}
IpfsEvent::DhtPut(key, value, quorum, ret) => {
let future = self.swarm.dht_put(key, value, quorum);
let future = self.swarm.behaviour_mut().dht_put(key, value, quorum);
let _ = ret.send(future);
}
IpfsEvent::GetBootstrappers(ret) => {
let list = self.swarm.get_bootstrappers();
let list = self.swarm.behaviour_mut().get_bootstrappers();
let _ = ret.send(list);
}
IpfsEvent::AddBootstrapper(addr, ret) => {
let result = self.swarm.add_bootstrapper(addr);
let result = self.swarm.behaviour_mut().add_bootstrapper(addr);
let _ = ret.send(result);
}
IpfsEvent::RemoveBootstrapper(addr, ret) => {
let result = self.swarm.remove_bootstrapper(addr);
let result = self.swarm.behaviour_mut().remove_bootstrapper(addr);
let _ = ret.send(result);
}
IpfsEvent::ClearBootstrappers(ret) => {
let list = self.swarm.clear_bootstrappers();
let list = self.swarm.behaviour_mut().clear_bootstrappers();
let _ = ret.send(list);
}
IpfsEvent::RestoreBootstrappers(ret) => {
let list = self.swarm.restore_bootstrappers();
let list = self.swarm.behaviour_mut().restore_bootstrappers();
let _ = ret.send(list);
}
IpfsEvent::Exit => {
Expand All @@ -1569,21 +1572,25 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid),
RepoEvent::WantBlock(cid) => self.swarm.behaviour_mut().want_block(cid),
RepoEvent::UnwantBlock(cid) => {
self.swarm.behaviour_mut().bitswap().cancel_block(&cid)
}
RepoEvent::NewBlock(cid, ret) => {
// TODO: consider if cancel is applicable in cases where we provide the
// associated Block ourselves
self.swarm.bitswap().cancel_block(&cid);
self.swarm.behaviour_mut().bitswap().cancel_block(&cid);
// currently disabled; see https://github.com/rs-ipfs/rust-ipfs/pull/281#discussion_r465583345
// for details regarding the concerns about enabling this functionality as-is
if false {
let _ = ret.send(self.swarm.start_providing(cid));
let _ = ret.send(self.swarm.behaviour_mut().start_providing(cid));
} else {
let _ = ret.send(Err(anyhow!("not actively providing blocks yet")));
}
}
RepoEvent::RemovedBlock(cid) => self.swarm.stop_providing_block(&cid),
RepoEvent::RemovedBlock(cid) => {
self.swarm.behaviour_mut().stop_providing_block(&cid)
}
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::anyhow;
use cid::Cid;
use ipfs_bitswap::{Bitswap, BitswapEvent};
use libp2p::core::{Multiaddr, PeerId};
use libp2p::identify::{Identify, IdentifyEvent};
use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent};
use libp2p::kad::record::{store::MemoryStore, Key, Record};
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum};
// use libp2p::mdns::{MdnsEvent, TokioMdns};
Expand Down Expand Up @@ -185,7 +185,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to republish provider {}", key);
}
GetRecord(Ok(GetRecordOk { records })) => {
GetRecord(Ok(GetRecordOk { records, .. })) => {
if self.kademlia.query(&id).is_none() {
let records = records.into_iter().map(|rec| rec.record).collect();
self.kad_subscriptions
Expand Down Expand Up @@ -445,9 +445,8 @@ impl<Types: IpfsTypes> Behaviour<Types> {
let bitswap = Bitswap::default();
let ping = Ping::default();
let identify = Identify::new(
"/ipfs/0.1.0".into(),
"rust-ipfs".into(),
options.keypair.public(),
IdentifyConfig::new("/ipfs/0.1.0".into(), options.keypair.public())
.with_agent_version("rust-ipfs".into()),
);
let pubsub = Pubsub::new(options.peer_id);
let mut swarm = SwarmApi::default();
Expand Down
8 changes: 4 additions & 4 deletions src/p2p/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,12 @@ impl NetworkBehaviour for Pubsub {
self.floodsub.inject_dial_failure(peer_id)
}

fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.floodsub.inject_new_listen_addr(addr)
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.floodsub.inject_new_listen_addr(id, addr)
}

fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.floodsub.inject_expired_listen_addr(addr)
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.floodsub.inject_expired_listen_addr(id, addr)
}

fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
Expand Down
12 changes: 8 additions & 4 deletions src/p2p/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,10 @@ mod tests {
Multihash::from_bytes(&peer1_id.to_bytes()).unwrap(),
));

let mut sub = swarm2.connect(addr.try_into().unwrap()).unwrap();
let mut sub = swarm2
.behaviour_mut()
.connect(addr.try_into().unwrap())
.unwrap();

loop {
tokio::select! {
Expand All @@ -481,7 +484,7 @@ mod tests {
res.unwrap();

// just to confirm that there are no connections.
assert_eq!(Vec::<Multiaddr>::new(), swarm1.connections_to(&peer2_id));
assert_eq!(Vec::<Multiaddr>::new(), swarm1.behaviour().connections_to(&peer2_id));
break;
}
}
Expand Down Expand Up @@ -509,6 +512,7 @@ mod tests {
}

let mut fut = swarm2
.behaviour_mut()
.connect(
MultiaddrWithoutPeerId::try_from(address)
.unwrap()
Expand Down Expand Up @@ -559,8 +563,8 @@ mod tests {
// these two should be attempted in parallel. since we know both of them work, and they are
// given in this order, we know that in libp2p 0.34 only the first should win, however
// both should always be finished.
connections.push(swarm2.connect(targets.0).unwrap());
connections.push(swarm2.connect(targets.1).unwrap());
connections.push(swarm2.behaviour_mut().connect(targets.0).unwrap());
connections.push(swarm2.behaviour_mut().connect(targets.1).unwrap());
let ready = connections
// turn the private error type into Option
.map_err(|e| e.into_inner())
Expand Down
4 changes: 2 additions & 2 deletions src/p2p/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::upgrade::Version;
use libp2p::core::transport::Boxed;
use libp2p::core::upgrade::SelectUpgrade;
use libp2p::dns::DnsConfig;
use libp2p::dns::TokioDnsConfig;
use libp2p::identity;
use libp2p::mplex::MplexConfig;
use libp2p::noise::{self, NoiseConfig};
Expand All @@ -24,7 +24,7 @@ pub fn build_transport(keypair: identity::Keypair) -> io::Result<TTransport> {
.unwrap();
let noise_config = NoiseConfig::xx(xx_keypair).into_authenticated();

Ok(DnsConfig::new(TokioTcpConfig::new().nodelay(true))?
Ok(TokioDnsConfig::system(TokioTcpConfig::new())?
.upgrade(Version::V1)
.authenticate(noise_config)
.multiplex(SelectUpgrade::new(
Expand Down