Skip to content

Commit

Permalink
including arbo as key-value database to be used with updater filter t…
Browse files Browse the repository at this point in the history
…o avoid reprocessing transactions
  • Loading branch information
lucasmenendez committed Jun 21, 2024
1 parent b04d12e commit 3979a7d
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 73 deletions.
11 changes: 9 additions & 2 deletions cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/vocdoni/census3/scanner/providers/manager"
"github.com/vocdoni/census3/scanner/providers/poap"
web3provider "github.com/vocdoni/census3/scanner/providers/web3"
dvotedb "go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/metadb"
"go.vocdoni.io/dvote/log"
)

Expand Down Expand Up @@ -201,10 +203,15 @@ func main() {
DB: farcasterDB,
})
}
// init the filters database
filtersDB, err := metadb.New(dvotedb.TypePebble, config.filtersPath)
if err != nil {
log.Fatal(err)
}
// start the token updater with the database and the provider manager
updater := scanner.NewUpdater(database, w3p, pm, config.filtersPath)
updater := scanner.NewUpdater(database, w3p, pm, filtersDB)
// start the holder scanner with the database and the provider manager
hc := scanner.NewScanner(database, updater, w3p, pm, config.scannerCoolDown, config.filtersPath)
hc := scanner.NewScanner(database, updater, w3p, pm, config.scannerCoolDown)
// if the admin token is not defined, generate a random one
if config.adminToken != "" {
if _, err := uuid.Parse(config.adminToken); err != nil {
Expand Down
128 changes: 128 additions & 0 deletions db/treedb/treedb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package treedb

// The treedb package provides a wrapper of key-value database that uses a
// merkle tree under the hood. Every tree is stored in the same database, but
// with a different prefix.

import (
"fmt"

"go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/prefixeddb"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/tree"
"go.vocdoni.io/dvote/tree/arbo"
)

// filterTreeLevels is the number of levels of the tree used to store the
// filter. It is a constant to avoid re-creating the tree with a different
// number of levels. The available number of leaves is 2^filterTreeLevels.
// It also limits the size of the key to filterTreeLevels/8 bytes.
const filterTreeLevels = 64

// ErrNotInitialized is returned when no tree is initialized in a TreeDB
// instance, which means that LoadTree has not been called and the tree is
// not ready to be used.
var ErrNotInitialized = fmt.Errorf("tree not initialized, call Load first")

// TokenFilter is a filter associated with a token.
type TreeDB struct {
prefix string
parentDB db.Database
tree *tree.Tree
}

// LoadTree loads a tree from the database identified by the given prefix. If it
// does not exist, it creates a new tree with the given prefix. It also creates
// the index if it does not exist. It returns an error if the tree cannot be
// loaded or created.
func LoadTree(db db.Database, prefix string) (*TreeDB, error) {
treeDB := prefixeddb.NewPrefixedDatabase(db, []byte(prefix))
tree, err := tree.New(nil, tree.Options{
DB: treeDB,
MaxLevels: filterTreeLevels,
HashFunc: arbo.HashFunctionBlake2b,
})
if err != nil {
return nil, err
}
// ensure index is created
wTx := tree.DB().WriteTx()
defer wTx.Discard()
return &TreeDB{
prefix: prefix,
parentDB: db,
tree: tree,
}, wTx.Commit()
}

func (tdb *TreeDB) Close() error {
if tdb.tree != nil {
if err := tdb.tree.DB().Close(); err != nil {
return err
}
}
if tdb.parentDB != nil {
return tdb.parentDB.Close()
}
return nil
}

// DeleteTree deletes a tree from the database identified by current prefix.
// It iterates over all the keys in the tree and deletes them. If some key
// cannot be deleted, it logs a warning and continues with the next key. It
// commits the transaction at the end.
func (tdb *TreeDB) Delete() error {
treeDB := prefixeddb.NewPrefixedDatabase(tdb.parentDB, []byte(tdb.prefix))
wTx := treeDB.WriteTx()
if err := treeDB.Iterate(nil, func(k, _ []byte) bool {
if err := wTx.Delete(k); err != nil {
log.Warnw("error deleting key", "key", k, "err", err)
}
return true
}); err != nil {
return err
}
return wTx.Commit()
}

// Add adds a key to the tree.
func (tdb *TreeDB) Add(key, value []byte) error {
if tdb.tree == nil {
return ErrNotInitialized
}
wTx := tdb.tree.DB().WriteTx()
defer wTx.Discard()
if err := tdb.tree.Add(wTx, key, value); err != nil {
return err
}
return wTx.Commit()
}

// Test checks if a key is in the tree.
func (tdb *TreeDB) Test(key []byte) (bool, error) {
if tdb.tree == nil {
return false, ErrNotInitialized
}
_, err := tdb.tree.Get(nil, key)
if err != nil {
if err == arbo.ErrKeyNotFound {
return false, nil
}
return false, err
}
return true, nil
}

// TestAndAdd checks if a key is in the tree, if not, add it to the tree. It
// is the combination of Test and conditional Add.
func (tdb *TreeDB) TestAndAdd(key, value []byte) (bool, error) {
exists, err := tdb.Test(key)
if err != nil {
return false, err
}
if exists {
return true, nil
}
return false, tdb.Add(key, value)
}
32 changes: 0 additions & 32 deletions scanner/filter/filter.go

This file was deleted.

15 changes: 15 additions & 0 deletions scanner/providers/holders_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/ethereum/go-ethereum/common"
)

// BlocksDelta struct defines the delta of blocks processed by any
// HolderProvider. It includes the total number of logs processed, the new logs
// processed, the logs already processed, the last block processed, and if the
// provider is synced. It also includes the current total supply of the token
// set in the provider.
type BlocksDelta struct {
LogsCount uint64
NewLogsCount uint64
Expand All @@ -16,6 +21,16 @@ type BlocksDelta struct {
TotalSupply *big.Int
}

// Filter interface defines the basic methods to interact with a filter to
// store the processed transfers identifiers and avoid to process them again,
// for example, if a token is rescanned. It allows to implement different
// filters, such as in-memory, disk, merkle tree, etc.
type Filter interface {
Add(key, value []byte) error
Test(key []byte) (bool, error)
TestAndAdd(key, value []byte) (bool, error)
}

// HolderProvider is the interface that wraps the basic methods to interact with
// a holders provider. It is used by the HoldersScanner to get the balances of
// the token holders. It allows to implement different providers, such as
Expand Down
5 changes: 2 additions & 3 deletions scanner/providers/web3/erc20_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
erc20 "github.com/vocdoni/census3/contracts/erc/erc20"
"github.com/vocdoni/census3/helpers/web3"
"github.com/vocdoni/census3/scanner/filter"
"github.com/vocdoni/census3/scanner/providers"
"go.vocdoni.io/dvote/log"
)
Expand All @@ -32,7 +31,7 @@ type ERC20HolderProvider struct {
creationBlock uint64
lastNetworkBlock uint64
synced atomic.Bool
filter *filter.TokenFilter
filter providers.Filter
}

func (p *ERC20HolderProvider) Init(_ context.Context, iconf any) error {
Expand Down Expand Up @@ -404,5 +403,5 @@ func (p *ERC20HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error) {
return false, err
}
hID := hashFn.Sum(nil)[:8]
return p.filter.TestAndAdd(hID), nil
return p.filter.TestAndAdd(hID, nil)
}
5 changes: 2 additions & 3 deletions scanner/providers/web3/erc721_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
erc721 "github.com/vocdoni/census3/contracts/erc/erc721"
"github.com/vocdoni/census3/helpers/web3"
"github.com/vocdoni/census3/scanner/filter"
"github.com/vocdoni/census3/scanner/providers"
"go.vocdoni.io/dvote/log"
)
Expand All @@ -32,7 +31,7 @@ type ERC721HolderProvider struct {
creationBlock uint64
lastNetworkBlock uint64
synced atomic.Bool
filter *filter.TokenFilter
filter providers.Filter
}

func (p *ERC721HolderProvider) Init(_ context.Context, iconf any) error {
Expand Down Expand Up @@ -401,5 +400,5 @@ func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error)
return false, err
}
hID := hashFn.Sum(nil)
return p.filter.TestAndAdd(hID), nil
return p.filter.TestAndAdd(hID, nil)
}
5 changes: 2 additions & 3 deletions scanner/providers/web3/erc777_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
erc777 "github.com/vocdoni/census3/contracts/erc/erc777"
"github.com/vocdoni/census3/helpers/web3"
"github.com/vocdoni/census3/scanner/filter"
"github.com/vocdoni/census3/scanner/providers"
"go.vocdoni.io/dvote/log"
)
Expand All @@ -32,7 +31,7 @@ type ERC777HolderProvider struct {
creationBlock uint64
lastNetworkBlock uint64
synced atomic.Bool
filter *filter.TokenFilter
filter providers.Filter
}

func (p *ERC777HolderProvider) Init(_ context.Context, iconf any) error {
Expand Down Expand Up @@ -401,5 +400,5 @@ func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error)
return false, err
}
hID := hashFn.Sum(nil)
return p.filter.TestAndAdd(hID), nil
return p.filter.TestAndAdd(hID, nil)
}
3 changes: 1 addition & 2 deletions scanner/providers/web3/web3_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vocdoni/census3/helpers/web3"
"github.com/vocdoni/census3/scanner/filter"
"github.com/vocdoni/census3/scanner/providers"
"go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/log"
Expand All @@ -22,7 +21,7 @@ type Web3ProviderRef struct {
HexAddress string
ChainID uint64
CreationBlock uint64
Filter *filter.TokenFilter
Filter providers.Filter
}

type Web3ProviderConfig struct {
Expand Down
6 changes: 2 additions & 4 deletions scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type Scanner struct {
networks *web3.Web3Pool
providerManager *manager.ProviderManager
coolDown time.Duration
filtersPath string

tokens []*ScannerToken
tokensMtx sync.Mutex
Expand All @@ -56,16 +55,15 @@ type Scanner struct {

// NewScanner returns a new scanner instance with the required parameters
// initialized.
func NewScanner(db *db.DB, updater *Updater, networks *web3.Web3Pool, pm *manager.ProviderManager,
coolDown time.Duration, filtersPath string,
func NewScanner(db *db.DB, updater *Updater, networks *web3.Web3Pool,
pm *manager.ProviderManager, coolDown time.Duration,
) *Scanner {
return &Scanner{
db: db,
updater: updater,
networks: networks,
providerManager: pm,
coolDown: coolDown,
filtersPath: filtersPath,
tokens: []*ScannerToken{},
tokensMtx: sync.Mutex{},
waiter: sync.WaitGroup{},
Expand Down
Loading

0 comments on commit 3979a7d

Please sign in to comment.