Skip to content

Commit

Permalink
k8sclusterreceiver: Switch to new interface (#753)
Browse files Browse the repository at this point in the history
  • Loading branch information
asuresh4 committed Aug 18, 2020
1 parent 0ad332d commit 4c56271
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 126 deletions.
2 changes: 1 addition & 1 deletion cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func components() (component.Factories, error) {
redisreceiver.NewFactory(),
kubeletstatsreceiver.NewFactory(),
simpleprometheusreceiver.NewFactory(),
&k8sclusterreceiver.Factory{},
k8sclusterreceiver.NewFactory(),
prometheusexecreceiver.NewFactory(),
receivercreator.NewFactory(),
}
Expand Down
22 changes: 22 additions & 0 deletions receiver/k8sclusterreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
k8s "k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig"
)
Expand All @@ -35,4 +36,25 @@ type Config struct {
NodeConditionTypesToReport []string `mapstructure:"node_conditions_to_report"`
// List of exporters to which metadata from this receiver should be forwarded to.
MetadataExporters []string `mapstructure:"metadata_exporters"`

// For mocking.
makeClient func(apiConf k8sconfig.APIConfig) (k8s.Interface, error)
}

func (cfg *Config) getReceiverOptions() (*receiverOptions, error) {
if cfg.makeClient == nil {
cfg.makeClient = k8sconfig.MakeClient
}
client, err := cfg.makeClient(cfg.APIConfig)
if err != nil {
return nil, err
}

return &receiverOptions{
name: cfg.Name(),
client: client,
collectionInterval: cfg.CollectionInterval,
nodeConditionTypesToReport: cfg.NodeConditionTypesToReport,
metadataExporters: cfg.MetadataExporters,
}, nil
}
2 changes: 1 addition & 1 deletion receiver/k8sclusterreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := &Factory{}
factory := NewFactory()
receiverType := "k8s_cluster"
factories.Receivers[configmodels.Type(receiverType)] = factory
cfg, err := configtest.LoadConfigFile(
Expand Down
45 changes: 24 additions & 21 deletions receiver/k8sclusterreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig"
)
Expand All @@ -37,17 +37,15 @@ const (

var defaultNodeConditionsToReport = []string{"Ready"}

var _ component.ReceiverFactoryOld = (*Factory)(nil)

// Factory is the factory for kubernetes-cluster receiver.
type Factory struct {
}

func (f Factory) Type() configmodels.Type {
return typeStr
type receiverOptions struct {
name string
client kubernetes.Interface
collectionInterval time.Duration
nodeConditionTypesToReport []string
metadataExporters []string
}

func (f Factory) CreateDefaultConfig() configmodels.Receiver {
func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
Expand All @@ -61,17 +59,22 @@ func (f Factory) CreateDefaultConfig() configmodels.Receiver {
}
}

func (f Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return nil
}
func createMetricsReceiver(
_ context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver,
consumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {
rCfg := cfg.(*Config)

func (f Factory) CreateTraceReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver,
consumer consumer.TraceConsumerOld) (component.TraceReceiver, error) {
return nil, configerror.ErrDataTypeIsNotSupported
rOptions, err := rCfg.getReceiverOptions()
if err != nil {
return nil, err
}
return newReceiver(params.Logger, rOptions, consumer)
}

func (f Factory) CreateMetricsReceiver(ctx context.Context, logger *zap.Logger, cfg configmodels.Receiver,
consumer consumer.MetricsConsumerOld) (component.MetricsReceiver, error) {
rCfg := cfg.(*Config)
return newReceiver(logger, rCfg, consumer)
// NewFactory creates a factory for k8s_cluster receiver.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithMetrics(createMetricsReceiver))
}
75 changes: 75 additions & 0 deletions receiver/k8sclusterreceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sclusterreceiver

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exportertest"
"k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig"
)

func TestFactory(t *testing.T) {
f := NewFactory()
require.Equal(t, configmodels.Type("k8s_cluster"), f.Type())

cfg := f.CreateDefaultConfig()
rCfg, ok := cfg.(*Config)
require.True(t, ok)

require.Equal(t, &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
CollectionInterval: defaultCollectionInterval,
NodeConditionTypesToReport: defaultNodeConditionsToReport,
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
}, rCfg)

r, err := f.CreateTraceReceiver(
context.Background(), component.ReceiverCreateParams{},
&configmodels.ReceiverSettings{}, &exportertest.SinkTraceExporter{},
)
require.Error(t, err)
require.Nil(t, r)

// Fails with bad K8s Config.
r, err = f.CreateMetricsReceiver(
context.Background(), component.ReceiverCreateParams{},
rCfg, &exportertest.SinkMetricsExporter{},
)
require.Error(t, err)
require.Nil(t, r)

// Override for tests.
rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) {
return nil, nil
}
r, err = f.CreateMetricsReceiver(
context.Background(), component.ReceiverCreateParams{},
rCfg, &exportertest.SinkMetricsExporter{},
)
require.NoError(t, err)
require.NotNil(t, r)
}
79 changes: 79 additions & 0 deletions receiver/k8sclusterreceiver/mock_resources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sclusterreceiver

import (
"context"
"strconv"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
)

func createPods(t *testing.T, client *fake.Clientset, numPods int) {
for i := 0; i < numPods; i++ {
p := &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
UID: types.UID("pod" + strconv.Itoa(i)),
Name: strconv.Itoa(i),
Namespace: "test",
},
}
_, err := client.CoreV1().Pods(p.Namespace).Create(context.Background(), p, v1.CreateOptions{})

if err != nil {
t.Errorf("error creating pod: %v", err)
t.FailNow()
}

time.Sleep(2 * time.Millisecond)
}
}

func deletePods(t *testing.T, client *fake.Clientset, numPods int) {
for i := 0; i < numPods; i++ {
err := client.CoreV1().Pods("test").Delete(context.Background(), strconv.Itoa(i), v1.DeleteOptions{})

if err != nil {
t.Errorf("error deleting pod: %v", err)
t.FailNow()
}
}

time.Sleep(2 * time.Millisecond)
}

func createNodes(t *testing.T, client *fake.Clientset, numNodes int) {
for i := 0; i < numNodes; i++ {
n := &corev1.Node{
ObjectMeta: v1.ObjectMeta{
UID: types.UID("node" + strconv.Itoa(i)),
Name: strconv.Itoa(i),
},
}
_, err := client.CoreV1().Nodes().Create(context.Background(), n, v1.CreateOptions{})

if err != nil {
t.Errorf("error creating node: %v", err)
t.FailNow()
}

time.Sleep(2 * time.Millisecond)
}
}
51 changes: 21 additions & 30 deletions receiver/k8sclusterreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/k8sconfig"
)

const (
Expand All @@ -36,32 +35,32 @@ var _ component.MetricsReceiver = (*kubernetesReceiver)(nil)
type kubernetesReceiver struct {
resourceWatcher *resourceWatcher

config *Config
options *receiverOptions
logger *zap.Logger
consumer consumer.MetricsConsumerOld
consumer consumer.MetricsConsumer
cancel context.CancelFunc
}

func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error {
var c context.Context
c, kr.cancel = context.WithCancel(obsreport.ReceiverContext(ctx, typeStr, transport, kr.config.Name()))
c, kr.cancel = context.WithCancel(obsreport.ReceiverContext(ctx, typeStr, transport, kr.options.name))

exporters := host.GetExporters()
if err := kr.resourceWatcher.setupMetadataExporters(
exporters[configmodels.MetricsDataType], kr.config.MetadataExporters); err != nil {
exporters[configmodels.MetricsDataType], kr.options.metadataExporters); err != nil {
return err
}

go func() {
kr.resourceWatcher.startWatchingResources(c.Done())

ticker := time.NewTicker(kr.config.CollectionInterval)
ticker := time.NewTicker(kr.options.collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
kr.dispatchMetricData(c)
kr.dispatchMetrics(c)
case <-c.Done():
return
}
Expand All @@ -76,41 +75,33 @@ func (kr *kubernetesReceiver) Shutdown(context.Context) error {
return nil
}

func (kr *kubernetesReceiver) dispatchMetricData(ctx context.Context) {
func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) {
now := time.Now()
for _, m := range kr.resourceWatcher.dataCollector.CollectMetricData(now) {
c := obsreport.StartMetricsReceiveOp(ctx, typeStr, transport)
mds := kr.resourceWatcher.dataCollector.CollectMetricData(now)
resourceMetrics := pdatautil.MetricsFromMetricsData(mds)

numTimeseries, numPoints := obsreport.CountMetricPoints(m)
c := obsreport.StartMetricsReceiveOp(ctx, typeStr, transport)

err := kr.consumer.ConsumeMetricsData(c, m)
obsreport.EndMetricsReceiveOp(c, typeStr, numPoints, numTimeseries, err)
}
numTimeseries, numPoints := pdatautil.MetricAndDataPointCount(resourceMetrics)

err := kr.consumer.ConsumeMetrics(c, resourceMetrics)
obsreport.EndMetricsReceiveOp(c, typeStr, numPoints, numTimeseries, err)
}

// newReceiver creates the Kubernetes cluster receiver with the given configuration.
func newReceiver(
logger *zap.Logger,
config *Config,
consumer consumer.MetricsConsumerOld,
) (component.MetricsReceiver, error) {
logger *zap.Logger, rOptions *receiverOptions,
consumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {

client, err := k8sconfig.MakeClient(config.APIConfig)
resourceWatcher, err := newResourceWatcher(logger, rOptions)
if err != nil {
return nil, err
}

resourceWatcher, err := newResourceWatcher(logger, config, client)
if err != nil {
return nil, err
}

r := &kubernetesReceiver{
return &kubernetesReceiver{
resourceWatcher: resourceWatcher,
logger: logger,
config: config,
options: rOptions,
consumer: consumer,
}

return r, nil
}, nil
}
Loading

0 comments on commit 4c56271

Please sign in to comment.