Skip to content

Commit

Permalink
Add EMF Only Flag
Browse files Browse the repository at this point in the history
  • Loading branch information
sethAmazon committed Aug 3, 2023
1 parent 10a8fba commit 89e1d4f
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 3 deletions.
7 changes: 7 additions & 0 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Config struct {
// Export raw log string instead of log wrapper
// Required for emf logs
RawLog bool `mapstructure:"raw_log,omitempty"`

// Only allow emf logs
// If this is true raw log must also be true
EmfOnly bool `mapstructure:"emf_only,omitempty"`
}

type QueueSettings struct {
Expand All @@ -80,6 +84,9 @@ func (config *Config) Validate() error {
if !isValidRetentionValue(config.LogRetention) {
return errors.New("invalid value for retention policy. Please make sure to use the following values: 0 (Never Expire), 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653")
}
if config.EmfOnly && !config.RawLog {
return errors.New("emf only is true, but raw log is false")
}
return nil
}

Expand Down
57 changes: 57 additions & 0 deletions exporter/awscloudwatchlogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,60 @@ func TestRetentionValidateWrong(t *testing.T) {
assert.Error(t, wrongcfg.Validate())

}

func TestRawLogEmfOnlyCombination(t *testing.T) {
tests := []struct {
RawLog bool
EmfOnly bool
Test string
wantError bool
}{
{
RawLog: true,
EmfOnly: true,
wantError: false,
Test: "Valid Combination Raw Log True Emf Only True",
},
{
RawLog: true,
EmfOnly: false,
wantError: false,
Test: "Valid Combination Raw Log True Emf Only false",
},
{
RawLog: false,
EmfOnly: false,
wantError: false,
Test: "Valid Combination Raw Log false Emf Only false",
},
{
RawLog: false,
EmfOnly: true,
wantError: true,
Test: "Invalid Combination Raw Log false Emf Only true",
},
}
for _, tt := range tests {
t.Run(tt.Test, func(t *testing.T) {
defaultRetrySettings := exporterhelper.NewDefaultRetrySettings()
cfg := &Config{
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Endpoint: "",
LogRetention: 365,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
RawLog: tt.RawLog,
EmfOnly: tt.EmfOnly,
}
if tt.wantError {
assert.Error(t, component.ValidateConfig(cfg))
} else {
assert.NoError(t, component.ValidateConfig(cfg))
}
})
}
}
10 changes: 7 additions & 3 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.E
log := logs.At(k)
event, err := logToCWLog(resourceAttrs, log, config)
if err != nil {
logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err))
logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err), zap.String("log", log.Body().AsString()))
dropped++
} else {
out = append(out, event)
Expand Down Expand Up @@ -238,17 +238,21 @@ func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord, config
var metadata emfMetadata
bodyString := log.Body().AsString()
err = json.Unmarshal([]byte(bodyString), &metadata)
switch {
// v1 emf json
if err == nil && metadata.AWSMetadata != nil && metadata.AWSMetadata.LogGroupName != "" {
case err == nil && metadata.AWSMetadata != nil && metadata.AWSMetadata.LogGroupName != "":
logGroupName = metadata.AWSMetadata.LogGroupName
if metadata.AWSMetadata.LogStreamName != "" {
logStreamName = metadata.AWSMetadata.LogStreamName
}
} else /* v0 emf json */ if err == nil && metadata.LogGroupName != "" {
// v0 emf json
case err == nil && metadata.LogGroupName != "":
logGroupName = metadata.LogGroupName
if metadata.LogStreamName != "" {
logStreamName = metadata.LogStreamName
}
case config.EmfOnly:
return &cwlogs.Event{}, errors.New("invalid emf log")
}
bodyJSON = []byte(bodyString)
} else {
Expand Down
44 changes: 44 additions & 0 deletions exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,50 @@ func TestLogToCWLog(t *testing.T) {
LogStreamName: "Foo",
},
},
{
name: "invalid emf log",
resource: testResource(),
log: createPLog(`1000{"Timestamp":1574109732004,"log_group_name":"Foo","log_stream_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`),
config: Config{
LogGroupName: "tLogGroup",
LogStreamName: "tStreamName",
RawLog: true,
EmfOnly: true,
},
wantErr: true,
},
{
name: "invalid emf log no log group",
resource: testResource(),
log: createPLog(`{"Timestamp":1574109732004,"log_stream_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`),
config: Config{
LogGroupName: "tLogGroup",
LogStreamName: "tStreamName",
RawLog: true,
EmfOnl