Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] state verification pipeline #795

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
forcodedancing committed Mar 10, 2022
commit 4d8c1481fd3de59a40f8b0bb4699569b4b3509b3
2 changes: 1 addition & 1 deletion core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
}
statedb.Finalise(v.config.IsEIP158(header.Number))
//state verification pipeline - accounts root are not calculated here
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
statedb.AccountsIntermediateWithoutRoot()
statedb.PopulateSnapAccountAndStorage()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
} else {
Expand Down
4 changes: 1 addition & 3 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"math"
"math/rand"
"reflect"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -192,7 +191,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
default:
panic("unknown parent type")
}
fmt.Println("parent type:", reflect.TypeOf(parent))

// Sanity check that accounts or storage slots are never nil
for accountHash, blob := range accounts {
if blob == nil {
Expand Down Expand Up @@ -477,7 +476,6 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([
// Update creates a new layer on top of the existing snapshot diff tree with
// the specified data items.
func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
fmt.Println("diffLayer Update")
return newDiffLayer(dl, blockRoot, destructs, accounts, storage, verified)
}

Expand Down
14 changes: 1 addition & 13 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package snapshot

import (
"bytes"
"fmt"
"sync"

"github.com/VictoriaMetrics/fastcache"
Expand Down Expand Up @@ -60,17 +59,7 @@ func (dl *diskLayer) Verified() bool {
return true
}

func (dl *diskLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
dl.lock.Lock()
defer dl.lock.Unlock()

fmt.Println("diskLayer CorrectAccounts")
fmt.Println("-----")
fmt.Println("diskLayer to use :", len(accounts))
for k, v := range accounts {
fmt.Printf("key:= %s, v:= %x \n", k.Hex(), v)
}
fmt.Println("-----")
func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

// Parent always returns nil as there's no layer below the disk.
Expand Down Expand Up @@ -186,6 +175,5 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro
// the specified data items. Note, the maps are retained by the method to avoid
// copying everything.
func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer {
fmt.Println("disklayer update")
return newDiffLayer(dl, blockHash, destructs, accounts, storage, verified)
}
6 changes: 4 additions & 2 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,10 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
return common.Hash{}, ErrSnapshotStale
}

// TODO: Wait the accountData in the layer is corrected
dl.WaitAndGetVerifyRes()
// Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data
if !dl.WaitAndGetVerifyRes() {
return common.Hash{}, ErrSnapshotStale
}

// Everything below was journalled, persist this layer too
if err := rlp.Encode(buffer, dl.root); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,6 @@ func (t *Tree) Snapshots(root common.Hash, limits int, nodisk bool) []Snapshot {

func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte, verified chan struct{}) error {
hashDestructs, hashAccounts, hashStorage := transformSnapData(destructs, accounts, storage)
fmt.Println("hashAccounts: ", len(hashAccounts))
for k, _ := range hashAccounts {
fmt.Println("key=", k)
}
return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage, verified)
}

Expand Down
64 changes: 42 additions & 22 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,13 +1023,12 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
return s.StateIntermediateRoot()
}

func (s *StateDB) AccountsIntermediateWithoutRoot() {
func (s *StateDB) PopulateSnapAccountAndStorage() {
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
if s.snap != nil && !obj.deleted {
s.snapMux.Lock()
s.populateSnapStorage(obj)
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash)
s.snapMux.Unlock()
}
data, err := rlp.EncodeToBytes(obj)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
Expand Down Expand Up @@ -1117,7 +1116,6 @@ func (s *StateDB) accountDataForDiffLayer() map[common.Hash][]byte {
obj.updateRoot(s.db)
if s.snap != nil && !obj.deleted {
lock.Lock()
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash)
accountData[crypto.Keccak256Hash(obj.address[:])] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash)
lock.Unlock()
}
Expand All @@ -1135,6 +1133,38 @@ func (s *StateDB) accountDataForDiffLayer() map[common.Hash][]byte {
return accountData
}

func (s *StateDB) populateSnapStorage(obj *StateObject) {
tr := obj.getTrie(s.db)
var storage map[string][]byte
for key, value := range obj.pendingStorage {
// Skip noop changes, persist actual changes
if value == obj.originStorage[key] {
continue
}
obj.originStorage[key] = value

var v []byte
if (value == common.Hash{}) {
obj.setError(tr.TryDelete(key[:]))
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
obj.setError(tr.TryUpdate(key[:], v))
}
// If state snapshotting is active, cache the data til commit
if obj.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = obj.db.snapStorage[obj.address]; storage == nil {
storage = make(map[string][]byte)
obj.db.snapStorage[obj.address] = storage
}
}
storage[string(key[:])] = v // v will be nil if value is 0x00
}
}
}

func (s *StateDB) StateIntermediateRoot() common.Hash {
// If there was a trie prefetcher operating, it gets aborted and irrevocably
// modified after we start retrieving tries. Remove it from the statedb after
Expand Down Expand Up @@ -1169,7 +1199,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash {
}
s.trie = tr
}
//TODO: stateObjectsPending here

usedAddrs := make([][]byte, 0, len(s.stateObjectsPending))
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; obj.deleted {
Expand All @@ -1182,7 +1212,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash {
if prefetcher != nil {
prefetcher.used(s.originalRoot, usedAddrs)
}
//TODO: stateObjectsPending here

if len(s.stateObjectsPending) > 0 {
s.stateObjectsPending = make(map[common.Address]struct{})
}
Expand Down Expand Up @@ -1361,7 +1391,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
var verified chan struct{}
var snapUpdated chan struct{}
var snapCreated chan common.Hash
forcodedancing marked this conversation as resolved.
Show resolved Hide resolved
var accountRefreshed chan struct{}
if s.snap != nil {
diffLayer = &types.DiffLayer{}
}
Expand All @@ -1370,33 +1399,24 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
verified = make(chan struct{})
snapUpdated = make(chan struct{})
snapCreated = make(chan common.Hash)
forcodedancing marked this conversation as resolved.
Show resolved Hide resolved
accountRefreshed = make(chan struct{})
}

commmitTrie := func() error {
commitErr := func() error {
accountData := make(map[common.Hash][]byte)
if s.pipeCommit && s.snap != nil {
if s.pipeCommit {
// Due to state verification pipeline, the accounts roots are not updated, leading to the data in the difflayer is not correct
// Fix the wrong data here
accountData = s.accountDataForDiffLayer()
close(accountRefreshed)
fmt.Println("accountData:", len(accountData))
for k, _ := range accountData {
fmt.Println("key=", k)
}
}

if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot {
return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot)
}

if s.pipeCommit {
fmt.Println("s.stateRoot:", s.stateRoot.Hex())
root := <-snapCreated

empty := common.Hash{}
if root != empty {
if root != (common.Hash{}) {
forcodedancing marked this conversation as resolved.
Show resolved Hide resolved
s.snaps.Snapshot(root).CorrectAccounts(accountData)
}
}
Expand Down Expand Up @@ -1545,7 +1565,6 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
snapCreated <- common.Hash{}
}
if s.pipeCommit {
fmt.Println("s.expectedRoot", s.expectedRoot.Hex())
snapCreated <- s.expectedRoot
}
// Keep n diff layers in the memory
Expand All @@ -1557,15 +1576,16 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
log.Warn("Failed to cap snapshot tree", "root", s.expectedRoot, "layers", s.snaps.CapLimit(), "err", err)
}
}()
} else {
if s.pipeCommit { // If no snap created, still need to put data into the channel
snapCreated <- common.Hash{}
}
}
}
return nil
},
func() error {
if s.snap != nil {
if s.pipeCommit {
<-accountRefreshed
}
diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = s.SnapToDiffLayer()
}
return nil
Expand Down