Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8S processor extensions #142

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e86c8ea
Add more fields being extracted from K8S
pmm-sumo Feb 14, 2020
0016ccc
Add tag configuration capabilities
pmm-sumo Feb 17, 2020
76972df
Normalize attribute names to keep them consistent with K8S API
pmm-sumo Feb 18, 2020
124894a
Adjust gofmt
pmm-sumo Feb 18, 2020
9f3f488
Adjust import order
pmm-sumo Feb 18, 2020
bdc70db
Fetch owner tree recursively for k8s processing
pmm-sumo Feb 19, 2020
6837106
Allow for generic label/annotation tagging
pmm-sumo Feb 20, 2020
071343a
Move label/annotation template to more relevant config section
pmm-sumo Feb 21, 2020
b423910
Fix newOwner initialization
pmm-sumo Feb 21, 2020
4870952
Initialize namespace cache
pmm-sumo Feb 21, 2020
67aa693
Basic microbenchmarks to for k8s metadata tagging
pmm-sumo Feb 25, 2020
9f64f42
Fix import grouping
pmm-sumo Feb 25, 2020
e045dc7
Fix client_test imports
pmm-sumo Feb 25, 2020
a52ea8b
Add obligatory cache warmup
pmm-sumo Feb 26, 2020
8ea11c9
Add observability to K8S API calls
pmm-sumo Feb 27, 2020
fe55fed
Fix cache warmup and follow code conventions
pmm-sumo Feb 27, 2020
210a6a5
Add unit to duration
pmm-sumo Feb 27, 2020
b7b050d
Handle Zipkin processing
pmm-sumo Mar 5, 2020
9b60435
Create empty map if not initialized
pmm-sumo Mar 7, 2020
c86aa0f
Add podIP debugging option
pmm-sumo Mar 9, 2020
943e200
Tidy go.mod
pmm-sumo Mar 20, 2020
037badb
Add license header
pmm-sumo Mar 20, 2020
43a6d80
Extract hostname from pod.name when the former is not set
pmm-sumo Mar 21, 2020
4783a42
Move the documentation to README.md
pmm-sumo Mar 21, 2020
0872e70
Improve configuration specification
pmm-sumo Mar 23, 2020
b0c3d3c
Use Informers for retrieving K8S metadata
pmm-sumo Apr 1, 2020
b53c391
Tidy the go.mod files
pmm-sumo Apr 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use Informers for retrieving K8S metadata
  • Loading branch information
pmm-sumo committed Apr 7, 2020
commit b0c3d3c5b4e735adc976c645df1270e1acfe7e2b
8 changes: 3 additions & 5 deletions processor/k8sprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ There are several top level sections of the processor config:
does not try to extract any other metadata. It does not need access to the K8S cluster API.
Agent/Collector must receive spans directly from services to be able to correctly detect the pod IPs.
- `owner_lookup_enabled` (default = false): when set to true, fields such as `daemonSetName`,
`replicaSetName`, etc. can be extracted, though it requires additional Kubernetes API calls to traverse
`replicaSetName`, `service`, etc. can be extracted, though it requires fetching additional data to traverse
the `owner` relationship. See the [list of fields](#k8sprocessor-extract) for more information over
which tags require the flag to be enabled.
- `extract`: the section (see [below](#k8sprocessor-extract)) allows specifying extraction rules
Expand All @@ -36,12 +36,12 @@ can be extracted:
- `deploymentName`
- `hostName`
- `namespace`
- `namespaceId` _(`owner_lookup_enabled` must be set to `true`)_
- `nodeName`
- `podId`
- `podName`
- `replicaSetName` _(`owner_lookup_enabled` must be set to `true`)_
- `serviceName` _(`owner_lookup_enabled` must be set to `true`)_
- `serviceName` _(`owner_lookup_enabled` must be set to `true`)_ - in case more than one service is assigned
to the pod, they are comma-separated
- `startTime`
- `statefulSetName` _(`owner_lookup_enabled` must be set to `true`)_

Expand All @@ -55,7 +55,6 @@ can be extracted:
- `deploymentName `: `k8s.deployment.name`
- `hostName `: `k8s.pod.hostname`
- `namespaceName `: `k8s.namespace.name`
- `namespaceID `: `k8s.namespace.id`
- `nodeName `: `k8s.node.name`
- `podID `: `k8s.pod.id`
- `podName `: `k8s.pod.name`
Expand Down Expand Up @@ -183,7 +182,6 @@ processors:
- deploymentName
- hostName
- namespace
- namespaceId
- nodeName
- podId
- podName
Expand Down
4 changes: 2 additions & 2 deletions processor/k8sprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestLoadConfig(t *testing.T) {
Extract: ExtractConfig{
Metadata: []string{
"containerId", "containerName", "containerImage", "clusterName", "daemonSetName",
"deploymentName", "hostName", "namespace", "namespaceId", "nodeName", "podId",
"podName", "replicaSetName", "serviceName", "startTime", "statefulSetName",
"deploymentName", "hostName", "namespace", "nodeName", "podId", "podName",
"replicaSetName", "serviceName", "startTime", "statefulSetName",
},
Tags: map[string]string{
"containerid": "my.namespace.containerId",
Expand Down
3 changes: 1 addition & 2 deletions processor/k8sprocessor/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ github.com/hashicorp/serf v0.8.3 h1:MWYcmct5EtKz0efYooPcL0yNkem+7kWxqXDi/UIh+8k=
github.com/hashicorp/serf v0.8.3/go.mod h1:UpNcs7fFbpKIyZaUuSW6EPiH+eZC7OuyFD+wc1oal+k=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down Expand Up @@ -863,8 +864,6 @@ github.com/parnurzeal/gorequest v0.2.15/go.mod h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfD
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b h1:yS0+/i6mwRZCdssUd+MkFJkCn/Evh1PlUKCYe3aCtQw=
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b/go.mod h1:x/hU0bfdWIhuOT1SKwiJg++yvkk6EuOtJk8WtDZqgr8=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
Expand Down
49 changes: 17 additions & 32 deletions processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,22 @@ func New(logger *zap.Logger, rules ExtractionRules, filters Filters, newClientSe
}
c.kc = kc

labelSelector, fieldSelector, err := selectorsFromFilters(c.Filters)
if err != nil {
return nil, err
}

if c.Rules.OwnerLookupEnabled {
if newOwnerProviderFunc == nil {
newOwnerProviderFunc = newOwnerProvider
}

c.op, err = newOwnerProviderFunc(logger, kc, !shouldWarmCache(filters))
c.op, err = newOwnerProviderFunc(logger, kc, labelSelector, fieldSelector, c.Filters.Namespace)
if err != nil {
return nil, err
}
}

labelSelector, fieldSelector, err := selectorsFromFilters(c.Filters)
if err != nil {
return nil, err
}
logger.Info(
"k8s filtering",
zap.String("labelSelector", labelSelector.String()),
Expand All @@ -102,6 +103,10 @@ func New(logger *zap.Logger, rules ExtractionRules, filters Filters, newClientSe

// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
func (c *WatchClient) Start() {
if c.op != nil {
c.op.Start()
}

c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
Expand All @@ -113,6 +118,10 @@ func (c *WatchClient) Start() {
// Stop signals the the k8s watcher/informer to stop watching for new events.
func (c *WatchClient) Stop() {
close(c.stopCh)

if c.op != nil {
c.op.Stop()
}
}

func (c *WatchClient) handlePodAdd(obj interface{}) {
Expand Down Expand Up @@ -256,10 +265,6 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.ReplicaSetName {
tags[c.Rules.Tags.ReplicaSetName] = owner.name
}
case "Service":
if c.Rules.ServiceName {
tags[c.Rules.Tags.ServiceName] = owner.name
}
case "StatefulSet":
if c.Rules.StatefulSetName {
tags[c.Rules.Tags.StatefulSetName] = owner.name
Expand All @@ -269,12 +274,10 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
}
}

if c.Rules.NamespaceID {
ns := c.op.GetNamespace(pod.Namespace)
if ns != nil {
tags[c.Rules.Tags.NamespaceID] = string(ns.UID)
}
if c.Rules.ServiceName {
tags[c.Rules.Tags.ServiceName] = strings.Join(c.op.GetServices(pod), ", ")
}

}

if len(pod.Status.ContainerStatuses) > 0 {
Expand Down Expand Up @@ -417,24 +420,6 @@ func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool {
return false
}

// shouldWarmCache check if there are filters applied; if this is the case, then pod scope is being
// limited and it is better to not do cache warmup and instead rely on just the lazy lookups
func shouldWarmCache(filters Filters) bool {
if len(filters.Labels) > 0 {
return true
}

if len(filters.Fields) > 0 {
return true
}

if filters.Node != "" {
return true
}

return false
}

func selectorsFromFilters(filters Filters) (labels.Selector, fields.Selector, error) {
labelSelector := labels.Everything()
for _, f := range filters.Labels {
Expand Down
5 changes: 1 addition & 4 deletions processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ func TestExtractionRules(t *testing.T) {
StatefulSetName: true,
StartTime: true,
Namespace: true,
NamespaceID: true,
NodeName: true,
OwnerLookupEnabled: true,
Tags: NewExtractionFieldTags(),
Expand All @@ -246,8 +245,8 @@ func TestExtractionRules(t *testing.T) {
"k8s.pod.name": "auth-service-abc12-xyz3",
"k8s.pod.startTime": pod.GetCreationTimestamp().String(),
"k8s.replicaset.name": "SomeReplicaSet",
"k8s.service.name": "foo, bar",
"k8s.namespace.name": "ns1",
"k8s.namespace.id": "33333-66666",
"k8s.node.name": "node1",
},
}, {
Expand All @@ -267,7 +266,6 @@ func TestExtractionRules(t *testing.T) {
StatefulSetName: false,
StartTime: false,
Namespace: false,
NamespaceID: false,
NodeName: false,
Tags: ExtractionFieldTags{
ClusterName: "cc",
Expand Down Expand Up @@ -507,7 +505,6 @@ func newBenchmarkClient(b *testing.B) *WatchClient {
StatefulSetName: true,
StartTime: true,
Namespace: true,
NamespaceID: true,
NodeName: true,
Tags: NewExtractionFieldTags(),
}
Expand Down
68 changes: 24 additions & 44 deletions processor/k8sprocessor/kube/fake_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,51 @@
package kube

import (
"time"

gocache "github.com/patrickmn/go-cache"
"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor/observability"
)

// fakeOwnerCache is a simple structure which aids querying for owners
type fakeOwnerCache struct {
logger *zap.Logger
objectOwnersCache *gocache.Cache
apiCallDuration time.Duration
logger *zap.Logger
objectOwners map[string]*ObjectOwner
}

// NewOwnerProvider creates new instance of the owners api
func newFakeOwnerProvider(logger *zap.Logger,
clientset *kubernetes.Clientset,
cacheWarmupEnabled bool) (OwnerAPI, error) {
labelSelector labels.Selector,
fieldSelector fields.Selector,
namespace string) (OwnerAPI, error) {
ownerCache := fakeOwnerCache{}
ownerCache.objectOwnersCache = gocache.New(15*time.Minute, 30*time.Minute)
ownerCache.apiCallDuration = 50 * time.Millisecond
ownerCache.objectOwners = map[string]*ObjectOwner{}
ownerCache.logger = logger
return &ownerCache, nil
}

// GetNamespace retrieves relevant metadata from API or from cache
func (op *fakeOwnerCache) GetNamespace(namespace string) *ObjectOwner {
oo := ObjectOwner{
UID: "33333-66666",
namespace: namespace,
UID: "1a1658f9-7818-11e9-90f1-02324f7e0d1e",
namespace: "kube-system",
ownerUIDs: []types.UID{},
kind: "namespace",
name: namespace,
kind: "ReplicaSet",
name: "SomeReplicaSet",
}
ownerCache.objectOwners[string(oo.UID)] = &oo

return &oo
return &ownerCache, nil
}

func (op *fakeOwnerCache) deepCacheObject(namespace string, kind string, name string, objectUID types.UID) {
startTime := time.Now()

time.Sleep(op.apiCallDuration)
// Start
func (op *fakeOwnerCache) Start() {}

oo := ObjectOwner{
UID: objectUID,
namespace: namespace,
ownerUIDs: []types.UID{},
kind: kind,
name: name,
}
observability.RecordAPICallMadeAndLatency(&startTime)
// Stop
func (op *fakeOwnerCache) Stop() {}

op.objectOwnersCache.Add(string(oo.UID), &oo, gocache.DefaultExpiration)
// GetServices fetches list of services for a given pod
func (op *fakeOwnerCache) GetServices(pod *api_v1.Pod) []string {
return []string{"foo", "bar"}
}

// GetOwners fetches deep tree of owners for a given pod
Expand All @@ -80,19 +68,11 @@ func (op *fakeOwnerCache) GetOwners(pod *api_v1.Pod) []*ObjectOwner {

// Make sure the tree is cached/traversed first
for _, or := range pod.OwnerReferences {
_, found := op.objectOwnersCache.Get(string(or.UID))
if !found {
op.deepCacheObject(pod.Namespace, or.Kind, or.Name, or.UID)
oo, found := op.objectOwners[string(or.UID)]
if found {
objectOwners = append(objectOwners, oo)
}
}
oo := ObjectOwner{
UID: "12345",
namespace: pod.Namespace,
ownerUIDs: []types.UID{},
kind: "ReplicaSet",
name: "SomeReplicaSet",
}

objectOwners = append(objectOwners, &oo)
return objectOwners
}
4 changes: 0 additions & 4 deletions processor/k8sprocessor/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ const (
defaultTagDeploymentName = "k8s.deployment.name"
defaultTagHostName = "k8s.pod.hostname"
defaultTagNamespaceName = "k8s.namespace.name"
defaultTagNamespaceID = "k8s.namespace.id"
defaultTagNodeName = "k8s.node.name"
defaultTagPodID = "k8s.pod.id"
defaultTagPodName = "k8s.pod.name"
Expand Down Expand Up @@ -128,7 +127,6 @@ type ExtractionRules struct {
StatefulSetName bool
StartTime bool
Namespace bool
NamespaceID bool
NodeName bool

OwnerLookupEnabled bool
Expand All @@ -150,7 +148,6 @@ type ExtractionFieldTags struct {
PodID string
PodName string
Namespace string
NamespaceID string
NodeName string
ReplicaSetName string
ServiceName string
Expand All @@ -171,7 +168,6 @@ func NewExtractionFieldTags() ExtractionFieldTags {
tags.PodID = defaultTagPodID
tags.PodName = defaultTagPodName
tags.Namespace = defaultTagNamespaceName
tags.NamespaceID = defaultTagNamespaceID
tags.NodeName = defaultTagNodeName
tags.ReplicaSetName = defaultTagReplicaSetName
tags.ServiceName = defaultTagServiceName
Expand Down
Loading