Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Update libp2p to v0.43.0 #499

Merged
merged 27 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f601b8d
fix: update libp2p and renamed the changed types
rand0m-cloud Mar 18, 2022
c3a48c9
fix: updated libp2p in the bitswap crate
rand0m-cloud Mar 18, 2022
4e5ff4d
more libp2p updating
rand0m-cloud Mar 18, 2022
918d4d8
more updating of types
rand0m-cloud Mar 18, 2022
c1a5bba
some updates to pubsub
rand0m-cloud Mar 18, 2022
7e9da72
fix the pubsub network behaviour action type
rand0m-cloud Mar 18, 2022
085be77
replaced todo placeholders
rand0m-cloud Mar 18, 2022
e4002d6
re-add connection closed and established
rand0m-cloud Mar 18, 2022
a996922
added change to changelog
rand0m-cloud Mar 18, 2022
bdf977c
enable event_process for BehaviourEvent
rand0m-cloud Mar 18, 2022
93b31b3
chore: clean up type signature
rand0m-cloud Mar 18, 2022
25c8d58
fix: removed unneeded BehaviourEvent struct
rand0m-cloud Mar 18, 2022
3b59193
temp fix: changed field order to workaround bug in libp2p
rand0m-cloud Mar 18, 2022
31262b5
chore: more updating to libp2p
rand0m-cloud Mar 18, 2022
6c6fc3d
fix: update libp2p and renamed the changed types
rand0m-cloud Mar 18, 2022
77291ee
fix(swarm-test): add biased to tokio::select for non-random behavior
rand0m-cloud Mar 18, 2022
888e6f1
wip: re-add code fragment to handle dial failure
rand0m-cloud Mar 18, 2022
72ff95d
fix(swarm): corrected dial failure logic
rand0m-cloud Mar 21, 2022
1cee67d
fix: corrected faulty Vec::retain logic and updated WrongPeerId test
rand0m-cloud Mar 21, 2022
897c16f
fix: apply review suggestions and fix clippy lints
rand0m-cloud Mar 24, 2022
d4d3def
fix(pubsub): tell Floodsub about the peers we want to hear from
rand0m-cloud Mar 25, 2022
87a4114
ci(win): use windows-2019 image
koivunej Mar 30, 2022
82453e5
fix(build): stop building while writing an error
koivunej Mar 30, 2022
277954b
test(pubsub): disjoint topics as new test case
koivunej Apr 1, 2022
50ad10f
test(pubsub): simplify, comment
koivunej Apr 1, 2022
081a598
test(conf): ignore pubsub tests on windows for now
koivunej Apr 1, 2022
bf7a807
doc(p2p): add fixme for possible issue
koivunej Apr 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(swarm): corrected dial failure logic
  • Loading branch information
rand0m-cloud authored and koivunej committed Apr 1, 2022
commit 72ff95dca0f11f697ecefd14992f2811a8f66149
75 changes: 32 additions & 43 deletions src/p2p/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,27 +322,6 @@ impl NetworkBehaviour for SwarmApi {
// we were not dialing to the peer, thus we cannot have a pending subscription to
// finish.
}

trace!("inject_disconnected: {}", peer_id);
assert!(!self.connected_peers.contains_key(peer_id));
self.roundtrip_times.remove(peer_id);

let failed = self
.pending_addresses
.remove(peer_id)
.unwrap_or_default()
.into_iter()
.chain(
self.pending_connections
.remove(peer_id)
.unwrap_or_default()
.into_iter(),
);

for addr in failed {
self.connect_registry
.finish_subscription(addr.into(), Err("disconnected".into()));
}
}

fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}
Expand All @@ -353,33 +332,43 @@ impl NetworkBehaviour for SwarmApi {
_handler: Self::ConnectionHandler,
error: &DialError,
) {
trace!("inject_dial_failure: {:?} ({})", peer_id, error);
if let Some(peer_id) = peer_id {
if self.pending_addresses.contains_key(&peer_id) {
// it is possible that these addresses have not been tried yet; they will be asked
// for soon.
let handler = self.new_handler();
self.events.push_back(swarm::NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(peer_id)
.condition(PeerCondition::NotDialing)
.build(),
handler,
});
}
}

if let Some(peer_id) = peer_id {
match self.pending_connections.entry(peer_id) {
Entry::Occupied(mut oe) => {
let addresses = oe.get_mut();
let addr = MultiaddrWithPeerId::try_from(addr.clone())
.expect("dialed address contains peerid in libp2p 0.38");
let pos = addresses.iter().position(|a| *a == addr);

if let Some(pos) = pos {
addresses.swap_remove(pos);
self.connect_registry
.finish_subscription(addr.into(), Err(error.to_string()));
match error {
DialError::Transport(multiaddrs) => {
for (addr, error) in multiaddrs {
let addr = MultiaddrWithPeerId::try_from(addr.clone())
.expect("to recieve an MultiAddrWithPeerId from DialError");
self.connect_registry
.finish_subscription(addr.into(), Err(error.to_string()));
}

let peer_ids = multiaddrs
.into_iter()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.into_iter()
.iter()

The error is behind a borrow so cannot take ownership here. I was going to ask about the addr.clone() on line 352 but this explains it.

However ... This vec building seems extra because we are already handling a failure to dial a specific peer, and we only allow connect's to MultiaddrWithPeerId on line 97 with pub fn connect(&mut self, addr: MultiaddrWithPeerId) -> ....

So perhaps it would be better to remove from addresses with in the finish_subscription loop above. It will need to be position based swap remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will update it with your suggestion of using swap_remove. My only hesitation come from reading the docs on it and seeing it moves elements to replace the gap. I have no idea what is actually better, my intuition said the retain was better because it described what I wanted instead how I wanted it done.

.map(|(addr, _err)| {
MultiaddrWithPeerId::try_from(addr.clone()).unwrap()
})
.collect::<Vec<_>>();

addresses.retain(|peer_id| peer_ids.iter().any(|id| peer_id == id));
rand0m-cloud marked this conversation as resolved.
Show resolved Hide resolved
}
DialError::WrongPeerId {
obtained: _,
endpoint: _,
} => {
rand0m-cloud marked this conversation as resolved.
Show resolved Hide resolved
for addr in addresses.iter() {
self.connect_registry.finish_subscription(
addr.clone().into(),
Err(error.to_string()),
);
}

addresses.clear();
}
err => trace!("unhandled DialError {}", err),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we trace every DialError? Was thinking about the WrongPeerId failure above and I'm not sure if it would otherwise be reported to the user.

Copy link
Collaborator

@koivunej koivunej Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably go with

Suggested change
err => trace!("unhandled DialError {}", err),
error => {
warn!(?error, "unexpected DialError; some futures might never complete");
},

Unless you can see a way to finish all of the subscriptions under this peerid? I cannot at least, we had not considered this situation originally as mentioned the events used to all arrive. Looks like the pending subscriptions should be finishable with the self.pending_connections.

}

if addresses.is_empty() {
Copy link
Collaborator

@koivunej koivunej Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should test this as well, wondering about the lack of new dial attempt.

Expand Down
19 changes: 11 additions & 8 deletions tests/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,17 @@ async fn publish_between_two_nodes() {
.collect::<HashSet<_>>();

for st in &mut [b_msgs.by_ref(), a_msgs.by_ref()] {
let actual = st
.take(2)
// Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305
// can still be looping
.map(|msg| (*msg).clone())
.map(|msg| (msg.topics, msg.source, msg.data))
.collect::<HashSet<_>>()
.await;
let actual = timeout(
Duration::from_secs(2),
st.take(2)
// Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305
// can still be looping
.map(|msg| (*msg).clone())
.map(|msg| (msg.topics, msg.source, msg.data))
.collect::<HashSet<_>>(),
)
.await
.unwrap();
assert_eq!(expected, actual);
}

Expand Down