Skip to content

Commit

Permalink
new 'next' method for updater to get the next token to update in each…
Browse files Browse the repository at this point in the history
… iteration an support sorted scan
  • Loading branch information
lucasmenendez committed Jun 23, 2024
1 parent 3979a7d commit 09f8392
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func main() {
log.Fatal(err)
}
// start the token updater with the database and the provider manager
updater := scanner.NewUpdater(database, w3p, pm, filtersDB)
updater := scanner.NewUpdater(database, w3p, pm, filtersDB, config.scannerCoolDown)
// start the holder scanner with the database and the provider manager
hc := scanner.NewScanner(database, updater, w3p, pm, config.scannerCoolDown)
// if the admin token is not defined, generate a random one
Expand Down
3 changes: 2 additions & 1 deletion scanner/providers/web3/erc20_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
alreadyProcessedLogs := uint64(0)
balances := make(map[common.Address]*big.Int)
// iterate the logs and update the balances
log.Infow("parsing logs", "address", p.address, "type", p.TypeName(), "count", len(logs))
for _, currentLog := range logs {
// skip the log if it has been removed
if currentLog.Removed {
Expand Down Expand Up @@ -208,7 +209,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
balances[logData.From] = new(big.Int).Neg(logData.Value)
}
}
log.Infow("saving blocks",
log.Infow("logs parsed",
"count", len(balances),
"new_logs", newTransfers,
"already_processed_logs", alreadyProcessedLogs,
Expand Down
3 changes: 1 addition & 2 deletions scanner/providers/web3/erc721_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (p *ERC721HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
if errors.Is(err, ErrTooManyRequests) {
log.Warnf("too many requests, the provider will continue in the next iteration from block %d", lastBlock)
}
log.Warnw("logs received", "number_of_logs", len(logs), "last_block", lastBlock)
// encode the number of new transfers
newTransfers := uint64(0)
alreadyProcessedLogs := uint64(0)
Expand Down Expand Up @@ -399,6 +398,6 @@ func (p *ERC721HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error)
if _, err := hashFn.Write([]byte(transferID)); err != nil {
return false, err
}
hID := hashFn.Sum(nil)
hID := hashFn.Sum(nil)[:8]
return p.filter.TestAndAdd(hID, nil)
}
3 changes: 1 addition & 2 deletions scanner/providers/web3/erc777_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (p *ERC777HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fr
if errors.Is(err, ErrTooManyRequests) {
log.Warnf("too many requests, the provider will continue in the next iteration from block %d", lastBlock)
}
log.Warnw("logs received", "number_of_logs", len(logs), "last_block", lastBlock)
// encode the number of new transfers
newTransfers := uint64(0)
alreadyProcessedLogs := uint64(0)
Expand Down Expand Up @@ -399,6 +398,6 @@ func (p *ERC777HolderProvider) isLogAlreadyProcessed(l types.Log) (bool, error)
if _, err := hashFn.Write([]byte(transferID)); err != nil {
return false, err
}
hID := hashFn.Sum(nil)
hID := hashFn.Sum(nil)[:8]
return p.filter.TestAndAdd(hID, nil)
}
2 changes: 1 addition & 1 deletion scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (s *Scanner) Start(ctx context.Context) {
continue
}
}

log.Infow("checking token in the updater queue",
"address", token.Address.Hex(),
"chainID", token.ChainID,
Expand Down Expand Up @@ -435,6 +434,7 @@ func (s *Scanner) prepareToken(token *ScannerToken) error {
return err
}
token.CreationBlock = creationBlock
token.LastBlock = creationBlock
token.Ready = true
}
return nil
Expand Down
172 changes: 108 additions & 64 deletions scanner/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -51,62 +52,73 @@ type Updater struct {
ctx context.Context
cancel context.CancelFunc

db *db.DB
networks *web3.Web3Pool
providers *manager.ProviderManager
queue map[string]*UpdateRequest
queueMtx sync.Mutex
processing sync.Map
waiter sync.WaitGroup
kvdb dvotedb.Database
db *db.DB
networks *web3.Web3Pool
providers *manager.ProviderManager
sortedQueue []string
queue map[string]*UpdateRequest
queueMtx sync.Mutex
processing sync.Map
nextReq atomic.Uint64
waiter sync.WaitGroup
kvdb dvotedb.Database
coolDown time.Duration
}

// NewUpdater creates a new instance of Updater.
func NewUpdater(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager,
kvdb dvotedb.Database,
kvdb dvotedb.Database, coolDown time.Duration,
) *Updater {
return &Updater{
db: db,
networks: networks,
providers: pm,
queue: make(map[string]*UpdateRequest),
kvdb: kvdb,
db: db,
networks: networks,
providers: pm,
sortedQueue: []string{},
queue: make(map[string]*UpdateRequest),
kvdb: kvdb,
coolDown: coolDown,
}
}

// Start starts the updater process in a goroutine.
func (u *Updater) Start(ctx context.Context, concurrentTokens int) {
u.ctx, u.cancel = context.WithCancel(ctx)
sem := make(chan struct{}, concurrentTokens)
defer close(sem)
for {
select {
case <-u.ctx.Done():
return
default:
pending := u.pendingRequests()
if len(pending) == 0 {
time.Sleep(coolDown)
req, id := u.next()
if req == nil {
log.Info("no more requests to process, sleeping...")
time.Sleep(u.coolDown)
continue
}
sem := make(chan struct{}, concurrentTokens)
defer close(sem)
for id, req := range u.pendingRequests() {
u.processing.Store(id, true)
sem <- struct{}{}
go func(id string, req *UpdateRequest) {
defer func() {
<-sem
u.processing.Store(id, false)
}()
if err := u.process(req); err != nil {
log.Errorf("Error processing update request: %v", err)
return
}
// update the request in the queue
u.queueMtx.Lock()
u.queue[id] = req
u.queueMtx.Unlock()
}(id, req)
}
sem <- struct{}{}
u.waiter.Add(1)
go func(id string, req UpdateRequest) {
defer func() {
u.waiter.Done()
<-sem
}()
log.Infow("processing token",
"address", req.Address.Hex(),
"from", req.CreationBlock,
"to", req.EndBlock,
"current", req.LastBlock)
res, err := u.process(id, req)
if err != nil {
log.Errorf("error processing update request: %v", err)
return
}
// update the request in the queue
log.Infow("updating request in the queue", "lastBlock", req.LastBlock, "done", req.Done)
if err := u.SetRequest(id, &res); err != nil {
log.Errorf("error updating request in the queue: %v", err)
}
}(id, *req)
}
}
}
Expand All @@ -127,12 +139,20 @@ func (u *Updater) RequestStatus(id string, deleteOnDone bool) *UpdateRequest {
if !ok {
return nil
}
res := *req
if deleteOnDone && req.Done {
// remove the request from the processing map
u.processing.Delete(id)
// remove the request from the queue
delete(u.queue, id)
// remove the request from the sorted queue
for i, v := range u.sortedQueue {
if v == id {
u.sortedQueue = append(u.sortedQueue[:i], u.sortedQueue[i+1:]...)
break
}
}
}
return &res
return req
}

// SetRequest adds a new request to the queue. It will return an error if the
Expand All @@ -157,8 +177,10 @@ func (u *Updater) SetRequest(id string, req *UpdateRequest) error {
}
u.queueMtx.Lock()
defer u.queueMtx.Unlock()
if _, exists := u.queue[id]; !exists {
u.sortedQueue = append(u.sortedQueue, id)
}
u.queue[id] = req
u.processing.Store(id, false)
return nil
}

Expand Down Expand Up @@ -195,48 +217,71 @@ func RequestID(address common.Address, chainID uint64, externalID string) (strin
return hex.EncodeToString(bHash[:4]), nil
}

func (u *Updater) pendingRequests() map[string]*UpdateRequest {
func (u *Updater) next() (*UpdateRequest, string) {
u.queueMtx.Lock()
defer u.queueMtx.Unlock()
queue := map[string]*UpdateRequest{}
for k, v := range u.queue {
if processing, ok := u.processing.Load(k); v.Done || !ok || processing.(bool) {
continue
// if the queue is empty return nil
if len(u.sortedQueue) == 0 {
return nil, ""
}
// get the next request in the queue, if the next request is out of the
// range of the sorted queue, return nil and set the next request index to 0
i := u.nextReq.Load()
max := uint64(len(u.sortedQueue))
if i >= max {
u.nextReq.Store(0)
return nil, ""
}
// iterate over the sorted queue to find the next request that is not being
// processed or already done
for ; i < max; i++ {
id := u.sortedQueue[i]
req, exists := u.queue[id]
if !exists {
// if the request is not found, remove the ID from the sorted queue and
// return nil setting the next request index to 0
u.sortedQueue = append(u.sortedQueue[:i], u.sortedQueue[i+1:]...)
u.nextReq.Store(0)
return nil, ""
}
// if request is not done and not being processed, return it
if isProcessing, ok := u.processing.Load(id); !req.Done && (!ok || !isProcessing.(bool)) {
u.nextReq.Store(i + 1)
return req, id
}
queue[k] = v
}
return queue
// if the next request is not found, set the next request index to 0
u.nextReq.Store(0)
return nil, ""
}

// process iterates over the current queue items, getting the token holders
// balances and saving them in the database until the last block is greater or
// equal to the end block. It updates th status of the request in the queue. It
// will return an error if the provider is not found, the token is external or
// there is an error getting the token holders balances.
func (u *Updater) process(req *UpdateRequest) error {
// log the start of the process
log.Infow("rescanning token",
"address", req.Address.Hex(),
"from", req.CreationBlock,
"to", req.EndBlock,
"current", req.LastBlock)
func (u *Updater) process(id string, req UpdateRequest) (UpdateRequest, error) {
// set the request as processing and defer to set it as not processing
u.processing.Store(id, true)
defer u.processing.Store(id, false)
// create a context with a timeout to avoid blocking the process
ctx, cancel := context.WithTimeout(u.ctx, UPDATE_TIMEOUT)
defer cancel()
// get the provider by token type
provider, err := u.providers.GetProvider(ctx, req.Type)
if err != nil {
return fmt.Errorf("error getting provider for token: %v", err)
return req, fmt.Errorf("error getting provider for token: %v", err)
}
// if the token is a external token, return an error
if !provider.IsExternal() {
chainAddress, ok := u.networks.ChainAddress(req.ChainID, req.Address.Hex())
if !ok {
return fmt.Errorf("error getting chain address for token: %v", err)
return req, fmt.Errorf("error getting chain address for token: %v", err)
}
// load filter of the token from the database
filter, err := treedb.LoadTree(u.kvdb, chainAddress)
if err != nil {
return err
return req, err
}
// set the reference of the token to update in the provider
if err := provider.SetRef(web3provider.Web3ProviderRef{
Expand All @@ -245,7 +290,7 @@ func (u *Updater) process(req *UpdateRequest) error {
CreationBlock: req.CreationBlock,
Filter: filter,
}); err != nil {
return fmt.Errorf("error setting provider reference: %v", err)
return req, fmt.Errorf("error setting provider reference: %v", err)
}
}
// update the last block number of the provider to the last block of
Expand All @@ -257,7 +302,7 @@ func (u *Updater) process(req *UpdateRequest) error {
ChainID: req.ChainID,
})
if err != nil {
return fmt.Errorf("error getting token holders from database: %v", err)
return req, fmt.Errorf("error getting token holders from database: %v", err)
}
currentHolders := map[common.Address]*big.Int{}
for _, holder := range results {
Expand All @@ -275,7 +320,7 @@ func (u *Updater) process(req *UpdateRequest) error {
}
// set the current holders in the provider
if err := provider.SetLastBalances(ctx, nil, currentHolders, req.LastBlock); err != nil {
return fmt.Errorf("error setting last balances in provider: %v", err)
return req, fmt.Errorf("error setting last balances in provider: %v", err)
}
// get range balances from the provider, it will check itereate again
// over transfers logs, checking if there are new transfers using the
Expand All @@ -296,7 +341,7 @@ func (u *Updater) process(req *UpdateRequest) error {
}
}
if err != nil {
return fmt.Errorf("error getting token holders balances: %v", err)
return req, fmt.Errorf("error getting token holders balances: %v", err)
}
log.Debugw("new logs received",
"address", req.Address.Hex(),
Expand All @@ -311,13 +356,12 @@ func (u *Updater) process(req *UpdateRequest) error {
ChainID: req.ChainID,
}, balances, delta.NewLogsCount, delta.Block, delta.Synced, delta.TotalSupply)
if err != nil {
return fmt.Errorf("error saving token holders balances: %v", err)
return req, fmt.Errorf("error saving token holders balances: %v", err)
}
log.Debugw("token holders balances updated",
"token", req.Address.Hex(),
"chainID", req.ChainID,
"created", created,
"updated", updated)
log.Infow("updating request in the queue", "lastBlock", req.LastBlock, "done", req.Done)
return nil
return req, nil
}

0 comments on commit 09f8392

Please sign in to comment.