Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Isolate cursor advancement code to its own interface #4484

Merged
merged 8 commits into from
Jul 28, 2022
Merged
96 changes: 96 additions & 0 deletions exp/lighthorizon/services/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package services

import (
"fmt"

"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/toid"
)

// CursorManager describes a way to control how a cursor advances for a
// particular indexing strategy.
type CursorManager interface {
Begin(cursor int64) (int64, error)
Advance() (int64, error)
}

type AccountActivityCursorManager struct {
AccountId string

store index.Store
lastCursor *toid.ID
}

func NewCursorManagerForAccountActivity(store index.Store, accountId string) *AccountActivityCursorManager {
return &AccountActivityCursorManager{AccountId: accountId, store: store}
}

func (c *AccountActivityCursorManager) Begin(cursor int64) (int64, error) {
freq := checkpointManager.GetCheckpointFrequency()
id := toid.Parse(cursor)
lastCheckpoint := uint32(0)
if id.LedgerSequence >= int32(checkpointManager.GetCheckpointFrequency()) {
lastCheckpoint = index.GetCheckpointNumber(uint32(id.LedgerSequence))
}

fmt.Println("got", id.LedgerSequence, "checkpoint #", lastCheckpoint)

// We shouldn't take the provided cursor for granted: instead, we should
// skip ahead to the first active ledger that's >= the given cursor.
//
// For example, someone might say ?cursor=0 but the first active checkpoint
// is actually 40M ledgers in.
firstCheckpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, lastCheckpoint)
if err != nil {
return cursor, err
}

nextLedger := (firstCheckpoint - 1) * freq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would Low from checkpointManager.GetCheckpointRange(firstCheckpoint) work here instead of manually doing the math?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, no. The checkpoint values provided by the index represent the checkpoint number, whereas checkpointManager works on ledgers. To clarify the difference: if the index returns 2, that means the ledger range is [64, 127]. So the upscale by freq is always necessary to do manually.


// However, if the given cursor is actually *more* specific than the index
// can give us (e.g. somewhere *within* an active checkpoint range), prefer
// it rather than starting over.
if nextLedger < uint32(id.LedgerSequence) {
better := toid.Parse(cursor)
c.lastCursor = &better
return cursor, nil
}

c.lastCursor = toid.New(int32(nextLedger), 1, 1)
return c.lastCursor.ToInt64(), nil
}

func (c *AccountActivityCursorManager) Advance() (int64, error) {
if c.lastCursor == nil {
panic("invalid cursor, call Begin() first")
}

// Advancing the cursor means deciding whether or not we need to query the
// index.

lastLedger := uint32(c.lastCursor.LedgerSequence)
freq := checkpointManager.GetCheckpointFrequency()

if checkpointManager.IsCheckpoint(lastLedger) {
// If the last cursor we looked at was a checkpoint ledger, then we need
// to jump ahead to the next checkpoint.
checkpoint := index.GetCheckpointNumber(uint32(c.lastCursor.LedgerSequence))
checkpoint, err := c.store.NextActive(c.AccountId, allTransactionsIndex, checkpoint+1)

if err != nil {
return c.lastCursor.ToInt64(), err
}

// We add a -1 here because an active checkpoint indicates that an
// account had activity in the *previous* 64 ledgers, so we need to
// backtrack to that ledger range.
c.lastCursor = toid.New(int32((checkpoint-1)*freq), 1, 1)
} else {
// Otherwise, we can just bump the ledger number.
c.lastCursor = toid.New(int32(lastLedger+1), 1, 1)
}

return c.lastCursor.ToInt64(), nil
}

var _ CursorManager = (*AccountActivityCursorManager)(nil) // ensure conformity to the interface
74 changes: 74 additions & 0 deletions exp/lighthorizon/services/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package services

import (
"io"
"testing"

"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/keypair"
"github.com/stellar/go/toid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
checkpointMgr = historyarchive.NewCheckpointManager(0)
)

func TestAccountTransactionCursorManager(t *testing.T) {
freq := int32(checkpointMgr.GetCheckpointFrequency())
accountId := keypair.MustRandom().Address()

// Create an index and fill it with some checkpoint details.
store, err := index.NewFileStore(index.StoreConfig{
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
Url: "file:https://" + t.TempDir(),
Workers: 4,
})
require.NoError(t, err)

for _, checkpoint := range []uint32{
1, 5, 10,
} {
require.NoError(t, store.AddParticipantsToIndexes(
checkpoint, allTransactionsIndex, []string{accountId}))
}

cursorMgr := NewCursorManagerForAccountActivity(store, accountId)

cursor := toid.New(1, 1, 1)
var nextCursor int64

nextCursor, err = cursorMgr.Begin(cursor.ToInt64())
require.NoError(t, err)
assert.EqualValues(t, 1, getLedgerFromCursor(nextCursor))

cursor.LedgerSequence = freq / 2
nextCursor, err = cursorMgr.Begin(cursor.ToInt64())
require.NoError(t, err)
assert.EqualValues(t, cursor.LedgerSequence, getLedgerFromCursor(nextCursor))

cursor.LedgerSequence = 2 * freq
nextCursor, err = cursorMgr.Begin(cursor.ToInt64())
require.NoError(t, err)
assert.EqualValues(t, 4*freq, getLedgerFromCursor(nextCursor))

for i := int32(1); i < freq; i++ {
nextCursor, err = cursorMgr.Advance()
require.NoError(t, err)
assert.EqualValues(t, 4*freq+i, getLedgerFromCursor(nextCursor))
}

nextCursor, err = cursorMgr.Advance()
require.NoError(t, err)
assert.EqualValues(t, 9*freq, getLedgerFromCursor(nextCursor))

for i := int32(1); i < freq; i++ {
nextCursor, err = cursorMgr.Advance()
require.NoError(t, err)
assert.EqualValues(t, 9*freq+i, getLedgerFromCursor(nextCursor))
}

_, err = cursorMgr.Advance()
assert.ErrorIs(t, err, io.EOF)
}
82 changes: 38 additions & 44 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

const (
allIndexes = "all/all"
allTransactionsIndex = "all/all"
allPaymentsIndex = "all/payments"
)

var (
Expand Down Expand Up @@ -52,16 +53,24 @@ type TransactionRepository interface {
GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error)
}

// searchCallback is a generic way for any endpoint to process a transaction and
// its corresponding ledger. It should return whether or not we should stop
// processing (e.g. when a limit is reached) and any error that occurred.
type searchCallback func(archive.LedgerTransaction, *xdr.LedgerHeader) (finished bool, err error)

func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Operation, error) {
func (os *OperationsService) GetOperationsByAccount(ctx context.Context,
cursor int64, limit uint64,
accountId string,
) ([]common.Operation, error) {
ops := []common.Operation{}

opsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) {
for operationOrder, op := range tx.Envelope.Operations() {
opParticipants, opParticipantErr := os.Config.Archive.GetOperationParticipants(tx, op, operationOrder)
if opParticipantErr != nil {
return false, opParticipantErr
opParticipants, err := os.Config.Archive.GetOperationParticipants(tx, op, operationOrder)
if err != nil {
return false, err
}

if _, foundInOp := opParticipants[accountId]; foundInOp {
ops = append(ops, common.Operation{
TransactionEnvelope: &tx.Envelope,
Expand All @@ -70,11 +79,13 @@ func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor
TxIndex: int32(tx.Index),
OpIndex: int32(operationOrder),
})
if uint64(len(ops)) == limit {

if uint64(len(ops)) >= limit {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
return true, nil
}
}
}

return false, nil
}

Expand All @@ -85,7 +96,10 @@ func (os *OperationsService) GetOperationsByAccount(ctx context.Context, cursor
return ops, nil
}

func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cursor int64, limit uint64, accountId string) ([]common.Transaction, error) {
func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context,
cursor int64, limit uint64,
accountId string,
) ([]common.Transaction, error) {
txs := []common.Transaction{}

txsCallback := func(tx archive.LedgerTransaction, ledgerHeader *xdr.LedgerHeader) (bool, error) {
Expand All @@ -96,6 +110,7 @@ func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cur
TxIndex: int32(tx.Index),
NetworkPassphrase: ts.Config.Passphrase,
})

return (uint64(len(txs)) >= limit), nil
}

Expand All @@ -106,18 +121,23 @@ func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context, cur
}

func searchTxByAccount(ctx context.Context, cursor int64, accountId string, config Config, callback searchCallback) error {
nextLedger, err := getAccountNextLedgerCursor(accountId, cursor, config.IndexStore, allIndexes)
cursorMgr := NewCursorManagerForAccountActivity(config.IndexStore, accountId)
cursor, err := cursorMgr.Begin(cursor)
if err == io.EOF {
return nil
} else if err != nil {
return err
}
log.Debugf("Searching index by account %v starting at cursor %v", accountId, nextLedger)

nextLedger := getLedgerFromCursor(cursor)
log.Debugf("Searching %s for account %s starting at ledger %d",
allTransactionsIndex, accountId, nextLedger)

for {
ledger, ledgerErr := config.Archive.GetLedger(ctx, uint32(nextLedger))
ledger, ledgerErr := config.Archive.GetLedger(ctx, nextLedger)
if ledgerErr != nil {
return errors.Wrapf(ledgerErr, "ledger export state is out of sync, missing ledger %v from checkpoint %v", nextLedger, getIndexCheckpointCounter(uint32(nextLedger)))
return errors.Wrapf(ledgerErr,
"ledger export state is out of sync at ledger %d", nextLedger)
}

reader, readerErr := config.Archive.NewLedgerTransactionReaderFromLedgerCloseMeta(config.Passphrase, ledger)
Expand All @@ -140,54 +160,28 @@ func searchTxByAccount(ctx context.Context, cursor int64, accountId string, conf
}

if _, found := participants[accountId]; found {
if finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header); callBackErr != nil {
finished, callBackErr := callback(tx, &ledger.V0.LedgerHeader.Header)
if finished || callBackErr != nil {
return callBackErr
} else if finished {
return nil
}
}

if ctx.Err() != nil {
return ctx.Err()
}
}
nextCursor := toid.New(int32(nextLedger), 1, 1).ToInt64()
nextLedger, err = getAccountNextLedgerCursor(accountId, nextCursor, config.IndexStore, allIndexes)

cursor, err = cursorMgr.Advance()
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if err == io.EOF {
return nil
} else if err != nil {
return err
}
}
}

// this deals in ledgers but adapts to the index model, which is currently keyed by checkpoint for now
func getAccountNextLedgerCursor(accountId string, cursor int64, store index.Store, indexName string) (uint64, error) {
nextLedger := uint32(toid.Parse(cursor).LedgerSequence + 1)

// done for performance reasons, skip reading the index for any requested ledger cursors
// only need to read the index when next cursor falls on checkpoint boundary
if !checkpointManager.IsCheckpoint(nextLedger) {
return uint64(nextLedger), nil
nextLedger = getLedgerFromCursor(cursor)
}

// the 'NextActive' index query takes a starting checkpoint, from which the index is scanned AFTER that checkpoint, non-inclusive
// use the the currrent checkpoint as the starting point since it represents up to the cursor's ledger
queryStartingCheckpoint := getIndexCheckpointCounter(nextLedger)
indexNextCheckpoint, err := store.NextActive(accountId, indexName, queryStartingCheckpoint)

if err != nil {
return 0, err
}

// return the first ledger of the next index checkpoint that had account activity after cursor
// so we need to go back 64 ledgers(one checkpoint's worth) relative to next index checkpoint
// to get first ledger, since checkpoint ledgers are the last/greatest ledger in the checkpoint range
return uint64((indexNextCheckpoint - 1) * checkpointManager.GetCheckpointFrequency()), nil
}

// derives what checkpoint this ledger would be in the index
func getIndexCheckpointCounter(ledger uint32) uint32 {
return (checkpointManager.GetCheckpoint(uint32(ledger)) /
checkpointManager.GetCheckpointFrequency()) + 1
func getLedgerFromCursor(cursor int64) uint32 {
return uint32(toid.Parse(cursor).LedgerSequence)
}