Skip to content

Commit

Permalink
[k8sclusterreceiver] Split scrapers in separate packages, allows meta…
Browse files Browse the repository at this point in the history
…data per package (open-telemetry#18216)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 1, 2023
1 parent 42b19da commit c54adac
Show file tree
Hide file tree
Showing 37 changed files with 914 additions and 867 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"
package clusterresourcequota // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterresourcequota"

import (
"strings"
Expand All @@ -24,6 +24,7 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils"
)

Expand Down Expand Up @@ -73,7 +74,7 @@ var appliedClusterResourceQuotaUsedMetric = &metricspb.MetricDescriptor{
},
}

func getMetricsForClusterResourceQuota(rq *quotav1.ClusterResourceQuota) []*agentmetricspb.ExportMetricsServiceRequest {
func GetMetrics(rq *quotav1.ClusterResourceQuota) []*agentmetricspb.ExportMetricsServiceRequest {
var metrics []*metricspb.Metric

metrics = appendClusterQuotaMetrics(metrics, clusterResourceQuotaLimitMetric, rq.Status.Total.Hard, "")
Expand All @@ -84,7 +85,7 @@ func getMetricsForClusterResourceQuota(rq *quotav1.ClusterResourceQuota) []*agen
}
return []*agentmetricspb.ExportMetricsServiceRequest{
{
Resource: getResourceForClusterResourceQuota(rq),
Resource: getResource(rq),
Metrics: metrics,
},
}
Expand Down Expand Up @@ -113,12 +114,12 @@ func appendClusterQuotaMetrics(metrics []*metricspb.Metric, metric *metricspb.Me
return metrics
}

func getResourceForClusterResourceQuota(rq *quotav1.ClusterResourceQuota) *resourcepb.Resource {
func getResource(rq *quotav1.ClusterResourceQuota) *resourcepb.Resource {
return &resourcepb.Resource{
Type: k8sType,
Type: constants.K8sType,
Labels: map[string]string{
k8sKeyClusterResourceQuotaUID: string(rq.UID),
k8sKeyClusterResourceQuotaName: rq.Name,
constants.K8sKeyClusterResourceQuotaUID: string(rq.UID),
constants.K8sKeyClusterResourceQuotaName: rq.Name,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package collection
package clusterresourcequota

import (
"testing"
Expand All @@ -25,19 +25,20 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/constants"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
)

func TestClusterRequestQuotaMetrics(t *testing.T) {
rq := newMockClusterResourceQuota("1")

actualResourceMetrics := getMetricsForClusterResourceQuota(rq)
actualResourceMetrics := GetMetrics(rq)

require.Equal(t, 1, len(actualResourceMetrics))

metrics := actualResourceMetrics[0].Metrics
require.Equal(t, 6, len(metrics))
testutils.AssertResource(t, actualResourceMetrics[0].Resource, k8sType,
testutils.AssertResource(t, actualResourceMetrics[0].Resource, constants.K8sType,
map[string]string{
"openshift.clusterquota.uid": "test-clusterquota-1-uid",
"openshift.clusterquota.name": "test-clusterquota-1",
Expand Down
106 changes: 45 additions & 61 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,51 +32,35 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/clusterresourcequota"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/cronjob"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/demonset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/deployment"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/hpa"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/jobs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/namespace"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/node"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/pod"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicaset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/replicationcontroller"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/resourcequota"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/statefulset"
)

// TODO: Consider moving some of these constants to
// https://go.opentelemetry.io/collector/blob/main/model/semconv/opentelemetry.go.

// Resource label keys.
const (
// TODO: Remove after switch to new Metrics definition
// Resource Type
k8sType = "k8s"
containerType = "container"

// Resource labels keys for UID.
k8sKeyNamespaceUID = "k8s.namespace.uid"
k8sKeyReplicationControllerUID = "k8s.replicationcontroller.uid"
k8sKeyHPAUID = "k8s.hpa.uid"
k8sKeyResourceQuotaUID = "k8s.resourcequota.uid"
k8sKeyClusterResourceQuotaUID = "openshift.clusterquota.uid"

// Resource labels keys for Name.
k8sKeyReplicationControllerName = "k8s.replicationcontroller.name"
k8sKeyHPAName = "k8s.hpa.name"
k8sKeyResourceQuotaName = "k8s.resourcequota.name"
k8sKeyClusterResourceQuotaName = "openshift.clusterquota.name"

// Kubernetes resource kinds
k8sKindCronJob = "CronJob"
k8sKindDaemonSet = "DaemonSet"
k8sKindDeployment = "Deployment"
k8sKindJob = "Job"
k8sKindReplicationController = "ReplicationController"
k8sKindReplicaSet = "ReplicaSet"
k8sStatefulSet = "StatefulSet"
)

// DataCollector wraps around a metricsStore and a metadaStore exposing
// methods to perform on the underlying stores. DataCollector also provides
// an interface to interact with refactored code from SignalFx Agent which is
// confined to the collection package.
type DataCollector struct {
logger *zap.Logger
metricsStore *metricsStore
metadataStore *metadataStore
metadataStore *metadata.Store
nodeConditionsToReport []string
allocatableTypesToReport []string
}
Expand All @@ -88,15 +72,15 @@ func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTyp
metricsStore: &metricsStore{
metricsCache: make(map[types.UID]pmetric.Metrics),
},
metadataStore: &metadataStore{},
metadataStore: &metadata.Store{},
nodeConditionsToReport: nodeConditionsToReport,
allocatableTypesToReport: allocatableTypesToReport,
}
}

// SetupMetadataStore initializes a metadata store for the kubernetes kind.
func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) {
dc.metadataStore.setupStore(gvk, store)
dc.metadataStore.Setup(gvk, store)
}

func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {
Expand Down Expand Up @@ -129,33 +113,33 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) {

switch o := obj.(type) {
case *corev1.Pod:
md = ocsToMetrics(getMetricsForPod(o, dc.logger))
md = ocsToMetrics(pod.GetMetrics(o, dc.logger))
case *corev1.Node:
md = ocsToMetrics(getMetricsForNode(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.logger))
md = ocsToMetrics(node.GetMetrics(o, dc.nodeConditionsToReport, dc.allocatableTypesToReport, dc.logger))
case *corev1.Namespace:
md = ocsToMetrics(getMetricsForNamespace(o))
md = ocsToMetrics(namespace.GetMetrics(o))
case *corev1.ReplicationController:
md = ocsToMetrics(getMetricsForReplicationController(o))
md = ocsToMetrics(replicationcontroller.GetMetrics(o))
case *corev1.ResourceQuota:
md = ocsToMetrics(getMetricsForResourceQuota(o))
md = ocsToMetrics(resourcequota.GetMetrics(o))
case *appsv1.Deployment:
md = ocsToMetrics(getMetricsForDeployment(o))
md = ocsToMetrics(deployment.GetMetrics(o))
case *appsv1.ReplicaSet:
md = ocsToMetrics(getMetricsForReplicaSet(o))
md = ocsToMetrics(replicaset.GetMetrics(o))
case *appsv1.DaemonSet:
md = ocsToMetrics(getMetricsForDaemonSet(o))
md = ocsToMetrics(demonset.GetMetrics(o))
case *appsv1.StatefulSet:
md = ocsToMetrics(getMetricsForStatefulSet(o))
md = ocsToMetrics(statefulset.GetMetrics(o))
case *batchv1.Job:
md = ocsToMetrics(getMetricsForJob(o))
md = ocsToMetrics(jobs.GetMetrics(o))
case *batchv1.CronJob:
md = ocsToMetrics(getMetricsForCronJob(o))
md = ocsToMetrics(cronjob.GetMetrics(o))
case *batchv1beta1.CronJob:
md = ocsToMetrics(getMetricsForCronJobBeta(o))
md = ocsToMetrics(cronjob.GetMetricsBeta(o))
case *autoscalingv2beta2.HorizontalPodAutoscaler:
md = ocsToMetrics(getMetricsForHPA(o))
md = ocsToMetrics(hpa.GetMetrics(o))
case *quotav1.ClusterResourceQuota:
md = ocsToMetrics(getMetricsForClusterResourceQuota(o))
md = ocsToMetrics(clusterresourcequota.GetMetrics(o))
default:
return
}
Expand All @@ -168,31 +152,31 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) {
}

// SyncMetadata updates the metric store with latest metrics from the kubernetes object
func (dc *DataCollector) SyncMetadata(obj interface{}) map[metadata.ResourceID]*KubernetesMetadata {
km := map[metadata.ResourceID]*KubernetesMetadata{}
func (dc *DataCollector) SyncMetadata(obj interface{}) map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata {
km := map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata{}
switch o := obj.(type) {
case *corev1.Pod:
km = getMetadataForPod(o, dc.metadataStore, dc.logger)
km = pod.GetMetadata(o, dc.metadataStore, dc.logger)
case *corev1.Node:
km = getMetadataForNode(o)
km = node.GetMetadata(o)
case *corev1.ReplicationController:
km = getMetadataForReplicationController(o)
km = replicationcontroller.GetMetadata(o)
case *appsv1.Deployment:
km = getMetadataForDeployment(o)
km = deployment.GetMetadata(o)
case *appsv1.ReplicaSet:
km = getMetadataForReplicaSet(o)
km = replicaset.GetMetadata(o)
case *appsv1.DaemonSet:
km = getMetadataForDaemonSet(o)
km = demonset.GetMetadata(o)
case *appsv1.StatefulSet:
km = getMetadataForStatefulSet(o)
km = statefulset.GetMetadata(o)
case *batchv1.Job:
km = getMetadataForJob(o)
km = jobs.GetMetadata(o)
case *batchv1.CronJob:
km = getMetadataForCronJob(o)
km = cronjob.GetMetadata(o)
case *batchv1beta1.CronJob:
km = getMetadataForCronJobBeta(o)
km = cronjob.GetMetadataBeta(o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
km = getMetadataForHPA(o)
km = hpa.GetMetadata(o)
}

return km
Expand Down
Loading

0 comments on commit c54adac

Please sign in to comment.