Skip to content

Commit

Permalink
[chore] prometheusreceiver: reduce number of allocations for getKey (#…
Browse files Browse the repository at this point in the history
…16142)

Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 7, 2022
1 parent a547ad9 commit 5c0438f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 26 deletions.
22 changes: 7 additions & 15 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type metricFamily struct {
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroup struct {
family *metricFamily
mtype pmetric.MetricType
ts int64
ls labels.Labels
count float64
Expand Down Expand Up @@ -87,12 +87,6 @@ func (mf *metricFamily) includesMetric(metricName string) bool {
return metricName == mf.name
}

func (mf *metricFamily) getGroupKey(ls labels.Labels) uint64 {
bytes := make([]byte, 0, 2048)
hash, _ := ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mf.mtype)...)
return hash
}

func (mg *metricGroup) sortPoints() {
sort.Slice(mg.complexValue, func(i, j int) bool {
return mg.complexValue[i].boundary < mg.complexValue[j].boundary
Expand Down Expand Up @@ -209,7 +203,7 @@ func (mg *metricGroup) toNumberDataPoint(dest pmetric.NumberDataPointSlice) {
tsNanos := timestampFromMs(mg.ts)
point := dest.AppendEmpty()
// gauge/undefined types have no start time.
if mg.family.mtype == pmetric.MetricTypeSum {
if mg.mtype == pmetric.MetricTypeSum {
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}
point.SetTimestamp(tsNanos)
Expand Down Expand Up @@ -245,7 +239,7 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Label
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroup{
family: mf,
mtype: mf.mtype,
ts: ts,
ls: ls,
exemplars: pmetric.NewExemplarSlice(),
Expand All @@ -257,9 +251,8 @@ func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Label
return mg
}

func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v float64) error {
groupKey := mf.getGroupKey(ls)
mg := mf.loadMetricGroupOrCreate(groupKey, ls, t)
func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, v float64) error {
mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t)
if mg.ts != t {
return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName)
}
Expand Down Expand Up @@ -340,9 +333,8 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice) {
metric.MoveTo(metrics.AppendEmpty())
}

func (mf *metricFamily) addExemplar(l labels.Labels, e exemplar.Exemplar) {
gk := mf.getGroupKey(l)
mg := mf.groups[gk]
func (mf *metricFamily) addExemplar(seriesRef uint64, e exemplar.Exemplar) {
mg := mf.groups[seriesRef]
if mg == nil {
return
}
Expand Down
17 changes: 8 additions & 9 deletions receiver/prometheusreceiver/internal/metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
} else {
lbls = tt.labels.Copy()
}
err := mp.Add(tv.metric, lbls, tv.at, tv.value)
sRef, _ := getSeriesRef(nil, lbls, mp.mtype)
err := mp.addSeries(sRef, tv.metric, lbls, tv.at, tv.value)
if tt.wantErr {
if i != 0 {
require.Error(t, err)
Expand All @@ -202,8 +203,6 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
}

require.Len(t, mp.groups, 1)
groupKey := mp.getGroupKey(tt.labels.Copy())
require.NotNil(t, mp.groups[groupKey])

sl := pmetric.NewMetricSlice()
mp.appendMetric(sl)
Expand Down Expand Up @@ -400,7 +399,9 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
mp := newMetricFamily(tt.name, mc, zap.NewNop())
for _, lbs := range tt.labelsScrapes {
for i, scrape := range lbs.scrapes {
err := mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value)
lb := lbs.labels.Copy()
sRef, _ := getSeriesRef(nil, lb, mp.mtype)
err := mp.addSeries(sRef, scrape.metric, lb, scrape.at, scrape.value)
if tt.wantErr {
// The first scrape won't have an error
if i != 0 {
Expand All @@ -417,8 +418,6 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
}

require.Len(t, mp.groups, 1)
groupKey := mp.getGroupKey(tt.labelsScrapes[0].labels.Copy())
require.NotNil(t, mp.groups[groupKey])

sl := pmetric.NewMetricSlice()
mp.appendMetric(sl)
Expand Down Expand Up @@ -496,12 +495,12 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamily(tt.metricKind, mc, zap.NewNop())
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
lb := tt.labels.Copy()
sRef, _ := getSeriesRef(nil, lb, mp.mtype)
require.NoError(t, mp.addSeries(sRef, tv.metric, lb, tv.at, tv.value))
}

require.Len(t, mp.groups, 1)
groupKey := mp.getGroupKey(tt.labels.Copy())
require.NotNil(t, mp.groups[groupKey])

sl := pmetric.NewMetricSlice()
mp.appendMetric(sl)
Expand Down
17 changes: 15 additions & 2 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type transaction struct {
logger *zap.Logger
metricAdjuster MetricsAdjuster
obsrecv *obsreport.Receiver
// Used as buffer to calculate series ref hash.
bufBytes []byte
}

func newTransaction(
Expand All @@ -68,6 +70,7 @@ func newTransaction(
externalLabels: externalLabels,
logger: settings.Logger,
obsrecv: obsrecv,
bufBytes: make([]byte, 0, 1024),
}
}

Expand Down Expand Up @@ -126,7 +129,7 @@ func (t *transaction) Append(ref storage.SeriesRef, ls labels.Labels, atMs int64

curMF := t.getOrCreateMetricFamily(metricName)

return 0, curMF.Add(metricName, ls, atMs, val)
return 0, curMF.addSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, val)
}

func (t *transaction) getOrCreateMetricFamily(mn string) *metricFamily {
Expand Down Expand Up @@ -171,11 +174,17 @@ func (t *transaction) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e e
}

mf := t.getOrCreateMetricFamily(mn)
mf.addExemplar(l, e)
mf.addExemplar(t.getSeriesRef(l, mf.mtype), e)

return 0, nil
}

func (t *transaction) getSeriesRef(ls labels.Labels, mtype pmetric.MetricType) uint64 {
var hash uint64
hash, t.bufBytes = getSeriesRef(t.bufBytes, ls, mtype)
return hash
}

// getMetrics returns all metrics to the given slice.
// The only error returned by this function is errNoDataToBuild.
func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, error) {
Expand Down Expand Up @@ -263,3 +272,7 @@ func (t *transaction) AddTargetInfo(labels labels.Labels) error {

return nil
}

func getSeriesRef(bytes []byte, ls labels.Labels, mtype pmetric.MetricType) (uint64, []byte) {
return ls.HashWithoutLabels(bytes, getSortedNotUsefulLabels(mtype)...)
}

0 comments on commit 5c0438f

Please sign in to comment.