Skip to content

Commit

Permalink
Update aristo journal functionality (#2155)
Browse files Browse the repository at this point in the history
* Aristo: Code cosmetics, e.g. update some CamelCase names

* CoreDb+Aristo: Provide oldest known state root implied

details:
  The Aristo journal allows to recover earlier but not all state roots.

* Aristo: Fix journal backward index operator, e.g. `[^1]`

* Aristo: Fix journal updater

why:
  The `fifosStore()` store function slightly misinterpreted the update
  instructions when translation is to database `put()` functions. The
  effect was that the journal was ever growing due to stale entries which
  were never deleted.

* CoreDb+Aristo: Provide utils for purging stale data from the KVT

details:
  See earlier patch, not all state roots are available. This patch
  provides a mapping from some state root to a block number and allows to
  remove all KVT data related to a particular block number

* Aristo+Kvt: Implement a clean up schedule for expired data in KVT

why:
  For a single state ledger like `Aristo`, there is only a limited
  backlog of states. So KVT data (i.e. headers etc.) are cleaned up
  regularly

* Fix copyright year
  • Loading branch information
mjfh authored Apr 26, 2024
1 parent 1512f95 commit 0d4ef02
Show file tree
Hide file tree
Showing 26 changed files with 506 additions and 83 deletions.
32 changes: 28 additions & 4 deletions nimbus/core/chain/persist_blocks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type

PersistBlockFlags = set[PersistBlockFlag]

const
CleanUpEpoch = 30_000.u256
## Regular checks for history clean up (applies to single state DB)

# ------------------------------------------------------------------------------
# Private
# ------------------------------------------------------------------------------
Expand All @@ -53,12 +57,22 @@ proc getVmState(c: ChainRef, header: BlockHeader):
return err()
return ok(vmState)

proc purgeExpiredBlocks(db: CoreDbRef) {.inline, raises: [RlpError].} =
## Remove non-reachable blocks from KVT database
var blkNum = db.getOldestJournalBlockNumber()
if 0 < blkNum:
blkNum = blkNum - 1
while 0 < blkNum:
if not db.forgetHistory blkNum:
break
blkNum = blkNum - 1


proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
bodies: openArray[BlockBody],
flags: PersistBlockFlags = {}): ValidationResult
# wildcard exception, wrapped below in public section
{.inline, raises: [CatchableError].} =

let dbTx = c.db.beginTransaction()
defer: dbTx.dispose()

Expand All @@ -71,10 +85,13 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
let vmState = c.getVmState(headers[0]).valueOr:
return ValidationResult.Error

trace "Persisting blocks",
fromBlock = headers[0].blockNumber,
toBlock = headers[^1].blockNumber
# Check point
let stateRootChpt = vmState.parent.stateRoot

# Needed for figuring out whether KVT cleanup is due (see at the end)
let (fromBlock, toBlock) = (headers[0].blockNumber, headers[^1].blockNumber)

trace "Persisting blocks", fromBlock, toBlock
for i in 0 ..< headers.len:
let (header, body) = (headers[i], bodies[i])

Expand Down Expand Up @@ -177,6 +194,13 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
# The `c.db.persistent()` call is ignored by the legacy DB which
# automatically saves persistently when reaching the zero level transaction
c.db.persistent()

# For a single state ledger, there is only a limited backlog. So clean up
# regularly (the `CleanUpEpoch` should not be too small as each lookup pulls
# a journal entry from disk.)
if (fromBlock mod CleanUpEpoch) <= (toBlock - fromBlock):
c.db.purgeExpiredBlocks()

ValidationResult.OK

# ------------------------------------------------------------------------------
Expand Down
17 changes: 17 additions & 0 deletions nimbus/db/aristo/aristo_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ type
## pair was found on the filter or the backend, this transaction is
## empty.

AristoApiGetFilUbeFn* =
proc(db: AristoDbRef;
qid: QueueID;
): Result[FilterRef,AristoError]
{.noRaise.}
## Get the filter from the unfiltered backened if available.

AristoApiGetKeyRcFn* =
proc(db: AristoDbRef;
vid: VertexID;
Expand Down Expand Up @@ -352,6 +359,7 @@ type
forget*: AristoApiForgetFn
forkTop*: AristoApiForkTopFn
forkWith*: AristoApiForkWithFn
getFilUbe*: AristoApiGetFilUbeFn
getKeyRc*: AristoApiGetKeyRcFn
hashify*: AristoApiHashifyFn
hasPath*: AristoApiHasPathFn
Expand Down Expand Up @@ -384,6 +392,7 @@ type
AristoApiProfForgetFn = "forget"
AristoApiProfForkTopFn = "forkTop"
AristoApiProfForkWithFn = "forkWith"
AristoApiProfGetFilUbeFn = "getFilUBE"
AristoApiProfGetKeyRcFn = "getKeyRc"
AristoApiProfHashifyFn = "hashify"
AristoApiProfHasPathFn = "hasPath"
Expand Down Expand Up @@ -434,6 +443,7 @@ when AutoValidateApiHooks:
doAssert not api.forget.isNil
doAssert not api.forkTop.isNil
doAssert not api.forkWith.isNil
doAssert not api.getFilUbe.isNil
doAssert not api.getKeyRc.isNil
doAssert not api.hashify.isNil
doAssert not api.hasPath.isNil
Expand Down Expand Up @@ -486,6 +496,7 @@ func init*(api: var AristoApiObj) =
api.forget = forget
api.forkTop = forkTop
api.forkWith = forkWith
api.getFilUbe = getFilUbe
api.getKeyRc = getKeyRc
api.hashify = hashify
api.hasPath = hasPath
Expand Down Expand Up @@ -521,6 +532,7 @@ func dup*(api: AristoApiRef): AristoApiRef =
forget: api.forget,
forkTop: api.forkTop,
forkWith: api.forkWith,
getFilUbe: api.getFilUbe,
getKeyRc: api.getKeyRc,
hashify: api.hashify,
hasPath: api.hasPath,
Expand Down Expand Up @@ -609,6 +621,11 @@ func init*(
AristoApiProfForkWithFn.profileRunner:
result = api.forkWith(a, b, c, d)

profApi.getFilUbe =
proc(a: AristoDbRef; b: QueueID): auto =
AristoApiProfGetFilUbeFn.profileRunner:
result = api.getFilUbe(a, b)

profApi.getKeyRc =
proc(a: AristoDbRef; b: VertexID): auto =
AristoApiProfGetKeyRcFn.profileRunner:
Expand Down
22 changes: 19 additions & 3 deletions nimbus/db/aristo/aristo_check.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import
results,
./aristo_walk/persistent,
"."/[aristo_desc, aristo_get, aristo_init, aristo_utils],
./aristo_check/[check_be, check_top]
./aristo_check/[check_be, check_journal, check_top]

# ------------------------------------------------------------------------------
# Public functions
Expand Down Expand Up @@ -55,7 +55,7 @@ proc checkBE*(
cache = true; # Also verify against top layer cache
fifos = false; # Also verify cascaded filter fifos
): Result[void,(VertexID,AristoError)] =
## Veryfy database backend structure. If the argument `relax` is set `false`,
## Verify database backend structure. If the argument `relax` is set `false`,
## all necessary Merkle hashes are compiled and verified. If the argument
## `cache` is set `true`, the cache is also checked so that a safe operation
## (like `resolveBackendFilter()`) will leave the backend consistent.
Expand All @@ -79,6 +79,18 @@ proc checkBE*(
of BackendVoid:
return VoidBackendRef.checkBE(db, cache=cache, relax=relax)

proc checkJournal*(
db: AristoDbRef; # Database, top layer
): Result[void,(QueueID,AristoError)] =
## Verify database backend journal.
case db.backend.kind:
of BackendMemory:
return MemBackendRef.checkJournal(db)
of BackendRocksDB:
return RdbBackendRef.checkJournal(db)
of BackendVoid:
return ok() # no journal


proc check*(
db: AristoDbRef; # Database, top layer
Expand All @@ -89,7 +101,11 @@ proc check*(
): Result[void,(VertexID,AristoError)] =
## Shortcut for running `checkTop()` followed by `checkBE()`
? db.checkTop(proofMode = proofMode)
? db.checkBE(relax = relax, cache = cache)
? db.checkBE(relax = relax, cache = cache, fifos = fifos)
if fifos:
let rc = db.checkJournal()
if rc.isErr:
return err((VertexID(0),rc.error[1]))
ok()

# ------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion nimbus/db/aristo/aristo_check/check_be.nim
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ proc checkBE*[T: RdbBackendRef|MemBackendRef|VoidBackendRef](
if fifos and
not db.backend.isNil and
not db.backend.journal.isNil:
var lastTrg = db.getKeyUBE(VertexID(1)).get(otherwise = VOID_HASH_KEY)
var lastTrg = db.getKeyUbe(VertexID(1)).get(otherwise = VOID_HASH_KEY)
.to(Hash256)
for (qid,filter) in db.backend.T.walkFifoBe: # walk in fifo order
if filter.src != lastTrg:
Expand Down
203 changes: 203 additions & 0 deletions nimbus/db/aristo/aristo_check/check_journal.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# nimbus-eth1
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# https://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# https://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.

{.push raises: [].}

import
std/[algorithm, sequtils, sets, tables],
eth/common,
results,
../aristo_filter/filter_scheduler,
../aristo_walk/persistent,
".."/[aristo_desc, aristo_blobify]

const
ExtraDebugMessages = false

type
JrnRec = tuple
src: Hash256
trg: Hash256
size: int

when ExtraDebugMessages:
import
../aristo_debug

# ------------------------------------------------------------------------------
# Private functions and helpers
# ------------------------------------------------------------------------------

template noValueError(info: static[string]; code: untyped) =
try:
code
except ValueError as e:
raiseAssert info & ", name=\"" & $e.name & "\", msg=\"" & e.msg & "\""

when ExtraDebugMessages:
proc pp(t: var Table[QueueID,JrnRec]): string =
result = "{"
for qid in t.keys.toSeq.sorted:
t.withValue(qid,w):
result &= qid.pp & "#" & $w[].size & ","
if result[^1] == '{':
result &= "}"
else:
result[^1] = '}'

proc pp(t: seq[QueueID]): string =
result = "{"
var list = t
for n in 2 ..< list.len:
if list[n-1] == list[n] - 1 and
(list[n-2] == QueueID(0) or list[n-2] == list[n] - 2):
list[n-1] = QueueID(0)
for w in list:
if w != QueueID(0):
result &= w.pp & ","
elif result[^1] == ',':
result[^1] = '.'
result &= "."
if result[^1] == '{':
result &= "}"
else:
result[^1] = '}'

proc pp(t: HashSet[QueueID]): string =
result = "{"
var list = t.toSeq.sorted
for n in 2 ..< list.len:
if list[n-1] == list[n] - 1 and
(list[n-2] == QueueID(0) or list[n-2] == list[n] - 2):
list[n-1] = QueueID(0)
for w in list:
if w != QueueID(0):
result &= w.pp & ","
elif result[^1] == ',':
result[^1] = '.'
result &= "."
if result[^1] == '{':
result &= "}"
else:
result[^1] = '}'

# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

proc checkJournal*[T: RdbBackendRef|MemBackendRef](
_: type T;
db: AristoDbRef;
): Result[void,(QueueID,AristoError)] =
let jrn = db.backend.journal
if jrn.isNil: return ok()

var
nToQid: seq[QueueID] # qids sorted by history/age
cached: HashSet[QueueID] # `nToQid[]` as set
saved: Table[QueueID,JrnRec]
error: (QueueID,AristoError)

when ExtraDebugMessages:
var
sizeTally = 0
maxBlock = 0

proc moan(n = -1, s = "", listOk = true) =
var txt = ""
if 0 <= n:
txt &= " (" & $n & ")"
if error[1] != AristoError(0):
txt &= " oops"
txt &=
" jLen=" & $jrn.len &
" tally=" & $sizeTally &
" maxBlock=" & $maxBlock &
""
if 0 < s.len:
txt &= " " & s
if error[1] != AristoError(0):
txt &=
" errQid=" & error[0].pp &
" error=" & $error[1] &
""
if listOk:
txt &=
"\n cached=" & cached.pp &
"\n saved=" & saved.pp &
""
debugEcho "*** checkJournal", txt
else:
template moan(n = -1, s = "", listOk = true) =
discard

# Collect cached handles
for n in 0 ..< jrn.len:
let qid = jrn[n]
# Must be no overlap
if qid in cached:
error = (qid,CheckJrnCachedQidOverlap)
moan(2)
return err(error)
cached.incl qid
nToQid.add qid

# Collect saved data
for (qid,fil) in db.backend.T.walkFilBe():
var jrnRec: JrnRec
jrnRec.src = fil.src
jrnRec.trg = fil.trg

when ExtraDebugMessages:
let rc = fil.blobify
if rc.isErr:
moan(5)
return err((qid,rc.error))
jrnRec.size = rc.value.len
if maxBlock < jrnRec.size:
maxBlock = jrnRec.size
sizeTally += jrnRec.size

saved[qid] = jrnRec

# Compare cached against saved data
let
savedQids = saved.keys.toSeq.toHashSet
unsavedQids = cached - savedQids
staleQids = savedQids - cached

if 0 < unsavedQids.len:
error = (unsavedQids.toSeq.sorted[0],CheckJrnSavedQidMissing)
moan(6)
return err(error)

if 0 < staleQids.len:
error = (staleQids.toSeq.sorted[0], CheckJrnSavedQidStale)
moan(7)
return err(error)

# Compare whether journal records link together
if 1 < nToQid.len:
noValueError("linked journal records"):
var prvRec = saved[nToQid[0]]
for n in 1 ..< nToQid.len:
let thisRec = saved[nToQid[n]]
if prvRec.trg != thisRec.src:
error = (nToQid[n],CheckJrnLinkingGap)
moan(8, "qidInx=" & $n)
return err(error)
prvRec = thisRec

moan(9, listOk=false)
ok()

# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------
Loading

0 comments on commit 0d4ef02

Please sign in to comment.