-
Notifications
You must be signed in to change notification settings - Fork 107
/
nimbus_import.nim
192 lines (164 loc) · 5.28 KB
/
nimbus_import.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# Nimbus
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import
chronicles,
chronos/timer,
std/[strformat, strutils],
stew/io2,
./config,
./common/common,
./core/[block_import, chain],
./db/era1_db,
beacon_chain/era_db
var running {.volatile.} = true
func shortLog(a: timer.Duration, parts = int.high): string {.inline.} =
## Returns string representation of Duration ``a`` as nanoseconds value.
var
res = ""
v = a.nanoseconds()
parts = parts
template f(n: string, T: Duration) =
if v >= T.nanoseconds():
res.add($(uint64(v div T.nanoseconds())))
res.add(n)
v = v mod T.nanoseconds()
dec parts
if v == 0 or parts <= 0:
return res
f("w", Week)
f("d", Day)
f("h", Hour)
f("m", Minute)
f("s", Second)
f("ms", Millisecond)
f("us", Microsecond)
f("ns", Nanosecond)
res
proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
running = false
setControlCHook(controlCHandler)
let
start =
try:
com.db.getSavedStateBlockNumber().truncate(uint64) + 1
except RlpError as exc:
error "Could not read block number", err = exc.msg
quit(QuitFailure)
chain = com.newChain()
var
imported = 0'u64
gas = 0.u256
txs = 0
time0 = Moment.now()
csv =
if conf.csvStats.isSome:
try:
let f = open(conf.csvStats.get(), fmAppend)
if f.getFileSize() == 0:
f.writeLine("block_number,blocks,txs,gas,time")
f
except IOError as exc:
error "Could not open statistics output file",
file = conf.csvStats, err = exc.msg
quit(QuitFailure)
else:
File(nil)
defer:
if csv != nil:
close(csv)
template blockNumber(): uint64 =
start + imported
if isDir(conf.era1Dir.string):
doAssert conf.networkId == MainNet, "Only mainnet era1 current supported"
const
# TODO the merge block number could be fetched from the era1 file instead,
# specially if the accumulator is added to the chain metadata
lastEra1Block = 15537393
if start <= lastEra1Block:
notice "Importing era1 archive",
start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string
var
headers: seq[BlockHeader]
bodies: seq[BlockBody]
func f(value: float): string =
try:
&"{value:4.3f}"
except ValueError:
raiseAssert "valid fmt string"
template process() =
let
time1 = Moment.now()
statsRes = chain.persistBlocks(headers, bodies)
if statsRes.isErr():
error "Failed to persist blocks", error = statsRes.error
quit(QuitFailure)
txs += statsRes[].txs
gas += uint64 statsRes[].gas
let
time2 = Moment.now()
diff1 = (time2 - time1).nanoseconds().float / 1000000000
diff0 = (time2 - time0).nanoseconds().float / 1000000000
info "Imported blocks",
blockNumber,
blocks = imported,
txs,
gas,
bps = f(headers.len.float / diff1),
tps = f(statsRes[].txs.float / diff1),
gps = f(statsRes[].gas.float / diff1),
avgBps = f(imported.float / diff0),
avgTps = f(txs.float / diff0),
avgGps = f(gas.truncate(uint64).float / diff0), # TODO fix truncate
elapsed = shortLog(time2 - time0, 3)
if csv != nil:
# In the CSV, we store a line for every chunk of blocks processed so
# that the file can meaningfully be appended to when restarting the
# process - this way, each sample is independent
try:
csv.writeLine(
[
$blockNumber,
$headers.len,
$statsRes[].txs,
$statsRes[].gas,
$(time2 - time1).nanoseconds(),
].join(",")
)
csv.flushFile()
except IOError as exc:
warn "Could not write csv", err = exc.msg
headers.setLen(0)
bodies.setLen(0)
let db =
Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present")
defer:
db.dispose()
while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block:
var blk = db.getBlockTuple(blockNumber).valueOr:
error "Could not load block from era1", blockNumber, error
break
imported += 1
headers.add move(blk.header)
bodies.add move(blk.body)
if headers.lenu64 mod conf.chunkSize == 0:
process()
if headers.len > 0:
process() # last chunk, if any
for blocksFile in conf.blocksFile:
if isFile(string blocksFile):
# success or not, we quit after importing blocks
if not importRlpBlock(string blocksFile, com):
quit(QuitFailure)
else:
quit(QuitSuccess)