Skip to content

Commit

Permalink
k8sclusterreceiver: Set datapoint timestamp in receiver (#693)
Browse files Browse the repository at this point in the history
  • Loading branch information
asuresh4 committed Aug 13, 2020
1 parent 4533aa7 commit b1dbdb5
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 9 deletions.
5 changes: 3 additions & 2 deletions receiver/k8sclusterreceiver/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package collection

import (
"reflect"
"time"

"go.opentelemetry.io/collector/consumer/consumerdata"
"go.uber.org/zap"
Expand Down Expand Up @@ -111,8 +112,8 @@ func (dc *DataCollector) UpdateMetricsStore(obj interface{}, rm []*resourceMetri
}
}

func (dc *DataCollector) CollectMetricData() []consumerdata.MetricsData {
return dc.metricsStore.getMetricData()
func (dc *DataCollector) CollectMetricData(currentTime time.Time) []consumerdata.MetricsData {
return dc.metricsStore.getMetricData(currentTime)
}

// SyncMetrics updates the metric store with latest metrics from the kubernetes object.
Expand Down
28 changes: 26 additions & 2 deletions receiver/k8sclusterreceiver/collection/metricsstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package collection

import (
"sync"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"go.opentelemetry.io/collector/consumer/consumerdata"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -77,15 +80,36 @@ func (ms *metricsStore) remove(obj runtime.Object) error {
}

// getMetricData returns metricsCache stored in the cache at a given point in time.
func (ms *metricsStore) getMetricData() []consumerdata.MetricsData {
func (ms *metricsStore) getMetricData(currentTime time.Time) []consumerdata.MetricsData {
ms.RLock()
defer ms.RUnlock()

var out []consumerdata.MetricsData

for _, mds := range ms.metricsCache {
out = append(out, mds...)
for _, md := range mds {
// Set datapoint timestamp to be time of retrieval from cache.
applyCurrentTime(md.Metrics, currentTime)
out = append(out, md)
}
}

return out
}

func applyCurrentTime(metrics []*metricspb.Metric, t time.Time) []*metricspb.Metric {
currentTime := timestampProto(t)
for _, metric := range metrics {
if metric != nil {
for i := range metric.Timeseries {
metric.Timeseries[i].Points[0].Timestamp = currentTime
}
}
}
return metrics
}

func timestampProto(t time.Time) *timestamp.Timestamp {
out, _ := ptypes.TimestampProto(t)
return out
}
5 changes: 3 additions & 2 deletions receiver/k8sclusterreceiver/collection/metricsstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package collection

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumerdata"
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestMetricsStoreOperations(t *testing.T) {
require.True(t, len(ms.metricsCache[u.id]) == len(u.rm))
expectedMetricData += len(u.rm)
}
require.Equal(t, expectedMetricData, len(ms.getMetricData()))
require.Equal(t, expectedMetricData, len(ms.getMetricData(time.Now())))

// Remove non existent item
ms.remove(&corev1.Pod{
Expand All @@ -77,6 +78,6 @@ func TestMetricsStoreOperations(t *testing.T) {
})
expectedMetricData -= len(updates[1].rm)
require.Equal(t, len(updates)-1, len(ms.metricsCache))
require.Equal(t, expectedMetricData, len(ms.getMetricData()))
require.Equal(t, expectedMetricData, len(ms.getMetricData(time.Now())))

}
1 change: 1 addition & 0 deletions receiver/k8sclusterreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/census-instrumentation/opencensus-proto v0.3.0
github.com/golang/protobuf v1.4.2
github.com/iancoleman/strcase v0.0.0-20171129010253-3de563c3dc08
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.6.1
Expand Down
3 changes: 2 additions & 1 deletion receiver/k8sclusterreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (kr *kubernetesReceiver) Shutdown(context.Context) error {
}

func (kr *kubernetesReceiver) dispatchMetricData(ctx context.Context) {
for _, m := range kr.resourceWatcher.dataCollector.CollectMetricData() {
now := time.Now()
for _, m := range kr.resourceWatcher.dataCollector.CollectMetricData(now) {
c := obsreport.StartMetricsReceiveOp(ctx, typeStr, transport)

numTimeseries, numPoints := obsreport.CountMetricPoints(m)
Expand Down
4 changes: 2 additions & 2 deletions receiver/k8sclusterreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestReceiver(t *testing.T) {
// struct corresponds to one resource.
expectedResources := numPods + numNodes
require.Eventually(t, func() bool {
return len(r.resourceWatcher.dataCollector.CollectMetricData()) == expectedResources
return len(r.resourceWatcher.dataCollector.CollectMetricData(time.Now())) == expectedResources
}, 10*time.Second, 100*time.Millisecond,
"metrics not collected")

Expand All @@ -64,7 +64,7 @@ func TestReceiver(t *testing.T) {
// Expects metric data from a node, since other resources were deleted.
expectedResources = (numPods - numPodsToDelete) + numNodes
require.Eventually(t, func() bool {
return len(r.resourceWatcher.dataCollector.CollectMetricData()) == expectedResources
return len(r.resourceWatcher.dataCollector.CollectMetricData(time.Now())) == expectedResources
}, 10*time.Second, 100*time.Millisecond,
"updated metrics not collected")

Expand Down

0 comments on commit b1dbdb5

Please sign in to comment.