Skip to content

Commit

Permalink
Refactor trade aggregation query. (#4389)
Browse files Browse the repository at this point in the history
  • Loading branch information
erika-sdf committed May 20, 2022
1 parent e96fc25 commit daeecfc
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ file. This project adheres to [Semantic Versioning](http:https://semver.org/).
## Unreleased

- Querying claimable balances has been optimized ([4385](https://github.com/stellar/go/pull/4385)).
- Querying trade aggregations has been optimized ([4389](https://github.com/stellar/go/pull/4389)).

## V2.17.0

Expand Down
103 changes: 77 additions & 26 deletions services/horizon/internal/db2/history/trade_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type TradeAggregation struct {
CloseD int64 `db:"close_d"`
}

const HistoryTradesTableName = "history_trades_60000"

// TradeAggregationsQ is a helper struct to aid in configuring queries to
// bucket and aggregate trades
type TradeAggregationsQ struct {
Expand Down Expand Up @@ -123,51 +125,100 @@ func (q *TradeAggregationsQ) WithEndTime(endTime strtime.Millis) (*TradeAggregat
}
}

// GetSql generates a sql statement to aggregate Trades based on given parameters
func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
var orderPreserved bool
orderPreserved, q.baseAssetID, q.counterAssetID = getCanonicalAssetOrder(q.baseAssetID, q.counterAssetID)

var bucketSQL sq.SelectBuilder
func (q *TradeAggregationsQ) getRawTradesSql(orderPreserved bool) sq.SelectBuilder {
var rawTradesSQL sq.SelectBuilder
if orderPreserved {
bucketSQL = bucketTrades(q.resolution, q.offset)
rawTradesSQL = bucketTrades(q.resolution, q.offset)
} else {
bucketSQL = reverseBucketTrades(q.resolution, q.offset)
rawTradesSQL = reverseBucketTrades(q.resolution, q.offset)
}

bucketSQL = bucketSQL.From("history_trades_60000").
rawTradesSQL = rawTradesSQL.
Join("timestamp_range r ON 1=1").
From(fmt.Sprintf("%s AS tr", HistoryTradesTableName)).
Where(sq.Eq{"base_asset_id": q.baseAssetID, "counter_asset_id": q.counterAssetID})

//adjust time range and apply time filters
bucketSQL = bucketSQL.Where(sq.GtOrEq{"timestamp": q.startTime})
if !q.endTime.IsNil() {
bucketSQL = bucketSQL.Where(sq.Lt{"timestamp": q.endTime})
}
bucketTs := formatBucketTimestamp(q.resolution, q.offset, "tr")
rawTradesSQL = rawTradesSQL.
Where(fmt.Sprintf("r.max_ts >= %s", bucketTs)).
Where(fmt.Sprintf("r.min_ts <= %s", bucketTs))

if q.resolution != 60000 {
//ensure open/close order for cases when multiple trades occur in the same ledger
bucketSQL = bucketSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC")
rawTradesSQL = rawTradesSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC")
// Do on-the-fly aggregation for higher resolutions.
bucketSQL = aggregate(bucketSQL)
}
return rawTradesSQL
}

// GetSql generates a sql statement to aggregate Trades based on given parameters
func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
var orderPreserved bool
orderPreserved, q.baseAssetID, q.counterAssetID = getCanonicalAssetOrder(q.baseAssetID, q.counterAssetID)

return bucketSQL.
bucketSQL := aggregate("raw_trades").
Limit(q.pagingParams.Limit).
OrderBy("timestamp " + q.pagingParams.Order)
OrderBy("timestamp "+q.pagingParams.Order).
Prefix("WITH last_range_ts AS (?),",
lastRangeTs(
q.baseAssetID, q.counterAssetID, q.resolution, q.offset, q.startTime, q.endTime,
q.pagingParams.Order, q.pagingParams.Limit)).
Prefix("timestamp_range AS (?),",
timestampRange()).
Prefix("raw_trades AS (?)",
q.getRawTradesSql(orderPreserved))

return bucketSQL
}

// formatBucketTimestampSelect formats a sql select clause for a bucketed timestamp, based on given resolution
// formatBucketTimestamp formats a sql select clause for a bucketed timestamp, based on given resolution
// and the offset. Given a time t, it gives it a timestamp defined by
// f(t) = ((t - offset)/resolution)*resolution + offset.
func formatBucketTimestampSelect(resolution int64, offset int64) string {
return fmt.Sprintf("((timestamp - %d) / %d) * %d + %d as timestamp", offset, resolution, resolution, offset)
func formatBucketTimestamp(resolution int64, offset int64, tsPrefix string) string {
prefix := ""
if len(tsPrefix) > 0 {
prefix = fmt.Sprintf("%s.", tsPrefix)
}
return fmt.Sprintf("((%stimestamp - %d) / %d) * %d + %d", prefix, offset, resolution, resolution, offset)
}

func formatBucketTimestampSelect(resolution int64, offset int64, tsPrefix string) string {
return fmt.Sprintf("%s AS timestamp", formatBucketTimestamp(resolution, offset, tsPrefix))
}

func lastRangeTs(baseAssetID, counterAssetID, resolution, offset int64, startTime, endTime strtime.Millis, order string, limit uint64) sq.SelectBuilder {
s := sq.Select(
formatBucketTimestampSelect(resolution, offset, ""),
).From(
HistoryTradesTableName,
).Where(
sq.Eq{"base_asset_id": baseAssetID, "counter_asset_id": counterAssetID},
).Where(sq.GtOrEq{"timestamp": startTime})
if !endTime.IsNil() {
s = s.Where(sq.Lt{"timestamp": endTime})
}
return s.GroupBy(
formatBucketTimestamp(resolution, offset, ""),
).OrderBy(
fmt.Sprintf("%s %s", formatBucketTimestamp(resolution, offset, ""), order),
).Suffix(
fmt.Sprintf("FETCH FIRST %d ROWS ONLY", limit),
)
}

func timestampRange() sq.SelectBuilder {
return sq.Select(
"min(timestamp) as min_ts",
"max(timestamp) as max_ts",
).From("last_range_ts")
}

// bucketTrades generates a select statement to filter rows from the `history_trades` table in
// a compact form, with a timestamp rounded to resolution and reversed base/counter.
func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
formatBucketTimestampSelect(resolution, offset, "tr"),
"count",
"base_volume",
"counter_volume",
Expand All @@ -187,7 +238,7 @@ func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
// a compact form, with a timestamp rounded to resolution and reversed base/counter.
func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
formatBucketTimestampSelect(resolution, offset, "tr"),
"count",
"base_volume as counter_volume",
"counter_volume as base_volume",
Expand All @@ -203,7 +254,7 @@ func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder {
)
}

func aggregate(query sq.SelectBuilder) sq.SelectBuilder {
func aggregate(rawTradesTable string) sq.SelectBuilder {
return sq.Select(
"timestamp",
"sum(\"count\") as count",
Expand All @@ -218,7 +269,7 @@ func aggregate(query sq.SelectBuilder) sq.SelectBuilder {
"(first(ARRAY[open_n, open_d]))[2] as open_d",
"(last(ARRAY[close_n, close_d]))[1] as close_n",
"(last(ARRAY[close_n, close_d]))[2] as close_d",
).FromSelect(query, "htrd").GroupBy("timestamp")
).From(rawTradesTable).GroupBy("timestamp")
}

// RebuildTradeAggregationTimes rebuilds a specific set of trade aggregation
Expand All @@ -228,7 +279,7 @@ func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Mi
from = from.RoundDown(60_000)
to = to.RoundDown(60_000)
// Clear out the old bucket values.
_, err := q.Exec(ctx, sq.Delete("history_trades_60000").Where(
_, err := q.Exec(ctx, sq.Delete(HistoryTradesTableName).Where(
sq.GtOrEq{"timestamp": from},
).Where(
sq.LtOrEq{"timestamp": to},
Expand Down Expand Up @@ -278,7 +329,7 @@ func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Mi
).FromSelect(trades, "trades").GroupBy("base_asset_id", "counter_asset_id", "timestamp")

// Insert the new bucket values.
_, err = q.Exec(ctx, sq.Insert("history_trades_60000").Select(rebuilt))
_, err = q.Exec(ctx, sq.Insert(HistoryTradesTableName).Select(rebuilt))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}
Expand Down

0 comments on commit daeecfc

Please sign in to comment.