From 20e9a0257d02e20e75f9bdd9d58cf6246190a351 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Fri, 21 Jul 2023 18:56:10 -0400 Subject: [PATCH] [receiver/k8sclusterreceiver] Add timestamp field to entity events (#24431) Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24428 This is part 4 of the work to move to entity events-as-log-records in K8s cluster receiver: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19741 Overall design document: https://docs.google.com/document/d/1Tg18sIck3Nakxtd3TFFcIjrmRO_0GLMdHXylVqBQmJA/ I chose to use Timestamp field of the LogRecord (and not ObservedTimestamp). Please speak if you think ObservedTimestamp is a better place. I am not sure if changelog entry is needed for this PR. It is an addition to an API that is experimental and not yet released. Example log record emitted BEFORE this PR: ``` ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC Timestamp: 1970-01-01 00:00:00 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Empty() Attributes: -> otel.entity.id: Map({"k8s.pod.uid":"07cc87d9-8e76-4472-b5ee-c9ecbad94ea9"}) -> otel.entity.event.type: Str(entity_state) -> otel.entity.type: Str(k8s.pod) -> otel.entity.attributes: Map({"k8s-app":"kubernetes-dashboard","k8s.deployment.name":"kubernetes-dashboard","k8s.deployment.uid":"4c1ee765-906b-498b-80b5-bea67a714fce","k8s.replicaset.name":"kubernetes-dashboard-6c7ccbcf87","k8s.replicasetuid":"e8c052b4-c1db-43bd-806d-f85d8a861f5b","k8s.service.kubernetes-dashboard":"","k8s.workload.kind":"Deployment","k8s.workload.name":"kubernetes-dashboard","pod-template-hash":"6c7ccbcf87","podcreation_timestamp":"2023-06-30T11:32:00-04:00"}) Trace ID: Span ID: Flags: 0 ``` Example log record emitted AFTER this PR: ``` ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC Timestamp: 2023-07-21 17:42:54.851743 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Empty() Attributes: -> otel.entity.id: Map({"k8s.pod.uid":"07cc87d9-8e76-4472-b5ee-c9ecbad94ea9"}) -> otel.entity.event.type: Str(entity_state) -> otel.entity.type: Str(k8s.pod) -> otel.entity.attributes: Map({"k8s-app":"kubernetes-dashboard","k8s.deployment.name":"kubernetes-dashboard","k8s.deployment.uid":"4c1ee765-906b-498b-80b5-bea67a714fce","k8s.replicaset.name":"kubernetes-dashboard-6c7ccbcf87","k8s.replicasetuid":"e8c052b4-c1db-43bd-806d-f85d8a861f5b","k8s.service.kubernetes-dashboard":"","k8s.workload.kind":"Deployment","k8s.workload.name":"kubernetes-dashboard","pod-template-hash":"6c7ccbcf87","podcreation_timestamp":"2023-06-30T11:32:00-04:00"}) Trace ID: Span ID: Flags: 0 ``` --- pkg/experimentalmetricmetadata/entity_events.go | 10 ++++++++++ .../entity_events_test.go | 11 +++++++++++ .../internal/metadata/entities.go | 6 +++++- .../internal/metadata/entities_test.go | 6 +++++- receiver/k8sclusterreceiver/watcher.go | 6 ++++-- receiver/k8sclusterreceiver/watcher_test.go | 13 +++++++++++++ 6 files changed, 48 insertions(+), 4 deletions(-) diff --git a/pkg/experimentalmetricmetadata/entity_events.go b/pkg/experimentalmetricmetadata/entity_events.go index 5edb703cc8aa6..3dd14864b6689 100644 --- a/pkg/experimentalmetricmetadata/entity_events.go +++ b/pkg/experimentalmetricmetadata/entity_events.go @@ -75,6 +75,16 @@ type EntityEvent struct { orig plog.LogRecord } +// Timestamp of the event. +func (e EntityEvent) Timestamp() pcommon.Timestamp { + return e.orig.Timestamp() +} + +// SetTimestamp sets the event timestamp. +func (e EntityEvent) SetTimestamp(timestamp pcommon.Timestamp) { + e.orig.SetTimestamp(timestamp) +} + // ID of the entity. func (e EntityEvent) ID() pcommon.Map { m, ok := e.orig.Attributes().Get(semconvOtelEntityID) diff --git a/pkg/experimentalmetricmetadata/entity_events_test.go b/pkg/experimentalmetricmetadata/entity_events_test.go index 7ded1c7bb5031..b76c6dcd2060f 100644 --- a/pkg/experimentalmetricmetadata/entity_events_test.go +++ b/pkg/experimentalmetricmetadata/entity_events_test.go @@ -5,8 +5,10 @@ package experimentalmetricmetadata import ( "testing" + "time" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" ) @@ -126,3 +128,12 @@ func Test_EntityTypeEmpty(t *testing.T) { e := EntityStateDetails{lr} assert.Equal(t, "", e.EntityType()) } + +func Test_EntityEventTimestamp(t *testing.T) { + lr := plog.NewLogRecord() + e := EntityEvent{lr} + ts := pcommon.NewTimestampFromTime(time.Now()) + e.SetTimestamp(ts) + assert.EqualValues(t, ts, e.Timestamp()) + assert.EqualValues(t, ts, lr.Timestamp()) +} diff --git a/receiver/k8sclusterreceiver/internal/metadata/entities.go b/receiver/k8sclusterreceiver/internal/metadata/entities.go index fd955584be04f..ca3bb1295001f 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/entities.go +++ b/receiver/k8sclusterreceiver/internal/metadata/entities.go @@ -4,17 +4,20 @@ package metadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" import ( + "go.opentelemetry.io/collector/pdata/pcommon" + metadataPkg "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" ) // GetEntityEvents processes metadata updates and returns entity events that describe the metadata changes. -func GetEntityEvents(old, new map[metadataPkg.ResourceID]*KubernetesMetadata) metadataPkg.EntityEventsSlice { +func GetEntityEvents(old, new map[metadataPkg.ResourceID]*KubernetesMetadata, timestamp pcommon.Timestamp) metadataPkg.EntityEventsSlice { out := metadataPkg.NewEntityEventsSlice() for id, oldObj := range old { if _, ok := new[id]; !ok { // An object was present, but no longer is. Create a "delete" event. entityEvent := out.AppendEmpty() + entityEvent.SetTimestamp(timestamp) entityEvent.ID().PutStr(oldObj.ResourceIDKey, string(oldObj.ResourceID)) entityEvent.SetEntityDelete() } @@ -23,6 +26,7 @@ func GetEntityEvents(old, new map[metadataPkg.ResourceID]*KubernetesMetadata) me // All "new" are current objects. Create "state" events. "old" state does not matter. for _, newObj := range new { entityEvent := out.AppendEmpty() + entityEvent.SetTimestamp(timestamp) entityEvent.ID().PutStr(newObj.ResourceIDKey, string(newObj.ResourceID)) state := entityEvent.SetEntityState() state.SetEntityType(newObj.EntityType) diff --git a/receiver/k8sclusterreceiver/internal/metadata/entities_test.go b/receiver/k8sclusterreceiver/internal/metadata/entities_test.go index 5061d22c3cb70..e27fb52e39d47 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/entities_test.go +++ b/receiver/k8sclusterreceiver/internal/metadata/entities_test.go @@ -5,9 +5,11 @@ package metadata // import "github.com/open-telemetry/opentelemetry-collector-co import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" metadataPkg "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" ) @@ -187,11 +189,13 @@ func Test_GetEntityEvents(t *testing.T) { } // Convert and test expected events. - events := GetEntityEvents(tt.old, tt.new) + timestamp := pcommon.NewTimestampFromTime(time.Now()) + events := GetEntityEvents(tt.old, tt.new, timestamp) require.Equal(t, tt.events.Len(), events.Len()) for i := 0; i < events.Len(); i++ { actual := events.At(i) expected := tt.events.At(i) + assert.EqualValues(t, timestamp, actual.Timestamp()) assert.EqualValues(t, expected.EventType(), actual.EventType()) assert.EqualValues(t, expected.ID().AsRaw(), actual.ID().AsRaw()) if expected.EventType() == metadataPkg.EventTypeState { diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index ada240927c601..824803602e888 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -14,6 +14,7 @@ import ( quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -343,6 +344,8 @@ func validateMetadataExporters(metadataExporters map[string]bool, exporters map[ } func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata) { + timestamp := pcommon.NewTimestampFromTime(time.Now()) + metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata) if len(metadataUpdate) != 0 { for _, consume := range rw.metadataConsumers { @@ -352,8 +355,7 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper if rw.entityLogConsumer != nil { // Represent metadata update as entity events. - // TODO: record the timestamp in the events. - entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata) + entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp) // Convert entity events to log representation. logs := entityEvents.ConvertAndMoveToLogs() diff --git a/receiver/k8sclusterreceiver/watcher_test.go b/receiver/k8sclusterreceiver/watcher_test.go index 28ec630023e12..19a5f7f81d151 100644 --- a/receiver/k8sclusterreceiver/watcher_test.go +++ b/receiver/k8sclusterreceiver/watcher_test.go @@ -5,6 +5,7 @@ package k8sclusterreceiver import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -236,25 +237,32 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{}) rw.entityLogConsumer = logsConsumer + step1 := time.Now() + // Make some changes to the pod. Each change should result in an entity event represented // as a log record. // Pod is created. rw.syncMetadataUpdate(nil, rw.dataCollector.SyncMetadata(origPod)) + step2 := time.Now() // Pod is updated. rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), rw.dataCollector.SyncMetadata(updatedPod)) + step3 := time.Now() // Pod is updated again, but nothing changed in the pod. // Should still result in entity event because they are emitted even // if the entity is not changed. rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(updatedPod)) + step4 := time.Now() // Change pod's state back to original rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(origPod)) + step5 := time.Now() // Delete the pod rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), nil) + step6 := time.Now() // Must have 5 entity events. require.EqualValues(t, 5, logsConsumer.LogRecordCount()) @@ -268,22 +276,26 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { "otel.entity.attributes": map[string]any{"pod.creation_timestamp": "0001-01-01T00:00:00Z"}, } assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + assert.WithinRange(t, lr.Timestamp().AsTime(), step1, step2) // Event 2 should contain the updated state of the pod. lr = logsConsumer.AllLogs()[1].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) attrs := expected["otel.entity.attributes"].(map[string]any) attrs["key"] = "value" assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + assert.WithinRange(t, lr.Timestamp().AsTime(), step2, step3) // Event 3 should be identical to the previous one since pod state didn't change. lr = logsConsumer.AllLogs()[2].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + assert.WithinRange(t, lr.Timestamp().AsTime(), step3, step4) // Event 4 should contain the reverted state of the pod. lr = logsConsumer.AllLogs()[3].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) attrs = expected["otel.entity.attributes"].(map[string]any) delete(attrs, "key") assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + assert.WithinRange(t, lr.Timestamp().AsTime(), step4, step5) // Event 5 should indicate pod deletion. lr = logsConsumer.AllLogs()[4].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) @@ -292,4 +304,5 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) { "otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"}, } assert.EqualValues(t, expected, lr.Attributes().AsRaw()) + assert.WithinRange(t, lr.Timestamp().AsTime(), step5, step6) }