Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Commit

Permalink
fix: update libp2p and renamed the changed types
Browse files Browse the repository at this point in the history
  • Loading branch information
rand0m-cloud committed Mar 6, 2022
1 parent 3eff4e1 commit e049b6a
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 128 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ either = { default-features = false, version = "1.5" }
futures = { default-features = false, version = "0.3.9", features = ["alloc", "std"] }
hash_hasher = "2.0.3"
ipfs-unixfs = { version = "0.2", path = "unixfs" }
libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-tokio", "mplex", "noise", "ping", "yamux", "dns-tokio"], version = "0.39.1" }
libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-tokio", "mplex", "noise", "ping", "yamux", "dns-tokio"], version = "0.43.0" }
multibase = { default-features = false, version = "0.9" }
multihash = { default-features = false, version = "0.11" }
prost = { default-features = false, version = "0.8" }
Expand Down
4 changes: 2 additions & 2 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
};

match event {
InboundRequestServed { request } => {
InboundRequest { request } => {
trace!("kad: inbound {:?} request handled", request);
}
OutboundQueryCompleted { result, id, .. } => {
Expand Down Expand Up @@ -377,7 +377,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<BitswapEvent> for Behaviour<

impl<Types: IpfsTypes> NetworkBehaviourEventProcess<PingEvent> for Behaviour<Types> {
fn inject_event(&mut self, event: PingEvent) {
use libp2p::ping::handler::{PingFailure, PingSuccess};
use libp2p::ping::{PingFailure, PingSuccess};
match event {
PingEvent {
peer,
Expand Down
28 changes: 5 additions & 23 deletions src/p2p/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use libp2p::core::{
Multiaddr, PeerId,
};
use libp2p::floodsub::{Floodsub, FloodsubConfig, FloodsubEvent, FloodsubMessage, Topic};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use libp2p::swarm::{ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters};

/// Currently a thin wrapper around Floodsub, perhaps supporting both Gossipsub and Floodsub later.
/// Allows single subscription to a topic with only unbounded senders. Tracks the peers subscribed
Expand Down Expand Up @@ -233,30 +233,22 @@ impl Pubsub {
}

type PubsubNetworkBehaviourAction = NetworkBehaviourAction<
<<Pubsub as NetworkBehaviour>::ProtocolsHandler as ProtocolsHandler>::InEvent,
<<Pubsub as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::InEvent,
<Pubsub as NetworkBehaviour>::OutEvent,
>;

impl NetworkBehaviour for Pubsub {
type ProtocolsHandler = <Floodsub as NetworkBehaviour>::ProtocolsHandler;
type ConnectionHandler = <Floodsub as NetworkBehaviour>::ConnectionHandler;
type OutEvent = void::Void;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
self.floodsub.new_handler()
}

fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.floodsub.addresses_of_peer(peer_id)
}

fn inject_connected(&mut self, peer_id: &PeerId) {
self.floodsub.inject_connected(peer_id)
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.floodsub.inject_disconnected(peer_id)
}

fn inject_connection_established(
&mut self,
peer_id: &PeerId,
Expand All @@ -281,21 +273,11 @@ impl NetworkBehaviour for Pubsub {
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
self.floodsub.inject_event(peer_id, connection, event)
}

fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
self.floodsub
.inject_addr_reach_failure(peer_id, addr, error)
}

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.floodsub.inject_dial_failure(peer_id)
}
Expand Down
129 changes: 27 additions & 102 deletions src/p2p/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::p2p::{MultiaddrWithPeerId, MultiaddrWithoutPeerId};
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use core::task::{Context, Poll};
use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::protocols_handler::{
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
use libp2p::swarm::handler::DummyConnectionHandler;
use libp2p::swarm::{
self,
dial_opts::{DialOpts, PeerCondition},
ConnectionHandler, NetworkBehaviour, PollParameters, Swarm,
};
use libp2p::swarm::{self, DialPeerCondition, NetworkBehaviour, PollParameters, Swarm};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::time::Duration;
Expand Down Expand Up @@ -33,7 +35,10 @@ impl Disconnector {
}

// Currently this is swarm::NetworkBehaviourAction<Void, Void>
type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<<SwarmApi as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, <SwarmApi as NetworkBehaviour>::OutEvent>;
type NetworkBehaviourAction = swarm::NetworkBehaviourAction<
<<SwarmApi as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::InEvent,
<<SwarmApi as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::OutEvent,
>;

#[derive(Debug, Default)]
pub struct SwarmApi {
Expand Down Expand Up @@ -106,11 +111,12 @@ impl SwarmApi {
.connect_registry
.create_subscription(addr.clone().into(), None);

self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: addr.peer_id,
self.events.push_back(NetworkBehaviourAction::Dial {
// rationale: this is sort of explicit command, perhaps the old address is no longer
// valid. Always would be even better but it's bugged at the moment.
condition: DialPeerCondition::NotDialing,
opt: DialOpts::peer_id(addr.peer_id)
.condition(PeerCondition::Always)
.build(),
});

self.pending_addresses
Expand Down Expand Up @@ -140,10 +146,10 @@ impl SwarmApi {
}

impl NetworkBehaviour for SwarmApi {
type ProtocolsHandler = DummyProtocolsHandler;
type ConnectionHandler = DummyConnectionHandler;
type OutEvent = void::Void;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
Default::default()
}

Expand All @@ -165,12 +171,14 @@ impl NetworkBehaviour for SwarmApi {
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
_id: &ConnectionId,
cp: &ConnectedPoint,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
_other_established: usize,
) {
// TODO: could be that the connection is not yet fully established at this point
trace!("inject_connection_established {} {:?}", peer_id, cp);
let addr = connection_point_addr(cp);
trace!("inject_connection_established {} {:?}", peer_id, endpoint);
let addr = connection_point_addr(endpoint);

self.peers.insert(*peer_id);
let connections = self.connected_peers.entry(*peer_id).or_default();
Expand All @@ -185,7 +193,7 @@ impl NetworkBehaviour for SwarmApi {
);
}

if let ConnectedPoint::Dialer { address } = cp {
if let ConnectedPoint::Dialer { address } = endpoint {
// we dialed to the `address`
match self.pending_connections.entry(*peer_id) {
Entry::Occupied(mut oe) => {
Expand Down Expand Up @@ -213,36 +221,6 @@ impl NetworkBehaviour for SwarmApi {
}
}

fn inject_connected(&mut self, peer_id: &PeerId) {
// we have at least one fully open connection and handler is running
//
// just finish all of the subscriptions that remain.
trace!("inject connected {}", peer_id);

let all_subs = self
.pending_addresses
.remove(peer_id)
.unwrap_or_default()
.into_iter()
.chain(
self.pending_connections
.remove(peer_id)
.unwrap_or_default()
.into_iter(),
);

for addr in all_subs {
// fail the other than already connected subscriptions in
// inject_connection_established. while the whole swarmapi is quite unclear on the
// actual use cases, assume that connecting one is good enough for all outstanding
// connection requests.
self.connect_registry.finish_subscription(
addr.into(),
Err("finished connecting to another address".into()),
);
}
}

fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
Expand Down Expand Up @@ -290,7 +268,7 @@ impl NetworkBehaviour for SwarmApi {

// this needs to be guarded, so that the connect test case doesn't cause a
// panic following inject_connection_established, inject_connection_closed
// if there's only the DummyProtocolsHandler, which doesn't open a
// if there's only the DummyConnectionHandler, which doesn't open a
// substream and closes up immediatedly.
self.connect_registry.finish_subscription(
addr.into(),
Expand All @@ -310,29 +288,6 @@ impl NetworkBehaviour for SwarmApi {
}
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
trace!("inject_disconnected: {}", peer_id);
assert!(!self.connected_peers.contains_key(peer_id));
self.roundtrip_times.remove(peer_id);

let failed = self
.pending_addresses
.remove(peer_id)
.unwrap_or_default()
.into_iter()
.chain(
self.pending_connections
.remove(peer_id)
.unwrap_or_default()
.into_iter(),
);

for addr in failed {
self.connect_registry
.finish_subscription(addr.into(), Err("disconnected".into()));
}
}

fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
Expand All @@ -342,8 +297,9 @@ impl NetworkBehaviour for SwarmApi {
// for soon.
self.events
.push_back(swarm::NetworkBehaviourAction::DialPeer {
peer_id: *peer_id,
condition: DialPeerCondition::NotDialing,
opt: DialOpts::peer_id(peer_id)
.condition(PeerCondition::NotDialing)
.build(),
});
}

Expand All @@ -355,37 +311,6 @@ impl NetworkBehaviour for SwarmApi {
}
}

fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
trace!("inject_addr_reach_failure {} {}", addr, error);

if let Some(peer_id) = peer_id {
match self.pending_connections.entry(*peer_id) {
Entry::Occupied(mut oe) => {
let addresses = oe.get_mut();
let addr = MultiaddrWithPeerId::try_from(addr.clone())
.expect("dialed address contains peerid in libp2p 0.38");
let pos = addresses.iter().position(|a| *a == addr);

if let Some(pos) = pos {
addresses.swap_remove(pos);
self.connect_registry
.finish_subscription(addr.into(), Err(error.to_string()));
}

if addresses.is_empty() {
oe.remove();
}
}
Entry::Vacant(_) => {}
}
}
}

fn poll(
&mut self,
_: &mut Context,
Expand Down Expand Up @@ -455,7 +380,7 @@ mod tests {
_ = (&mut swarm2).next() => {},
res = (&mut sub) => {
// this is currently a success even though the connection is never really
// established, the DummyProtocolsHandler doesn't do anything nor want the
// established, the DummyConnectionHandler doesn't do anything nor want the
// connection to be kept alive and thats it.
//
// it could be argued that this should be `Err("keepalive disconnected")`
Expand Down

0 comments on commit e049b6a

Please sign in to comment.