Skip to content

Commit

Permalink
simple peer manager to handle static peers reconnection
Browse files Browse the repository at this point in the history
fix #618
  • Loading branch information
jangko committed Aug 26, 2022
1 parent 5355f4e commit 355134a
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 10 deletions.
11 changes: 11 additions & 0 deletions nimbus/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,17 @@ type
defaultValue: ""
name: "static-peers-file" }: InputFile

reconnectMaxRetry* {.
desc: "Specifies max number of retries if static peers disconnected/not connected. " &
"0 = infinite."
defaultValue: 0
name: "reconnect-max-retry" }: int

reconnectInterval* {.
desc: "Interval in seconds before next attempt to reconnect to static peers. Min 5 seconds."
defaultValue: 15
name: "reconnect-interval" }: int

listenAddress* {.
desc: "Listening IP address for Ethereum P2P and Discovery traffic"
defaultValue: defaultListenAddress
Expand Down
19 changes: 15 additions & 4 deletions nimbus/nimbus.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import
eth/[keys, net/nat, trie/db],
eth/common as eth_common,
eth/p2p as eth_p2p,
eth/p2p/[peer_pool, rlpx_protocols/les_protocol],
eth/p2p/[rlpx_protocols/les_protocol],
json_rpc/rpcserver,
metrics,
metrics/[chronos_httpserver, chronicles_support],
stew/shims/net as stewNet,
websock/websock as ws,
"."/[conf_utils, config, constants, context, genesis, sealer, utils, version],
"."/[conf_utils, config, constants, context, genesis, sealer, utils, version, peers],
./db/[storage_types, db_chain, select_backend],
./graphql/ethapi,
./p2p/[chain, clique/clique_desc, clique/clique_sealer],
Expand Down Expand Up @@ -57,6 +57,7 @@ type
txPool: TxPoolRef
networkLoop: Future[void]
dbBackend: ChainDB
peerManager: PeerManagerRef

proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
if string(conf.blocksFile).len > 0:
Expand Down Expand Up @@ -156,8 +157,14 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,

# Connect directly to the static nodes
let staticPeers = conf.getStaticPeers()
for enode in staticPeers:
asyncSpawn nimbus.ethNode.peerPool.connectToNode(newNode(enode))
if staticPeers.len > 0:
nimbus.peerManager = PeerManagerRef.new(
nimbus.ethNode.peerPool,
conf.reconnectInterval,
conf.reconnectMaxRetry,
staticPeers
)
nimbus.peerManager.start()

# Start Eth node
if conf.maxPeers > 0:
Expand Down Expand Up @@ -412,6 +419,10 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
await nimbus.graphqlServer.stop()
if conf.engineSigner != ZERO_ADDRESS:
await nimbus.sealingEngine.stop()
if conf.maxPeers > 0:
await nimbus.networkLoop.cancelAndWait()
if nimbus.peerManager.isNil.not:
await nimbus.peerManager.stop()

proc process*(nimbus: NimbusNode, conf: NimbusConf) =
# Main event loop
Expand Down
122 changes: 122 additions & 0 deletions nimbus/peers.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Nimbus
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

import
std/[tables],
chronicles,
chronos,
eth/p2p,
eth/p2p/peer_pool,
../nimbus/sync/protocol

# Currently, this module only handles static peers
# but we can extend it to handles trusted peers as well
# or bootnodes

type
ReconnectState = ref object
node: Node
retryCount: int
connected: bool

PMState = enum
Starting, Running, Stopping, Stopped

PeerManagerRef* = ref object
state: PMState
pool: PeerPool
maxRetryCount: int # zero == infinite
retryInterval: int # in seconds
reconnectStates: seq[ReconnectState]
reconnectFut: Future[void]

logScope:
topics = "PeerManagerRef"

proc setConnected(pm: PeerManagerRef, peer: Peer, connected: bool) =
for n in pm.reconnectStates:
if peer.remote.id == n.node.id:
n.connected = connected
return

doAssert(false, "unreachable code")

proc needReconnect(pm: PeerManagerRef): bool =
for n in pm.reconnectStates:
if not n.connected:
return true

proc reconnect(pm: PeerManagerRef) {.async, gcsafe.} =
for n in pm.reconnectStates:
if not n.connected and pm.state == Running:
if n.retryCount < pm.maxRetryCount or pm.maxRetryCount == 0:
trace "Reconnecting to", remote=n.node.node
await pm.pool.connectToNode(n.node)
inc n.retryCount
elif n.retryCount == pm.maxRetryCount:
trace "Exceed max retry count, give up reconnecting", remote=n.node.node
inc n.retryCount

proc runReconnectLoop(pm: PeerManagerRef) {.async, gcsafe.} =
while pm.state == Running:
if pm.needReconnect:
await pm.reconnect
else:
pm.state = Stopping
break
await sleepAsync(pm.retryInterval.seconds)

proc setupManager(pm: PeerManagerRef, enodes: openArray[ENode]) =
var po: PeerObserver
po.onPeerConnected = proc(peer: Peer) {.gcsafe.} =
trace "Peer connected", remote=peer.remote.node
pm.setConnected(peer, true)

po.onPeerDisconnected = proc(peer: Peer) {.gcsafe.} =
trace "Peer disconnected", remote=peer.remote.node
pm.setConnected(peer, false)
if pm.state notin {Running, Stopped}:
pm.state = Running
pm.reconnectFut = pm.runReconnectLoop()

po.setProtocol eth
pm.pool.addObserver(pm, po)

for enode in enodes:
let state = ReconnectState(
node: newNode(enode),
retryCount: 0,
connected: false
)
pm.reconnectStates.add(state)

proc new*(_: type PeerManagerRef,
pool: PeerPool,
retryInterval: int,
maxRetryCount: int,
enodes: openArray[ENode]): PeerManagerRef =
result = PeerManagerRef(
pool: pool,
state: Starting,
maxRetryCount: max(0, maxRetryCount),
retryInterval: max(5, retryInterval)
)
result.setupManager(enodes)

proc start*(pm: PeerManagerRef) =
if pm.state notin {Stopped, Running} and pm.needReconnect:
pm.state = Running
pm.reconnectFut = pm.runReconnectLoop()
info "Reconnecting to static peers"

proc stop*(pm: PeerManagerRef) {.async.} =
if pm.state notin {Stopped, Stopping}:
pm.state = Stopped
await pm.reconnectFut.cancelAndWait()
info "Peer manager stopped"
11 changes: 6 additions & 5 deletions nimbus/rpc/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

import
std/[strutils, tables],
nimcrypto, eth/common as eth_common, stint, json_rpc/server, json_rpc/errors,
eth/p2p, eth/p2p/enode, eth/p2p/peer_pool,
nimcrypto, eth/common as eth_common,
stint, json_rpc/server, json_rpc/errors,
eth/p2p, eth/p2p/enode,
../config, ./hexstrings

type
Expand All @@ -37,11 +38,11 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) =
result = $conf.networkId

server.rpc("net_listening") do() -> bool:
let numPeers = node.peerPool.connectedNodes.len
let numPeers = node.numPeers
result = numPeers < conf.maxPeers

server.rpc("net_peerCount") do() -> HexQuantityStr:
let peerCount = uint node.peerPool.connectedNodes.len
let peerCount = uint node.numPeers
result = encodeQuantity(peerCount)

server.rpc("net_nodeInfo") do() -> NodeInfo:
Expand All @@ -61,6 +62,6 @@ proc setupCommonRpc*(node: EthereumNode, conf: NimbusConf, server: RpcServer) =
server.rpc("nimbus_addPeer") do(enode: string) -> bool:
var res = ENode.fromString(enode)
if res.isOk:
asyncSpawn node.peerPool.connectToNode(newNode(res.get()))
asyncSpawn node.connectToNode(res.get())
return true
raise (ref InvalidRequest)(code: -32602, msg: "Invalid ENode")
2 changes: 1 addition & 1 deletion vendor/nim-eth
Submodule nim-eth updated 2 files
+18 −0 eth/p2p.nim
+20 −4 eth/p2p/peer_pool.nim

0 comments on commit 355134a

Please sign in to comment.