Skip to content

Commit

Permalink
[exporter/awscloudwatchlogs] Feat: include scope in log record of clo…
Browse files Browse the repository at this point in the history
…udwatchlogs exporter (#30316)

**Description:** Include the instrumentation scope in the log records
exported by the cloudwatchlogs expoter

**Link to tracking Issue:** #29884

**Testing:** Unit tests were added.

---------

Signed-off-by: Raphael Silva <[email protected]>
Co-authored-by: bryan-aguilar <[email protected]>
  • Loading branch information
rapphil and bryan-aguilar committed Jan 10, 2024
1 parent 6b41703 commit aa2e9f7
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 17 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_scope_awscloudwatchlogsexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awscloudwatchlogsexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add instrumentation scope in log records exported to CloudWatch logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30316, 29884]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
40 changes: 29 additions & 11 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,11 @@ func pushLogsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config, pusher c
sls := rl.ScopeLogs()
for j := 0; j < sls.Len(); j++ {
sl := sls.At(j)
scope := sl.Scope()
logs := sl.LogRecords()
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)
event, err := logToCWLog(resourceAttrs, log, config)
event, err := logToCWLog(resourceAttrs, scope, log, config)
if err != nil {
logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err))
} else {
Expand All @@ -159,19 +160,26 @@ func pushLogsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config, pusher c
return errs
}

type scopeCwLogBody struct {
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
Attributes map[string]any `json:"attributes,omitempty"`
}

type cwLogBody struct {
Body any `json:"body,omitempty"`
SeverityNumber int32 `json:"severity_number,omitempty"`
SeverityText string `json:"severity_text,omitempty"`
DroppedAttributesCount uint32 `json:"dropped_attributes_count,omitempty"`
Flags uint32 `json:"flags,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Attributes map[string]any `json:"attributes,omitempty"`
Resource map[string]any `json:"resource,omitempty"`
Body any `json:"body,omitempty"`
SeverityNumber int32 `json:"severity_number,omitempty"`
SeverityText string `json:"severity_text,omitempty"`
DroppedAttributesCount uint32 `json:"dropped_attributes_count,omitempty"`
Flags uint32 `json:"flags,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Attributes map[string]any `json:"attributes,omitempty"`
Scope *scopeCwLogBody `json:"scope,omitempty"`
Resource map[string]any `json:"resource,omitempty"`
}

func logToCWLog(resourceAttrs map[string]any, log plog.LogRecord, config *Config) (*cwlogs.Event, error) {
func logToCWLog(resourceAttrs map[string]any, scope pcommon.InstrumentationScope, log plog.LogRecord, config *Config) (*cwlogs.Event, error) {
// TODO(jbd): Benchmark and improve the allocations.
// Evaluate go.elastic.co/fastjson as a replacement for encoding/json.
logGroupName := config.LogGroupName
Expand Down Expand Up @@ -214,6 +222,16 @@ func logToCWLog(resourceAttrs map[string]any, log plog.LogRecord, config *Config
body.Attributes = attrsValue(log.Attributes())
body.Resource = resourceAttrs

// scope should have a name at least
if scope.Name() != "" {
scopeBody := &scopeCwLogBody{
Name: scope.Name(),
Version: scope.Version(),
Attributes: attrsValue(scope.Attributes()),
}
body.Scope = scopeBody
}

bodyJSON, err = json.Marshal(body)
if err != nil {
return &cwlogs.Event{}, err
Expand Down
53 changes: 47 additions & 6 deletions exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestLogToCWLog(t *testing.T) {
tests := []struct {
name string
resource pcommon.Resource
scope pcommon.InstrumentationScope
log plog.LogRecord
config *Config
want cwlogs.Event
Expand All @@ -56,12 +57,13 @@ func TestLogToCWLog(t *testing.T) {
name: "basic",
resource: testResource(),
log: testLogRecord(),
scope: testScope(),
config: &Config{},
want: cwlogs.Event{
GeneratedTime: time.Now(),
InputLogEvent: &cloudwatchlogs.InputLogEvent{
Timestamp: aws.Int64(1609719139),
Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`),
Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"},"scope":{"name":"test-scope","version":"1.0.0","attributes":{"scope-attr":"value"}},"resource":{"host":"abc123","node":5}}`),
},
StreamKey: cwlogs.StreamKey{
LogGroupName: "",
Expand All @@ -72,13 +74,32 @@ func TestLogToCWLog(t *testing.T) {
{
name: "no resource",
resource: pcommon.NewResource(),
scope: testScope(),
log: testLogRecord(),
config: &Config{},
want: cwlogs.Event{
GeneratedTime: time.Now(),
InputLogEvent: &cloudwatchlogs.InputLogEvent{
Timestamp: aws.Int64(1609719139),
Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"}}`),
Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"},"scope":{"name":"test-scope","version":"1.0.0","attributes":{"scope-attr":"value"}}}`),
},
StreamKey: cwlogs.StreamKey{
LogGroupName: "",
LogStreamName: "",
},
},
},
{
name: "no scope",
resource: testResource(),
log: testLogRecord(),
scope: emptyScope(),
config: &Config{},
want: cwlogs.Event{
GeneratedTime: time.Now(),
InputLogEvent: &cloudwatchlogs.InputLogEvent{
Timestamp: aws.Int64(1609719139),
Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`),
},
StreamKey: cwlogs.StreamKey{
LogGroupName: "",
Expand All @@ -89,6 +110,7 @@ func TestLogToCWLog(t *testing.T) {
{
name: "no trace",
resource: testResource(),
scope: testScope(),
log: testLogRecordWithoutTrace(),
config: &Config{
LogGroupName: "tLogGroup",
Expand All @@ -98,7 +120,7 @@ func TestLogToCWLog(t *testing.T) {
GeneratedTime: time.Now(),
InputLogEvent: &cloudwatchlogs.InputLogEvent{
Timestamp: aws.Int64(1609719139),
Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`),
Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"attributes":{"key1":1,"key2":"attr2"},"scope":{"name":"test-scope","version":"1.0.0","attributes":{"scope-attr":"value"}},"resource":{"host":"abc123","node":5}}`),
},
StreamKey: cwlogs.StreamKey{
LogGroupName: "tLogGroup",
Expand All @@ -109,6 +131,7 @@ func TestLogToCWLog(t *testing.T) {
{
name: "raw",
resource: testResource(),
scope: testScope(),
log: testLogRecordWithoutTrace(),
config: &Config{
LogGroupName: "tLogGroup",
Expand All @@ -130,6 +153,7 @@ func TestLogToCWLog(t *testing.T) {
{
name: "raw emf v1",
resource: testResource(),
scope: testScope(),
log: createPLog(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`),
config: &Config{
LogGroupName: "tLogGroup",
Expand All @@ -151,6 +175,7 @@ func TestLogToCWLog(t *testing.T) {
{
name: "raw emf v1 with log stream",
resource: testResource(),
scope: testScope(),
log: createPLog(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","LogStreamName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`),
config: &Config{
LogGroupName: "tLogGroup",
Expand All @@ -172,6 +197,7 @@ func TestLogToCWLog(t *testing.T) {
{
name: "raw emf v0",
resource: testResource(),
scope: testScope(),
log: createPLog(`{"Timestamp":1574109732004,"log_group_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`),
config: &Config{
LogGroupName: "tLogGroup",
Expand All @@ -193,6 +219,7 @@ func TestLogToCWLog(t *testing.T) {
{
name: "raw emf v0 with log stream",
resource: testResource(),
scope: testScope(),
log: createPLog(`{"Timestamp":1574109732004,"log_group_name":"Foo","log_stream_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`),
config: &Config{
LogGroupName: "tLogGroup",
Expand All @@ -216,13 +243,13 @@ func TestLogToCWLog(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resourceAttrs := attrsValue(tt.resource.Attributes())
got, err := logToCWLog(resourceAttrs, tt.log, tt.config)
got, err := logToCWLog(resourceAttrs, tt.scope, tt.log, tt.config)
if (err != nil) != tt.wantErr {
t.Errorf("logToCWLog() error = %v, wantErr %v", err, tt.wantErr)
return
}
// Do not test generated time since it is time.Now()
assert.Equal(t, tt.want.InputLogEvent, got.InputLogEvent)
assert.Equal(t, *tt.want.InputLogEvent, *got.InputLogEvent)
assert.Equal(t, tt.want.LogStreamName, got.LogStreamName)
assert.Equal(t, tt.want.LogGroupName, got.LogGroupName)
})
Expand All @@ -234,8 +261,9 @@ func BenchmarkLogToCWLog(b *testing.B) {

resource := testResource()
log := testLogRecord()
scope := testScope()
for i := 0; i < b.N; i++ {
_, err := logToCWLog(attrsValue(resource.Attributes()), log, &Config{})
_, err := logToCWLog(attrsValue(resource.Attributes()), scope, log, &Config{})
if err != nil {
b.Errorf("logToCWLog() failed %v", err)
return
Expand All @@ -250,6 +278,19 @@ func testResource() pcommon.Resource {
return resource
}

func testScope() pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName("test-scope")
scope.SetVersion("1.0.0")
scope.Attributes().PutStr("scope-attr", "value")
return scope
}

func emptyScope() pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
return scope
}

func testLogRecord() plog.LogRecord {
record := plog.NewLogRecord()
record.SetSeverityNumber(5)
Expand Down

0 comments on commit aa2e9f7

Please sign in to comment.