Skip to content

Commit

Permalink
cortex/querier: fix analysis merging (thanos-io#7179)
Browse files Browse the repository at this point in the history
We were not merging analysis properly - mergo was overwriting data.
Instead of using a whole library for this, just write two small
functions and use them. Add test to cover this.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Mar 5, 2024
1 parent 4166776 commit c06d55d
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 35 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ require (
)

require (
dario.cat/mergo v1.0.0
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,6 @@ cloud.google.com/go/workflows v1.7.0/go.mod h1:JhSrZuVZWuiDfKEFxU0/F1PQjmpnpcoIS
cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vfKf5Af+to4M=
cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA=
cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
Expand Down
53 changes: 31 additions & 22 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ import (
"math"
"net/http"
"net/url"
"reflect"
"sort"
"strconv"
"strings"
"time"
"unsafe"

"dario.cat/mergo"
"github.com/gogo/protobuf/proto"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
"github.com/gogo/status"
Expand Down Expand Up @@ -211,22 +209,38 @@ func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse {
}
}

type TimeDurationTransformer struct{}
func traverseAnalysis(a *Analysis, results *[]*Analysis) {
if a == nil {
return
}

func (t TimeDurationTransformer) Transformer(typ reflect.Type) func(dst, src reflect.Value) error {
if typ == reflect.TypeOf(Duration(0)) {
return func(dst, src reflect.Value) error {
if dst.CanSet() {
d := dst.Interface().(Duration)
s := src.Interface().(Duration)
*results = append(*results, a)

merged := d + s
dst.Set(reflect.ValueOf(merged))
}
return nil
for _, ch := range a.Children {
traverseAnalysis(ch, results)
}
}

func AnalyzesMerge(analysis ...*Analysis) *Analysis {
if len(analysis) == 0 {
return &Analysis{}
}

root := analysis[0]

var rootElements []*Analysis
traverseAnalysis(root, &rootElements)

for _, a := range analysis[1:] {
var elements []*Analysis
traverseAnalysis(a, &elements)

for i := 0; i < len(elements) && i < len(rootElements); i++ {
rootElements[i].ExecutionTime += analysis[i].ExecutionTime
}
}
return nil

return root
}

func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response, error) {
Expand All @@ -246,18 +260,13 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
// Merge the responses.
sort.Sort(byFirstTime(promResponses))

var analysis Analysis
analyzes := make([]*Analysis, 0, len(responses))
for i := range promResponses {
if promResponses[i].Data.GetAnalysis() == nil {
continue
}

if err := mergo.Merge(&analysis,
promResponses[i].Data.GetAnalysis(),
mergo.WithTransformers(TimeDurationTransformer{}),
); err != nil {
return nil, err
}
analyzes = append(analyzes, promResponses[i].Data.GetAnalysis())
}

response := PrometheusResponse{
Expand All @@ -266,7 +275,7 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
ResultType: model.ValMatrix.String(),
Result: matrixMerge(promResponses),
Stats: StatsMerge(responses),
Analysis: &analysis,
Analysis: AnalyzesMerge(analyzes...),
},
}

Expand Down
66 changes: 66 additions & 0 deletions internal/cortex/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,72 @@ func TestMergeAPIResponses(t *testing.T) {
},
},

{
name: "Basic merging of two responses with nested analysis trees.",
input: []Response{
&PrometheusResponse{
Data: PrometheusData{
ResultType: matrix,
Analysis: &Analysis{
Name: "foo",
Children: []*Analysis{{Name: "bar", ExecutionTime: Duration(1 * time.Second)}},
ExecutionTime: Duration(1 * time.Second),
},
Result: []SampleStream{
{
Labels: []cortexpb.LabelAdapter{},
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
{Value: 1, TimestampMs: 1},
},
},
},
},
},
&PrometheusResponse{
Data: PrometheusData{
ResultType: matrix,
Analysis: &Analysis{
Name: "foo",
Children: []*Analysis{{Name: "bar", ExecutionTime: Duration(1 * time.Second)}},
ExecutionTime: Duration(1 * time.Second),
},
Result: []SampleStream{
{
Labels: []cortexpb.LabelAdapter{},
Samples: []cortexpb.Sample{
{Value: 2, TimestampMs: 2},
{Value: 3, TimestampMs: 3},
},
},
},
},
},
},
expected: &PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: matrix,
Analysis: &Analysis{
Name: "foo",
Children: []*Analysis{{Name: "bar", ExecutionTime: Duration(2 * time.Second)}},
ExecutionTime: Duration(2 * time.Second),
},
Result: []SampleStream{
{
Labels: []cortexpb.LabelAdapter{},
Samples: []cortexpb.Sample{
{Value: 0, TimestampMs: 0},
{Value: 1, TimestampMs: 1},
{Value: 2, TimestampMs: 2},
{Value: 3, TimestampMs: 3},
},
},
},
},
},
},

{
name: "Merging of responses when labels are in different order.",
input: []Response{
Expand Down
14 changes: 4 additions & 10 deletions pkg/queryfrontend/queryinstant_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"strconv"
"strings"

"dario.cat/mergo"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -55,18 +54,13 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu
promResponses = append(promResponses, resp.(*queryrange.PrometheusInstantQueryResponse))
}

var analysis queryrange.Analysis
var analyzes []*queryrange.Analysis
for i := range promResponses {
if promResponses[i].Data.GetAnalysis() == nil {
continue
}

if err := mergo.Merge(&analysis,
promResponses[i].Data.GetAnalysis(),
mergo.WithTransformers(queryrange.TimeDurationTransformer{}),
); err != nil {
return nil, err
}
analyzes = append(analyzes, promResponses[i].Data.GetAnalysis())
}

var res queryrange.Response
Expand All @@ -81,7 +75,7 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu
Matrix: matrixMerge(promResponses),
},
},
Analysis: &analysis,
Analysis: queryrange.AnalyzesMerge(analyzes...),
Stats: queryrange.StatsMerge(responses),
},
}
Expand All @@ -99,7 +93,7 @@ func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...qu
Vector: v,
},
},
Analysis: &analysis,
Analysis: queryrange.AnalyzesMerge(analyzes...),
Stats: queryrange.StatsMerge(responses),
},
}
Expand Down

0 comments on commit c06d55d

Please sign in to comment.