Skip to content

Commit

Permalink
[processor/k8sattributes] Fix node/ns labels/annotations extraction (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#28838)

Set attributes from namespace/node labels or annotations even if
`k8s.namespace.name` and `k8s.node.name` are not extracted.

Fixes
open-telemetry#28837
  • Loading branch information
dmitryax authored and jmsnll committed Nov 12, 2023
1 parent c401891 commit 5d3249a
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 4 deletions.
11 changes: 11 additions & 0 deletions .chloggen/k8s-attrs-fix-ns-node-labels-annotation-setting.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Set attributes from namespace/node labels or annotations even if node/namespaces attribute are not set.

# One or more tracking issues related to the change
issues: [28837]
22 changes: 19 additions & 3 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
return
}

var pod *kube.Pod
if podIdentifierValue.IsNotEmpty() {
if pod, ok := kp.kc.GetPod(podIdentifierValue); ok {
var podFound bool
if pod, podFound = kp.kc.GetPod(podIdentifierValue); podFound {
kp.logger.Debug("getting the pod", zap.Any("pod", pod))

for key, val := range pod.Attributes {
Expand All @@ -126,7 +128,7 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}
}

namespace := stringAttributeFromMap(resource.Attributes(), conventions.AttributeK8SNamespaceName)
namespace := getNamespace(pod, resource.Attributes())
if namespace != "" {
attrsToAdd := kp.getAttributesForPodsNamespace(namespace)
for key, val := range attrsToAdd {
Expand All @@ -136,7 +138,7 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}
}

nodeName := stringAttributeFromMap(resource.Attributes(), conventions.AttributeK8SNodeName)
nodeName := getNodeName(pod, resource.Attributes())
if nodeName != "" {
attrsToAdd := kp.getAttributesForPodsNode(nodeName)
for key, val := range attrsToAdd {
Expand All @@ -147,6 +149,20 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
}
}

func getNamespace(pod *kube.Pod, resAttrs pcommon.Map) string {
if pod != nil && pod.Namespace != "" {
return pod.Namespace
}
return stringAttributeFromMap(resAttrs, conventions.AttributeK8SNamespaceName)
}

func getNodeName(pod *kube.Pod, resAttrs pcommon.Map) string {
if pod != nil && pod.NodeName != "" {
return pod.NodeName
}
return stringAttributeFromMap(resAttrs, conventions.AttributeK8SNodeName)
}

// addContainerAttributes looks if pod has any container identifiers and adds additional container attributes
func (kp *kubernetesprocessor) addContainerAttributes(attrs pcommon.Map, pod *kube.Pod) {
containerName := stringAttributeFromMap(attrs, conventions.AttributeK8SContainerName)
Expand Down
146 changes: 145 additions & 1 deletion processor/k8sattributesprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func TestPodUID(t *testing.T) {
})
}

func TestProcessorAddLabels(t *testing.T) {
func TestAddPodLabels(t *testing.T) {
m := newMultiTest(
t,
NewFactory().CreateDefaultConfig(),
Expand Down Expand Up @@ -727,6 +727,150 @@ func TestProcessorAddLabels(t *testing.T) {
}
}

func TestAddNamespaceLabels(t *testing.T) {
m := newMultiTest(
t,
func() component.Config {
cfg := createDefaultConfig().(*Config)
cfg.Extract.Metadata = []string{}
cfg.Extract.Labels = []FieldExtractConfig{
{
From: kube.MetadataFromNamespace,
Key: "namespace-label",
},
}
return cfg
}(),
nil,
)

podIP := "1.1.1.1"
namespaces := map[string]map[string]string{
"namespace-1": {
"nslabel": "1",
},
"namespace-2": {
"nslabel": "2",
},
}
m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
kp.podAssociations = []kube.Association{
{
Sources: []kube.AssociationSource{
{
From: "connection",
},
},
},
}
})

m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
pi := kube.PodIdentifier{
kube.PodIdentifierAttributeFromConnection(podIP),
}
kp.kc.(*fakeClient).Pods[pi] = &kube.Pod{Name: "test-2323", Namespace: "namespace-1"}
kp.kc.(*fakeClient).Namespaces = make(map[string]*kube.Namespace)
for ns, labels := range namespaces {
kp.kc.(*fakeClient).Namespaces[ns] = &kube.Namespace{Attributes: labels}
}
})

ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{
IP: net.ParseIP(podIP),
},
})
m.testConsume(
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})

m.assertBatchesLen(1)
m.assertResourceObjectLen(0)
m.assertResource(0, func(res pcommon.Resource) {
assert.Equal(t, 2, res.Attributes().Len())
assertResourceHasStringAttribute(t, res, "k8s.pod.ip", podIP)
assertResourceHasStringAttribute(t, res, "nslabel", "1")
})
}

func TestAddNodeLabels(t *testing.T) {
m := newMultiTest(
t,
func() component.Config {
cfg := createDefaultConfig().(*Config)
cfg.Extract.Metadata = []string{}
cfg.Extract.Labels = []FieldExtractConfig{
{
From: kube.MetadataFromNode,
Key: "node-label",
},
}
return cfg
}(),
nil,
)

podIP := "1.1.1.1"
nodes := map[string]map[string]string{
"node-1": {
"nodelabel": "1",
},
"node-2": {
"nodelabel": "2",
},
}
m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
kp.podAssociations = []kube.Association{
{
Sources: []kube.AssociationSource{
{
From: "connection",
},
},
},
}
})

m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
pi := kube.PodIdentifier{
kube.PodIdentifierAttributeFromConnection(podIP),
}
kp.kc.(*fakeClient).Pods[pi] = &kube.Pod{Name: "test-2323", NodeName: "node-1"}
kp.kc.(*fakeClient).Nodes = make(map[string]*kube.Node)
for ns, labels := range nodes {
kp.kc.(*fakeClient).Nodes[ns] = &kube.Node{Attributes: labels}
}
})

ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{
IP: net.ParseIP(podIP),
},
})
m.testConsume(
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})

m.assertBatchesLen(1)
m.assertResourceObjectLen(0)
m.assertResource(0, func(res pcommon.Resource) {
assert.Equal(t, 2, res.Attributes().Len())
assertResourceHasStringAttribute(t, res, "k8s.pod.ip", podIP)
assertResourceHasStringAttribute(t, res, "nodelabel", "1")
})
}

func TestProcessorAddContainerAttributes(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 5d3249a

Please sign in to comment.