Skip to content

Commit

Permalink
[k8sclusterreceiver] Change internal metric store to use pdata (open-…
Browse files Browse the repository at this point in the history
…telemetry#18214)

This PR allows to change independently each metrics getter to pdata.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 1, 2023
1 parent b2f0015 commit dd1fa43
Show file tree
Hide file tree
Showing 30 changed files with 240 additions and 215 deletions.
11 changes: 11 additions & 0 deletions .chloggen/k8sclusterreceiverpdatastore.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sclusterreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change internal metric store to use pdata

# One or more tracking issues related to the change
issues: [18214]
2 changes: 1 addition & 1 deletion receiver/k8sclusterreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ require (
go.opentelemetry.io/collector/semconv v0.70.0
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.28.1
k8s.io/api v0.26.1
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
Expand Down Expand Up @@ -87,6 +86,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 // indirect
google.golang.org/grpc v1.52.3 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package collection // import "github.com/open-telemetry/opentelemetry-collector-
import (
"strings"

agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
quotav1 "github.com/openshift/api/quota/v1"
Expand Down Expand Up @@ -72,7 +73,7 @@ var appliedClusterResourceQuotaUsedMetric = &metricspb.MetricDescriptor{
},
}

func getMetricsForClusterResourceQuota(rq *quotav1.ClusterResourceQuota) []*resourceMetrics {
func getMetricsForClusterResourceQuota(rq *quotav1.ClusterResourceQuota) []*agentmetricspb.ExportMetricsServiceRequest {
var metrics []*metricspb.Metric

metrics = appendClusterQuotaMetrics(metrics, clusterResourceQuotaLimitMetric, rq.Status.Total.Hard, "")
Expand All @@ -81,10 +82,10 @@ func getMetricsForClusterResourceQuota(rq *quotav1.ClusterResourceQuota) []*reso
metrics = appendClusterQuotaMetrics(metrics, appliedClusterResourceQuotaLimitMetric, ns.Status.Hard, ns.Namespace)
metrics = appendClusterQuotaMetrics(metrics, appliedClusterResourceQuotaUsedMetric, ns.Status.Used, ns.Namespace)
}
return []*resourceMetrics{
return []*agentmetricspb.ExportMetricsServiceRequest{
{
resource: getResourceForClusterResourceQuota(rq),
metrics: metrics,
Resource: getResourceForClusterResourceQuota(rq),
Metrics: metrics,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func TestClusterRequestQuotaMetrics(t *testing.T) {

require.Equal(t, 1, len(actualResourceMetrics))

metrics := actualResourceMetrics[0].metrics
metrics := actualResourceMetrics[0].Metrics
require.Equal(t, 6, len(metrics))
testutils.AssertResource(t, actualResourceMetrics[0].resource, k8sType,
testutils.AssertResource(t, actualResourceMetrics[0].Resource, k8sType,
map[string]string{
"openshift.clusterquota.uid": "test-clusterquota-1-uid",
"openshift.clusterquota.name": "test-clusterquota-1",
Expand Down
49 changes: 29 additions & 20 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/cache"

metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus"
)

// TODO: Consider moving some of these constants to
Expand Down Expand Up @@ -85,7 +86,7 @@ func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTyp
return &DataCollector{
logger: logger,
metricsStore: &metricsStore{
metricsCache: map[types.UID][]*agentmetricspb.ExportMetricsServiceRequest{},
metricsCache: make(map[types.UID]pmetric.Metrics),
},
metadataStore: &metadataStore{},
nodeConditionsToReport: nodeConditionsToReport,
Expand All @@ -108,8 +109,8 @@ func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {
}
}

func (dc *DataCollector) UpdateMetricsStore(obj interface{}, rm []*resourceMetrics) {
if err := dc.metricsStore.update(obj.(runtime.Object), rm); err != nil {
func (dc *DataCollector) UpdateMetricsStore(obj interface{}, md pmetric.Metrics) {
if err := dc.metricsStore.update(obj.(runtime.Object), md); err != nil {
dc.logger.Error(
"failed to update metric cache",
zap.String("obj", reflect.TypeOf(obj).String()),
Expand All @@ -124,46 +125,46 @@ func (dc *DataCollector) CollectMetricData(currentTime time.Time) pmetric.Metric

// SyncMetrics updates the metric store with latest metrics from the kubernetes object.
func (dc *DataCollector) SyncMetrics(obj interface{}) {
var rm []*resourceMetrics
var md pmetric.Metrics

switch o := obj.(type) {
case *corev1.Pod:
rm = getMetricsForPod(o, dc.logger)
md = ocsToMetrics(getMetricsForPod(o, dc.logger))
case *corev1.Node:
rm = getMetricsForNode(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.logger)
md = ocsToMetrics(getMetricsForNode(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.logger))
case *corev1.Namespace:
rm = getMetricsForNamespace(o)
md = ocsToMetrics(getMetricsForNamespace(o))
case *corev1.ReplicationController:
rm = getMetricsForReplicationController(o)
md = ocsToMetrics(getMetricsForReplicationController(o))
case *corev1.ResourceQuota:
rm = getMetricsForResourceQuota(o)
md = ocsToMetrics(getMetricsForResourceQuota(o))
case *appsv1.Deployment:
rm = getMetricsForDeployment(o)
md = ocsToMetrics(getMetricsForDeployment(o))
case *appsv1.ReplicaSet:
rm = getMetricsForReplicaSet(o)
md = ocsToMetrics(getMetricsForReplicaSet(o))
case *appsv1.DaemonSet:
rm = getMetricsForDaemonSet(o)
md = ocsToMetrics(getMetricsForDaemonSet(o))
case *appsv1.StatefulSet:
rm = getMetricsForStatefulSet(o)
md = ocsToMetrics(getMetricsForStatefulSet(o))
case *batchv1.Job:
rm = getMetricsForJob(o)
md = ocsToMetrics(getMetricsForJob(o))
case *batchv1.CronJob:
rm = getMetricsForCronJob(o)
md = ocsToMetrics(getMetricsForCronJob(o))
case *batchv1beta1.CronJob:
rm = getMetricsForCronJobBeta(o)
md = ocsToMetrics(getMetricsForCronJobBeta(o))
case *autoscalingv2beta2.HorizontalPodAutoscaler:
rm = getMetricsForHPA(o)
md = ocsToMetrics(getMetricsForHPA(o))
case *quotav1.ClusterResourceQuota:
rm = getMetricsForClusterResourceQuota(o)
md = ocsToMetrics(getMetricsForClusterResourceQuota(o))
default:
return
}

if len(rm) == 0 {
if md.DataPointCount() == 0 {
return
}

dc.UpdateMetricsStore(obj, rm)
dc.UpdateMetricsStore(obj, md)
}

// SyncMetadata updates the metric store with latest metrics from the kubernetes object
Expand Down Expand Up @@ -196,3 +197,11 @@ func (dc *DataCollector) SyncMetadata(obj interface{}) map[metadata.ResourceID]*

return km
}

func ocsToMetrics(ocs []*agentmetricspb.ExportMetricsServiceRequest) pmetric.Metrics {
md := pmetric.NewMetrics()
for _, ocm := range ocs {
internaldata.OCToMetrics(ocm.Node, ocm.Resource, ocm.Metrics).ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
}
return md
}
17 changes: 9 additions & 8 deletions receiver/k8sclusterreceiver/internal/collection/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand All @@ -40,7 +41,7 @@ var activeJobs = &metricspb.MetricDescriptor{

// TODO: All the CronJob related functions below can be de-duplicated using generics from go 1.18

func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics {
func getMetricsForCronJob(cj *batchv1.CronJob) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := []*metricspb.Metric{
{
MetricDescriptor: activeJobs,
Expand All @@ -50,15 +51,15 @@ func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics {
},
}

return []*resourceMetrics{
return []*agentmetricspb.ExportMetricsServiceRequest{
{
resource: getResourceForCronJob(cj),
metrics: metrics,
Resource: getResourceForCronJob(cj),
Metrics: metrics,
},
}
}

func getMetricsForCronJobBeta(cj *batchv1beta1.CronJob) []*resourceMetrics {
func getMetricsForCronJobBeta(cj *batchv1beta1.CronJob) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := []*metricspb.Metric{
{
MetricDescriptor: activeJobs,
Expand All @@ -68,10 +69,10 @@ func getMetricsForCronJobBeta(cj *batchv1beta1.CronJob) []*resourceMetrics {
},
}

return []*resourceMetrics{
return []*agentmetricspb.ExportMetricsServiceRequest{
{
resource: getResourceForCronJobBeta(cj),
metrics: metrics,
Resource: getResourceForCronJobBeta(cj),
Metrics: metrics,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ func TestCronJobMetrics(t *testing.T) {

require.Equal(t, 1, len(actualResourceMetrics))

require.Equal(t, 1, len(actualResourceMetrics[0].metrics))
testutils.AssertResource(t, actualResourceMetrics[0].resource, k8sType,
require.Equal(t, 1, len(actualResourceMetrics[0].Metrics))
testutils.AssertResource(t, actualResourceMetrics[0].Resource, k8sType,
map[string]string{
"k8s.cronjob.uid": "test-cronjob-1-uid",
"k8s.cronjob.name": "test-cronjob-1",
"k8s.namespace.name": "test-namespace",
},
)

testutils.AssertMetricsInt(t, actualResourceMetrics[0].metrics[0], "k8s.cronjob.active_jobs",
testutils.AssertMetricsInt(t, actualResourceMetrics[0].Metrics[0], "k8s.cronjob.active_jobs",
metricspb.MetricDescriptor_GAUGE_INT64, 2)
}

Expand Down
9 changes: 5 additions & 4 deletions receiver/k8sclusterreceiver/internal/collection/daemonsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -52,7 +53,7 @@ var daemonSetReadyMetric = &metricspb.MetricDescriptor{
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

func getMetricsForDaemonSet(ds *appsv1.DaemonSet) []*resourceMetrics {
func getMetricsForDaemonSet(ds *appsv1.DaemonSet) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := []*metricspb.Metric{
{
MetricDescriptor: daemonSetCurrentScheduledMetric,
Expand Down Expand Up @@ -80,10 +81,10 @@ func getMetricsForDaemonSet(ds *appsv1.DaemonSet) []*resourceMetrics {
},
}

return []*resourceMetrics{
return []*agentmetricspb.ExportMetricsServiceRequest{
{
resource: getResourceForDaemonSet(ds),
metrics: metrics,
Resource: getResourceForDaemonSet(ds),
Metrics: metrics,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,27 @@ func TestDaemonsetMetrics(t *testing.T) {
actualResourceMetrics := getMetricsForDaemonSet(ds)

require.Equal(t, 1, len(actualResourceMetrics))
require.Equal(t, 4, len(actualResourceMetrics[0].metrics))
require.Equal(t, 4, len(actualResourceMetrics[0].Metrics))

rm := actualResourceMetrics[0]
testutils.AssertResource(t, rm.resource, k8sType,
testutils.AssertResource(t, rm.Resource, k8sType,
map[string]string{
"k8s.daemonset.uid": "test-daemonset-1-uid",
"k8s.daemonset.name": "test-daemonset-1",
"k8s.namespace.name": "test-namespace",
},
)

testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.daemonset.current_scheduled_nodes",
testutils.AssertMetricsInt(t, rm.Metrics[0], "k8s.daemonset.current_scheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 3)

testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.daemonset.desired_scheduled_nodes",
testutils.AssertMetricsInt(t, rm.Metrics[1], "k8s.daemonset.desired_scheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 5)

testutils.AssertMetricsInt(t, rm.metrics[2], "k8s.daemonset.misscheduled_nodes",
testutils.AssertMetricsInt(t, rm.Metrics[2], "k8s.daemonset.misscheduled_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 1)

testutils.AssertMetricsInt(t, rm.metrics[3], "k8s.daemonset.ready_nodes",
testutils.AssertMetricsInt(t, rm.Metrics[3], "k8s.daemonset.ready_nodes",
metricspb.MetricDescriptor_GAUGE_INT64, 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,23 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
appsv1 "k8s.io/api/apps/v1"

metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
)

func getMetricsForDeployment(dep *appsv1.Deployment) []*resourceMetrics {
func getMetricsForDeployment(dep *appsv1.Deployment) []*agentmetricspb.ExportMetricsServiceRequest {
if dep.Spec.Replicas == nil {
return nil
}

return []*resourceMetrics{
return []*agentmetricspb.ExportMetricsServiceRequest{
{
resource: getResourceForDeployment(dep),
metrics: getReplicaMetrics(
Resource: getResourceForDeployment(dep),
Metrics: getReplicaMetrics(
"deployment",
*dep.Spec.Replicas,
dep.Status.AvailableReplicas,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ func TestDeploymentMetrics(t *testing.T) {
actualResourceMetrics := getMetricsForDeployment(dep)

require.Equal(t, 1, len(actualResourceMetrics))
require.Equal(t, 2, len(actualResourceMetrics[0].metrics))
require.Equal(t, 2, len(actualResourceMetrics[0].Metrics))

rm := actualResourceMetrics[0]
testutils.AssertResource(t, rm.resource, k8sType,
testutils.AssertResource(t, rm.Resource, k8sType,
map[string]string{
"k8s.deployment.uid": "test-deployment-1-uid",
"k8s.deployment.name": "test-deployment-1",
"k8s.namespace.name": "test-namespace",
},
)

testutils.AssertMetricsInt(t, rm.metrics[0], "k8s.deployment.desired",
testutils.AssertMetricsInt(t, rm.Metrics[0], "k8s.deployment.desired",
metricspb.MetricDescriptor_GAUGE_INT64, 10)

testutils.AssertMetricsInt(t, rm.metrics[1], "k8s.deployment.available",
testutils.AssertMetricsInt(t, rm.Metrics[1], "k8s.deployment.available",
metricspb.MetricDescriptor_GAUGE_INT64, 3)
}

Expand Down
9 changes: 5 additions & 4 deletions receiver/k8sclusterreceiver/internal/collection/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -52,7 +53,7 @@ var hpaDesiredReplicasMetric = &metricspb.MetricDescriptor{
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

func getMetricsForHPA(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) []*resourceMetrics {
func getMetricsForHPA(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) []*agentmetricspb.ExportMetricsServiceRequest {
metrics := []*metricspb.Metric{
{
MetricDescriptor: hpaMaxReplicasMetric,
Expand Down Expand Up @@ -80,10 +81,10 @@ func getMetricsForHPA(hpa *autoscalingv2beta2.HorizontalPodAutoscaler) []*resour
},
}

return []*resourceMetrics{
return []*agentmetricspb.ExportMetricsServiceRequest{
{
resource: getResourceForHPA(hpa),
metrics: metrics,
Resource: getResourceForHPA(hpa),
Metrics: metrics,
},
}
}
Expand Down
Loading

0 comments on commit dd1fa43

Please sign in to comment.