-
Notifications
You must be signed in to change notification settings - Fork 107
/
beacon_db.nim
407 lines (365 loc) · 13.7 KB
/
beacon_db.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# fluffy
# Copyright (c) 2022-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
chronicles,
metrics,
eth/db/kvstore,
eth/db/kvstore_sqlite3,
stint,
results,
ssz_serialization,
beacon_chain/db_limits,
beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
beacon_chain/spec/forks,
beacon_chain/spec/forks_light_client,
./beacon_content,
./beacon_chain_historical_summaries,
./beacon_init_loader,
../wire/portal_protocol
from beacon_chain/spec/helpers import is_better_update, toMeta
export kvstore_sqlite3
type
BestLightClientUpdateStore = ref object
getStmt: SqliteStmt[int64, seq[byte]]
getBulkStmt: SqliteStmt[(int64, int64), seq[byte]]
putStmt: SqliteStmt[(int64, seq[byte]), void]
delStmt: SqliteStmt[int64, void]
BeaconDb* = ref object
backend: SqStoreRef
kv: KvStoreRef
bestUpdates: BestLightClientUpdateStore
forkDigests: ForkDigests
cfg: RuntimeConfig
finalityUpdateCache: Opt[LightClientFinalityUpdateCache]
optimisticUpdateCache: Opt[LightClientOptimisticUpdateCache]
# Storing the content encoded here. Could also store decoded and access the
# slot directly. However, that would require is to have access to the
# fork digests here to be able the re-encode the data.
LightClientFinalityUpdateCache = object
lastFinalityUpdate: seq[byte]
lastFinalityUpdateSlot: uint64
LightClientOptimisticUpdateCache = object
lastOptimisticUpdate: seq[byte]
lastOptimisticUpdateSlot: uint64
template expectDb(x: auto): untyped =
# There's no meaningful error handling implemented for a corrupt database or
# full disk - this requires manual intervention, so we'll panic for now
x.expect("working database (disk broken/full?)")
template disposeSafe(s: untyped): untyped =
if distinctBase(s) != nil:
s.dispose()
s = typeof(s)(nil)
proc initBestUpdatesStore(
backend: SqStoreRef, name: string
): KvResult[BestLightClientUpdateStore] =
?backend.exec(
"""
CREATE TABLE IF NOT EXISTS `""" & name &
"""` (
`period` INTEGER PRIMARY KEY, -- `SyncCommitteePeriod`
`update` BLOB -- `altair.LightClientUpdate` (SSZ)
);
"""
)
let
getStmt = backend
.prepareStmt(
"""
SELECT `update`
FROM `""" & name &
"""`
WHERE `period` = ?;
""",
int64,
seq[byte],
managed = false,
)
.expect("SQL query OK")
getBulkStmt = backend
.prepareStmt(
"""
SELECT `update`
FROM `""" & name &
"""`
WHERE `period` >= ? AND `period` < ?;
""",
(int64, int64),
seq[byte],
managed = false,
)
.expect("SQL query OK")
putStmt = backend
.prepareStmt(
"""
REPLACE INTO `""" & name &
"""` (
`period`, `update`
) VALUES (?, ?);
""",
(int64, seq[byte]),
void,
managed = false,
)
.expect("SQL query OK")
delStmt = backend
.prepareStmt(
"""
DELETE FROM `""" & name &
"""`
WHERE `period` = ?;
""",
int64,
void,
managed = false,
)
.expect("SQL query OK")
ok BestLightClientUpdateStore(
getStmt: getStmt, getBulkStmt: getBulkStmt, putStmt: putStmt, delStmt: delStmt
)
func close*(store: var BestLightClientUpdateStore) =
store.getStmt.disposeSafe()
store.getBulkStmt.disposeSafe()
store.putStmt.disposeSafe()
store.delStmt.disposeSafe()
proc new*(
T: type BeaconDb, networkData: NetworkInitData, path: string, inMemory = false
): BeaconDb =
let
db =
if inMemory:
SqStoreRef.init("", "lc-test", inMemory = true).expect(
"working database (out of memory?)"
)
else:
SqStoreRef.init(path, "lc").expectDb()
kvStore = kvStore db.openKvStore().expectDb()
bestUpdates = initBestUpdatesStore(db, "lcu").expectDb()
BeaconDb(
backend: db,
kv: kvStore,
bestUpdates: bestUpdates,
cfg: networkData.metadata.cfg,
forkDigests: (newClone networkData.forks)[],
)
## Private KvStoreRef Calls
proc get(kv: KvStoreRef, key: openArray[byte]): results.Opt[seq[byte]] =
var res: results.Opt[seq[byte]] = Opt.none(seq[byte])
proc onData(data: openArray[byte]) =
res = ok(@data)
discard kv.get(key, onData).expectDb()
return res
## Private BeaconDb calls
proc get(db: BeaconDb, key: openArray[byte]): results.Opt[seq[byte]] =
db.kv.get(key)
proc put(db: BeaconDb, key, value: openArray[byte]) =
db.kv.put(key, value).expectDb()
## Public ContentId based ContentDB calls
proc get*(db: BeaconDb, key: ContentId): results.Opt[seq[byte]] =
# TODO: Here it is unfortunate that ContentId is a uint256 instead of Digest256.
db.get(key.toBytesBE())
proc put*(db: BeaconDb, key: ContentId, value: openArray[byte]) =
db.put(key.toBytesBE(), value)
# TODO Add checks that uint64 can be safely casted to int64
proc getLightClientUpdates(
db: BeaconDb, start: uint64, to: uint64
): ForkedLightClientUpdateBytesList =
## Get multiple consecutive LightClientUpdates for given periods
var updates: ForkedLightClientUpdateBytesList
var update: seq[byte]
for res in db.bestUpdates.getBulkStmt.exec((start.int64, to.int64), update):
res.expect("SQL query OK")
let byteList = List[byte, MAX_LIGHT_CLIENT_UPDATE_SIZE].init(update)
discard updates.add(byteList)
return updates
proc getBestUpdate*(
db: BeaconDb, period: SyncCommitteePeriod
): Result[ForkedLightClientUpdate, string] =
## Get the best ForkedLightClientUpdate for given period
## Note: Only the best one for a given period is being stored.
doAssert period.isSupportedBySQLite
doAssert distinctBase(db.bestUpdates.getStmt) != nil
var update: seq[byte]
for res in db.bestUpdates.getStmt.exec(period.int64, update):
res.expect("SQL query OK")
return decodeLightClientUpdateForked(db.forkDigests, update)
proc putBootstrap*(
db: BeaconDb, blockRoot: Digest, bootstrap: ForkedLightClientBootstrap
) =
# Put a ForkedLightClientBootstrap in the db.
withForkyBootstrap(bootstrap):
when lcDataFork > LightClientDataFork.None:
let
contentKey = bootstrapContentKey(blockRoot)
contentId = toContentId(contentKey)
forkDigest = forkDigestAtEpoch(
db.forkDigests, epoch(forkyBootstrap.header.beacon.slot), db.cfg
)
encodedBootstrap = encodeBootstrapForked(forkDigest, bootstrap)
db.put(contentId, encodedBootstrap)
func putLightClientUpdate*(db: BeaconDb, period: uint64, update: seq[byte]) =
# Put an encoded ForkedLightClientUpdate in the db.
let res = db.bestUpdates.putStmt.exec((period.int64, update))
res.expect("SQL query OK")
func putBestUpdate*(
db: BeaconDb, period: SyncCommitteePeriod, update: ForkedLightClientUpdate
) =
# Put a ForkedLightClientUpdate in the db.
doAssert not db.backend.readOnly # All `stmt` are non-nil
doAssert period.isSupportedBySQLite
withForkyUpdate(update):
when lcDataFork > LightClientDataFork.None:
let numParticipants = forkyUpdate.sync_aggregate.num_active_participants
if numParticipants < MIN_SYNC_COMMITTEE_PARTICIPANTS:
let res = db.bestUpdates.delStmt.exec(period.int64)
res.expect("SQL query OK")
else:
let
forkDigest = forkDigestAtEpoch(
db.forkDigests, epoch(forkyUpdate.attested_header.beacon.slot), db.cfg
)
encodedUpdate = encodeForkedLightClientObject(update, forkDigest)
res = db.bestUpdates.putStmt.exec((period.int64, encodedUpdate))
res.expect("SQL query OK")
else:
db.bestUpdates.delStmt.exec(period.int64).expect("SQL query OK")
proc putUpdateIfBetter*(
db: BeaconDb, period: SyncCommitteePeriod, update: ForkedLightClientUpdate
) =
let currentUpdate = db.getBestUpdate(period).valueOr:
# No current update for that period so we can just put this one
db.putBestUpdate(period, update)
return
if is_better_update(update, currentUpdate):
db.putBestUpdate(period, update)
proc putUpdateIfBetter*(db: BeaconDb, period: SyncCommitteePeriod, update: seq[byte]) =
let newUpdate = decodeLightClientUpdateForked(db.forkDigests, update).valueOr:
# TODO:
# Need to go over the usage in offer/accept vs findcontent/content
# and in some (all?) decoding has already been verified.
return
db.putUpdateIfBetter(period, newUpdate)
proc getLastFinalityUpdate*(db: BeaconDb): Opt[ForkedLightClientFinalityUpdate] =
db.finalityUpdateCache.map(
proc(x: LightClientFinalityUpdateCache): ForkedLightClientFinalityUpdate =
decodeLightClientFinalityUpdateForked(db.forkDigests, x.lastFinalityUpdate).valueOr:
raiseAssert "Stored finality update must be valid"
)
proc createGetHandler*(db: BeaconDb): DbGetHandler =
return (
proc(contentKey: ByteList, contentId: ContentId): results.Opt[seq[byte]] =
let contentKey = contentKey.decode().valueOr:
# TODO: as this should not fail, maybe it is better to raiseAssert ?
return Opt.none(seq[byte])
case contentKey.contentType
of unused:
raiseAssert "Should not be used and fail at decoding"
of lightClientBootstrap:
db.get(contentId)
of lightClientUpdate:
let
# TODO: add validation that startPeriod is not from the future,
# this requires db to be aware off the current beacon time
startPeriod = contentKey.lightClientUpdateKey.startPeriod
# get max 128 updates
numOfUpdates = min(
uint64(MAX_REQUEST_LIGHT_CLIENT_UPDATES),
contentKey.lightClientUpdateKey.count,
)
toPeriod = startPeriod + numOfUpdates # Not inclusive
updates = db.getLightClientUpdates(startPeriod, toPeriod)
if len(updates) == 0:
Opt.none(seq[byte])
else:
Opt.some(SSZ.encode(updates))
of lightClientFinalityUpdate:
# TODO:
# Return only when the update is better than what is requested by
# contentKey. This is currently not possible as the contentKey does not
# include best update information.
if db.finalityUpdateCache.isSome():
let slot = contentKey.lightClientFinalityUpdateKey.finalizedSlot
let cache = db.finalityUpdateCache.get()
if cache.lastFinalityUpdateSlot >= slot:
Opt.some(cache.lastFinalityUpdate)
else:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
of lightClientOptimisticUpdate:
# TODO same as above applies here too.
if db.optimisticUpdateCache.isSome():
let slot = contentKey.lightClientOptimisticUpdateKey.optimisticSlot
let cache = db.optimisticUpdateCache.get()
if cache.lastOptimisticUpdateSlot >= slot:
Opt.some(cache.lastOptimisticUpdate)
else:
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
of beacon_content.ContentType.historicalSummaries:
db.get(contentId)
)
proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
return (
proc(
contentKey: ByteList, contentId: ContentId, content: seq[byte]
) {.raises: [], gcsafe.} =
let contentKey = decode(contentKey).valueOr:
# TODO: as this should not fail, maybe it is better to raiseAssert ?
return
case contentKey.contentType
of unused:
raiseAssert "Should not be used and fail at decoding"
of lightClientBootstrap:
db.put(contentId, content)
of lightClientUpdate:
let updates = decodeSsz(content, ForkedLightClientUpdateBytesList).valueOr:
return
# Lot of assumptions here:
# - that updates are continious i.e there is no period gaps
# - that updates start from startPeriod of content key
var period = contentKey.lightClientUpdateKey.startPeriod
for update in updates.asSeq():
# Only put the update if it is better, although in currently a new offer
# should not be accepted as it is based on only the period.
db.putUpdateIfBetter(SyncCommitteePeriod(period), update.asSeq())
inc period
of lightClientFinalityUpdate:
db.finalityUpdateCache = Opt.some(
LightClientFinalityUpdateCache(
lastFinalityUpdateSlot:
contentKey.lightClientFinalityUpdateKey.finalizedSlot,
lastFinalityUpdate: content,
)
)
of lightClientOptimisticUpdate:
db.optimisticUpdateCache = Opt.some(
LightClientOptimisticUpdateCache(
lastOptimisticUpdateSlot:
contentKey.lightClientOptimisticUpdateKey.optimisticSlot,
lastOptimisticUpdate: content,
)
)
of beacon_content.ContentType.historicalSummaries:
# TODO: Its probably better to not use the kvstore here and instead use a sql
# table with slot as index and move the slot logic to the db store handler.
let current = db.get(contentId)
if current.isSome():
let summariesWithProof = decodeSsz(
db.forkDigests, current.get(), HistoricalSummariesWithProof
).valueOr:
raiseAssert error
let newSummariesWithProof = decodeSsz(
db.forkDigests, content, HistoricalSummariesWithProof
).valueOr:
return
if newSummariesWithProof.epoch > summariesWithProof.epoch:
db.put(contentId, content)
else:
db.put(contentId, content)
)