Skip to content

Commit

Permalink
[receiver/k8sclusterreceiver] Add timestamp field to entity events (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#24431)

Resolves
open-telemetry#24428

This is part 4 of the work to move to entity events-as-log-records in
K8s cluster receiver:
open-telemetry#19741

Overall design document:

https://docs.google.com/document/d/1Tg18sIck3Nakxtd3TFFcIjrmRO_0GLMdHXylVqBQmJA/

I chose to use Timestamp field of the LogRecord (and not
ObservedTimestamp). Please speak if you think ObservedTimestamp is a
better place.

I am not sure if changelog entry is needed for this PR. It is an
addition to an API that is experimental
and not yet released.

Example log record emitted BEFORE this PR:
```
ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 1970-01-01 00:00:00 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
     -> otel.entity.id: Map({"k8s.pod.uid":"07cc87d9-8e76-4472-b5ee-c9ecbad94ea9"})
     -> otel.entity.event.type: Str(entity_state)
     -> otel.entity.type: Str(k8s.pod)
     -> otel.entity.attributes: Map({"k8s-app":"kubernetes-dashboard","k8s.deployment.name":"kubernetes-dashboard","k8s.deployment.uid":"4c1ee765-906b-498b-80b5-bea67a714fce","k8s.replicaset.name":"kubernetes-dashboard-6c7ccbcf87","k8s.replicasetuid":"e8c052b4-c1db-43bd-806d-f85d8a861f5b","k8s.service.kubernetes-dashboard":"","k8s.workload.kind":"Deployment","k8s.workload.name":"kubernetes-dashboard","pod-template-hash":"6c7ccbcf87","podcreation_timestamp":"2023-06-30T11:32:00-04:00"})
Trace ID:
Span ID:
Flags: 0
```

Example log record emitted AFTER this PR:
```
ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2023-07-21 17:42:54.851743 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
     -> otel.entity.id: Map({"k8s.pod.uid":"07cc87d9-8e76-4472-b5ee-c9ecbad94ea9"})
     -> otel.entity.event.type: Str(entity_state)
     -> otel.entity.type: Str(k8s.pod)
     -> otel.entity.attributes: Map({"k8s-app":"kubernetes-dashboard","k8s.deployment.name":"kubernetes-dashboard","k8s.deployment.uid":"4c1ee765-906b-498b-80b5-bea67a714fce","k8s.replicaset.name":"kubernetes-dashboard-6c7ccbcf87","k8s.replicasetuid":"e8c052b4-c1db-43bd-806d-f85d8a861f5b","k8s.service.kubernetes-dashboard":"","k8s.workload.kind":"Deployment","k8s.workload.name":"kubernetes-dashboard","pod-template-hash":"6c7ccbcf87","podcreation_timestamp":"2023-06-30T11:32:00-04:00"})
Trace ID:
Span ID:
Flags: 0
```
  • Loading branch information
tigrannajaryan committed Jul 21, 2023
1 parent 2312eb2 commit 20e9a02
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 4 deletions.
10 changes: 10 additions & 0 deletions pkg/experimentalmetricmetadata/entity_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ type EntityEvent struct {
orig plog.LogRecord
}

// Timestamp of the event.
func (e EntityEvent) Timestamp() pcommon.Timestamp {
return e.orig.Timestamp()
}

// SetTimestamp sets the event timestamp.
func (e EntityEvent) SetTimestamp(timestamp pcommon.Timestamp) {
e.orig.SetTimestamp(timestamp)
}

// ID of the entity.
func (e EntityEvent) ID() pcommon.Map {
m, ok := e.orig.Attributes().Get(semconvOtelEntityID)
Expand Down
11 changes: 11 additions & 0 deletions pkg/experimentalmetricmetadata/entity_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package experimentalmetricmetadata

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand Down Expand Up @@ -126,3 +128,12 @@ func Test_EntityTypeEmpty(t *testing.T) {
e := EntityStateDetails{lr}
assert.Equal(t, "", e.EntityType())
}

func Test_EntityEventTimestamp(t *testing.T) {
lr := plog.NewLogRecord()
e := EntityEvent{lr}
ts := pcommon.NewTimestampFromTime(time.Now())
e.SetTimestamp(ts)
assert.EqualValues(t, ts, e.Timestamp())
assert.EqualValues(t, ts, lr.Timestamp())
}
6 changes: 5 additions & 1 deletion receiver/k8sclusterreceiver/internal/metadata/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
package metadata // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata"

import (
"go.opentelemetry.io/collector/pdata/pcommon"

metadataPkg "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
)

// GetEntityEvents processes metadata updates and returns entity events that describe the metadata changes.
func GetEntityEvents(old, new map[metadataPkg.ResourceID]*KubernetesMetadata) metadataPkg.EntityEventsSlice {
func GetEntityEvents(old, new map[metadataPkg.ResourceID]*KubernetesMetadata, timestamp pcommon.Timestamp) metadataPkg.EntityEventsSlice {
out := metadataPkg.NewEntityEventsSlice()

for id, oldObj := range old {
if _, ok := new[id]; !ok {
// An object was present, but no longer is. Create a "delete" event.
entityEvent := out.AppendEmpty()
entityEvent.SetTimestamp(timestamp)
entityEvent.ID().PutStr(oldObj.ResourceIDKey, string(oldObj.ResourceID))
entityEvent.SetEntityDelete()
}
Expand All @@ -23,6 +26,7 @@ func GetEntityEvents(old, new map[metadataPkg.ResourceID]*KubernetesMetadata) me
// All "new" are current objects. Create "state" events. "old" state does not matter.
for _, newObj := range new {
entityEvent := out.AppendEmpty()
entityEvent.SetTimestamp(timestamp)
entityEvent.ID().PutStr(newObj.ResourceIDKey, string(newObj.ResourceID))
state := entityEvent.SetEntityState()
state.SetEntityType(newObj.EntityType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package metadata // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"

metadataPkg "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
)
Expand Down Expand Up @@ -187,11 +189,13 @@ func Test_GetEntityEvents(t *testing.T) {
}

// Convert and test expected events.
events := GetEntityEvents(tt.old, tt.new)
timestamp := pcommon.NewTimestampFromTime(time.Now())
events := GetEntityEvents(tt.old, tt.new, timestamp)
require.Equal(t, tt.events.Len(), events.Len())
for i := 0; i < events.Len(); i++ {
actual := events.At(i)
expected := tt.events.At(i)
assert.EqualValues(t, timestamp, actual.Timestamp())
assert.EqualValues(t, expected.EventType(), actual.EventType())
assert.EqualValues(t, expected.ID().AsRaw(), actual.ID().AsRaw())
if expected.EventType() == metadataPkg.EventTypeState {
Expand Down
6 changes: 4 additions & 2 deletions receiver/k8sclusterreceiver/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -343,6 +344,8 @@ func validateMetadataExporters(metadataExporters map[string]bool, exporters map[
}

func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[experimentalmetricmetadata.ResourceID]*metadata.KubernetesMetadata) {
timestamp := pcommon.NewTimestampFromTime(time.Now())

metadataUpdate := metadata.GetMetadataUpdate(oldMetadata, newMetadata)
if len(metadataUpdate) != 0 {
for _, consume := range rw.metadataConsumers {
Expand All @@ -352,8 +355,7 @@ func (rw *resourceWatcher) syncMetadataUpdate(oldMetadata, newMetadata map[exper

if rw.entityLogConsumer != nil {
// Represent metadata update as entity events.
// TODO: record the timestamp in the events.
entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata)
entityEvents := metadata.GetEntityEvents(oldMetadata, newMetadata, timestamp)

// Convert entity events to log representation.
logs := entityEvents.ConvertAndMoveToLogs()
Expand Down
13 changes: 13 additions & 0 deletions receiver/k8sclusterreceiver/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sclusterreceiver

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -236,25 +237,32 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) {
rw := newResourceWatcher(receivertest.NewNopCreateSettings(), &Config{})
rw.entityLogConsumer = logsConsumer

step1 := time.Now()

// Make some changes to the pod. Each change should result in an entity event represented
// as a log record.

// Pod is created.
rw.syncMetadataUpdate(nil, rw.dataCollector.SyncMetadata(origPod))
step2 := time.Now()

// Pod is updated.
rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), rw.dataCollector.SyncMetadata(updatedPod))
step3 := time.Now()

// Pod is updated again, but nothing changed in the pod.
// Should still result in entity event because they are emitted even
// if the entity is not changed.
rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(updatedPod))
step4 := time.Now()

// Change pod's state back to original
rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(updatedPod), rw.dataCollector.SyncMetadata(origPod))
step5 := time.Now()

// Delete the pod
rw.syncMetadataUpdate(rw.dataCollector.SyncMetadata(origPod), nil)
step6 := time.Now()

// Must have 5 entity events.
require.EqualValues(t, 5, logsConsumer.LogRecordCount())
Expand All @@ -268,22 +276,26 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) {
"otel.entity.attributes": map[string]any{"pod.creation_timestamp": "0001-01-01T00:00:00Z"},
}
assert.EqualValues(t, expected, lr.Attributes().AsRaw())
assert.WithinRange(t, lr.Timestamp().AsTime(), step1, step2)

// Event 2 should contain the updated state of the pod.
lr = logsConsumer.AllLogs()[1].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
attrs := expected["otel.entity.attributes"].(map[string]any)
attrs["key"] = "value"
assert.EqualValues(t, expected, lr.Attributes().AsRaw())
assert.WithinRange(t, lr.Timestamp().AsTime(), step2, step3)

// Event 3 should be identical to the previous one since pod state didn't change.
lr = logsConsumer.AllLogs()[2].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
assert.EqualValues(t, expected, lr.Attributes().AsRaw())
assert.WithinRange(t, lr.Timestamp().AsTime(), step3, step4)

// Event 4 should contain the reverted state of the pod.
lr = logsConsumer.AllLogs()[3].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
attrs = expected["otel.entity.attributes"].(map[string]any)
delete(attrs, "key")
assert.EqualValues(t, expected, lr.Attributes().AsRaw())
assert.WithinRange(t, lr.Timestamp().AsTime(), step4, step5)

// Event 5 should indicate pod deletion.
lr = logsConsumer.AllLogs()[4].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
Expand All @@ -292,4 +304,5 @@ func TestSyncMetadataAndEmitEntityEvents(t *testing.T) {
"otel.entity.id": map[string]any{"k8s.pod.uid": "pod0"},
}
assert.EqualValues(t, expected, lr.Attributes().AsRaw())
assert.WithinRange(t, lr.Timestamp().AsTime(), step5, step6)
}

0 comments on commit 20e9a02

Please sign in to comment.