-
Notifications
You must be signed in to change notification settings - Fork 107
/
portal_protocol.nim
1708 lines (1452 loc) · 60.5 KB
/
portal_protocol.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Fluffy
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## Implementation of the Portal wire protocol as specified at:
## https://github.com/ethereum/portal-network-specs/blob/master/portal-wire-protocol.md
{.push raises: [].}
import
std/[sequtils, sets, algorithm, tables],
stew/[byteutils, leb128, endians2],
results,
chronicles,
chronos,
nimcrypto/hash,
bearssl,
ssz_serialization,
metrics,
faststreams,
eth/rlp,
eth/p2p/discoveryv5/
[protocol, node, enr, routing_table, random2, nodes_verification, lru],
"."/[portal_stream, portal_protocol_config],
./messages
export messages, routing_table, protocol
declareCounter portal_message_requests_incoming,
"Portal wire protocol incoming message requests",
labels = ["protocol_id", "message_type"]
declareCounter portal_message_decoding_failures,
"Portal wire protocol message decoding failures", labels = ["protocol_id"]
declareCounter portal_message_requests_outgoing,
"Portal wire protocol outgoing message requests",
labels = ["protocol_id", "message_type"]
declareCounter portal_message_response_incoming,
"Portal wire protocol incoming message responses",
labels = ["protocol_id", "message_type"]
const requestBuckets = [1.0, 3.0, 5.0, 7.0, 9.0, Inf]
declareHistogram portal_lookup_node_requests,
"Portal wire protocol amount of requests per node lookup",
labels = ["protocol_id"],
buckets = requestBuckets
declareHistogram portal_lookup_content_requests,
"Portal wire protocol amount of requests per node lookup",
labels = ["protocol_id"],
buckets = requestBuckets
declareCounter portal_lookup_content_failures,
"Portal wire protocol content lookup failures", labels = ["protocol_id"]
const contentKeysBuckets = [0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, Inf]
declareHistogram portal_content_keys_offered,
"Portal wire protocol amount of content keys per offer message send",
labels = ["protocol_id"],
buckets = contentKeysBuckets
declareHistogram portal_content_keys_accepted,
"Portal wire protocol amount of content keys per accept message received",
labels = ["protocol_id"],
buckets = contentKeysBuckets
declareCounter portal_gossip_offers_successful,
"Portal wire protocol successful content offers from neighborhood gossip",
labels = ["protocol_id"]
declareCounter portal_gossip_offers_failed,
"Portal wire protocol failed content offers from neighborhood gossip",
labels = ["protocol_id"]
declareCounter portal_gossip_with_lookup,
"Portal wire protocol neighborhood gossip that required a node lookup",
labels = ["protocol_id"]
declareCounter portal_gossip_without_lookup,
"Portal wire protocol neighborhood gossip that did not require a node lookup",
labels = ["protocol_id"]
# Note: These metrics are to get some idea on how many enrs are send on average.
# Relevant issue: https://github.com/ethereum/portal-network-specs/issues/136
const enrsBuckets = [0.0, 1.0, 3.0, 5.0, 8.0, 9.0, Inf]
declareHistogram portal_nodes_enrs_packed,
"Portal wire protocol amount of enrs packed in a nodes message",
labels = ["protocol_id"],
buckets = enrsBuckets
# This one will currently hit the max numbers because all neighbours are send,
# not only the ones closer to the content.
declareHistogram portal_content_enrs_packed,
"Portal wire protocol amount of enrs packed in a content message",
labels = ["protocol_id"],
buckets = enrsBuckets
const distanceBuckets = [
float64 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253,
254, 255, 256,
]
declareHistogram portal_find_content_log_distance,
"Portal wire protocol logarithmic distance of requested content",
labels = ["protocol_id"],
buckets = distanceBuckets
declareHistogram portal_offer_log_distance,
"Portal wire protocol logarithmic distance of offered content",
labels = ["protocol_id"],
buckets = distanceBuckets
logScope:
topics = "portal_wire"
const
alpha = 3 ## Kademlia concurrency factor
enrsResultLimit* = 32 ## Maximum amount of ENRs in the total Nodes messages
## that will be processed
refreshInterval = 5.minutes ## Interval of launching a random query to
## refresh the routing table.
revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this
## value in milliseconds
initialLookups = 1 ## Amount of lookups done when populating the routing table
# TalkResp message is a response message so the session is established and a
# regular discv5 packet is assumed for size calculation.
# Regular message = IV + header + message
# talkResp message = rlp: [request-id, response]
talkRespOverhead =
16 + # IV size
55 + # header size
1 + # talkResp msg id
3 + # rlp encoding outer list, max length will be encoded in 2 bytes
9 + # request id (max = 8) + 1 byte from rlp encoding byte string
3 + # rlp encoding response byte string, max length in 2 bytes
16 # HMAC
# These are the concurrent offers per Portal wire protocol that is running.
# Using the `offerQueue` allows for limiting the amount of offers send and
# thus how many streams can be started.
# TODO:
# More thought needs to go into this as it is currently on a per network
# basis. Keep it simple like that? Or limit it better at the stream transport
# level? In the latter case, this might still need to be checked/blocked at
# the very start of sending the offer, because blocking/waiting too long
# between the received accept message and actually starting the stream and
# sending data could give issues due to timeouts on the other side.
# And then there are still limits to be applied also for FindContent and the
# incoming directions.
concurrentOffers = 50
type
ToContentIdHandler* =
proc(contentKey: ByteList): results.Opt[ContentId] {.raises: [], gcsafe.}
DbGetHandler* = proc(
contentKey: ByteList, contentId: ContentId
): results.Opt[seq[byte]] {.raises: [], gcsafe.}
DbStoreHandler* = proc(contentKey: ByteList, contentId: ContentId, content: seq[byte]) {.
raises: [], gcsafe
.}
PortalProtocolId* = array[2, byte]
RadiusCache* = LRUCache[NodeId, UInt256]
ContentKV* = object
contentKey*: ByteList
content*: seq[byte]
OfferRequestType = enum
Direct
Database
OfferRequest = object
dst: Node
case kind: OfferRequestType
of Direct:
contentList: List[ContentKV, contentKeysLimit]
of Database:
contentKeys: ContentKeysList
PortalProtocol* = ref object of TalkProtocol
protocolId*: PortalProtocolId
routingTable*: RoutingTable
baseProtocol*: protocol.Protocol
toContentId*: ToContentIdHandler
dbGet*: DbGetHandler
dbPut*: DbStoreHandler
radiusConfig: RadiusConfig
dataRadius*: UInt256
bootstrapRecords*: seq[Record]
lastLookup: chronos.Moment
refreshLoop: Future[void]
revalidateLoop: Future[void]
stream*: PortalStream
radiusCache: RadiusCache
offerQueue: AsyncQueue[OfferRequest]
offerWorkers: seq[Future[void]]
disablePoke: bool
pingTimings: Table[NodeId, chronos.Moment]
PortalResult*[T] = Result[T, string]
FoundContentKind* = enum
Nodes
Content
FoundContent* = object
src*: Node
case kind*: FoundContentKind
of Content:
content*: seq[byte]
utpTransfer*: bool
of Nodes:
nodes*: seq[Node]
ContentLookupResult* = object
content*: seq[byte]
utpTransfer*: bool
# List of nodes which do not have requested content, and for which
# content is in their range
nodesInterestedInContent*: seq[Node]
TraceResponse* = object
durationMs*: int64
respondedWith*: seq[NodeId]
NodeMetadata* = object
enr*: Record
distance*: UInt256
TraceObject* = object
origin*: NodeId
targetId: UInt256
receivedFrom*: Opt[NodeId]
responses*: Table[string, TraceResponse]
metadata*: Table[string, NodeMetadata]
cancelled*: seq[NodeId]
startedAtMs*: int64
TraceContentLookupResult* = object
content*: Opt[seq[byte]]
utpTransfer*: bool
trace*: TraceObject
func init*(T: type ContentKV, contentKey: ByteList, content: seq[byte]): T =
ContentKV(contentKey: contentKey, content: content)
func init*(
T: type ContentLookupResult,
content: seq[byte],
utpTransfer: bool,
nodesInterestedInContent: seq[Node],
): T =
ContentLookupResult(
content: content,
utpTransfer: utpTransfer,
nodesInterestedInContent: nodesInterestedInContent,
)
func `$`(id: PortalProtocolId): string =
id.toHex()
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
p.routingTable.addNode(node)
proc addNode*(p: PortalProtocol, r: Record): bool =
let node = newNode(r)
if node.isOk():
p.addNode(node[]) == Added
else:
false
func getNode*(p: PortalProtocol, id: NodeId): Opt[Node] =
p.routingTable.getNode(id)
func localNode*(p: PortalProtocol): Node =
p.baseProtocol.localNode
func neighbours*(p: PortalProtocol, id: NodeId, seenOnly = false): seq[Node] =
p.routingTable.neighbours(id = id, seenOnly = seenOnly)
func distance(p: PortalProtocol, a, b: NodeId): UInt256 =
p.routingTable.distance(a, b)
func logDistance(p: PortalProtocol, a, b: NodeId): uint16 =
p.routingTable.logDistance(a, b)
func inRange(
p: PortalProtocol, nodeId: NodeId, nodeRadius: UInt256, contentId: ContentId
): bool =
let distance = p.distance(nodeId, contentId)
distance <= nodeRadius
func inRange*(p: PortalProtocol, contentId: ContentId): bool =
p.inRange(p.localNode.id, p.dataRadius, contentId)
func truncateEnrs(
nodes: seq[Node], maxSize: int, enrOverhead: int
): List[ByteList, 32] =
var enrs: List[ByteList, 32]
var totalSize = 0
for n in nodes:
let enr = ByteList.init(n.record.raw)
if totalSize + enr.len() + enrOverhead <= maxSize:
let res = enrs.add(enr)
# With max payload of discv5 and the sizes of ENRs this should not occur.
doAssert(res, "32 limit will not be reached")
totalSize = totalSize + enr.len() + enrOverhead
else:
break
enrs
func handlePing(p: PortalProtocol, ping: PingMessage, srcId: NodeId): seq[byte] =
# TODO: This should become custom per Portal Network
# TODO: Need to think about the effect of malicious actor sending lots of
# pings from different nodes to clear the LRU.
let customPayloadDecoded =
try:
SSZ.decode(ping.customPayload.asSeq(), CustomPayload)
except SerializationError:
# invalid custom payload, send empty back
return @[]
p.radiusCache.put(srcId, customPayloadDecoded.dataRadius)
let customPayload = CustomPayload(dataRadius: p.dataRadius)
let p = PongMessage(
enrSeq: p.localNode.record.seqNum,
customPayload: ByteList(SSZ.encode(customPayload)),
)
encodeMessage(p)
proc handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
if fn.distances.len == 0:
let enrs = List[ByteList, 32](@[])
encodeMessage(NodesMessage(total: 1, enrs: enrs))
elif fn.distances.contains(0):
# A request for our own record.
let enr = ByteList(rlp.encode(p.localNode.record))
encodeMessage(NodesMessage(total: 1, enrs: List[ByteList, 32](@[enr])))
else:
let distances = fn.distances.asSeq()
if distances.all(
proc(x: uint16): bool =
return x <= 256
):
let nodes = p.routingTable.neighboursAtDistances(distances, seenOnly = true)
# TODO: Total amount of messages is set fixed to 1 for now, else we would
# need to either move the send of the talkresp messages here, or allow for
# returning multiple messages.
# On the long run, it might just be better to use a stream in these cases?
# Size calculation is done to truncate the ENR results in order to not go
# over the discv5 packet size limits. ENRs are sorted so the closest nodes
# will still be passed.
const
nodesOverhead = 1 + 1 + 4 # msg id + total + container offset
maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - nodesOverhead
enrOverhead = 4 # per added ENR, 4 bytes offset overhead
let enrs = truncateEnrs(nodes, maxPayloadSize, enrOverhead)
portal_nodes_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId])
encodeMessage(NodesMessage(total: 1, enrs: enrs))
else:
# invalid request, send empty back
let enrs = List[ByteList, 32](@[])
encodeMessage(NodesMessage(total: 1, enrs: enrs))
proc handleFindContent(
p: PortalProtocol, fc: FindContentMessage, srcId: NodeId
): seq[byte] =
const
contentOverhead = 1 + 1 # msg id + SSZ Union selector
maxPayloadSize = maxDiscv5PacketSize - talkRespOverhead - contentOverhead
enrOverhead = 4 # per added ENR, 4 bytes offset overhead
let contentId = p.toContentId(fc.contentKey).valueOr:
# Return empty response when content key validation fails
# TODO: Better would be to return no message at all? Needs changes on
# discv5 layer.
return @[]
let logDistance = p.logDistance(contentId, p.localNode.id)
portal_find_content_log_distance.observe(
int64(logDistance), labelValues = [$p.protocolId]
)
# Check first if content is in range, as this is a cheaper operation
if p.inRange(contentId):
let contentResult = p.dbGet(fc.contentKey, contentId)
if contentResult.isOk():
let content = contentResult.get()
if content.len <= maxPayloadSize:
return encodeMessage(
ContentMessage(contentMessageType: contentType, content: ByteList(content))
)
else:
let connectionId = p.stream.addContentRequest(srcId, content)
return encodeMessage(
ContentMessage(
contentMessageType: connectionIdType, connectionId: connectionId
)
)
# Node does not have the content, or content is not even in radius,
# send closest neighbours to the requested content id.
let
closestNodes = p.routingTable.neighbours(NodeId(contentId), seenOnly = true)
enrs = truncateEnrs(closestNodes, maxPayloadSize, enrOverhead)
portal_content_enrs_packed.observe(enrs.len().int64, labelValues = [$p.protocolId])
encodeMessage(ContentMessage(contentMessageType: enrsType, enrs: enrs))
proc handleOffer(p: PortalProtocol, o: OfferMessage, srcId: NodeId): seq[byte] =
# Early return when our contentQueue is full. This means there is a backlog
# of content to process and potentially gossip around. Don't accept more
# data in this case.
if p.stream.contentQueue.full():
return encodeMessage(
AcceptMessage(
connectionId: Bytes2([byte 0x00, 0x00]),
contentKeys: ContentKeysBitList.init(o.contentKeys.len),
)
)
var contentKeysBitList = ContentKeysBitList.init(o.contentKeys.len)
var contentKeys = ContentKeysList.init(@[])
# TODO: Do we need some protection against a peer offering lots (64x) of
# content that fits our Radius but is actually bogus?
# Additional TODO, but more of a specification clarification: What if we don't
# want any of the content? Reply with empty bitlist and a connectionId of
# all zeroes but don't actually allow an uTP connection?
for i, contentKey in o.contentKeys:
let contentIdResult = p.toContentId(contentKey)
if contentIdResult.isOk():
let contentId = contentIdResult.get()
let logDistance = p.logDistance(contentId, p.localNode.id)
portal_offer_log_distance.observe(
int64(logDistance), labelValues = [$p.protocolId]
)
if p.inRange(contentId):
if p.dbGet(contentKey, contentId).isErr:
contentKeysBitList.setBit(i)
discard contentKeys.add(contentKey)
else:
# Return empty response when content key validation fails
return @[]
let connectionId =
if contentKeysBitList.countOnes() != 0:
p.stream.addContentOffer(srcId, contentKeys)
else:
# When the node does not accept any of the content offered, reply with an
# all zeroes bitlist and connectionId.
# Note: What to do in this scenario is not defined in the Portal spec.
Bytes2([byte 0x00, 0x00])
encodeMessage(
AcceptMessage(connectionId: connectionId, contentKeys: contentKeysBitList)
)
proc messageHandler(
protocol: TalkProtocol,
request: seq[byte],
srcId: NodeId,
srcUdpAddress: Address,
nodeOpt: Opt[Node],
): seq[byte] =
doAssert(protocol of PortalProtocol)
logScope:
protocolId = p.protocolId
let p = PortalProtocol(protocol)
let decoded = decodeMessage(request)
if decoded.isOk():
let message = decoded.get()
trace "Received message request", srcId, srcUdpAddress, kind = message.kind
# Received a proper Portal message, check first if an ENR is provided by
# the discovery v5 layer and add it to the portal network routing table.
# If not provided through the handshake, try to get it from the discovery v5
# routing table.
# When the node would be eligable for the portal network routing table, it
# is possible that it exists in the base discv5 routing table as the same
# node ids are used. It is not certain at all however as more nodes might
# exists on the base layer, and it will also depend on the distance,
# order of lookups, etc.
# Note: As third measure, could run a findNodes request with distance 0.
if nodeOpt.isSome():
let node = nodeOpt.value()
let status = p.addNode(node)
trace "Adding new node to routing table after incoming request", status, node
else:
let nodeOpt = p.baseProtocol.getNode(srcId)
if nodeOpt.isSome():
let node = nodeOpt.value()
let status = p.addNode(node)
trace "Adding new node to routing table after incoming request", status, node
portal_message_requests_incoming.inc(labelValues = [$p.protocolId, $message.kind])
case message.kind
of MessageKind.ping:
p.handlePing(message.ping, srcId)
of MessageKind.findNodes:
p.handleFindNodes(message.findNodes)
of MessageKind.findContent:
p.handleFindContent(message.findContent, srcId)
of MessageKind.offer:
p.handleOffer(message.offer, srcId)
else:
# This would mean a that Portal wire response message is being send over a
# discv5 talkreq message.
debug "Invalid Portal wire message type over talkreq", kind = message.kind
@[]
else:
portal_message_decoding_failures.inc(labelValues = [$p.protocolId])
debug "Packet decoding error", error = decoded.error, srcId, srcUdpAddress
@[]
proc new*(
T: type PortalProtocol,
baseProtocol: protocol.Protocol,
protocolId: PortalProtocolId,
toContentId: ToContentIdHandler,
dbGet: DbGetHandler,
stream: PortalStream,
bootstrapRecords: openArray[Record] = [],
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
config: PortalProtocolConfig = defaultPortalProtocolConfig,
): T =
let initialRadius: UInt256 = config.radiusConfig.getInitialRadius()
let proto = PortalProtocol(
protocolHandler: messageHandler,
protocolId: protocolId,
routingTable: RoutingTable.init(
baseProtocol.localNode, config.bitsPerHop, config.tableIpLimits, baseProtocol.rng,
distanceCalculator,
),
baseProtocol: baseProtocol,
toContentId: toContentId,
dbGet: dbGet,
radiusConfig: config.radiusConfig,
dataRadius: initialRadius,
bootstrapRecords: @bootstrapRecords,
stream: stream,
radiusCache: RadiusCache.init(256),
offerQueue: newAsyncQueue[OfferRequest](concurrentOffers),
disablePoke: config.disablePoke,
pingTimings: initTable[NodeId, chronos.Moment](),
)
proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
"Only one protocol should have this id"
)
proto
# Sends the discv5 talkreq message with provided Portal message, awaits and
# validates the proper response, and updates the Portal Network routing table.
proc reqResponse[Request: SomeMessage, Response: SomeMessage](
p: PortalProtocol, dst: Node, request: Request
): Future[PortalResult[Response]] {.async.} =
logScope:
protocolId = p.protocolId
trace "Send message request", dstId = dst.id, kind = messageKind(Request)
portal_message_requests_outgoing.inc(
labelValues = [$p.protocolId, $messageKind(Request)]
)
let talkresp =
await talkReq(p.baseProtocol, dst, @(p.protocolId), encodeMessage(request))
# Note: Failure of `decodeMessage` might also simply mean that the peer is
# not supporting the specific talk protocol, as according to specification
# an empty response needs to be send in that case.
# See: https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#talkreq-request-0x05
let messageResponse = talkresp
.mapErr(
proc(x: cstring): string =
$x
)
.flatMap(
proc(x: seq[byte]): Result[Message, string] =
decodeMessage(x)
)
.flatMap(
proc(m: Message): Result[Response, string] =
getInnerMessage[Response](m)
)
if messageResponse.isOk():
trace "Received message response",
srcId = dst.id, srcAddress = dst.address, kind = messageKind(Response)
portal_message_response_incoming.inc(
labelValues = [$p.protocolId, $messageKind(Response)]
)
p.routingTable.setJustSeen(dst)
else:
debug "Error receiving message response",
error = messageResponse.error, srcId = dst.id, srcAddress = dst.address
p.pingTimings.del(dst.id)
p.routingTable.replaceNode(dst)
return messageResponse
proc pingImpl*(
p: PortalProtocol, dst: Node
): Future[PortalResult[PongMessage]] {.async.} =
let customPayload = CustomPayload(dataRadius: p.dataRadius)
let ping = PingMessage(
enrSeq: p.localNode.record.seqNum,
customPayload: ByteList(SSZ.encode(customPayload)),
)
return await reqResponse[PingMessage, PongMessage](p, dst, ping)
proc findNodesImpl*(
p: PortalProtocol, dst: Node, distances: List[uint16, 256]
): Future[PortalResult[NodesMessage]] {.async.} =
let fn = FindNodesMessage(distances: distances)
# TODO Add nodes validation
return await reqResponse[FindNodesMessage, NodesMessage](p, dst, fn)
proc findContentImpl*(
p: PortalProtocol, dst: Node, contentKey: ByteList
): Future[PortalResult[ContentMessage]] {.async.} =
let fc = FindContentMessage(contentKey: contentKey)
return await reqResponse[FindContentMessage, ContentMessage](p, dst, fc)
proc offerImpl*(
p: PortalProtocol, dst: Node, contentKeys: ContentKeysList
): Future[PortalResult[AcceptMessage]] {.async.} =
let offer = OfferMessage(contentKeys: contentKeys)
return await reqResponse[OfferMessage, AcceptMessage](p, dst, offer)
proc recordsFromBytes*(rawRecords: List[ByteList, 32]): PortalResult[seq[Record]] =
var records: seq[Record]
for r in rawRecords.asSeq():
var record: Record
if record.fromBytes(r.asSeq()):
records.add(record)
else:
# If any of the ENRs is invalid, fail immediatly. This is similar as what
# is done on the discovery v5 layer.
return err("Deserialization of an ENR failed")
ok(records)
proc ping*(p: PortalProtocol, dst: Node): Future[PortalResult[PongMessage]] {.async.} =
let pongResponse = await p.pingImpl(dst)
if pongResponse.isOk():
# Update last time we pinged this node
p.pingTimings[dst.id] = now(chronos.Moment)
let pong = pongResponse.get()
# TODO: This should become custom per Portal Network
let customPayloadDecoded =
try:
SSZ.decode(pong.customPayload.asSeq(), CustomPayload)
except MalformedSszError, SszSizeMismatchError:
# invalid custom payload
return err("Pong message contains invalid custom payload")
p.radiusCache.put(dst.id, customPayloadDecoded.dataRadius)
return pongResponse
proc findNodes*(
p: PortalProtocol, dst: Node, distances: seq[uint16]
): Future[PortalResult[seq[Node]]] {.async.} =
let nodesMessage = await p.findNodesImpl(dst, List[uint16, 256](distances))
if nodesMessage.isOk():
let records = recordsFromBytes(nodesMessage.get().enrs)
if records.isOk():
# TODO: distance function is wrong here for state, fix + tests
return ok(verifyNodesRecords(records.get(), dst, enrsResultLimit, distances))
else:
return err(records.error)
else:
return err(nodesMessage.error)
proc findContent*(
p: PortalProtocol, dst: Node, contentKey: ByteList
): Future[PortalResult[FoundContent]] {.async.} =
logScope:
node = dst
contentKey
let contentMessageResponse = await p.findContentImpl(dst, contentKey)
if contentMessageResponse.isOk():
let m = contentMessageResponse.get()
case m.contentMessageType
of connectionIdType:
let nodeAddress = NodeAddress.init(dst)
if nodeAddress.isNone():
# It should not happen as we are already after the succesfull
# talkreq/talkresp cycle
error "Trying to connect to node with unknown address", id = dst.id
return err("Trying to connect to node with unknown address")
# uTP protocol uses BE for all values in the header, incl. connection id
let socket = (
await p.stream.connectTo(
nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId)
)
).valueOr:
debug "uTP connection error for find content", error
return err("Error connecting uTP socket")
try:
# Read all bytes from the socket
# This will either end with a FIN, or because the read action times out.
# A FIN does not necessarily mean that the data read is complete.
# Further validation is required, using a length prefix here might be
# beneficial for this.
let readFut = socket.read()
readFut.cancelCallback = proc(udate: pointer) {.gcsafe.} =
debug "Socket read cancelled", socketKey = socket.socketKey
# In case this `findContent` gets cancelled while reading the data,
# send a FIN and clean up the socket.
socket.close()
if await readFut.withTimeout(p.stream.contentReadTimeout):
let content = readFut.read
# socket received remote FIN and drained whole buffer, it can be
# safely destroyed without notifing remote
debug "Socket read fully", socketKey = socket.socketKey
socket.destroy()
return ok(
FoundContent(src: dst, kind: Content, content: content, utpTransfer: true)
)
else:
debug "Socket read time-out", socketKey = socket.socketKey
# Note: This might look a bit strange, but not doing a socket.close()
# here as this is already done internally. utp_socket `checkTimeouts`
# already does a socket.destroy() on timeout. Might want to change the
# API on this later though.
return err("Reading data from socket timed out, content request failed")
except CancelledError as exc:
# even though we already installed cancelCallback on readFut, it is worth
# catching CancelledError in case that withTimeout throws CancelledError
# but readFut have already finished.
debug "Socket read cancelled", socketKey = socket.socketKey
socket.close()
raise exc
of contentType:
return ok(
FoundContent(
src: dst, kind: Content, content: m.content.asSeq(), utpTransfer: false
)
)
of enrsType:
let records = recordsFromBytes(m.enrs)
if records.isOk():
let verifiedNodes = verifyNodesRecords(records.get(), dst, enrsResultLimit)
return ok(FoundContent(src: dst, kind: Nodes, nodes: verifiedNodes))
else:
return err("Content message returned invalid ENRs")
else:
warn "FindContent failed due to find content request failure ",
error = contentMessageResponse.error
return err("No content response")
proc getContentKeys(o: OfferRequest): ContentKeysList =
case o.kind
of Direct:
var contentKeys: ContentKeysList
for info in o.contentList:
discard contentKeys.add(info.contentKey)
return contentKeys
of Database:
return o.contentKeys
func getMaxOfferedContentKeys*(protocolIdLen: uint32, maxKeySize: uint32): int =
## Calculates how many ContentKeys will fit in one offer message which
## will be small enouch to fit into discv5 limit.
## This is neccesarry as contentKeysLimit (64) is sometimes to big, and even
## half of this can be too much to fit into discv5 limits.
let maxTalkReqPayload = maxDiscv5PacketSize - getTalkReqOverhead(int(protocolIdLen))
# To calculate how much bytes, `n` content keys of size `maxKeySize` will take
# we can use following equation:
# bytes = (n * (maxKeySize + perContentKeyOverhead)) + offerMessageOverhead
# to calculate maximal number of keys which will given space this can be
# transformed to:
# n = trunc((bytes - offerMessageOverhead) / (maxKeySize + perContentKeyOverhead))
return ((maxTalkReqPayload - 5) div (int(maxKeySize) + 4))
proc offer(
p: PortalProtocol, o: OfferRequest
): Future[PortalResult[ContentKeysBitList]] {.async.} =
## Offer triggers offer-accept interaction with one peer
## Whole flow has two phases:
## 1. Come to an agreement on what content to transfer, by using offer and
## accept messages.
## 2. Open uTP stream from content provider to content receiver and transfer
## agreed content.
## There are two types of possible offer requests:
## Direct - when caller provides content to transfer. This way, content is
## guaranteed to be transferred as it stays in memory until whole transfer
## is completed.
## Database - when caller provides keys of content to be transferred. This
## way content is provided from database just before it is transferred through
## uTP socket. This is useful when there is a lot of content to be transferred
## to many peers, and keeping it all in memory could exhaust node resources.
## Main drawback is that content may be deleted from the node database
## by the cleanup process before it will be transferred, so this way does not
## guarantee content transfer.
let contentKeys = getContentKeys(o)
logScope:
node = o.dst
contentKeys
debug "Offering content"
portal_content_keys_offered.observe(
contentKeys.len().int64, labelValues = [$p.protocolId]
)
let acceptMessageResponse = await p.offerImpl(o.dst, contentKeys)
if acceptMessageResponse.isOk():
let m = acceptMessageResponse.get()
let contentKeysLen =
case o.kind
of Direct:
o.contentList.len()
of Database:
o.contentKeys.len()
if m.contentKeys.len() != contentKeysLen:
# TODO:
# When there is such system, the peer should get scored negatively here.
error "Accepted content key bitlist has invalid size"
return err("Accepted content key bitlist has invalid size")
let acceptedKeysAmount = m.contentKeys.countOnes()
portal_content_keys_accepted.observe(
acceptedKeysAmount.int64, labelValues = [$p.protocolId]
)
if acceptedKeysAmount == 0:
debug "No content accepted"
# Don't open an uTP stream if no content was requested
return ok(m.contentKeys)
let nodeAddress = NodeAddress.init(o.dst)
if nodeAddress.isNone():
# It should not happen as we are already after succesfull talkreq/talkresp
# cycle
error "Trying to connect to node with unknown address", id = o.dst.id
return err("Trying to connect to node with unknown address")
let socket = (
await p.stream.connectTo(
nodeAddress.unsafeGet(), uint16.fromBytesBE(m.connectionId)
)
).valueOr:
debug "uTP connection error for offer content", error
return err("Error connecting uTP socket")
template lenu32(x: untyped): untyped =
uint32(len(x))
case o.kind
of Direct:
for i, b in m.contentKeys:
if b:
let content = o.contentList[i].content
var output = memoryOutput()
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
trace "Offered content item send", dataWritten = dataWritten
of Database:
for i, b in m.contentKeys:
if b:
let
contentKey = o.contentKeys[i]
contentIdResult = p.toContentId(contentKey)
if contentIdResult.isOk():
let
contentId = contentIdResult.get()
contentResult = p.dbGet(contentKey, contentId)
var output = memoryOutput()
if contentResult.isOk():
let content = contentResult.get()
output.write(toBytes(content.lenu32, Leb128).toOpenArray())
output.write(content)
else:
# When data turns out missing, add a 0 size varint
output.write(toBytes(0'u8, Leb128).toOpenArray())
let dataWritten = (await socket.write(output.getOutput)).valueOr:
debug "Error writing requested data", error
# No point in trying to continue writing data
socket.close()
return err("Error writing requested data")
trace "Offered content item send", dataWritten = dataWritten
await socket.closeWait()
debug "Content successfully offered"
return ok(m.contentKeys)
else:
warn "Offer failed due to accept request failure ",
error = acceptMessageResponse.error
return err("No accept response")
proc offer*(
p: PortalProtocol, dst: Node, contentKeys: ContentKeysList
): Future[PortalResult[ContentKeysBitList]] {.async.} =
let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys)
return await p.offer(req)
proc offer*(
p: PortalProtocol, dst: Node, content: seq[ContentKV]
): Future[PortalResult[ContentKeysBitList]] {.async.} =
if len(content) > contentKeysLimit:
return err("Cannot offer more than 64 content items")
let contentList = List[ContentKV, contentKeysLimit].init(content)
let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList)
return await p.offer(req)
proc offerWorker(p: PortalProtocol) {.async.} =
while true:
let req = await p.offerQueue.popFirst()
let res = await p.offer(req)
if res.isOk():
portal_gossip_offers_successful.inc(labelValues = [$p.protocolId])
else:
portal_gossip_offers_failed.inc(labelValues = [$p.protocolId])
proc offerQueueEmpty*(p: PortalProtocol): bool =
p.offerQueue.empty()
proc lookupWorker(
p: PortalProtocol, dst: Node, target: NodeId
): Future[seq[Node]] {.async.} =
let distances = lookupDistances(target, dst.id)
let nodesMessage = await p.findNodes(dst, distances)
if nodesMessage.isOk():
let nodes = nodesMessage.get()
# Attempt to add all nodes discovered
for n in nodes:
discard p.addNode(n)
return nodes
else:
return @[]
proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = p.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
var asked, seen = initHashSet[NodeId]()
asked.incl(p.localNode.id) # No need to ask our own node
seen.incl(p.localNode.id) # No need to discover our own node
for node in closestNodes:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
var requestAmount = 0'i64
while true:
var i = 0
# Doing `alpha` amount of requests at once as long as closer non queried
# nodes are discovered.
while i < closestNodes.len and pendingQueries.len < alpha: