Skip to content

Commit

Permalink
[exporter/awsemfexporter] add log retention feature for Cloudwatch Lo…
Browse files Browse the repository at this point in the history
…g Group (open-telemetry#15678)

* Add changes for Log Retention

* Add comments to the code

* Update README

* Update CHANGELOG

* Add Log Retention config option to awscloudwatchlogsxporter
  • Loading branch information
humivo authored and shalper2 committed Dec 6, 2022
1 parent 9bc88ee commit 2fac6ec
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 25 deletions.
16 changes: 16 additions & 0 deletions .chloggen/add-log-retention.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awsemfexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added a `log_retention` field to the config to specify log retention policy for a Cloudwatch Log Group

# One or more tracking issues related to the change
issues: [15678]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 2 additions & 0 deletions exporter/awscloudwatchlogsexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The following settings can be optionally configured:

- `region`: The AWS region where the log stream is in.
- `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list.
- `log_retention`: LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653.

### Examples

Expand All @@ -44,6 +45,7 @@ exporters:
log_stream_name: "testing-integrations-stream"
region: "us-east-1"
endpoint: "logs.us-east-1.amazonaws.com"
log_retention: 365
sending_queue:
queue_size: 50
retry_on_failure:
Expand Down
38 changes: 38 additions & 0 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type Config struct {
// Optional.
Endpoint string `mapstructure:"endpoint"`

// LogRetention is the option to set the log retention policy for the CloudWatch Log Group. Defaults to Never Expire if not specified or set to 0
// Possible values are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653
LogRetention int64 `mapstructure:"log_retention"`

// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable due to how AWS CloudWatch API works
QueueSettings QueueSettings `mapstructure:"sending_queue"`
Expand Down Expand Up @@ -71,9 +75,43 @@ func (config *Config) Validate() error {
if config.QueueSettings.QueueSize < 1 {
return errors.New("'sending_queue.queue_size' must be 1 or greater")
}
if !isValidRetentionValue(config.LogRetention) {
return errors.New("invalid value for retention policy. Please make sure to use the following values: 0 (Never Expire), 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653")
}
return nil
}

// Added function to check if value is an accepted number of log retention days
func isValidRetentionValue(input int64) bool {
switch input {
case
0,
1,
3,
5,
7,
14,
30,
60,
90,
120,
150,
180,
365,
400,
545,
731,
1827,
2192,
2557,
2922,
3288,
3653:
return true
}
return false
}

func (config *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
Expand Down
36 changes: 36 additions & 0 deletions exporter/awscloudwatchlogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,39 @@ func TestLoadConfig(t *testing.T) {
})
}
}

func TestRetentionValidateCorrect(t *testing.T) {
defaultRetrySettings := exporterhelper.NewDefaultRetrySettings()
cfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "1")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Endpoint: "",
LogRetention: 365,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
assert.NoError(t, cfg.Validate())

}

func TestRetentionValidateWrong(t *testing.T) {
defaultRetrySettings := exporterhelper.NewDefaultRetrySettings()
wrongcfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "2")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Endpoint: "",
LogRetention: 366,
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
}
assert.Error(t, wrongcfg.Validate())

}
2 changes: 1 addition & 1 deletion exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newCwLogsPusher(expConfig *Config, params component.ExporterCreateSettings)
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, session)
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, session)
collectorIdentifier, err := uuid.NewRandom()

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The following exporter configuration parameters are supported.
|:---------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------- |
| `log_group_name` | Customized log group name which supports `{ClusterName}` and `{TaskId}` placeholders. One valid example is `/aws/metrics/{ClusterName}`. It will search for `ClusterName` (or `aws.ecs.cluster.name`) resource attribute in the metrics data and replace with the actual cluster name. If none of them are found in the resource attribute map, `{ClusterName}` will be replaced by `undefined`. Similar way, for the `{TaskId}`, it searches for `TaskId` (or `aws.ecs.task.id`) key in the resource attribute map. For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`) |"/metrics/default"|
| `log_stream_name` | Customized log stream name which supports `{TaskId}`, `{ClusterName}`, `{NodeName}`, `{ContainerInstanceId}`, and `{TaskDefinitionFamily}` placeholders. One valid example is `{TaskId}`. It will search for `TaskId` (or `aws.ecs.task.id`) resource attribute in the metrics data and replace with the actual task id. If none of them are found in the resource attribute map, `{TaskId}` will be replaced by `undefined`. Similarly, for the `{TaskDefinitionFamily}`, it searches for `TaskDefinitionFamily` (or `aws.ecs.task.family`). For the `{ClusterName}`, it searches for `ClusterName` (or `aws.ecs.cluster.name`). For `{NodeName}`, it searches for `NodeName` (or `k8s.node.name`). For `{ContainerInstanceId}`, it searches for `ContainerInstanceId` (or `aws.ecs.container.instance.id`). (Note: ContainerInstanceId (or `aws.ecs.container.instance.id`) only works for AWS ECS EC2 launch type. |"otel-stream"|
| `log_retention` | LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653. |"Never Expire"|
| `namespace` | Customized CloudWatch metrics namespace | "default" |
| `endpoint` | Optionally override the default CloudWatch service endpoint. | |
| `no_verify_ssl` | Enable or disable TLS certificate verification. | false |
Expand Down
43 changes: 43 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"

import (
"errors"

"go.opentelemetry.io/collector/config"
"go.uber.org/zap"

Expand Down Expand Up @@ -48,6 +50,11 @@ type Config struct {
// "SingleDimensionRollupOnly" - Enable single dimension rollup
// "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions)
DimensionRollupOption string `mapstructure:"dimension_rollup_option"`

// LogRetention is the option to set the log retention policy for the CloudWatch Log Group. Defaults to Never Expire if not specified or set to 0
// Possible values are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653
LogRetention int64 `mapstructure:"log_retention"`

// ParseJSONEncodedAttributeValues is an array of attribute keys whose corresponding values are JSON-encoded as strings.
// Those strings will be decoded to its original json structure.
ParseJSONEncodedAttributeValues []string `mapstructure:"parse_json_encoded_attr_values"`
Expand Down Expand Up @@ -113,9 +120,45 @@ func (config *Config) Validate() error {
}
}
config.MetricDescriptors = validDescriptors

if !isValidRetentionValue(config.LogRetention) {
return errors.New("invalid value for retention policy. Please make sure to use the following values: 0 (Never Expire), 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653")
}

return nil
}

// Added function to check if value is an accepted number of log retention days
func isValidRetentionValue(input int64) bool {
switch input {
case
0,
1,
3,
5,
7,
14,
30,
60,
90,
120,
150,
180,
365,
400,
545,
731,
1827,
2192,
2557,
2922,
3288,
3653:
return true
}
return false
}

func newEMFSupportedUnits() map[string]interface{} {
unitIndexer := map[string]interface{}{}
for _, unit := range []string{"Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes", "Megabytes",
Expand Down
32 changes: 32 additions & 0 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,35 @@ func TestConfigValidate(t *testing.T) {
{unit: "Megabytes", metricName: "memory_usage"},
}, cfg.MetricDescriptors)
}

func TestRetentionValidateCorrect(t *testing.T) {
cfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "1")),
AWSSessionSettings: awsutil.AWSSessionSettings{
RequestTimeoutSeconds: 30,
MaxRetries: 1,
},
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
LogRetention: 365,
ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true},
logger: zap.NewNop(),
}
assert.NoError(t, cfg.Validate())

}

func TestRetentionValidateWrong(t *testing.T) {
wrongcfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "2")),
AWSSessionSettings: awsutil.AWSSessionSettings{
RequestTimeoutSeconds: 30,
MaxRetries: 1,
},
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
LogRetention: 366,
ResourceToTelemetrySettings: resourcetotelemetry.Settings{Enabled: true},
logger: zap.NewNop(),
}
assert.Error(t, wrongcfg.Validate())

}
2 changes: 1 addition & 1 deletion exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newEmfPusher(
}

// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, session)
svcStructuredLog := cwlogs.NewClient(logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, expConfig.LogRetention, session)
collectorIdentifier, _ := uuid.NewRandom()

emfExporter := &emfExporter{
Expand Down
25 changes: 19 additions & 6 deletions internal/aws/cwlogs/cwlog_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,25 @@ const (
// Possible exceptions are combination of common errors (https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/CommonErrors.html)
// and API specific erros (e.g. https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html#API_PutLogEvents_Errors)
type Client struct {
svc cloudwatchlogsiface.CloudWatchLogsAPI
logger *zap.Logger
svc cloudwatchlogsiface.CloudWatchLogsAPI
logRetention int64
logger *zap.Logger
}

// Create a log client based on the actual cloudwatch logs client.
func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logger *zap.Logger) *Client {
func newCloudWatchLogClient(svc cloudwatchlogsiface.CloudWatchLogsAPI, logRetention int64, logger *zap.Logger) *Client {
logClient := &Client{svc: svc,
logger: logger}
logRetention: logRetention,
logger: logger}
return logClient
}

// NewClient create Client
func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, sess *session.Session) *Client {
func NewClient(logger *zap.Logger, awsConfig *aws.Config, buildInfo component.BuildInfo, logGroupName string, logRetention int64, sess *session.Session) *Client {
client := cloudwatchlogs.New(sess, awsConfig)
client.Handlers.Build.PushBackNamed(handler.RequestStructuredLogHandler)
client.Handlers.Build.PushFrontNamed(newCollectorUserAgentHandler(buildInfo, logGroupName))
return newCloudWatchLogClient(client, logger)
return newCloudWatchLogClient(client, logRetention, logger)
}

// PutLogEvents mainly handles different possible error could be returned from server side, and retries them
Expand Down Expand Up @@ -156,6 +158,17 @@ func (client *Client) CreateStream(logGroup, streamName *string) (token string,
LogGroupName: logGroup,
})
if err == nil {
// For newly created log groups, set the log retention polic if specified or non-zero. Otheriwse, set to Never Expire
if client.logRetention != 0 {
_, err = client.svc.PutRetentionPolicy(&cloudwatchlogs.PutRetentionPolicyInput{LogGroupName: logGroup, RetentionInDays: &client.logRetention})
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
client.logger.Debug("CreateLogStream / CreateLogGroup has errors related to log retention policy.", zap.String("LogGroupName", *logGroup), zap.String("LogStreamName", *streamName), zap.Error(e))
return token, err
}
}
}
_, err = client.svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: logGroup,
LogStreamName: streamName,
Expand Down
Loading

0 comments on commit 2fac6ec

Please sign in to comment.