Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Implement StreamAllOffers usin…
Browse files Browse the repository at this point in the history
…g batches (#4397)

The query to get all rows in the offers table currently takes more than 10 seconds to execute.
Given that the horizon request timeout is 10 seconds, we need to reimplement StreamAllOffers to use batching.
  • Loading branch information
tamirms authored May 24, 2022
1 parent daeecfc commit 5a0686a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 15 deletions.
41 changes: 34 additions & 7 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package history

import (
"context"
"database/sql"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"

"github.com/stellar/go/support/errors"
)

const offersBatchSize = 50000

// QOffers defines offer related queries.
type QOffers interface {
StreamAllOffers(ctx context.Context, callback func(Offer) error) error
Expand Down Expand Up @@ -83,28 +86,52 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) {

// StreamAllOffers loads all non deleted offers
func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
if tx := q.GetTx(); tx == nil {
return errors.New("cannot be called outside of a transaction")
}
if opts := q.GetTxOptions(); opts == nil || !opts.ReadOnly || opts.Isolation != sql.LevelRepeatableRead {
return errors.New("should only be called in a repeatable read transaction")
}

lastID := int64(0)
for {
nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback)
if err != nil {
return err
}
if lastID == nextID {
return nil
}
lastID = nextID
}
}

func (q *Q) streamAllOffersBatch(ctx context.Context, lastId int64, limit uint64, callback func(Offer) error) (int64, error) {
var rows *sqlx.Rows
var err error

if rows, err = q.Query(ctx, selectOffers.Where("deleted = ?", false)); err != nil {
return errors.Wrap(err, "could not run all offers select query")
rows, err = q.Query(ctx, selectOffers.
Where("deleted = ?", false).
Where("offer_id > ? ", lastId).
OrderBy("offer_id asc").Limit(limit))
if err != nil {
return 0, errors.Wrap(err, "could not run all offers select query")
}

defer rows.Close()

for rows.Next() {
offer := Offer{}
if err = rows.StructScan(&offer); err != nil {
return errors.Wrap(err, "could not scan row into offer struct")
return 0, errors.Wrap(err, "could not scan row into offer struct")
}

if err = callback(offer); err != nil {
return err
return 0, err
}
lastId = offer.OfferID
}

return rows.Err()

return lastId, rows.Err()
}

// GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence.
Expand Down
45 changes: 38 additions & 7 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package history

import (
"context"
"database/sql"
"github.com/stretchr/testify/assert"
"strconv"
"testing"

Expand Down Expand Up @@ -105,14 +108,42 @@ func TestGetNonExistentOfferByID(t *testing.T) {
tt.Assert.True(q.NoRows(err))
}

func streamAllOffersInTx(q *Q, ctx context.Context, f func(offer Offer) error) error {
err := q.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead})
if err != nil {
return err
}
defer q.Rollback()
return q.StreamAllOffers(ctx, f)
}

func TestStreamAllOffersRequiresTx(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
return nil
})
assert.EqualError(t, err, "cannot be called outside of a transaction")

assert.NoError(t, q.Begin())
defer q.Rollback()
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
return nil
})
assert.EqualError(t, err, "should only be called in a repeatable read transaction")
}

func TestQueryEmptyOffers(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

var offers []Offer
err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err := streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -150,7 +181,7 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -183,7 +214,7 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.Equal(3, afterCompactionCount)

var afterCompactionOffers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
afterCompactionOffers = append(afterCompactionOffers, offer)
return nil
})
Expand All @@ -201,7 +232,7 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down Expand Up @@ -229,7 +260,7 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand All @@ -256,7 +287,7 @@ func TestRemoveOffer(t *testing.T) {
err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand All @@ -274,7 +305,7 @@ func TestRemoveOffer(t *testing.T) {
expectedUpdates[0].Deleted = true

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
Expand Down
1 change: 0 additions & 1 deletion services/horizon/internal/ingest/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b
o.graph.AddOffers(offerToXDR(offer))
return nil
})

if err != nil {
return true, errors.Wrap(err, "Error loading offers into orderbook")
}
Expand Down

0 comments on commit 5a0686a

Please sign in to comment.