From 2fac6ecd43fb14af8392d1ded3fd8c2da0d14dba Mon Sep 17 00:00:00 2001 From: Huy Vo Date: Mon, 7 Nov 2022 08:36:44 -0800 Subject: [PATCH] [exporter/awsemfexporter] add log retention feature for Cloudwatch Log Group (#15678) * Add changes for Log Retention * Add comments to the code * Update README * Update CHANGELOG * Add Log Retention config option to awscloudwatchlogsxporter --- .chloggen/add-log-retention.yaml | 16 +++ exporter/awscloudwatchlogsexporter/README.md | 2 + exporter/awscloudwatchlogsexporter/config.go | 38 ++++++ .../awscloudwatchlogsexporter/config_test.go | 36 ++++++ .../awscloudwatchlogsexporter/exporter.go | 2 +- exporter/awsemfexporter/README.md | 1 + exporter/awsemfexporter/config.go | 43 +++++++ exporter/awsemfexporter/config_test.go | 32 +++++ exporter/awsemfexporter/emf_exporter.go | 2 +- internal/aws/cwlogs/cwlog_client.go | 25 +++- internal/aws/cwlogs/cwlog_client_test.go | 110 +++++++++++++++--- 11 files changed, 282 insertions(+), 25 deletions(-) create mode 100644 .chloggen/add-log-retention.yaml diff --git a/.chloggen/add-log-retention.yaml b/.chloggen/add-log-retention.yaml new file mode 100644 index 0000000000000..7443693e9ec5b --- /dev/null +++ b/.chloggen/add-log-retention.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsemfexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added a `log_retention` field to the config to specify log retention policy for a Cloudwatch Log Group + +# One or more tracking issues related to the change +issues: [15678] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: \ No newline at end of file diff --git a/exporter/awscloudwatchlogsexporter/README.md b/exporter/awscloudwatchlogsexporter/README.md index 96363f0026d2d..7c36bd7e62fd8 100644 --- a/exporter/awscloudwatchlogsexporter/README.md +++ b/exporter/awscloudwatchlogsexporter/README.md @@ -23,6 +23,7 @@ The following settings can be optionally configured: - `region`: The AWS region where the log stream is in. - `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list. +- `log_retention`: LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653. ### Examples @@ -44,6 +45,7 @@ exporters: log_stream_name: "testing-integrations-stream" region: "us-east-1" endpoint: "logs.us-east-1.amazonaws.com" + log_retention: 365 sending_queue: queue_size: 50 retry_on_failure: diff --git a/exporter/awscloudwatchlogsexporter/config.go b/exporter/awscloudwatchlogsexporter/config.go index c5dd121df6c0a..5942134e61fdb 100644 --- a/exporter/awscloudwatchlogsexporter/config.go +++ b/exporter/awscloudwatchlogsexporter/config.go @@ -44,6 +44,10 @@ type Config struct { // Optional. Endpoint string `mapstructure:"endpoint"` + // LogRetention is the option to set the log retention policy for the CloudWatch Log Group. Defaults to Never Expire if not specified or set to 0 + // Possible values are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653 + LogRetention int64 `mapstructure:"log_retention"` + // QueueSettings is a subset of exporterhelper.QueueSettings, // because only QueueSize is user-settable due to how AWS CloudWatch API works QueueSettings QueueSettings `mapstructure:"sending_queue"` @@ -71,9 +75,43 @@ func (config *Config) Validate() error { if config.QueueSettings.QueueSize < 1 { return errors.New("'sending_queue.queue_size' must be 1 or greater") } + if !isValidRetentionValue(config.LogRetention) { + return errors.New("invalid value for retention policy. Please make sure to use the following values: 0 (Never Expire), 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653") + } return nil } +// Added function to check if value is an accepted number of log retention days +func isValidRetentionValue(input int64) bool { + switch input { + case + 0, + 1, + 3, + 5, + 7, + 14, + 30, + 60, + 90, + 120, + 150, + 180, + 365, + 400, + 545, + 731, + 1827, + 2192, + 2557, + 2922, + 3288, + 3653: + return true + } + return false +} + func (config *Config) enforcedQueueSettings() exporterhelper.QueueSettings { return exporterhelper.QueueSettings{ Enabled: true, diff --git a/exporter/awscloudwatchlogsexporter/config_test.go b/exporter/awscloudwatchlogsexporter/config_test.go index fa64ca9d23ac7..f2677eecf559f 100644 --- a/exporter/awscloudwatchlogsexporter/config_test.go +++ b/exporter/awscloudwatchlogsexporter/config_test.go @@ -110,3 +110,39 @@ func TestLoadConfig(t *testing.T) { }) } } + +func TestRetentionValidateCorrect(t *testing.T) { + defaultRetrySettings := exporterhelper.NewDefaultRetrySettings() + cfg := &Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "1")), + RetrySettings: defaultRetrySettings, + LogGroupName: "test-1", + LogStreamName: "testing", + Endpoint: "", + LogRetention: 365, + AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), + QueueSettings: QueueSettings{ + QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + }, + } + assert.NoError(t, cfg.Validate()) + +} + +func TestRetentionValidateWrong(t *testing.T) { + defaultRetrySettings := exporterhelper.NewDefaultRetrySettings() + wrongcfg := &Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "2")), + RetrySettings: defaultRetrySettings, + LogGroupName: "test-1", + LogStreamName: "testing", + Endpoint: "", + LogRetention: 366, + AWSSessionSettings: awsutil.CreateDefaultSessionConfig(), + QueueSettings: QueueSettings{ + QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, + }, + } + assert.Error(t, wrongcfg.Validate()) + +} diff --git a/exporter/awscloudwatchlogsexporter/exporter.go b/exporter/awscloudwatchlogsexporter/exporter.go index eae429a463f42..ee14afab7c162 100644 --- a/exporter/awscloudwatchlogsexporter/exporter.go +++ b/exporter/awscloudwatchlogsexporter/exporter.go @@ -58,7 +58,7 @@ func newCwLogsPusher(expConfig *Config, params component.ExporterCreateSettings) } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, session) + svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, session) collectorIdentifier, err := uuid.NewRandom() if err != nil { diff --git a/exporter/awsemfexporter/README.md b/exporter/awsemfexporter/README.md index f80b4a580438c..a2ab37af643d1 100644 --- a/exporter/awsemfexporter/README.md +++ b/exporter/awsemfexporter/README.md @@ -22,6 +22,7 @@ The following exporter configuration parameters are supported. |:---------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------- | | `log_group_name` | Customized log group name which supports `{ClusterName}` and `{TaskId}` placeholders. One valid example is `/aws/metrics/{ClusterName}`. It will search for `ClusterName` (or `aws.ecs.cluster.name`) resource attribute in the metrics data and replace with the actual cluster name. If none of them are found in the resource attribute map, `{ClusterName}` will be replaced by `undefined`. Similar way, for the `{TaskId}`, it searches for `TaskId` (or `aws.ecs.task.id`) key in the resource attribute map. For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`) |"/metrics/default"| | `log_stream_name` | Customized log stream name which supports `{TaskId}`, `{ClusterName}`, `{NodeName}`, `{ContainerInstanceId}`, and `{TaskDefinitionFamily}` placeholders. One valid example is `{TaskId}`. It will search for `TaskId` (or `aws.ecs.task.id`) resource attribute in the metrics data and replace with the actual task id. If none of them are found in the resource attribute map, `{TaskId}` will be replaced by `undefined`. Similarly, for the `{TaskDefinitionFamily}`, it searches for `TaskDefinitionFamily` (or `aws.ecs.task.family`). For the `{ClusterName}`, it searches for `ClusterName` (or `aws.ecs.cluster.name`). For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`). For `{ContainerInstanceId}`, it searches for `ContainerInstanceId` (or `aws.ecs.container.instance.id`). (Note: ContainerInstanceId (or `aws.ecs.container.instance.id`) only works for AWS ECS EC2 launch type. |"otel-stream"| +| `log_retention` | LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653. |"Never Expire"| | `namespace` | Customized CloudWatch metrics namespace | "default" | | `endpoint` | Optionally override the default CloudWatch service endpoint. | | | `no_verify_ssl` | Enable or disable TLS certificate verification. | false | diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index b45a6fe728805..f95826485ba71 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -15,6 +15,8 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" import ( + "errors" + "go.opentelemetry.io/collector/config" "go.uber.org/zap" @@ -48,6 +50,11 @@ type Config struct { // "SingleDimensionRollupOnly" - Enable single dimension rollup // "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions) DimensionRollupOption string `mapstructure:"dimension_rollup_option"` + + // LogRetention is the option to set the log retention policy for the CloudWatch Log Group. Defaults to Never Expire if not specified or set to 0 + // Possible values are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653 + LogRetention int64 `mapstructure:"log_retention"` + // ParseJSONEncodedAttributeValues is an array of attribute keys whose corresponding values are JSON-encoded as strings. // Those strings will be decoded to its original json structure. ParseJSONEncodedAttributeValues []string `mapstructure:"parse_json_encoded_attr_values"` @@ -113,9 +120,45 @@ func (config *Config) Validate() error { } } config.MetricDescriptors = validDescriptors + + if !isValidRetentionValue(config.LogRetention) { + return errors.New("invalid value for retention policy. Please make sure to use the following values: 0 (Never Expire), 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653") + } + return nil } +// Added function to check if value is an accepted number of log retention days +func isValidRetentionValue(input int64) bool { + switch input { + case + 0, + 1, + 3, + 5, + 7, + 14, + 30, + 60, + 90, + 120, + 150, + 180, + 365, + 400, + 545, + 731, + 1827, + 2192, + 2557, + 2922, + 3288, + 3653: + return true + } + return false +} + func newEMFSupportedUnits() map[string]interface{} { unitIndexer := map[string]interface{}{} for _, unit := range []string{"Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes", "Megabytes", diff --git a/exporter/awsemfexporter/config_test.go b/exporter/awsemfexporter/config_test.go index efd53ff9bb880..c8e0fcd8cd76d 100644 --- a/exporter/awsemfexporter/config_test.go +++ b/exporter/awsemfexporter/config_test.go @@ -126,3 +126,35 @@ func TestConfigValidate(t *testing.T) { {unit: "Megabytes", metricName: "memory_usage"}, }, cfg.MetricDescriptors) } + +func TestRetentionValidateCorrect(t *testing.T) { + cfg := &Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "1")), + AWSSessionSettings: awsutil.AWSSessionSettings{ + RequestTimeoutSeconds: 30, + MaxRetries: 1, + }, + DimensionRollupOption: "ZeroAndSingleDimensionRollup", + LogRetention: 365, + ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, + logger: zap.NewNop(), + } + assert.NoError(t, cfg.Validate()) + +} + +func TestRetentionValidateWrong(t *testing.T) { + wrongcfg := &Config{ + ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "2")), + AWSSessionSettings: awsutil.AWSSessionSettings{ + RequestTimeoutSeconds: 30, + MaxRetries: 1, + }, + DimensionRollupOption: "ZeroAndSingleDimensionRollup", + LogRetention: 366, + ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true}, + logger: zap.NewNop(), + } + assert.Error(t, wrongcfg.Validate()) + +} diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 6c9f4c117b387..b33eca88cc2ee 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -78,7 +78,7 @@ func newEmfPusher( } // create CWLogs client with aws session config - svcStructuredLog := cwlogs.NewClient(logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, session) + svcStructuredLog := cwlogs.NewClient(logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, session) collectorIdentifier, _ := uuid.NewRandom() emfExporter := &emfExporter{ diff --git a/internal/aws/cwlogs/cwlog_client.go b/internal/aws/cwlogs/cwlog_client.go index 3164b79a4da50..632e4b6b65fd9 100644 --- a/internal/aws/cwlogs/cwlog_client.go +++ b/internal/aws/cwlogs/cwlog_client.go @@ -40,23 +40,25 @@ const ( // Possible exceptions are combination of common errors (https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/CommonErrors.html) // and API specific erros (e.g. https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html#API_PutLogEvents_Errors) type Client struct { - svc cloudwatchlogsiface.CloudWatchLogsAPI - logger *zap.Logger + svc cloudwatchlogsiface.CloudWatchLogsAPI + logRetention int64 + logger *zap.Logger } // Create a log client based on the actual cloudwatch logs client. -func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logger *zap.Logger) *Client { +func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetention int64, logger *zap.Logger) *Client { logClient := &Client{svc: svc, - logger: logger} + logRetention: logRetention, + logger: logger} return logClient } // NewClient create Client -func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, sess *session.Session) *Client { +func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, sess *session.Session) *Client { client := cloudwatchlogs.New(sess, awsConfig) client.Handlers.Build.PushBackNamed(handler.RequestStructuredLogHandler) client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName)) - return newCloudWatchLogClient(client, logger) + return newCloudWatchLogClient(client, logRetention, logger) } // PutLogEvents mainly handles different possible error could be returned from server side, and retries them @@ -156,6 +158,17 @@ func (client *Client) CreateStream(logGroup, streamName *string) (token string, LogGroupName: logGroup, }) if err == nil { + // For newly created log groups, set the log retention polic if specified or non-zero. Otheriwse, set to Never Expire + if client.logRetention != 0 { + _, err = client.svc.PutRetentionPolicy(&cloudwatchlogs.PutRetentionPolicyInput{LogGroupName: logGroup, RetentionInDays: &client.logRetention}) + if err != nil { + var awsErr awserr.Error + if errors.As(err, &awsErr) { + client.logger.Debug("CreateLogStream / CreateLogGroup has errors related to log retention policy.", zap.String("LogGroupName", *logGroup), zap.String("LogStreamName", *streamName), zap.Error(e)) + return token, err + } + } + } _, err = client.svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: logGroup, LogStreamName: streamName, diff --git a/internal/aws/cwlogs/cwlog_client_test.go b/internal/aws/cwlogs/cwlog_client_test.go index afff42bd01d94..4c8318856696a 100644 --- a/internal/aws/cwlogs/cwlog_client_test.go +++ b/internal/aws/cwlogs/cwlog_client_test.go @@ -49,7 +49,7 @@ func newAlwaysPassMockLogClient(putLogEventsFunc func(args mock.Arguments)) *Cli &cloudwatchlogs.DescribeLogStreamsOutput{ LogStreams: []*cloudwatchlogs.LogStream{{UploadSequenceToken: &expectedNextSequenceToken}}}, nil) - return newCloudWatchLogClient(svc, logger) + return newCloudWatchLogClient(svc, 0, logger) } type mockCloudWatchLogsClient struct { @@ -77,6 +77,11 @@ func (svc *mockCloudWatchLogsClient) DescribeLogStreams(input *cloudwatchlogs.De return args.Get(0).(*cloudwatchlogs.DescribeLogStreamsOutput), args.Error(1) } +func (svc *mockCloudWatchLogsClient) PutRetentionPolicy(input *cloudwatchlogs.PutRetentionPolicyInput) (*cloudwatchlogs.PutRetentionPolicyOutput, error) { + args := svc.Called(input) + return args.Get(0).(*cloudwatchlogs.PutRetentionPolicyOutput), args.Error(1) +} + // Tests var previousSequenceToken = "0000" var expectedNextSequenceToken = "1111" @@ -97,7 +102,7 @@ func TestPutLogEvents_HappyCase(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil) - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -123,7 +128,7 @@ func TestPutLogEvents_HappyCase_SomeRejectedInfo(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil) - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -143,7 +148,7 @@ func TestPutLogEvents_NonAWSError(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, errors.New("some random error")).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -164,7 +169,7 @@ func TestPutLogEvents_InvalidParameterException(t *testing.T) { invalidParameterException := &cloudwatchlogs.InvalidParameterException{} svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, invalidParameterException).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -187,7 +192,7 @@ func TestPutLogEvents_InvalidSequenceTokenException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, awsErr).Once() svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -208,7 +213,7 @@ func TestPutLogEvents_DataAlreadyAcceptedException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, awsErr).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -229,7 +234,7 @@ func TestPutLogEvents_OperationAbortedException(t *testing.T) { operationAbortedException := &cloudwatchlogs.OperationAbortedException{} svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, operationAbortedException).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -250,7 +255,7 @@ func TestPutLogEvents_ServiceUnavailableException(t *testing.T) { serviceUnavailableException := &cloudwatchlogs.ServiceUnavailableException{} svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, serviceUnavailableException).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -271,7 +276,7 @@ func TestPutLogEvents_UnknownException(t *testing.T) { unknownException := awserr.New("unknownException", "", nil) svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, unknownException).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -292,7 +297,7 @@ func TestPutLogEvents_ThrottlingException(t *testing.T) { throttlingException := awserr.New(errCodeThrottlingException, "", nil) svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, throttlingException).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -319,7 +324,78 @@ func TestPutLogEvents_ResourceNotFoundException(t *testing.T) { svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) + tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + + svc.AssertExpectations(t) + assert.Equal(t, expectedNextSequenceToken, *tokenP) +} + +func TestLogRetention_NeverExpire(t *testing.T) { + logger := zap.NewNop() + svc := new(mockCloudWatchLogsClient) + putLogEventsInput := &cloudwatchlogs.PutLogEventsInput{ + LogGroupName: &logGroup, + LogStreamName: &logStreamName, + SequenceToken: &emptySequenceToken, + } + + putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: &expectedNextSequenceToken} + awsErr := &cloudwatchlogs.ResourceNotFoundException{} + + svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, awsErr).Once() + + svc.On("CreateLogStream", + &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), awsErr).Once() + + svc.On("CreateLogGroup", + &cloudwatchlogs.CreateLogGroupInput{LogGroupName: &logGroup}).Return(new(cloudwatchlogs.CreateLogGroupOutput), nil).Once() + + // PutRetentionPolicy is not called because it is set to 0 + + svc.On("CreateLogStream", + &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), nil).Once() + + svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() + + client := newCloudWatchLogClient(svc, 0, logger) + tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) + + svc.AssertExpectations(t) + assert.Equal(t, expectedNextSequenceToken, *tokenP) +} + +func TestLogRetention_RetentionDaysInputted(t *testing.T) { + logger := zap.NewNop() + svc := new(mockCloudWatchLogsClient) + putLogEventsInput := &cloudwatchlogs.PutLogEventsInput{ + LogGroupName: &logGroup, + LogStreamName: &logStreamName, + SequenceToken: &emptySequenceToken, + } + + putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: &expectedNextSequenceToken} + awsErr := &cloudwatchlogs.ResourceNotFoundException{} + + svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, awsErr).Once() + + svc.On("CreateLogStream", + &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), awsErr).Once() + + svc.On("CreateLogGroup", + &cloudwatchlogs.CreateLogGroupInput{LogGroupName: &logGroup}).Return(new(cloudwatchlogs.CreateLogGroupOutput), nil).Once() + + svc.On("PutRetentionPolicy", + &cloudwatchlogs.PutRetentionPolicyInput{LogGroupName: &logGroup, RetentionInDays: aws.Int64(365)}).Return(new(cloudwatchlogs.PutRetentionPolicyOutput), nil).Once() + + svc.On("CreateLogStream", + &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), nil).Once() + + svc.On("PutLogEvents", putLogEventsInput).Return(putLogEventsOutput, nil).Once() + + client := newCloudWatchLogClient(svc, 365, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -344,7 +420,7 @@ func TestPutLogEvents_AllRetriesFail(t *testing.T) { svc.On("CreateLogStream", &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), nil).Twice() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) tokenP, _ := client.PutLogEvents(putLogEventsInput, defaultRetryCount) svc.AssertExpectations(t) @@ -358,7 +434,7 @@ func TestCreateStream_HappyCase(t *testing.T) { svc.On("CreateLogStream", &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return(new(cloudwatchlogs.CreateLogStreamOutput), nil) - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) token, err := client.CreateStream(&logGroup, &logStreamName) svc.AssertExpectations(t) @@ -375,7 +451,7 @@ func TestCreateStream_CreateLogStream_ResourceAlreadyExists(t *testing.T) { &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return( new(cloudwatchlogs.CreateLogStreamOutput), resourceAlreadyExistsException) - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) token, err := client.CreateStream(&logGroup, &logStreamName) svc.AssertExpectations(t) @@ -400,7 +476,7 @@ func TestCreateStream_CreateLogStream_ResourceNotFound(t *testing.T) { &cloudwatchlogs.CreateLogStreamInput{LogGroupName: &logGroup, LogStreamName: &logStreamName}).Return( new(cloudwatchlogs.CreateLogStreamOutput), nil).Once() - client := newCloudWatchLogClient(svc, logger) + client := newCloudWatchLogClient(svc, 0, logger) token, err := client.CreateStream(&logGroup, &logStreamName) svc.AssertExpectations(t) @@ -487,7 +563,7 @@ func TestUserAgent(t *testing.T) { session, _ := session.NewSession() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, session) + cwlog := NewClient(logger, &aws.Config{}, tc.buildInfo, tc.logGroupName, 0, session) logClient := cwlog.svc.(*cloudwatchlogs.CloudWatchLogs) req := request.New(aws.Config{}, metadata.ClientInfo{}, logClient.Handlers, nil, &request.Operation{