Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple peer manager to handle static peers reconnection #1205

Merged
merged 1 commit into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions nimbus/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,17 @@ type
defaultValue: ""
name: "static-peers-file" }: InputFile

reconnectMaxRetry* {.
desc: "Specifies max number of retries if static peers disconnected/not connected. " &
"0 = infinite."
defaultValue: 0
name: "reconnect-max-retry" }: int

reconnectInterval* {.
desc: "Interval in seconds before next attempt to reconnect to static peers. Min 5 seconds."
defaultValue: 15
name: "reconnect-interval" }: int

listenAddress* {.
desc: "Listening IP address for Ethereum P2P and Discovery traffic"
defaultValue: defaultListenAddress
Expand Down
19 changes: 15 additions & 4 deletions nimbus/nimbus.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import
eth/[keys, net/nat, trie/db],
eth/common as eth_common,
eth/p2p as eth_p2p,
eth/p2p/[peer_pool, rlpx_protocols/les_protocol],
eth/p2p/[rlpx_protocols/les_protocol],
json_rpc/rpcserver,
metrics,
metrics/[chronos_httpserver, chronicles_support],
stew/shims/net as stewNet,
websock/websock as ws,
"."/[conf_utils, config, constants, context, genesis, sealer, utils, version],
"."/[conf_utils, config, constants, context, genesis, sealer, utils, version, peers],
./db/[storage_types, db_chain, select_backend],
./graphql/ethapi,
./p2p/[chain, clique/clique_desc, clique/clique_sealer],
Expand Down Expand Up @@ -57,6 +57,7 @@ type
txPool: TxPoolRef
networkLoop: Future[void]
dbBackend: ChainDB
peerManager: PeerManagerRef

proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
if string(conf.blocksFile).len > 0:
Expand Down Expand Up @@ -156,8 +157,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,

# Connect directly to the static nodes
let staticPeers = conf.getStaticPeers()
for enode in staticPeers:
asyncSpawn nimbus.ethNode.peerPool.connectToNode(newNode(enode))
if staticPeers.len > 0:
nimbus.peerManager = PeerManagerRef.new(
nimbus.ethNode.peerPool,
conf.reconnectInterval,
conf.reconnectMaxRetry,
staticPeers
)
nimbus.peerManager.start()

# Start Eth node
if conf.maxPeers > 0:
Expand Down Expand Up @@ -412,6 +419,10 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
await nimbus.graphqlServer.stop()
if conf.engineSigner != ZERO_ADDRESS:
await nimbus.sealingEngine.stop()
if conf.maxPeers > 0:
await nimbus.networkLoop.cancelAndWait()
if nimbus.peerManager.isNil.not:
await nimbus.peerManager.stop()

proc process*(nimbus: NimbusNode, conf: NimbusConf) =
# Main event loop
Expand Down
122 changes: 122 additions & 0 deletions nimbus/peers.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Nimbus
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

import
std/[tables],
chronicles,
chronos,
eth/p2p,
eth/p2p/peer_pool,
../nimbus/sync/protocol

# Currently, this module only handles static peers
# but we can extend it to handles trusted peers as well
# or bootnodes

type
ReconnectState = ref object
node: Node
retryCount: int
connected: bool

PMState = enum
Starting, Running, Stopping, Stopped

PeerManagerRef* = ref object
state: PMState
pool: PeerPool
maxRetryCount: int # zero == infinite
retryInterval: int # in seconds
reconnectStates: seq[ReconnectState]
reconnectFut: Future[void]

logScope:
topics = "PeerManagerRef"

proc setConnected(pm: PeerManagerRef, peer: Peer, connected: bool) =
for n in pm.reconnectStates:
if peer.remote.id == n.node.id:
n.connected = connected
return

doAssert(false, "unreachable code")

proc needReconnect(pm: PeerManagerRef): bool =
for n in pm.reconnectStates:
if not n.connected:
return true

proc reconnect(pm: PeerManagerRef) {.async, gcsafe.} =
for n in pm.reconnectStates:
if not n.connected and pm.state == Running:
if n.retryCount < pm.maxRetryCount or pm.maxRetryCount == 0:
trace "Reconnecting to", remote=n.node.node
await pm.pool.connectToNode(n.node)
inc n.retryCount
elif n.retryCount == pm.maxRetryCount:
trace "Exceed max retry count, give up reconnecting", remote=n.node.node
inc n.retryCount

proc runReconnectLoop(pm: PeerManagerRef) {.async, gcsafe.} =
while pm.state == Running:
if pm.needReconnect:
await pm.reconnect
else:
pm.state = Stopping
break
await sleepAsync(pm.retryInterval.seconds)

proc setupManager(pm: PeerManagerRef, enodes: openArray[ENode]) =
var po: PeerObserver
po.onPeerConnected = proc(peer: Peer) {.gcsafe.} =
trace "Peer connected", remote=peer.remote.node
pm.setConnected(peer, true)

po.onPeerDisconnected = proc(peer: Peer) {.gcsafe.} =
trace "Peer disconnected", remote=peer.remote.node
pm.setConnected(peer, false)
if pm.state notin {Running, Stopped}:
pm.state = Running
pm.reconnectFut = pm.runReconnectLoop()

po.setProtocol eth
pm.pool.addObserver(pm, po)

for enode in enodes:
let state = ReconnectState(
node: newNode(enode),
retryCount: 0,
connected: false
)
pm.reconnectStates.add(state)

proc new*(_: type PeerManagerRef,
pool: PeerPool,
retryInterval: int,
maxRetryCount: int,
enodes: openArray[ENode]): PeerManagerRef =
result = PeerManagerRef(
pool: pool,
state: Starting,
maxRetryCount: max(0, maxRetryCount),
retryInterval: max(5, retryInterval)
)
result.setupManager(enodes)

proc start*(pm: PeerManagerRef) =
if pm.state notin {Stopped, Running} and pm.needReconnect:
pm.state = Running
pm.reconnectFut = pm.runReconnectLoop()
info "Reconnecting to static peers"

proc stop*(pm: PeerManagerRef) {.async.} =
if pm.state notin {Stopped, Stopping}:
pm.state = Stopped
await pm.reconnectFut.cancelAndWait()
info "Peer manager stopped"
11 changes: 6 additions & 5 deletions nimbus/rpc/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

import
std/[strutils, tables],
nimcrypto, eth/common as eth_common, stint, json_rpc/server, json_rpc/errors,
eth/p2p, eth/p2p/enode, eth/p2p/peer_pool,
nimcrypto, eth/common as eth_common,
stint, json_rpc/server, json_rpc/errors,
eth/p2p, eth/p2p/enode,
../config, ./hexstrings

type
Expand All @@ -37,11 +38,11 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) =
result = $conf.networkId

server.rpc("net_listening") do() -> bool:
let numPeers = node.peerPool.connectedNodes.len
let numPeers = node.numPeers
result = numPeers < conf.maxPeers

server.rpc("net_peerCount") do() -> HexQuantityStr:
let peerCount = uint node.peerPool.connectedNodes.len
let peerCount = uint node.numPeers
result = encodeQuantity(peerCount)

server.rpc("net_nodeInfo") do() -> NodeInfo:
Expand All @@ -61,6 +62,6 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) =
server.rpc("nimbus_addPeer") do(enode: string) -> bool:
var res = ENode.fromString(enode)
if res.isOk:
asyncSpawn node.peerPool.connectToNode(newNode(res.get()))
asyncSpawn node.connectToNode(res.get())
return true
raise (ref InvalidRequest)(code: -32602, msg: "Invalid ENode")
2 changes: 1 addition & 1 deletion vendor/nim-eth
Submodule nim-eth updated 2 files
+18 −0 eth/p2p.nim
+20 −4 eth/p2p/peer_pool.nim