Skip to content

Commit

Permalink
kubeletstatsreceiver: Sync available volume metadata from /pods endpo…
Browse files Browse the repository at this point in the history
…int (#690)

* kubeletstatsreceiver: Sync available volume metadata from /pods endpoint

* Address feedback

* Rename getExtraVolumeMetadata to setExtraVolumeMetadata

* Fix typo

Co-authored-by: Dmitrii Anoshin <[email protected]>

Co-authored-by: Dmitrii Anoshin <[email protected]>
  • Loading branch information
asuresh4 and dmitryax committed Aug 18, 2020
1 parent 1947955 commit 14fe86a
Show file tree
Hide file tree
Showing 13 changed files with 573 additions and 122 deletions.
13 changes: 9 additions & 4 deletions receiver/kubeletstatsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,16 @@ service:

### Extra metadata labels

By default all produced metrics get resource labels based on what kubelet /stats/summary endpoint provides.
By default, all produced metrics get resource labels based on what kubelet /stats/summary endpoint provides.
For some use cases it might be not enough. So it's possible to leverage other endpoints to fetch
additional metadata entities and set them as extra labels on metric resource.
The only additional label supported at the moment is `container.id`. If you want to have that label
added to your metrics, use `extra_metadata_labels` field to enable it, for example:
additional metadata entities and set them as extra labels on metric resource. Currently supported metadata
include the following -

- `container.id` - to augment metrics with Container ID label obtained from container statuses exposed via `/pods`.
- `k8s.volume.type` - to collect volume type from the Pod spec exposed via `/pods` and have it as a label on volume metrics.

If you want to have `container.id` label added to your metrics, use `extra_metadata_labels` field to enable
it, for example:

```yaml
receivers:
Expand Down
2 changes: 1 addition & 1 deletion receiver/kubeletstatsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {
// ExtraMetadataLabels contains list of extra metadata that should be taken from /pods endpoint
// and put as extra labels on metrics resource.
// No additional metadata is fetched by default, so there are no extra calls to /pods endpoint.
// Only container.id label is supported at the moment.
// Supported values include container.id and k8s.volume.type.
ExtraMetadataLabels []kubelet.MetadataLabel `mapstructure:"extra_metadata_labels"`

// MetricGroupsToCollect provides a list of metrics groups to collect metrics from.
Expand Down
7 changes: 5 additions & 2 deletions receiver/kubeletstatsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,11 @@ func TestLoadConfig(t *testing.T) {
AuthType: "serviceAccount",
},
},
CollectionInterval: duration,
ExtraMetadataLabels: []kubelet.MetadataLabel{kubelet.MetadataLabelContainerID},
CollectionInterval: duration,
ExtraMetadataLabels: []kubelet.MetadataLabel{
kubelet.MetadataLabelContainerID,
kubelet.MetadataLabelVolumeType,
},
MetricGroupsToCollect: []kubelet.MetricGroup{
kubelet.ContainerMetricGroup,
kubelet.PodMetricGroup,
Expand Down
11 changes: 10 additions & 1 deletion receiver/kubeletstatsreceiver/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,16 @@ func (a *metricDataAccumulator) volumeStats(podResource *resourcepb.Resource, s
return
}

volume := volumeResource(podResource, s)
volume, err := volumeResource(podResource, s, a.metadata)
if err != nil {
a.logger.Warn(
"Failed to gather additional volume metadata. Skipping metric collection.",
zap.String("pod", podResource.Labels[conventions.AttributeK8sPod]),
zap.String("volume", podResource.Labels[labelVolumeName]),
zap.Error(err),
)
return
}

a.accumulate(
nil,
Expand Down
186 changes: 144 additions & 42 deletions receiver/kubeletstatsreceiver/kubelet/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,58 +30,160 @@ import (
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)

// TestContainerStatsMetadataNotFound walks through the error cases of containerStats.
// Happy paths are covered in metadata_test.go
func TestContainerStatsMetadataNotFound(t *testing.T) {
now := metav1.Now()
podResource := &resourcepb.Resource{
Labels: map[string]string{
"k8s.pod.uid": "pod-uid-123",
"k8s.container.name": "container1",
},
}
containerStats := stats.ContainerStats{
Name: "container1",
StartTime: now,
}
metadata := NewMetadata(
[]MetadataLabel{MetadataLabelContainerID},
&v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("pod-uid-123"),
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
// different container name
Name: "container2",
ContainerID: "test-container",
// TestMetadataErrorCases walks through the error cases of collecting
// metadata. Happy paths are covered in metadata_test.go.
func TestMetadataErrorCases(t *testing.T) {
tests := []struct {
name string
metricGroupsToCollect map[MetricGroup]bool
testScenario func(acc metricDataAccumulator)
metadata Metadata
numMDs int
numLogs int
logMessages []string
}{
{
name: "Fails to get container metadata",
metricGroupsToCollect: map[MetricGroup]bool{
ContainerMetricGroup: true,
},
metadata: NewMetadata(
[]MetadataLabel{MetadataLabelContainerID},
&v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("pod-uid-123"),
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
// different container name
Name: "container2",
ContainerID: "test-container",
},
},
},
},
},
},
),
testScenario: func(acc metricDataAccumulator) {
now := metav1.Now()
podResource := &resourcepb.Resource{
Labels: map[string]string{
"k8s.pod.uid": "pod-uid-123",
"k8s.container.name": "container1",
},
}
containerStats := stats.ContainerStats{
Name: "container1",
StartTime: now,
}

acc.containerStats(podResource, containerStats)
},
numMDs: 0,
numLogs: 1,
logMessages: []string{
"failed to fetch container metrics",
},
},
)
{
name: "Fails to get volume metadata - no pods data",
metricGroupsToCollect: map[MetricGroup]bool{
VolumeMetricGroup: true,
},
metadata: NewMetadata(
[]MetadataLabel{MetadataLabelVolumeType},
nil,
),
testScenario: func(acc metricDataAccumulator) {
podResource := &resourcepb.Resource{
Labels: map[string]string{
"k8s.pod.uid": "pod-uid-123",
},
}
volumeStats := stats.VolumeStats{
Name: "volume-1",
}

observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)
acc.volumeStats(podResource, volumeStats)
},
numMDs: 0,
numLogs: 1,
logMessages: []string{
"Failed to gather additional volume metadata. Skipping metric collection.",
},
},

mds := []*consumerdata.MetricsData{}
acc := metricDataAccumulator{
m: mds,
metadata: metadata,
logger: logger,
metricGroupsToCollect: map[MetricGroup]bool{
ContainerMetricGroup: true,
{
name: "Fails to get volume metadata - volume not found",
metricGroupsToCollect: map[MetricGroup]bool{
VolumeMetricGroup: true,
},
metadata: NewMetadata(
[]MetadataLabel{MetadataLabelVolumeType},
&v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("pod-uid-123"),
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-0",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{},
},
},
},
},
},
},
},
),
testScenario: func(acc metricDataAccumulator) {
podResource := &resourcepb.Resource{
Labels: map[string]string{
"k8s.pod.uid": "pod-uid-123",
},
}
volumeStats := stats.VolumeStats{
Name: "volume-1",
}

acc.volumeStats(podResource, volumeStats)
},
numMDs: 0,
numLogs: 1,
logMessages: []string{
"Failed to gather additional volume metadata. Skipping metric collection.",
},
},
}

acc.containerStats(podResource, containerStats)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)

assert.Equal(t, 0, len(mds))
require.Equal(t, 1, logs.Len())
assert.Equal(t, "failed to fetch container metrics", logs.All()[0].Message)
var mds []*consumerdata.MetricsData
acc := metricDataAccumulator{
m: mds,
metadata: tt.metadata,
logger: logger,
metricGroupsToCollect: tt.metricGroupsToCollect,
}

tt.testScenario(acc)

assert.Equal(t, tt.numMDs, len(mds))
require.Equal(t, tt.numLogs, logs.Len())
for i := 0; i < tt.numLogs; i++ {
assert.Equal(t, tt.logMessages[i], logs.All()[i].Message)
}
})
}
}
1 change: 1 addition & 0 deletions receiver/kubeletstatsreceiver/kubelet/conventions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ package kubelet
const (
labelNodeName = "k8s.node.name"
labelVolumeName = "k8s.volume.name"
labelVolumeType = "k8s.volume.type"
)
73 changes: 55 additions & 18 deletions receiver/kubeletstatsreceiver/kubelet/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ type MetadataLabel string

const (
MetadataLabelContainerID MetadataLabel = conventions.AttributeContainerID
MetadataLabelVolumeType MetadataLabel = labelVolumeType
)

var supportedLabels = map[MetadataLabel]bool{
MetadataLabelContainerID: true,
MetadataLabelVolumeType: true,
}

// ValidateMetadataLabelsConfig validates that provided list of metadata labels is supported
Expand All @@ -51,28 +53,50 @@ func ValidateMetadataLabelsConfig(labels []MetadataLabel) error {
}

type Metadata struct {
Labels []MetadataLabel
Labels map[MetadataLabel]bool
PodsMetadata *v1.PodList
}

func NewMetadata(labels []MetadataLabel, podsMetadata *v1.PodList) Metadata {
return Metadata{
Labels: labels,
Labels: getLabelsMap(labels),
PodsMetadata: podsMetadata,
}
}

// setExtraLabels sets extra labels in `lables` map based on available metadata
func (m *Metadata) setExtraLabels(labels map[string]string, podUID string, containerName string) error {
for _, label := range m.Labels {
switch label {
case MetadataLabelContainerID:
containerID, err := m.getContainerID(podUID, containerName)
if err != nil {
return err
}
labels[conventions.AttributeContainerID] = containerID
return nil
func getLabelsMap(metadataLabels []MetadataLabel) map[MetadataLabel]bool {
out := make(map[MetadataLabel]bool, len(metadataLabels))
for _, l := range metadataLabels {
out[l] = true
}
return out
}

// setExtraLabels sets extra labels in `labels` map based on provided metadata label.
func (m *Metadata) setExtraLabels(
labels map[string]string, podUID string,
extraMetadataLabel MetadataLabel, extraMetadataFrom string) error {
// Ensure MetadataLabel exists before proceeding.
if !m.Labels[extraMetadataLabel] || len(m.Labels) == 0 {
return nil
}

// Cannot proceed, if metadata is unavailable.
if m.PodsMetadata == nil {
return errors.New("pods metadata were not fetched")
}

switch extraMetadataLabel {
case MetadataLabelContainerID:
containerID, err := m.getContainerID(podUID, extraMetadataFrom)
if err != nil {
return err
}
labels[conventions.AttributeContainerID] = containerID
case MetadataLabelVolumeType:
err := m.setExtraVolumeMetadata(podUID, extraMetadataFrom, labels)
if err != nil {
return err
}
}
return nil
Expand All @@ -81,12 +105,9 @@ func (m *Metadata) setExtraLabels(labels map[string]string, podUID string, conta
// getContainerID retrieves container id from metadata for given pod UID and container name,
// returns an error if no container found in the metadata that matches the requirements.
func (m *Metadata) getContainerID(podUID string, containerName string) (string, error) {
if m.PodsMetadata == nil {
return "", errors.New("pods metadata were not fetched")
}

uid := types.UID(podUID)
for _, pod := range m.PodsMetadata.Items {
if pod.UID == types.UID(podUID) {
if pod.UID == uid {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerName == containerStatus.Name {
return stripContainerID(containerStatus.ContainerID), nil
Expand All @@ -104,3 +125,19 @@ var containerSchemeRegexp = regexp.MustCompile(`^[\w_-]+:https://`)
func stripContainerID(id string) string {
return containerSchemeRegexp.ReplaceAllString(id, "")
}

func (m *Metadata) setExtraVolumeMetadata(podUID string, volumeName string, labels map[string]string) error {
uid := types.UID(podUID)
for _, pod := range m.PodsMetadata.Items {
if pod.UID == uid {
for _, volume := range pod.Spec.Volumes {
if volumeName == volume.Name {
getLabelsFromVolume(volume, labels)
return nil
}
}
}
}

return fmt.Errorf("pod %q with volume %q not found in the fetched metadata", podUID, volumeName)
}
Loading

0 comments on commit 14fe86a

Please sign in to comment.