diff --git a/cmd/otelcontribcol/components.go b/cmd/otelcontribcol/components.go index 873dcd7945927..05951c228a488 100644 --- a/cmd/otelcontribcol/components.go +++ b/cmd/otelcontribcol/components.go @@ -82,7 +82,7 @@ func components() (component.Factories, error) { redisreceiver.NewFactory(), kubeletstatsreceiver.NewFactory(), simpleprometheusreceiver.NewFactory(), - &k8sclusterreceiver.Factory{}, + k8sclusterreceiver.NewFactory(), prometheusexecreceiver.NewFactory(), receivercreator.NewFactory(), } diff --git a/receiver/k8sclusterreceiver/config.go b/receiver/k8sclusterreceiver/config.go index ce3fcac349e2b..2e18080c49a7d 100644 --- a/receiver/k8sclusterreceiver/config.go +++ b/receiver/k8sclusterreceiver/config.go @@ -18,6 +18,7 @@ import ( "time" "go.opentelemetry.io/collector/config/configmodels" + k8s "k8s.io/client-go/kubernetes" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig" ) @@ -35,4 +36,25 @@ type Config struct { NodeConditionTypesToReport []string `mapstructure:"node_conditions_to_report"` // List of exporters to which metadata from this receiver should be forwarded to. MetadataExporters []string `mapstructure:"metadata_exporters"` + + // For mocking. + makeClient func(apiConf k8sconfig.APIConfig) (k8s.Interface, error) +} + +func (cfg *Config) getReceiverOptions() (*receiverOptions, error) { + if cfg.makeClient == nil { + cfg.makeClient = k8sconfig.MakeClient + } + client, err := cfg.makeClient(cfg.APIConfig) + if err != nil { + return nil, err + } + + return &receiverOptions{ + name: cfg.Name(), + client: client, + collectionInterval: cfg.CollectionInterval, + nodeConditionTypesToReport: cfg.NodeConditionTypesToReport, + metadataExporters: cfg.MetadataExporters, + }, nil } diff --git a/receiver/k8sclusterreceiver/config_test.go b/receiver/k8sclusterreceiver/config_test.go index 913619ea1d630..4c70b90ced782 100644 --- a/receiver/k8sclusterreceiver/config_test.go +++ b/receiver/k8sclusterreceiver/config_test.go @@ -32,7 +32,7 @@ func TestLoadConfig(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.Nil(t, err) - factory := &Factory{} + factory := NewFactory() receiverType := "k8s_cluster" factories.Receivers[configmodels.Type(receiverType)] = factory cfg, err := configtest.LoadConfigFile( diff --git a/receiver/k8sclusterreceiver/factory.go b/receiver/k8sclusterreceiver/factory.go index a3eba5ffd68bd..b541095b626d1 100644 --- a/receiver/k8sclusterreceiver/factory.go +++ b/receiver/k8sclusterreceiver/factory.go @@ -19,10 +19,10 @@ import ( "time" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" - "go.uber.org/zap" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "k8s.io/client-go/kubernetes" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig" ) @@ -37,17 +37,15 @@ const ( var defaultNodeConditionsToReport = []string{"Ready"} -var _ component.ReceiverFactoryOld = (*Factory)(nil) - -// Factory is the factory for kubernetes-cluster receiver. -type Factory struct { -} - -func (f Factory) Type() configmodels.Type { - return typeStr +type receiverOptions struct { + name string + client kubernetes.Interface + collectionInterval time.Duration + nodeConditionTypesToReport []string + metadataExporters []string } -func (f Factory) CreateDefaultConfig() configmodels.Receiver { +func createDefaultConfig() configmodels.Receiver { return &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, @@ -61,17 +59,22 @@ func (f Factory) CreateDefaultConfig() configmodels.Receiver { } } -func (f Factory) CustomUnmarshaler() component.CustomUnmarshaler { - return nil -} +func createMetricsReceiver( + _ context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, + consumer consumer.MetricsConsumer) (component.MetricsReceiver, error) { + rCfg := cfg.(*Config) -func (f Factory) CreateTraceReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver, - consumer consumer.TraceConsumerOld) (component.TraceReceiver, error) { - return nil, configerror.ErrDataTypeIsNotSupported + rOptions, err := rCfg.getReceiverOptions() + if err != nil { + return nil, err + } + return newReceiver(params.Logger, rOptions, consumer) } -func (f Factory) CreateMetricsReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver, - consumer consumer.MetricsConsumerOld) (component.MetricsReceiver, error) { - rCfg := cfg.(*Config) - return newReceiver(logger, rCfg, consumer) +// NewFactory creates a factory for k8s_cluster receiver. +func NewFactory() component.ReceiverFactory { + return receiverhelper.NewFactory( + typeStr, + createDefaultConfig, + receiverhelper.WithMetrics(createMetricsReceiver)) } diff --git a/receiver/k8sclusterreceiver/factory_test.go b/receiver/k8sclusterreceiver/factory_test.go new file mode 100644 index 0000000000000..669ec63674d0b --- /dev/null +++ b/receiver/k8sclusterreceiver/factory_test.go @@ -0,0 +1,75 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclusterreceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exportertest" + "k8s.io/client-go/kubernetes" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig" +) + +func TestFactory(t *testing.T) { + f := NewFactory() + require.Equal(t, configmodels.Type("k8s_cluster"), f.Type()) + + cfg := f.CreateDefaultConfig() + rCfg, ok := cfg.(*Config) + require.True(t, ok) + + require.Equal(t, &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + CollectionInterval: defaultCollectionInterval, + NodeConditionTypesToReport: defaultNodeConditionsToReport, + APIConfig: k8sconfig.APIConfig{ + AuthType: k8sconfig.AuthTypeServiceAccount, + }, + }, rCfg) + + r, err := f.CreateTraceReceiver( + context.Background(), component.ReceiverCreateParams{}, + &configmodels.ReceiverSettings{}, &exportertest.SinkTraceExporter{}, + ) + require.Error(t, err) + require.Nil(t, r) + + // Fails with bad K8s Config. + r, err = f.CreateMetricsReceiver( + context.Background(), component.ReceiverCreateParams{}, + rCfg, &exportertest.SinkMetricsExporter{}, + ) + require.Error(t, err) + require.Nil(t, r) + + // Override for tests. + rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) { + return nil, nil + } + r, err = f.CreateMetricsReceiver( + context.Background(), component.ReceiverCreateParams{}, + rCfg, &exportertest.SinkMetricsExporter{}, + ) + require.NoError(t, err) + require.NotNil(t, r) +} diff --git a/receiver/k8sclusterreceiver/mock_resources_test.go b/receiver/k8sclusterreceiver/mock_resources_test.go new file mode 100644 index 0000000000000..b8eade2a94380 --- /dev/null +++ b/receiver/k8sclusterreceiver/mock_resources_test.go @@ -0,0 +1,79 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclusterreceiver + +import ( + "context" + "strconv" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" +) + +func createPods(t *testing.T, client *fake.Clientset, numPods int) { + for i := 0; i < numPods; i++ { + p := &corev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + UID: types.UID("pod" + strconv.Itoa(i)), + Name: strconv.Itoa(i), + Namespace: "test", + }, + } + _, err := client.CoreV1().Pods(p.Namespace).Create(context.Background(), p, v1.CreateOptions{}) + + if err != nil { + t.Errorf("error creating pod: %v", err) + t.FailNow() + } + + time.Sleep(2 * time.Millisecond) + } +} + +func deletePods(t *testing.T, client *fake.Clientset, numPods int) { + for i := 0; i < numPods; i++ { + err := client.CoreV1().Pods("test").Delete(context.Background(), strconv.Itoa(i), v1.DeleteOptions{}) + + if err != nil { + t.Errorf("error deleting pod: %v", err) + t.FailNow() + } + } + + time.Sleep(2 * time.Millisecond) +} + +func createNodes(t *testing.T, client *fake.Clientset, numNodes int) { + for i := 0; i < numNodes; i++ { + n := &corev1.Node{ + ObjectMeta: v1.ObjectMeta{ + UID: types.UID("node" + strconv.Itoa(i)), + Name: strconv.Itoa(i), + }, + } + _, err := client.CoreV1().Nodes().Create(context.Background(), n, v1.CreateOptions{}) + + if err != nil { + t.Errorf("error creating node: %v", err) + t.FailNow() + } + + time.Sleep(2 * time.Millisecond) + } +} diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index 8214f166e373a..2c627707078c9 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -21,10 +21,9 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/obsreport" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig" ) const ( @@ -36,32 +35,32 @@ var _ component.MetricsReceiver = (*kubernetesReceiver)(nil) type kubernetesReceiver struct { resourceWatcher *resourceWatcher - config *Config + options *receiverOptions logger *zap.Logger - consumer consumer.MetricsConsumerOld + consumer consumer.MetricsConsumer cancel context.CancelFunc } func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error { var c context.Context - c, kr.cancel = context.WithCancel(obsreport.ReceiverContext(ctx, typeStr, transport, kr.config.Name())) + c, kr.cancel = context.WithCancel(obsreport.ReceiverContext(ctx, typeStr, transport, kr.options.name)) exporters := host.GetExporters() if err := kr.resourceWatcher.setupMetadataExporters( - exporters[configmodels.MetricsDataType], kr.config.MetadataExporters); err != nil { + exporters[configmodels.MetricsDataType], kr.options.metadataExporters); err != nil { return err } go func() { kr.resourceWatcher.startWatchingResources(c.Done()) - ticker := time.NewTicker(kr.config.CollectionInterval) + ticker := time.NewTicker(kr.options.collectionInterval) defer ticker.Stop() for { select { case <-ticker.C: - kr.dispatchMetricData(c) + kr.dispatchMetrics(c) case <-c.Done(): return } @@ -76,41 +75,33 @@ func (kr *kubernetesReceiver) Shutdown(context.Context) error { return nil } -func (kr *kubernetesReceiver) dispatchMetricData(ctx context.Context) { +func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) { now := time.Now() - for _, m := range kr.resourceWatcher.dataCollector.CollectMetricData(now) { - c := obsreport.StartMetricsReceiveOp(ctx, typeStr, transport) + mds := kr.resourceWatcher.dataCollector.CollectMetricData(now) + resourceMetrics := pdatautil.MetricsFromMetricsData(mds) - numTimeseries, numPoints := obsreport.CountMetricPoints(m) + c := obsreport.StartMetricsReceiveOp(ctx, typeStr, transport) - err := kr.consumer.ConsumeMetricsData(c, m) - obsreport.EndMetricsReceiveOp(c, typeStr, numPoints, numTimeseries, err) - } + numTimeseries, numPoints := pdatautil.MetricAndDataPointCount(resourceMetrics) + + err := kr.consumer.ConsumeMetrics(c, resourceMetrics) + obsreport.EndMetricsReceiveOp(c, typeStr, numPoints, numTimeseries, err) } // newReceiver creates the Kubernetes cluster receiver with the given configuration. func newReceiver( - logger *zap.Logger, - config *Config, - consumer consumer.MetricsConsumerOld, -) (component.MetricsReceiver, error) { + logger *zap.Logger, rOptions *receiverOptions, + consumer consumer.MetricsConsumer) (component.MetricsReceiver, error) { - client, err := k8sconfig.MakeClient(config.APIConfig) + resourceWatcher, err := newResourceWatcher(logger, rOptions) if err != nil { return nil, err } - resourceWatcher, err := newResourceWatcher(logger, config, client) - if err != nil { - return nil, err - } - - r := &kubernetesReceiver{ + return &kubernetesReceiver{ resourceWatcher: resourceWatcher, logger: logger, - config: config, + options: rOptions, consumer: consumer, - } - - return r, nil + }, nil } diff --git a/receiver/k8sclusterreceiver/receiver_test.go b/receiver/k8sclusterreceiver/receiver_test.go index b600d4326fcd6..e340efa6c66c9 100644 --- a/receiver/k8sclusterreceiver/receiver_test.go +++ b/receiver/k8sclusterreceiver/receiver_test.go @@ -16,7 +16,6 @@ package k8sclusterreceiver import ( "context" - "strconv" "testing" "time" @@ -26,8 +25,6 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/testutils" @@ -35,7 +32,7 @@ import ( func TestReceiver(t *testing.T) { client := fake.NewSimpleClientset() - consumer := &exportertest.SinkMetricsExporterOld{} + consumer := &exportertest.SinkMetricsExporter{} r, err := setupReceiver(client, consumer) @@ -52,9 +49,11 @@ func TestReceiver(t *testing.T) { // Expects metric data from nodes and pods where each metric data // struct corresponds to one resource. - expectedResources := numPods + numNodes + expectedNumMetrics := numPods + numNodes + var initialMetricsCount int require.Eventually(t, func() bool { - return len(r.resourceWatcher.dataCollector.CollectMetricData(time.Now())) == expectedResources + initialMetricsCount = consumer.MetricsCount() + return initialMetricsCount == expectedNumMetrics }, 10*time.Second, 100*time.Millisecond, "metrics not collected") @@ -62,9 +61,11 @@ func TestReceiver(t *testing.T) { deletePods(t, client, numPodsToDelete) // Expects metric data from a node, since other resources were deleted. - expectedResources = (numPods - numPodsToDelete) + numNodes + expectedNumMetrics = (numPods - numPodsToDelete) + numNodes + var metricsCountDelta int require.Eventually(t, func() bool { - return len(r.resourceWatcher.dataCollector.CollectMetricData(time.Now())) == expectedResources + metricsCountDelta = consumer.MetricsCount() - initialMetricsCount + return metricsCountDelta == expectedNumMetrics }, 10*time.Second, 100*time.Millisecond, "updated metrics not collected") @@ -73,7 +74,7 @@ func TestReceiver(t *testing.T) { func TestReceiverWithManyResources(t *testing.T) { client := fake.NewSimpleClientset() - consumer := &exportertest.SinkMetricsExporterOld{} + consumer := &exportertest.SinkMetricsExporter{} r, err := setupReceiver(client, consumer) @@ -86,23 +87,25 @@ func TestReceiverWithManyResources(t *testing.T) { r.Start(ctx, componenttest.NewNopHost()) require.Eventually(t, func() bool { - return len(consumer.AllMetrics()) == numPods + return consumer.MetricsCount() == numPods }, 10*time.Second, 100*time.Millisecond, "metrics not collected") r.Shutdown(ctx) } -func setupReceiver(client *fake.Clientset, - consumer consumer.MetricsConsumerOld) (*kubernetesReceiver, error) { +func setupReceiver( + client *fake.Clientset, + consumer consumer.MetricsConsumer) (*kubernetesReceiver, error) { logger := zap.NewNop() - config := &Config{ - CollectionInterval: 1 * time.Second, - NodeConditionTypesToReport: []string{"Ready"}, + rOptions := &receiverOptions{ + collectionInterval: 1 * time.Second, + nodeConditionTypesToReport: []string{"Ready"}, + client: client, } - rw, err := newResourceWatcher(logger, config, client) + rw, err := newResourceWatcher(logger, rOptions) if err != nil { return nil, err @@ -113,59 +116,7 @@ func setupReceiver(client *fake.Clientset, return &kubernetesReceiver{ resourceWatcher: rw, logger: logger, - config: config, + options: rOptions, consumer: consumer, }, nil } - -func createPods(t *testing.T, client *fake.Clientset, numPods int) { - for i := 0; i < numPods; i++ { - p := &corev1.Pod{ - ObjectMeta: v1.ObjectMeta{ - UID: types.UID("pod" + strconv.Itoa(i)), - Name: strconv.Itoa(i), - Namespace: "test", - }, - } - _, err := client.CoreV1().Pods(p.Namespace).Create(context.Background(), p, v1.CreateOptions{}) - - if err != nil { - t.Errorf("error creating pod: %v", err) - t.FailNow() - } - - time.Sleep(2 * time.Millisecond) - } -} - -func deletePods(t *testing.T, client *fake.Clientset, numPods int) { - for i := 0; i < numPods; i++ { - err := client.CoreV1().Pods("test").Delete(context.Background(), strconv.Itoa(i), v1.DeleteOptions{}) - - if err != nil { - t.Errorf("error deleting pod: %v", err) - t.FailNow() - } - } - - time.Sleep(2 * time.Millisecond) -} - -func createNodes(t *testing.T, client *fake.Clientset, numNodes int) { - for i := 0; i < numNodes; i++ { - n := &corev1.Node{ - ObjectMeta: v1.ObjectMeta{ - UID: types.UID("node" + strconv.Itoa(i)), - Name: strconv.Itoa(i), - }, - } - _, err := client.CoreV1().Nodes().Create(context.Background(), n, v1.CreateOptions{}) - - if err != nil { - t.Errorf("error creating node: %v", err) - t.FailNow() - } - - time.Sleep(2 * time.Millisecond) - } -} diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index 66eb002b8f5fb..f4a6855d69fbe 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -45,12 +45,11 @@ type resourceWatcher struct { type metadataConsumer func(metadata []*collection.KubernetesMetadataUpdate) error // newResourceWatcher creates a Kubernetes resource watcher. -func newResourceWatcher(logger *zap.Logger, config *Config, - client kubernetes.Interface) (*resourceWatcher, error) { +func newResourceWatcher(logger *zap.Logger, rOptions *receiverOptions) (*resourceWatcher, error) { rw := &resourceWatcher{ - client: client, + client: rOptions.client, logger: logger, - dataCollector: collection.NewDataCollector(logger, config.NodeConditionTypesToReport), + dataCollector: collection.NewDataCollector(logger, rOptions.nodeConditionTypesToReport), } rw.prepareSharedInformerFactory()