Skip to content

Commit

Permalink
Added selector in k8sattributesprocessor (#1)
Browse files Browse the repository at this point in the history
* Added selector in k8sattributesprocessor

* Format changes

* Fixed tests

* [chore] Fix spark integration test by pinning version (open-telemetry#23675)

Fixes open-telemetry#23670

The test appears to have begun failing when the
[`latest`](https://hub.docker.com/layers/apache/spark/latest/images/sha256-a1dd2487a97fb5e35c5a5b409e830b501a92919029c62f9a559b13c4f5c50f63?context=explore)
tag of for the image was updated. This PR just pins the version.

* Version change and skip flaky tests
  • Loading branch information
hdkshingala committed Jul 4, 2023
1 parent 739e583 commit c935442
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 23 deletions.
4 changes: 3 additions & 1 deletion processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type fakeClient struct {
Pods map[kube.PodIdentifier]*kube.Pod
Rules kube.ExtractionRules
Filters kube.Filters
Selectors kube.Selectors
Associations []kube.Association
Informer cache.SharedInformer
NamespaceInformer cache.SharedInformer
Expand All @@ -33,14 +34,15 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
func newFakeClient(_ *zap.Logger, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, kubeSelectors kube.Selectors, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
return &fakeClient{
Pods: map[kube.PodIdentifier]*kube.Pod{},
Rules: rules,
Filters: filters,
Selectors: kubeSelectors,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs),
Expand Down
20 changes: 20 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type Config struct {
// pods by labels, fields, namespaces, nodes, etc.
Filter FilterConfig `mapstructure:"filter"`

// Selector section allows specifying selectors to select
// pods.
Selector SelectorConfig `mapstructure:"selector"`

// Association section allows to define rules for tagging spans, metrics,
// and logs with Pod metadata.
Association []PodAssociationConfig `mapstructure:"pod_association"`
Expand All @@ -51,6 +55,22 @@ func (cfg *Config) Validate() error {
return nil
}

// Selector section allows specifying selectors to select
// pods.
type SelectorConfig struct {
// Kubernetes namespace that the resource belongs to.
Namespace string `mapstructure:"namespace"`

// Kubernetes resource name.
Name string `mapstructure:"name"`

// API version of Kubernetes resource.
APIVersion string `mapstructure:"api_version"`

// Kubernetes resource type.
Kind string `mapstructure:"kind"`
}

// ExtractConfig section allows specifying extraction rules to extract
// data from k8s pod specs.
type ExtractConfig struct {
Expand Down
3 changes: 3 additions & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func createProcessorOpts(cfg component.Config) []option {
opts = append(opts, withFilterFields(oCfg.Filter.Fields...))
opts = append(opts, withAPIConfig(oCfg.APIConfig))

// selectors
opts = append(opts, withSelector(oCfg.Selector))

opts = append(opts, withExtractPodAssociations(oCfg.Association...))

opts = append(opts, withExcludes(oCfg.Exclude))
Expand Down
68 changes: 65 additions & 3 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type WatchClient struct {
Pods map[PodIdentifier]*Pod
Rules ExtractionRules
Filters Filters
Selectors Selectors
Associations []Association
Exclude Excludes

Expand All @@ -65,11 +66,12 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

// New initializes a new k8s Client.
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, selectors Selectors, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
c := &WatchClient{
logger: logger,
Rules: rules,
Filters: filters,
Selectors: selectors,
Associations: associations,
Exclude: exclude,
replicasetRegex: rRegex,
Expand Down Expand Up @@ -108,7 +110,12 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
newNamespaceInformer = newNamespaceSharedInformer
}

c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
namespace := c.Selectors.Namespace
if namespace == "" {
namespace = c.Filters.Namespace
}

c.informer = newInformer(c.kc, namespace, labelSelector, fieldSelector)
err = c.informer.SetTransform(
func(object interface{}) (interface{}, error) {
originalPod, success := object.(*api_v1.Pod)
Expand All @@ -133,7 +140,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
if newReplicaSetInformer == nil {
newReplicaSetInformer = newReplicaSetSharedInformer
}
c.replicasetInformer = newReplicaSetInformer(c.kc, c.Filters.Namespace)
c.replicasetInformer = newReplicaSetInformer(c.kc, namespace)
err = c.replicasetInformer.SetTransform(
func(object interface{}) (interface{}, error) {
originalReplicaset, success := object.(*apps_v1.ReplicaSet)
Expand Down Expand Up @@ -739,9 +746,64 @@ func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool {
}
}

// Check if the pod doesn't belong to the selector
return c.matchSelector(pod)
}

func (c *WatchClient) matchKind(kind, name, apiVersion string) bool {
if c.Selectors.Kind != "" && c.Selectors.Kind != kind {
return true
}

if c.Selectors.Name != "" && c.Selectors.Name != name {
return true
}

if c.Selectors.APIVersion != "" && c.Selectors.APIVersion != apiVersion {
return true
}

return false
}

func (c *WatchClient) matchSelector(pod *api_v1.Pod) bool {
if !c.Selectors.Enabled {
return false
}

if !c.matchKind(pod.Kind, pod.Name, pod.APIVersion) {
return false
}

for _, owner := range pod.OwnerReferences {
switch c.Selectors.Kind {
case "Deployment":
if owner.Kind == "ReplicaSet" {
if replicaset, ok := c.getReplicaSet(string(owner.UID)); ok {
if replicaset.Deployment.Name == c.Selectors.Name {
return false
}
}
} else {
return c.matchKind(owner.Kind, owner.Name, owner.APIVersion)
}
case "Job":
if c.Selectors.Kind == "CronJob" {
parts := c.cronJobRegex.FindStringSubmatch(owner.Name)
if len(parts) == 2 && c.Selectors.Name == parts[1] {
return false
}
} else {
return c.matchKind(owner.Kind, owner.Name, owner.APIVersion)
}
default:
return c.matchKind(owner.Kind, owner.Name, owner.APIVersion)
}
}

return true
}

func selectorsFromFilters(filters Filters) (labels.Selector, fields.Selector, error) {
labelSelector := labels.Everything()
for _, f := range filters.Labels {
Expand Down
9 changes: 5 additions & 4 deletions processor/k8sattributesprocessor/internal/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ func namespaceAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj in
}

func TestDefaultClientset(t *testing.T) {
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil)
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, Selectors{}, []Association{}, Excludes{}, nil, nil, nil, nil)
assert.Error(t, err)
assert.Equal(t, "invalid authType for kubernetes: ", err.Error())
assert.Nil(t, c)

c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil)
c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, Selectors{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, c)
}
Expand All @@ -131,6 +131,7 @@ func TestBadFilters(t *testing.T) {
k8sconfig.APIConfig{},
ExtractionRules{},
Filters{Fields: []FieldFilter{{Op: selection.Exists}}},
Selectors{},
[]Association{},
Excludes{},
newFakeAPIClientset,
Expand Down Expand Up @@ -173,7 +174,7 @@ func TestConstructorErrors(t *testing.T) {
gotAPIConfig = c
return nil, fmt.Errorf("error creating k8s client")
}
c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil)
c, err := New(zap.NewNop(), apiCfg, er, ff, Selectors{}, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil)
assert.Nil(t, c)
assert.Error(t, err)
assert.Equal(t, "error creating k8s client", err.Error())
Expand Down Expand Up @@ -1635,7 +1636,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o
},
},
}
c, err := New(logger, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer)
c, err := New(logger, k8sconfig.APIConfig{}, ExtractionRules{}, f, Selectors{}, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer)
require.NoError(t, err)
return c.(*WatchClient), logs
}
Expand Down
11 changes: 10 additions & 1 deletion processor/k8sattributesprocessor/internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Client interface {
}

// ClientProvider defines a func type that returns a new Client.
type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet) (Client, error)
type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, Selectors, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet) (Client, error)

// APIClientsetProvider defines a func type that initializes and return a new kubernetes
// Clientset object.
Expand Down Expand Up @@ -166,6 +166,15 @@ type Filters struct {
Labels []FieldFilter
}

// Selectors are used to select the pods which should be processed by the processor.
type Selectors struct {
Enabled bool
Namespace string
Name string
APIVersion string
Kind string
}

// FieldFilter represents exactly one filter by field rule.
type FieldFilter struct {
// Key matches the field name.
Expand Down
17 changes: 17 additions & 0 deletions processor/k8sattributesprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,23 @@ func extractFieldRules(fieldType string, fields ...FieldExtractConfig) ([]kube.F
return rules, nil
}

// withSelector allows specifying options to enable selector.
func withSelector(selector SelectorConfig) option {
return func(p *kubernetesprocessor) error {
if selector.Name == "" && selector.Kind == "" && selector.APIVersion == "" && selector.Namespace == "" {
p.selectors.Enabled = false
} else {
p.selectors.Enabled = true
p.selectors.Name = selector.Name
p.selectors.Kind = selector.Kind
p.selectors.APIVersion = selector.APIVersion
p.selectors.Namespace = selector.Namespace
}

return nil
}
}

// withFilterNode allows specifying options to control filtering pods by a node/host.
func withFilterNode(node, nodeFromEnvVar string) option {
return func(p *kubernetesprocessor) error {
Expand Down
37 changes: 28 additions & 9 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type kubernetesprocessor struct {
passthroughMode bool
rules kube.ExtractionRules
filters kube.Filters
selectors kube.Selectors
podAssociations []kube.Association
podIgnore kube.Excludes
}
Expand All @@ -40,7 +41,7 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub
kubeClient = kube.New
}
if !kp.passthroughMode {
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil)
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.selectors, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -68,36 +69,48 @@ func (kp *kubernetesprocessor) Shutdown(context.Context) error {

// processTraces process traces and add k8s metadata using resource IP or incoming IP as pod origin.
func (kp *kubernetesprocessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
processed := ptrace.NewTraces()
processed.ResourceSpans().EnsureCapacity(td.ResourceSpans().Len())
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
kp.processResource(ctx, rss.At(i).Resource())
if kp.processResource(ctx, rss.At(i).Resource()) {
rss.At(i).CopyTo(processed.ResourceSpans().AppendEmpty())
}
}

return td, nil
return processed, nil
}

// processMetrics process metrics and add k8s metadata using resource IP, hostname or incoming IP as pod origin.
func (kp *kubernetesprocessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
processed := pmetric.NewMetrics()
processed.ResourceMetrics().EnsureCapacity(md.ResourceMetrics().Len())
rm := md.ResourceMetrics()
for i := 0; i < rm.Len(); i++ {
kp.processResource(ctx, rm.At(i).Resource())
if kp.processResource(ctx, rm.At(i).Resource()) {
rm.At(i).CopyTo(processed.ResourceMetrics().AppendEmpty())
}
}

return md, nil
return processed, nil
}

// processLogs process logs and add k8s metadata using resource IP, hostname or incoming IP as pod origin.
func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
processed := plog.NewLogs()
processed.ResourceLogs().EnsureCapacity(ld.ResourceLogs().Len())
rl := ld.ResourceLogs()
for i := 0; i < rl.Len(); i++ {
kp.processResource(ctx, rl.At(i).Resource())
if kp.processResource(ctx, rl.At(i).Resource()) {
rl.At(i).CopyTo(processed.ResourceLogs().AppendEmpty())
}
}

return ld, nil
return processed, nil
}

// processResource adds Pod metadata tags to resource based on pod association configuration
func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource) {
func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource) bool {
podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations)
kp.logger.Debug("evaluating pod identifier", zap.Any("value", podIdentifierValue))

Expand All @@ -110,7 +123,7 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}
}
if kp.passthroughMode {
return
return true
}

if podIdentifierValue.IsNotEmpty() {
Expand All @@ -123,7 +136,11 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}
}
kp.addContainerAttributes(resource.Attributes(), pod)
} else if kp.selectors.Enabled {
return false
}
} else if kp.selectors.Enabled {
return false
}

namespace := stringAttributeFromMap(resource.Attributes(), conventions.AttributeK8SNamespaceName)
Expand All @@ -135,6 +152,8 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}
}
}

return true
}

// addContainerAttributes looks if pod has any container identifiers and adds additional container attributes
Expand Down
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestProcessorBadConfig(t *testing.T) {
}

func TestProcessorBadClientProvider(t *testing.T) {
clientProvider := func(_ *zap.Logger, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
clientProvider := func(_ *zap.Logger, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ kube.Selectors, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
return nil, fmt.Errorf("bad client error")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/spark
FROM apache/spark:3.4.0-python3

COPY sparkprograms/long_running.py /opt/spark/examples/src/main/python/long_running.py
RUN chmod +x /opt/spark/examples/src/main/python/long_running.py
Expand Down
1 change: 1 addition & 0 deletions receiver/elasticsearchreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
const elasticPort = "9200"

func TestIntegration(t *testing.T) {
t.Skip("Flaky test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19755")
t.Run("7.9.3", integrationTest("7_9_3"))
t.Run("7.16.3", integrationTest("7_16_3"))
}
Expand Down
Loading

0 comments on commit c935442

Please sign in to comment.