Skip to content

Commit

Permalink
[k8sobserver] incorporate EndpointsWatcher (open-telemetry#11544)
Browse files Browse the repository at this point in the history
These changes move the k8s observer to include the existing observer.EndpointsWatcher functionality in preparation for supporting multiple subscribers.
  • Loading branch information
rmfitzpatrick committed Jul 12, 2022
1 parent ec809ba commit ebae915
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 156 deletions.
51 changes: 29 additions & 22 deletions extension/observer/k8sobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector

import (
"context"
"fmt"
"sync"
"time"

"go.opentelemetry.io/collector/component"
v1 "k8s.io/api/core/v1"
Expand All @@ -30,27 +33,39 @@ var _ component.Extension = (*k8sObserver)(nil)
var _ observer.Observable = (*k8sObserver)(nil)

type k8sObserver struct {
*observer.EndpointsWatcher
telemetry component.TelemetrySettings
podInformer cache.SharedInformer
podListerWatcher cache.ListerWatcher
nodeInformer cache.SharedInformer
nodeListerWatcher cache.ListerWatcher
handler *handler
once *sync.Once
stop chan struct{}
config *Config
}

// Start will populate the cache.SharedInformers for pods and nodes as configured and run them as goroutines.
func (k *k8sObserver) Start(ctx context.Context, host component.Host) error {
if k.podListerWatcher != nil && k.podInformer == nil {
k.telemetry.Logger.Debug("creating and starting pod informer")
k.podInformer = cache.NewSharedInformer(k.podListerWatcher, &v1.Pod{}, 0)
go k.podInformer.Run(k.stop)
if k.once == nil {
return fmt.Errorf("cannot Start() partial k8sObserver (nil *sync.Once)")
}
if k.nodeListerWatcher != nil && k.nodeInformer == nil {
k.telemetry.Logger.Debug("creating and starting node informer")
k.nodeInformer = cache.NewSharedInformer(k.nodeListerWatcher, &v1.Node{}, 0)
go k.nodeInformer.Run(k.stop)
if k.handler == nil {
return fmt.Errorf("cannot Start() partial k8sObserver (nil *handler)")
}

k.once.Do(func() {
if k.podListerWatcher != nil {
k.telemetry.Logger.Debug("creating and starting pod informer")
podInformer := cache.NewSharedInformer(k.podListerWatcher, &v1.Pod{}, 0)
podInformer.AddEventHandler(k.handler)
go podInformer.Run(k.stop)
}
if k.nodeListerWatcher != nil {
k.telemetry.Logger.Debug("creating and starting node informer")
nodeInformer := cache.NewSharedInformer(k.nodeListerWatcher, &v1.Node{}, 0)
go nodeInformer.Run(k.stop)
nodeInformer.AddEventHandler(k.handler)
}
})
return nil
}

Expand All @@ -60,17 +75,6 @@ func (k *k8sObserver) Shutdown(ctx context.Context) error {
return nil
}

// ListAndWatch sets the respective cache.SharedInformer event handlers to inform the
// provided observer.Notify listener of pod and node entity updates
func (k *k8sObserver) ListAndWatch(listener observer.Notify) {
if k.podInformer != nil {
k.podInformer.AddEventHandler(&handler{listener: listener, idNamespace: k.config.ID().String(), logger: k.telemetry.Logger})
}
if k.nodeInformer != nil {
k.nodeInformer.AddEventHandler(&handler{listener: listener, idNamespace: k.config.ID().String(), logger: k.telemetry.Logger})
}
}

// newObserver creates a new k8s observer extension.
func newObserver(config *Config, telemetrySettings component.TelemetrySettings) (component.Extension, error) {
client, err := k8sconfig.MakeClient(config.APIConfig)
Expand Down Expand Up @@ -102,13 +106,16 @@ func newObserver(config *Config, telemetrySettings component.TelemetrySettings)
telemetrySettings.Logger.Debug("observing nodes")
nodeListerWatcher = cache.NewListWatchFromClient(restClient, "nodes", v1.NamespaceAll, nodeSelector)
}

h := &handler{idNamespace: config.ID().String(), endpoints: &sync.Map{}, logger: telemetrySettings.Logger}
obs := &k8sObserver{
EndpointsWatcher: &observer.EndpointsWatcher{Endpointslister: h, RefreshInterval: time.Second},
telemetry: telemetrySettings,
podListerWatcher: podListerWatcher,
nodeListerWatcher: nodeListerWatcher,
stop: make(chan struct{}),
config: config,
handler: h,
once: &sync.Once{},
}

return obs, nil
Expand Down
46 changes: 46 additions & 0 deletions extension/observer/k8sobserver/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ func TestExtensionObservePods(t *testing.T) {
},
}, sink.added[0])

podListerWatcher.Modify(pod1V2)

requireSink(t, sink, func() bool {
return len(sink.changed) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/pod1-UID",
Target: "1.2.3.4",
Details: &observer.Pod{
Name: "pod1",
Namespace: "default",
UID: "pod1-UID",
Labels: map[string]string{
"env": "prod",
"pod-version": "2",
},
},
}, sink.changed[0])

podListerWatcher.Delete(pod1V2)

requireSink(t, sink, func() bool {
Expand Down Expand Up @@ -149,6 +169,32 @@ func TestExtensionObserveNodes(t *testing.T) {
},
}, sink.added[0])

nodeListerWatcher.Modify(node1V2)

requireSink(t, sink, func() bool {
return len(sink.changed) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/node1-uid",
Target: "internalIP",
Details: &observer.K8sNode{
UID: "uid",
Annotations: map[string]string{"annotation-key": "annotation-value"},
Labels: map[string]string{
"label-key": "label-value",
"node-version": "2",
},
Name: "node1",
InternalIP: "internalIP",
InternalDNS: "internalDNS",
Hostname: "localhost",
ExternalIP: "externalIP",
ExternalDNS: "externalDNS",
KubeletEndpointPort: 1234,
},
}, sink.changed[0])

nodeListerWatcher.Delete(node1V2)

requireSink(t, sink, func() bool {
Expand Down
79 changes: 43 additions & 36 deletions extension/observer/k8sobserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector
import (
"encoding/json"
"reflect"
"sync"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
Expand All @@ -25,13 +26,30 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

var _ cache.ResourceEventHandler = (*handler)(nil)
var _ observer.EndpointsLister = (*handler)(nil)

// handler handles k8s cache informer callbacks.
type handler struct {
// idNamespace should be some unique token to distinguish multiple handler instances.
idNamespace string
// listener is the callback for discovered endpoints.
listener observer.Notify
logger *zap.Logger
// endpoints is a map[observer.EndpointID]observer.Endpoint all existing endpoints at any given moment
endpoints *sync.Map

logger *zap.Logger
}

func (h *handler) ListEndpoints() []observer.Endpoint {
var endpoints []observer.Endpoint
h.endpoints.Range(func(endpointID, endpoint interface{}) bool {
if e, ok := endpoint.(observer.Endpoint); ok {
endpoints = append(endpoints, e)
} else {
h.logger.Info("failed listing endpoint", zap.Any("endpointID", endpointID), zap.Any("endpoint", endpoint))
}
return true
})
return endpoints
}

// OnAdd is called in response to a new pod or node being detected.
Expand All @@ -47,15 +65,11 @@ func (h *handler) OnAdd(objectInterface interface{}) {
return
}

h.logEndpointEvent("endpoints added", endpoints)

for _, endpoint := range endpoints {
if env, err := endpoint.Env(); err == nil {
if marshaled, err := json.Marshal(env); err == nil {
h.logger.Debug("endpoint added", zap.String("env", string(marshaled)))
}
}
h.endpoints.Store(endpoint.ID, endpoint)
}

h.listener.OnAdd(endpoints)
}

// OnUpdate is called in response to an existing pod or node changing.
Expand Down Expand Up @@ -112,42 +126,25 @@ func (h *handler) OnUpdate(oldObjectInterface, newObjectInterface interface{}) {
}

if len(removedEndpoints) > 0 {
h.logEndpointEvent("endpoints removed (via update)", removedEndpoints)
for _, endpoint := range removedEndpoints {
if env, err := endpoint.Env(); err == nil {
if marshaled, err := json.Marshal(env); err == nil {
h.logger.Debug("endpoint removed (via update)", zap.String("env", string(marshaled)))
}
}
h.endpoints.Delete(endpoint.ID)
}
h.listener.OnRemove(removedEndpoints)
}

if len(updatedEndpoints) > 0 {
h.logEndpointEvent("endpoints changed (via update)", updatedEndpoints)
for _, endpoint := range updatedEndpoints {
if env, err := endpoint.Env(); err == nil {
if marshaled, err := json.Marshal(env); err == nil {
h.logger.Debug("endpoint changed (via update)", zap.String("env", string(marshaled)))
}
}
h.endpoints.Store(endpoint.ID, endpoint)
}
h.listener.OnChange(updatedEndpoints)
}

if len(addedEndpoints) > 0 {
h.logEndpointEvent("endpoints added (via update)", addedEndpoints)
for _, endpoint := range addedEndpoints {
if env, err := endpoint.Env(); err == nil {
if marshaled, err := json.Marshal(env); err == nil {
h.logger.Debug("endpoint added (via update)", zap.String("env", string(marshaled)))
}
}
h.endpoints.Store(endpoint.ID, endpoint)
}
h.listener.OnAdd(addedEndpoints)
}

// TODO: can changes be missed where a pod is deleted but we don't
// send remove notifications for some of its endpoints? If not provable
// then maybe keep track of pod -> endpoint association to be sure
// they are all cleaned up.
}

// OnDelete is called in response to a pod or node being deleted.
Expand All @@ -172,13 +169,23 @@ func (h *handler) OnDelete(objectInterface interface{}) {
return
}
if len(endpoints) != 0 {
h.logEndpointEvent("endpoints deleted", endpoints)
for _, endpoint := range endpoints {
h.endpoints.Delete(endpoint.ID)
}
}
}

func (h *handler) logEndpointEvent(msg string, endpoints []observer.Endpoint) {
if ce := h.logger.Check(zap.DebugLevel, msg); ce != nil {
var fields []zap.Field
for _, endpoint := range endpoints {
if env, err := endpoint.Env(); err == nil {
if marshaled, err := json.Marshal(env); err == nil {
h.logger.Debug("endpoint deleted", zap.String("env", string(marshaled)))
if marshaled, e := json.Marshal(env); e == nil {
fields = append(fields, zap.String(string(endpoint.ID), string(marshaled)))
}
}
}
h.listener.OnRemove(endpoints)
ce.Write(fields...)
}
}
Loading

0 comments on commit ebae915

Please sign in to comment.