Skip to content

Commit

Permalink
Implement query sample statistics in promql interface
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka committed May 23, 2024
1 parent c4b3f05 commit 7b7aa38
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,4 @@ replace (
k8s.io/klog/v2 => github.com/simonpasquier/klog-gokit/v3 v3.0.0
)

replace github.com/thanos-io/promql-engine => github.com/pedro-stanaka/promql-engine v0.0.0-20240515101825-a418ffe88a7d
replace github.com/thanos-io/promql-engine => github.com/pedro-stanaka/promql-engine v0.0.0-20240515145052-82c65aee0d7b
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1386,8 +1386,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pedro-stanaka/promql-engine v0.0.0-20240515101825-a418ffe88a7d h1:ppcJWGo/RxQRtNAHbpNSiSZcesNWPPraN76UoWWM0vk=
github.com/pedro-stanaka/promql-engine v0.0.0-20240515101825-a418ffe88a7d/go.mod h1:FEPnabuTql1bDA4OUM41mwcZOJ20R436k8vq+xtGEG0=
github.com/pedro-stanaka/promql-engine v0.0.0-20240515145052-82c65aee0d7b h1:p+zTMsyzklnCF6UB2fhQ83xDH11FwAFQuYIbS4nCid8=
github.com/pedro-stanaka/promql-engine v0.0.0-20240515145052-82c65aee0d7b/go.mod h1:FEPnabuTql1bDA4OUM41mwcZOJ20R436k8vq+xtGEG0=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
Expand Down
53 changes: 51 additions & 2 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ type Client struct {
tsdbInfos infopb.TSDBInfos
}

type remoteEnginePhases int

const (
remoteExecutionPhase remoteEnginePhases = iota
remoteSeriesPhase
)

func (r remoteEnginePhases) String() string {
switch r {
case remoteExecutionPhase:
return "execution"
case remoteSeriesPhase:
return "series"
default:
return "unknown"
}
}

// NewClient creates a new Client.
func NewClient(queryClient querypb.QueryClient, address string, tsdbInfos infopb.TSDBInfos) Client {
return Client{
Expand Down Expand Up @@ -221,11 +239,25 @@ type remoteQuery struct {
interval time.Duration
remoteAddr string

samplesStats *stats.QuerySamples

cancel context.CancelFunc
}

func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
start := time.Now()
defer func() {
keys := []any{
"msg", "Executed query",
"query", r.plan.String(),
"time", time.Since(start),
}
if r.samplesStats != nil {
keys = append(keys, "peak_samples", r.samplesStats.PeakSamples)
keys = append(keys, "total_samples", r.samplesStats.TotalSamples)
}
level.Debug(r.logger).Log(keys...)
}()

qctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
Expand All @@ -252,6 +284,8 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
level.Warn(r.logger).Log("msg", "Failed to encode query plan", "err", err)
}

r.samplesStats = stats.NewQuerySamples(false)

// Instant query.
if r.start == r.end {
request := &querypb.QueryRequest{
Expand All @@ -275,6 +309,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
result = make(promql.Vector, 0)
warnings annotations.Annotations
builder = labels.NewScratchBuilder(8)
qryStats *querypb.QueryStats
)
for {
msg, err := qry.Recv()
Expand All @@ -290,6 +325,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
continue
}

qryStats = msg.Stats
ts := msg.GetTimeseries()
builder.Reset()
for _, l := range ts.Labels {
Expand All @@ -304,6 +340,10 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
result = append(result, promql.Sample{Metric: builder.Labels(), F: ts.Samples[0].Value, T: r.start.UnixMilli()})
}
}
if qryStats != nil {
r.samplesStats.UpdatePeak(int(qryStats.PeakSamples))
r.samplesStats.TotalSamples = qryStats.SamplesTotal
}

return &promql.Result{
Value: result,
Expand Down Expand Up @@ -334,6 +374,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
result = make(promql.Matrix, 0)
warnings annotations.Annotations
builder = labels.NewScratchBuilder(8)
qryStats *querypb.QueryStats
)
for {
msg, err := qry.Recv()
Expand All @@ -353,6 +394,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
if ts == nil {
continue
}
qryStats = msg.Stats
builder.Reset()
for _, l := range ts.Labels {
builder.Add(strings.Clone(l.Name), strings.Clone(l.Value))
Expand All @@ -376,7 +418,10 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
}
result = append(result, series)
}
level.Debug(r.logger).Log("msg", "Executed query", "query", r.plan.String(), "time", time.Since(start))
if qryStats != nil {
r.samplesStats.UpdatePeak(int(qryStats.PeakSamples))
r.samplesStats.TotalSamples = qryStats.SamplesTotal
}

return &promql.Result{Value: result, Warnings: warnings}
}
Expand All @@ -385,7 +430,11 @@ func (r *remoteQuery) Close() { r.Cancel() }

func (r *remoteQuery) Statement() parser.Statement { return nil }

func (r *remoteQuery) Stats() *stats.Statistics { return nil }
func (r *remoteQuery) Stats() *stats.Statistics {
return &stats.Statistics{
Samples: r.samplesStats,
}
}

func (r *remoteQuery) Cancel() {
if r.cancel != nil {
Expand Down

0 comments on commit 7b7aa38

Please sign in to comment.