Skip to content

Commit

Permalink
Aristo selective read cashing for rocksdb backend (#2145)
Browse files Browse the repository at this point in the history
* Aristo+Kvt: Better RocksDB profiling

why:
  Providing more detailed information, mainly for `Aristo`

* Aristo: Renamed journal `stats()` to `capacity()`

why:
  `Stats()` was a misnomer

* Aristo: Provide backend read caches for key and vertex IDs

why:
  Dedicated LRU caching for particular types gives a throughput advantage.
  The sizes of the LRU queues used for caching are currently constant
  but might be adjusted at a later time.

* Fix copyright year
  • Loading branch information
mjfh committed Apr 22, 2024
1 parent 1d90b92 commit b9187e0
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 75 deletions.
56 changes: 56 additions & 0 deletions nimbus/db/aristo/aristo_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,14 @@ type

AristoApiProfBeGetVtxFn = "be/getVtx"
AristoApiProfBeGetKeyFn = "be/getKey"
AristoApiProfBeGetFilFn = "be/getFil"
AristoApiProfBeGetIdgFn = "be/getIfg"
AristoApiProfBeGetFqsFn = "be/getFqs"
AristoApiProfBePutVtxFn = "be/putVtx"
AristoApiProfBePutKeyFn = "be/putKey"
AristoApiProfBePutFilFn = "be/putFil"
AristoApiProfBePutIdgFn = "be/putIdg"
AristoApiProfBePutFqsFn = "be/putFqs"
AristoApiProfBePutEndFn = "be/putEnd"

AristoApiProfRef* = ref object of AristoApiRef
Expand Down Expand Up @@ -709,6 +717,54 @@ func init*(
result = be.getKeyFn(a)
data.list[AristoApiProfBeGetKeyFn.ord].masked = true

beDup.getFilFn =
proc(a: QueueID): auto =
AristoApiProfBeGetFilFn.profileRunner:
result = be.getFilFn(a)
data.list[AristoApiProfBeGetFilFn.ord].masked = true

beDup.getIdgFn =
proc(): auto =
AristoApiProfBeGetIdgFn.profileRunner:
result = be.getIdgFn()
data.list[AristoApiProfBeGetIdgFn.ord].masked = true

beDup.getFqsFn =
proc(): auto =
AristoApiProfBeGetFqsFn.profileRunner:
result = be.getFqsFn()
data.list[AristoApiProfBeGetFqsFn.ord].masked = true

beDup.putVtxFn =
proc(a: PutHdlRef; b: openArray[(VertexID,VertexRef)]) =
AristoApiProfBePutVtxFn.profileRunner:
be.putVtxFn(a,b)
data.list[AristoApiProfBePutVtxFn.ord].masked = true

beDup.putKeyFn =
proc(a: PutHdlRef; b: openArray[(VertexID,HashKey)]) =
AristoApiProfBePutKeyFn.profileRunner:
be.putKeyFn(a,b)
data.list[AristoApiProfBePutKeyFn.ord].masked = true

beDup.putFilFn =
proc(a: PutHdlRef; b: openArray[(QueueID,FilterRef)]) =
AristoApiProfBePutFilFn.profileRunner:
be.putFilFn(a,b)
data.list[AristoApiProfBePutFilFn.ord].masked = true

beDup.putIdgFn =
proc(a: PutHdlRef; b: openArray[VertexID]) =
AristoApiProfBePutIdgFn.profileRunner:
be.putIdgFn(a,b)
data.list[AristoApiProfBePutIdgFn.ord].masked = true

beDup.putFqsFn =
proc(a: PutHdlRef; b: openArray[(QueueID,QueueID)]) =
AristoApiProfBePutFqsFn.profileRunner:
be.putFqsFn(a,b)
data.list[AristoApiProfBePutFqsFn.ord].masked = true

beDup.putEndFn =
proc(a: PutHdlRef): auto =
AristoApiProfBePutEndFn.profileRunner:
Expand Down
38 changes: 24 additions & 14 deletions nimbus/db/aristo/aristo_filter/filter_scheduler.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# nimbus-eth1
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http:https://www.apache.org/licenses/LICENSE-2.0)
Expand All @@ -9,7 +9,7 @@
# except according to those terms.

import
std/[algorithm, sequtils],
std/[algorithm, sequtils, typetraits],
".."/[aristo_constants, aristo_desc]

type
Expand Down Expand Up @@ -238,11 +238,7 @@ func fifoDel(
# Delete all available
return (@[(QueueID(1), fifo[1]), (fifo[0], wrap)], ZeroQidPair)

# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

func stats*(
func capacity(
ctx: openArray[tuple[size, width: int]]; # Schedule layout
): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
## Number of maximally stored and covered queued entries for the argument
Expand All @@ -258,17 +254,24 @@ func stats*(
result.minCovered += (ctx[n].size * step).int
result.maxCovered += (size * step).int

func stats*(
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

func capacity*(
ctx: openArray[tuple[size, width, wrap: int]]; # Schedule layout
): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
## Variant of `stats()`
ctx.toSeq.mapIt((it[0],it[1])).stats
## Variant of `capacity()` below.
ctx.toSeq.mapIt((it[0],it[1])).capacity

func stats*(
ctx: QidLayoutRef; # Cascaded fifos descriptor
func capacity*(
journal: QidSchedRef; # Cascaded fifos descriptor
): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
## Variant of `stats()`
ctx.q.toSeq.mapIt((it[0].int,it[1].int)).stats
## Number of maximally stored and covered queued entries for the layout of
## argument `journal`. The resulting value of `maxQueue` entry is the maximal
## number of database slots needed, the `minCovered` and `maxCovered` entry
## indicate the rancge of the backlog foa a fully populated database.
journal.ctx.q.toSeq.mapIt((it[0].int,it[1].int)).capacity()


func addItem*(
Expand Down Expand Up @@ -549,6 +552,13 @@ func `[]`*(
return n.globalQid(wrap - inx)
inx -= qInxMax0 + 1 # Otherwise continue

func `[]`*(
fifo: QidSchedRef; # Cascaded fifos descriptor
bix: BackwardsIndex; # Index into latest items
): QueueID =
## Variant of `[]` for provifing `[^bix]`.
fifo[fifo.state.len - bix.distinctBase]


func `[]`*(
fifo: QidSchedRef; # Cascaded fifos descriptor
Expand Down
31 changes: 15 additions & 16 deletions nimbus/db/aristo/aristo_init/rocks_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ proc getVtxFn(db: RdbBackendRef): GetVtxFn =
proc(vid: VertexID): Result[VertexRef,AristoError] =

# Fetch serialised data record
let data = db.rdb.get(VtxPfx, vid.uint64).valueOr:
let data = db.rdb.getVtx(vid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getVtxFn() failed", vid, error=error[0], info=error[1]
return err(error[0])
Expand All @@ -97,7 +97,7 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn =
proc(vid: VertexID): Result[HashKey,AristoError] =

# Fetch serialised data record
let data = db.rdb.get(KeyPfx, vid.uint64).valueOr:
let data = db.rdb.getKey(vid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getKeyFn: failed", vid, error=error[0], info=error[1]
return err(error[0])
Expand All @@ -119,8 +119,8 @@ proc getFilFn(db: RdbBackendRef): GetFilFn =
result =
proc(qid: QueueID): Result[FilterRef,AristoError] =

# Fetch serialised data record
let data = db.rdb.get(FilPfx, qid.uint64).valueOr:
# Fetch serialised data record.
let data = db.rdb.getByPfx(FilPfx, qid.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getFilFn: failed", qid, error=error[0], info=error[1]
return err(error[0])
Expand All @@ -135,8 +135,8 @@ proc getIdgFn(db: RdbBackendRef): GetIdgFn =
result =
proc(): Result[seq[VertexID],AristoError]=

# Fetch serialised data record
let data = db.rdb.get(AdmPfx, AdmTabIdIdg.uint64).valueOr:
# Fetch serialised data record.
let data = db.rdb.getByPfx(AdmPfx, AdmTabIdIdg.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getIdgFn: failed", error=error[0], info=error[1]
return err(error[0])
Expand All @@ -158,8 +158,8 @@ proc getFqsFn(db: RdbBackendRef): GetFqsFn =
result =
proc(): Result[seq[(QueueID,QueueID)],AristoError]=

# Fetch serialised data record
let data = db.rdb.get(AdmPfx, AdmTabIdFqs.uint64).valueOr:
# Fetch serialised data record.
let data = db.rdb.getByPfx(AdmPfx, AdmTabIdFqs.uint64).valueOr:
when extraTraceMessages:
trace logTxt "getFqsFn: failed", error=error[0], info=error[1]
return err(error[0])
Expand All @@ -179,7 +179,6 @@ proc putBegFn(db: RdbBackendRef): PutBegFn =
db.rdb.begin()
db.newSession()


proc putVtxFn(db: RdbBackendRef): PutVtxFn =
result =
proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) =
Expand All @@ -201,8 +200,8 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn =
else:
batch.add (vid.uint64, EmptyBlob)

# Stash batch session data
db.rdb.put(VtxPfx, batch).isOkOr:
# Stash batch session data via LRU cache
db.rdb.putVtx(batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: VtxPfx,
vid: VertexID(error[0]),
Expand All @@ -223,8 +222,8 @@ proc putKeyFn(db: RdbBackendRef): PutKeyFn =
else:
batch.add (vid.uint64, EmptyBlob)

# Stash batch session data
db.rdb.put(KeyPfx, batch).isOkOr:
# Stash batch session data via LRU cache
db.rdb.putKey(batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: KeyPfx,
vid: VertexID(error[0]),
Expand Down Expand Up @@ -263,7 +262,7 @@ proc putFilFn(db: RdbBackendRef): PutFilFn =
batch.add (qid.uint64, EmptyBlob)

# Stash batch session data
db.rdb.put(FilPfx, batch).isOkOr:
db.rdb.putByPfx(FilPfx, batch).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: FilPfx,
qid: QueueID(error[0]),
Expand All @@ -276,7 +275,7 @@ proc putIdgFn(db: RdbBackendRef): PutIdgFn =
let hdl = hdl.getSession db
if hdl.error.isNil:
let idg = if 0 < vs.len: vs.blobify else: EmptyBlob
db.rdb.put(AdmPfx, @[(AdmTabIdIdg.uint64, idg)]).isOkOr:
db.rdb.putByPfx(AdmPfx, @[(AdmTabIdIdg.uint64, idg)]).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
aid: AdmTabIdIdg,
Expand All @@ -300,7 +299,7 @@ proc putFqsFn(db: RdbBackendRef): PutFqsFn =

# Stash batch session data
let fqs = if 0 < vs.len: vs.blobify else: EmptyBlob
db.rdb.put(AdmPfx, @[(AdmTabIdFqs.uint64, fqs)]).isOkOr:
db.rdb.putByPfx(AdmPfx, @[(AdmTabIdFqs.uint64, fqs)]).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: AdmPfx,
aid: AdmTabIdFqs,
Expand Down
25 changes: 15 additions & 10 deletions nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,34 @@

import
std/os,
eth/common,
rocksdb,
stew/endians2,
stew/[endians2, keyed_queue],
../../aristo_desc,
../init_common

type
RdbInst* = object
store*: ColFamilyReadWrite ## Rocks DB database handler
session*: WriteBatchRef ## For batched `put()`
basePath*: string ## Database directory
noFq*: bool ## No filter queues available
store*: ColFamilyReadWrite ## Rocks DB database handler
session*: WriteBatchRef ## For batched `put()`
rdKeyLru*: KeyedQueue[RdbKey,Blob] ## Read cache
rdVtxLru*: KeyedQueue[RdbKey,Blob] ## Read cache
basePath*: string ## Database directory
noFq*: bool ## No filter queues available

RdbGuestDbRef* = ref object of GuestDbRef
guestDb*: ColFamilyReadWrite ## Pigiback feature reference
guestDb*: ColFamilyReadWrite ## Pigiback feature reference

RdbKey* = array[1 + sizeof VertexID, byte]
## Sub-table key, <pfx> + VertexID

const
GuestFamily* = "Guest" ## Guest family (e.g. for Kvt)
AristoFamily* = "Aristo" ## RocksDB column family
BaseFolder* = "nimbus" ## Same as for Legacy DB
DataFolder* = "aristo" ## Legacy DB has "data"
GuestFamily* = "Guest" ## Guest family (e.g. for Kvt)
AristoFamily* = "Aristo" ## RocksDB column family
BaseFolder* = "nimbus" ## Same as for Legacy DB
DataFolder* = "aristo" ## Legacy DB has "data"
RdKeyLruMaxSize* = 4096 ## Max size of read cache for keys
RdVtxLruMaxSize* = 2048 ## Max size of read cache for vertex IDs

# ------------------------------------------------------------------------------
# Public functions
Expand Down
57 changes: 45 additions & 12 deletions nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import
eth/common,
rocksdb,
results,
stew/keyed_queue,
../../aristo_desc,
../init_common,
./rdb_desc
Expand All @@ -32,28 +33,60 @@ when extraTraceMessages:
logScope:
topics = "aristo-rocksdb"

proc getImpl(rdb: RdbInst; key: RdbKey): Result[Blob,(AristoError,string)] =
var res: Blob
let onData = proc(data: openArray[byte]) =
res = @data

let gotData = rdb.store.get(key, onData).valueOr:
const errSym = RdbBeDriverGetError
when extraTraceMessages:
trace logTxt "get", pfx=key[0], error=errSym, info=error
return err((errSym,error))

# Correct result if needed
if not gotData:
res = EmptyBlob
ok res

# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------

proc get*(
proc getByPfx*(
rdb: RdbInst;
pfx: StorageType;
xid: uint64,
): Result[Blob,(AristoError,string)] =
var res: Blob
let onData = proc(data: openArray[byte]) =
res = @data
rdb.getImpl(xid.toRdbKey pfx)

let gotData = rdb.store.get(xid.toRdbKey pfx, onData).valueOr:
const errSym = RdbBeDriverGetError
when extraTraceMessages:
trace logTxt "get", error=errSym, info=error
return err((errSym,error))
proc getKey*(rdb: var RdbInst; xid: uint64): Result[Blob,(AristoError,string)] =
# Try LRU cache first
let
key = xid.toRdbKey KeyPfx
rc = rdb.rdKeyLru.lruFetch(key)
if rc.isOK:
return ok(rc.value)

if not gotData:
res = EmptyBlob
ok res
# Otherwise fetch from backend database
let res = ? rdb.getImpl(key)

# Update cache and return
ok rdb.rdKeyLru.lruAppend(key, res, RdKeyLruMaxSize)

proc getVtx*(rdb: var RdbInst; xid: uint64): Result[Blob,(AristoError,string)] =
# Try LRU cache first
let
key = xid.toRdbKey VtxPfx
rc = rdb.rdVtxLru.lruFetch(key)
if rc.isOK:
return ok(rc.value)

# Otherwise fetch from backend database
let res = ? rdb.getImpl(key)

# Update cache and return
ok rdb.rdVtxLru.lruAppend(key, res, RdVtxLruMaxSize)

# ------------------------------------------------------------------------------
# End
Expand Down
Loading

0 comments on commit b9187e0

Please sign in to comment.