-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use queries with custom xor function #1113
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,16 +29,11 @@ export kvstore_sqlite3 | |
# 3. Or databases are created per network (and kvstores pre content type) and | ||
# thus depending on the network the right db needs to be selected. | ||
|
||
const | ||
# Maximal number of ObjInfo objects held in memory per database scan. 100k | ||
# objects should result in memory usage of around 7mb which should be | ||
# appropriate for even low resource devices | ||
maxObjPerScan = 100000 | ||
|
||
type | ||
RowInfo = tuple | ||
contentId: array[32, byte] | ||
payloadLength: int64 | ||
distance: array[32, byte] | ||
|
||
ObjInfo* = object | ||
contentId*: array[32, byte] | ||
|
@@ -51,7 +46,8 @@ type | |
sizeStmt: SqliteStmt[NoParams, int64] | ||
unusedSizeStmt: SqliteStmt[NoParams, int64] | ||
vacStmt: SqliteStmt[NoParams, void] | ||
getAll: SqliteStmt[NoParams, RowInfo] | ||
contentSizeStmt: SqliteStmt[NoParams, int64] | ||
getAllOrderedByDistanceStmt: SqliteStmt[array[32, byte], RowInfo] | ||
|
||
PutResultType* = enum | ||
ContentStored, DbPruned | ||
|
@@ -65,9 +61,21 @@ type | |
fractionOfDeletedContent*: float64 | ||
numOfDeletedElements*: int64 | ||
|
||
# Objects must be sorted from largest to closest distance | ||
proc `<`(a, b: ObjInfo): bool = | ||
return a.distFrom < b.distFrom | ||
func xorDistance( | ||
a: openArray[byte], | ||
b: openArray[byte] | ||
): Result[seq[byte], cstring] {.cdecl.} = | ||
var s: seq[byte] = newSeq[byte](32) | ||
|
||
if len(a) != 32 or len(b) != 32: | ||
return err("Blobs should have 32 byte length") | ||
|
||
var i = 0 | ||
while i < 32: | ||
s[i] = a[i] xor b[i] | ||
inc i | ||
|
||
return ok(s) | ||
|
||
template expectDb(x: auto): untyped = | ||
# There's no meaningful error handling implemented for a corrupt database or | ||
|
@@ -82,6 +90,9 @@ proc new*(T: type ContentDB, path: string, maxSize: uint32, inMemory = false): C | |
else: | ||
SqStoreRef.init(path, "fluffy").expectDb() | ||
|
||
db.registerCustomScalarFunction("xorDistance", xorDistance) | ||
.expect("Couldn't register custom xor function") | ||
|
||
let getSizeStmt = db.prepareStmt( | ||
"SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size();", | ||
NoParams, int64).get() | ||
|
@@ -96,71 +107,26 @@ proc new*(T: type ContentDB, path: string, maxSize: uint32, inMemory = false): C | |
|
||
let kvStore = kvStore db.openKvStore().expectDb() | ||
|
||
# This needs to go after `openKvStore`, as it checks whether the table name | ||
# kvstore already exists. | ||
let getKeysStmt = db.prepareStmt( | ||
"SELECT key, length(value) FROM kvstore", | ||
NoParams, RowInfo | ||
let contentSizeStmt = db.prepareStmt( | ||
"SELECT SUM(length(value)) FROM kvstore", | ||
NoParams, int64 | ||
).get() | ||
|
||
let getAllOrderedByDistanceStmt = db.prepareStmt( | ||
"SELECT key, length(value), xorDistance(?, key) as distance FROM kvstore ORDER BY distance DESC", | ||
array[32, byte], RowInfo | ||
).get() | ||
|
||
ContentDB( | ||
kv: kvStore, | ||
maxSize: maxSize, | ||
sizeStmt: getSizeStmt, | ||
vacStmt: vacStmt, | ||
getAll: getKeysStmt, | ||
unusedSizeStmt: unusedSize | ||
unusedSizeStmt: unusedSize, | ||
contentSizeStmt: contentSizeStmt, | ||
getAllOrderedByDistanceStmt: getAllOrderedByDistanceStmt | ||
) | ||
|
||
proc getNFurthestElements*( | ||
db: ContentDB, target: UInt256, n: uint64): (seq[ObjInfo], int64) = | ||
## Get at most n furthest elements from db in order from furthest to closest. | ||
## Payload lengths are also returned so the caller can decide how many of | ||
## those elements need to be deleted. | ||
## | ||
## Currently it uses xor metric | ||
## | ||
## Currently works by querying for all elements in database and doing all | ||
## necessary work on program level. This is mainly due to two facts: | ||
## - sqlite does not have build xor function, also it does not handle bitwise | ||
## operations on blobs as expected | ||
## - our nim wrapper for sqlite does not support create_function api of sqlite | ||
## so we cannot create custom function comparing blobs at sql level. If that | ||
## would be possible we may be able to all this work by one sql query | ||
|
||
if n == 0: | ||
return (newSeq[ObjInfo](), 0'i64) | ||
|
||
var heap = initHeapQueue[ObjInfo]() | ||
var totalContentSize: int64 = 0 | ||
|
||
var ri: RowInfo | ||
for e in db.getAll.exec(ri): | ||
let contentId = UInt256.fromBytesBE(ri.contentId) | ||
# TODO: Currently it assumes xor distance, but when we start testing | ||
# networks with other distance functions this needs to be adjusted to the | ||
# custom distance function | ||
let dist = contentId xor target | ||
let obj = ObjInfo( | ||
contentId: ri.contentId, payloadLength: ri.payloadLength, distFrom: dist) | ||
|
||
if (uint64(len(heap)) < n): | ||
heap.push(obj) | ||
else: | ||
if obj > heap[0]: | ||
discard heap.replace(obj) | ||
|
||
totalContentSize = totalContentSize + ri.payloadLength | ||
|
||
var res: seq[ObjInfo] = newSeq[ObjInfo](heap.len()) | ||
|
||
var i = heap.len() - 1 | ||
while heap.len() > 0: | ||
res[i] = heap.pop() | ||
dec i | ||
|
||
return (res, totalContentSize) | ||
|
||
proc reclaimSpace*(db: ContentDB): void = | ||
## Runs sqlite VACUUM commands which rebuilds the db, repacking it into a | ||
## minimal amount of disk space. | ||
|
@@ -195,6 +161,13 @@ proc unusedSize(db: ContentDB): int64 = | |
proc realSize*(db: ContentDB): int64 = | ||
db.size() - db.unusedSize() | ||
|
||
proc contentSize(db: ContentDB): int64 = | ||
## Returns total size of content stored in DB | ||
var size: int64 = 0 | ||
discard (db.contentSizeStmt.exec do(res: int64): | ||
size = res).expectDb() | ||
return size | ||
|
||
proc get*(db: ContentDB, key: openArray[byte]): Option[seq[byte]] = | ||
var res: Option[seq[byte]] | ||
proc onData(data: openArray[byte]) = res = some(@data) | ||
|
@@ -233,41 +206,35 @@ proc contains*(db: ContentDB, key: ContentId): bool = | |
proc del*(db: ContentDB, key: ContentId) = | ||
db.del(key.toByteArrayBE()) | ||
|
||
proc deleteFractionOfContent*( | ||
db: ContentDB, | ||
target: Uint256, | ||
targetFraction: float64): (UInt256, int64, int64, int64) = | ||
## Procedure which tries to delete fraction of database by scanning maxObjPerScan | ||
## furthest elements. | ||
## If the maxObjPerScan furthest elements, is not enough to attain required fraction | ||
## procedure deletes all but one element and report how many bytes have been | ||
## deleted | ||
## Procedure do not call reclaim space, it is left to the caller. | ||
|
||
let (furthestElements, totalContentSize) = db.getNFurthestElements(target, maxObjPerScan) | ||
var bytesDeleted: int64 = 0 | ||
let bytesToDelete = int64(targetFraction * float64(totalContentSize)) | ||
let numOfElements = len(furthestElements) | ||
var numOfDeletedElements: int64 = 0 | ||
|
||
if numOfElements == 0: | ||
# no elements in database, return some zero value | ||
return (UInt256.zero, 0'i64, 0'i64, 0'i64) | ||
proc deleteContentFraction( | ||
db: ContentDB, | ||
target: UInt256, | ||
fraction: float64): (UInt256, int64, int64, int64) = | ||
|
||
let lastIdx = len(furthestElements) - 1 | ||
doAssert( | ||
fraction > 0 and fraction < 1, | ||
"Deleted fraction shohould be > 0 and < 1" | ||
) | ||
|
||
for i, elem in furthestElements: | ||
if i == lastIdx: | ||
# this is our last element, do not delete it and report it as last non deleted | ||
# element | ||
return (elem.distFrom, bytesDeleted, totalContentSize, numOfDeletedElements) | ||
let totalContentSize = db.contentSize() | ||
let bytesToDelete = int64(fraction * float64(totalContentSize)) | ||
var numOfDeletedElements: int64 = 0 | ||
|
||
if bytesDeleted + elem.payloadLength < bytesToDelete: | ||
db.del(elem.contentId) | ||
bytesDeleted = bytesDeleted + elem.payloadLength | ||
var ri: RowInfo | ||
var bytesDeleted: int64 = 0 | ||
let targetBytes = target.toByteArrayBE() | ||
for e in db.getAllOrderedByDistanceStmt.exec(targetBytes, ri): | ||
if bytesDeleted + ri.payloadLength < bytesToDelete: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't matter that much in our current setting, as an arbitrary fraction is chosen anyhow, but it would be more correct if you still allow the last deletion to reach the amount of bytesToDelete (or slightly more). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is matter of preference i.e current approach interprets |
||
db.del(ri.contentId) | ||
bytesDeleted = bytesDeleted + ri.payloadLength | ||
inc numOfDeletedElements | ||
else: | ||
return (elem.distFrom, bytesDeleted, totalContentSize, numOfDeletedElements) | ||
return ( | ||
UInt256.fromBytesBE(ri.contentid), | ||
bytesDeleted, | ||
totalContentSize, | ||
numOfDeletedElements | ||
) | ||
|
||
proc put*( | ||
db: ContentDB, | ||
|
@@ -299,7 +266,7 @@ proc put*( | |
totalContentSize, | ||
deletedElements | ||
) = | ||
db.deleteFractionOfContent(target, 0.25) | ||
db.deleteContentFraction(target, 0.25) | ||
|
||
let deletedFraction = float64(deletedBytes) / float64(totalContentSize) | ||
|
||
|
+64 −0 | eth/db/kvstore_sqlite3.nim | |
+51 −2 | tests/db/test_kvstore_sqlite3.nim |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.