Skip to content

Commit

Permalink
last fixes about filters
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasmenendez committed Jun 5, 2024
1 parent f320e45 commit 54b6519
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 42 deletions.
8 changes: 4 additions & 4 deletions api/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (capi *census3API) initTokenHandlers() error {
api.MethodAccessTypeAdmin, capi.rescanToken); err != nil {
return err
}
if err := capi.endpoint.RegisterMethod("/tokens/rescan/queue/{queueId}", "GET",
if err := capi.endpoint.RegisterMethod("/tokens/rescan/queue/{queueID}", "GET",
api.MethodAccessTypeAdmin, capi.checkRescanToken); err != nil {
return err
}
Expand Down Expand Up @@ -648,7 +648,7 @@ func (capi *census3API) rescanToken(msg *api.APIdata, ctx *httprouter.HTTPContex
return ErrNoSyncedToken
}
// enqueue the rescan token process
id, err := capi.tokenUpdater.AddRequest(scanner.UpdateRequest{
id, err := capi.tokenUpdater.AddRequest(&scanner.UpdateRequest{
Address: address,
ChainID: uint64(chainID),
Type: tokenData.TypeID,
Expand All @@ -674,14 +674,14 @@ func (capi *census3API) checkRescanToken(msg *api.APIdata, ctx *httprouter.HTTPC
}
// get the rescan status from the updater
status, err := capi.tokenUpdater.RequestStatus(queueID)
if err != nil {
if err != nil || status == nil {
return ErrNotFoundToken.Withf("the ID %s does not exist in the queue", queueID)
}
// encoding the result and response it
response, err := json.Marshal(RescanTokenStatus{
Address: status.Address.String(),
ChainID: status.ChainID,
Done: status.Done(),
Done: status.Done,
})
if err != nil {
return ErrEncodeQueueItem.WithErr(err)
Expand Down
10 changes: 8 additions & 2 deletions cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ func main() {
panic(err)
}
config.farcaster = pviper.GetBool("farcaster")
// set the filters path into the config
// set the filters path into the config, create the folder if it does not
// exitst yet
config.filtersPath = config.dataDir + "/filters"
if err := os.MkdirAll(config.filtersPath, os.ModePerm); err != nil {
log.Fatal(err)
}
// init logger
log.Init(config.logLevel, "stdout", nil)
// check if the web3 providers are defined
Expand Down Expand Up @@ -200,7 +204,7 @@ func main() {
// start the holder scanner with the database and the provider manager
hc := scanner.NewScanner(database, w3p, pm, config.scannerCoolDown, config.filtersPath)
// start the token updater with the database and the provider manager
updater := scanner.NewUpdater(database, w3p, pm)
updater := scanner.NewUpdater(database, w3p, pm, config.filtersPath)
// if the admin token is not defined, generate a random one
if config.adminToken != "" {
if _, err := uuid.Parse(config.adminToken); err != nil {
Expand Down Expand Up @@ -235,6 +239,7 @@ func main() {
}()
// start the holder scanner
go hc.Start(ctx, config.scannerConcurrentTokens)
go updater.Start(ctx)

metrics.NewCounter(fmt.Sprintf("census3_info{version=%q,chains=%q}",
internal.Version, w3p.String())).Set(1)
Expand All @@ -249,6 +254,7 @@ func main() {
// closing database
go func() {
hc.Stop()
updater.Stop()
if err := apiService.Stop(); err != nil {
log.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions scanner/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func LoadFilter(basePath string, address common.Address, chainID uint64) (*Token
}

// Add adds a key to the filter.
func (tf *TokenFilter) Add(key []byte) {
tf.filter.Add(key)
func (tf *TokenFilter) Add(key []byte) boom.Filter {
return tf.filter.Add(key)
}

// Test checks if a key is in the filter.
Expand All @@ -75,7 +75,7 @@ func (tf *TokenFilter) Commit() error {
return err
}
// write the filter to the file
if err := os.WriteFile(tf.path, bFilter, 0o644); err != nil {
if err := os.WriteFile(tf.path, bFilter, os.ModePerm); err != nil {
return err
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion scanner/providers/web3/erc20_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
errors.Join(ErrCheckingProcessedLogs, fmt.Errorf("[ERC20] %s: %w", p.address, err))
}
if processed {
log.Info("log already processed")
continue
}
log.Info("log not processed yet, processing...")
newTransfers++
logData, err := p.contract.ERC20ContractFilterer.ParseTransfer(currentLog)
if err != nil {
Expand Down Expand Up @@ -197,7 +199,7 @@ func (p *ERC20HolderProvider) HoldersBalances(ctx context.Context, _ []byte, fro
p.synced.Store(synced)
totalSupply, err := p.TotalSupply(nil)
if err != nil {
log.Warn("error getting total supply, it will retry in the next iteration", "error", err)
log.Warnw("error getting total supply, it will retry in the next iteration", "error", err)
return balances, newTransfers, lastBlock, synced, nil, nil
}
return balances, newTransfers, lastBlock, synced, totalSupply, nil
Expand Down
3 changes: 2 additions & 1 deletion scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,15 @@ func (s *Scanner) ScanHolders(ctx context.Context, token ScannerToken) (
defer func() {
if err := filter.Commit(); err != nil {
log.Error(err)
return
}
}()
// set the token reference in the provider
if err := provider.SetRef(web3provider.Web3ProviderRef{
HexAddress: token.Address.Hex(),
ChainID: token.ChainID,
CreationBlock: token.CreationBlock,
Filter: filter.filter,
Filter: filter,
}); err != nil {
return nil, 0, token.LastBlock, token.Synced, nil, err
}
Expand Down
76 changes: 45 additions & 31 deletions scanner/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ type UpdateRequest struct {
CreationBlock uint64
EndBlock uint64
LastBlock uint64
}

// Done returns true if the request is done, that is, the last block is greater
// or equal to the end block.
func (ur UpdateRequest) Done() bool {
return ur.LastBlock >= ur.EndBlock
Done bool
}

// Updater is a struct to manage the update requests of the tokens. It will
Expand All @@ -46,21 +41,25 @@ type Updater struct {
ctx context.Context
cancel context.CancelFunc

db *db.DB
networks *web3.Web3Pool
providers *manager.ProviderManager
queue map[string]UpdateRequest
queueMtx sync.Mutex
waiter sync.WaitGroup
db *db.DB
networks *web3.Web3Pool
providers *manager.ProviderManager
queue map[string]*UpdateRequest
queueMtx sync.Mutex
waiter sync.WaitGroup
filtersPath string
}

// NewUpdater creates a new instance of Updater.
func NewUpdater(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager) *Updater {
func NewUpdater(db *db.DB, networks *web3.Web3Pool, pm *manager.ProviderManager,
filtersPath string,
) *Updater {
return &Updater{
db: db,
networks: networks,
providers: pm,
queue: make(map[string]UpdateRequest),
db: db,
networks: networks,
providers: pm,
queue: make(map[string]*UpdateRequest),
filtersPath: filtersPath,
}
}

Expand All @@ -80,7 +79,7 @@ func (u *Updater) Start(ctx context.Context) {
continue
}
if err := u.process(); err != nil {
log.Error("Error processing update request: %w", err)
log.Errorf("Error processing update request: %v", err)
}
}
}
Expand All @@ -96,24 +95,24 @@ func (u *Updater) Stop() {
// RequestStatus returns the status of a request by its ID. If the request is
// done, it will be removed from the queue. If the request is not found, it will
// return an error.
func (u *Updater) RequestStatus(id string) (UpdateRequest, error) {
func (u *Updater) RequestStatus(id string) (*UpdateRequest, error) {
u.queueMtx.Lock()
defer u.queueMtx.Unlock()
req, ok := u.queue[id]
if !ok {
return UpdateRequest{}, fmt.Errorf("request not found")
return nil, fmt.Errorf("request not found")
}
if req.Done() {
if req.Done {
delete(u.queue, id)
}
return u.queue[id], nil
return req, nil
}

// AddRequest adds a new request to the queue. It will return an error if the
// request is missing required fields or the block range is invalid. The request
// will be added to the queue with a random ID, that will be returned to allow
// the client to query the status of the request.
func (u *Updater) AddRequest(req UpdateRequest) (string, error) {
func (u *Updater) AddRequest(req *UpdateRequest) (string, error) {
if req.ChainID == 0 || req.Type == 0 || req.CreationBlock == 0 || req.EndBlock == 0 {
return "", fmt.Errorf("missing required fields")
}
Expand Down Expand Up @@ -142,17 +141,18 @@ func (u *Updater) IsEmpty() bool {
func (u *Updater) process() error {
// make a copy of current queue
u.queueMtx.Lock()
queue := map[string]UpdateRequest{}
queue := map[string]*UpdateRequest{}
for k, v := range u.queue {
queue[k] = v
}
u.queueMtx.Unlock()
// iterate over the current queue items
for id, req := range queue {
// check if the request is done
if req.Done() {
if req.Done {
continue
}
log.Infow("rescanning token", "address", req.Address.Hex(), "from", req.CreationBlock, "to", req.EndBlock, "current", req.LastBlock)
internalCtx, cancel := context.WithTimeout(u.ctx, UPDATE_TIMEOUT)
defer cancel()
// get the provider by token type
Expand All @@ -164,14 +164,30 @@ func (u *Updater) process() error {
if provider.IsExternal() {
return fmt.Errorf("external providers are not supported yet")
}
// load filter of the token from the database
filter, err := LoadFilter(u.filtersPath, req.Address, req.ChainID)
if err != nil {
return err
}
// commit the filter when the function finishes
defer func() {
if err := filter.Commit(); err != nil {
log.Error(err)
return
}
}()
// set the reference of the token to update in the provider
if err := provider.SetRef(web3provider.Web3ProviderRef{
HexAddress: req.Address.Hex(),
ChainID: req.ChainID,
CreationBlock: req.CreationBlock,
Filter: filter,
}); err != nil {
return err
}
// update the last block number of the provider to the last block of
// the request
provider.SetLastBlockNumber(req.EndBlock)
// get current token holders from database
results, err := u.db.QueriesRO.ListTokenHolders(internalCtx, queries.ListTokenHoldersParams{
TokenID: req.Address.Bytes(),
Expand All @@ -195,16 +211,14 @@ func (u *Updater) process() error {
// get range balances from the provider, it will check itereate again
// over transfers logs, checking if there are new transfers using the
// bloom filter associated to the token
rangeBalances, newTransfers, lastBlock, synced, totalSupply, err := provider.HoldersBalances(internalCtx, nil, req.EndBlock)
rangeBalances, newTransfers, lastBlock, synced, totalSupply, err := provider.HoldersBalances(internalCtx, nil, req.CreationBlock)
if err != nil {
return err
}
log.Infow("new logs received", "address", req.Address.Hex(), "from", req.LastBlock, "lastBlock", lastBlock, "newLogs", newTransfers)
// update the token last
if synced {
req.LastBlock = req.EndBlock
} else {
req.LastBlock = lastBlock
}
req.LastBlock = lastBlock
req.Done = synced
// save the new balances in the database
created, updated, err := SaveHolders(u.db, internalCtx, ScannerToken{
Address: req.Address,
Expand Down

0 comments on commit 54b6519

Please sign in to comment.