forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
exporter.go
146 lines (123 loc) · 3.89 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package awskinesisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter"
import (
"context"
"errors"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/sts"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/producer"
)
// Exporter implements an OpenTelemetry trace exporter that exports all spans to AWS Kinesis
type Exporter struct {
producer producer.Batcher
batcher batch.Encoder
}
// options is used to override the default shipped behavior
// to allow for testing correct setup of components
type options struct {
NewKinesisClient func(conf aws.Config, opts ...func(*kinesis.Options)) *kinesis.Client
}
func createExporter(ctx context.Context, c component.Config, log *zap.Logger, opts ...func(opt *options)) (*Exporter, error) {
options := &options{
NewKinesisClient: kinesis.NewFromConfig,
}
for _, opt := range opts {
opt(options)
}
conf, ok := c.(*Config)
if !ok || conf == nil {
return nil, errors.New("incorrect config provided")
}
var configOpts []func(*awsconfig.LoadOptions) error
if conf.AWS.Region != "" {
configOpts = append(configOpts, func(lo *awsconfig.LoadOptions) error {
lo.Region = conf.AWS.Region
return nil
})
}
awsconf, err := awsconfig.LoadDefaultConfig(ctx, configOpts...)
if err != nil {
return nil, err
}
var kinesisOpts []func(*kinesis.Options)
if conf.AWS.Role != "" {
kinesisOpts = append(kinesisOpts, func(o *kinesis.Options) {
o.Credentials = stscreds.NewAssumeRoleProvider(
sts.NewFromConfig(awsconf),
conf.AWS.Role,
)
})
}
if conf.AWS.KinesisEndpoint != "" {
kinesisOpts = append(kinesisOpts,
func(o *kinesis.Options) {
o.BaseEndpoint = aws.String(conf.AWS.KinesisEndpoint)
},
)
}
producer, err := producer.NewBatcher(
options.NewKinesisClient(awsconf, kinesisOpts...),
conf.AWS.StreamName,
producer.WithLogger(log),
)
if err != nil {
return nil, err
}
compressor, err := compress.NewCompressor(conf.Encoding.Compression)
if err != nil {
return nil, err
}
encoder, err := batch.NewEncoder(
conf.Encoding.Name,
batch.WithMaxRecordSize(conf.MaxRecordSize),
batch.WithMaxRecordsPerBatch(conf.MaxRecordsPerBatch),
batch.WithCompression(compressor),
)
if err != nil {
return nil, err
}
if conf.Encoding.Name == "otlp_json" {
log.Info("otlp_json is considered experimental and should not be used in a production environment")
}
return &Exporter{
producer: producer,
batcher: encoder,
}, nil
}
// start validates that the Kinesis stream is available.
func (e Exporter) start(ctx context.Context, _ component.Host) error {
return e.producer.Ready(ctx)
}
// ConsumeTraces receives a span batch and exports it to AWS Kinesis
func (e Exporter) consumeTraces(ctx context.Context, td ptrace.Traces) error {
bt, err := e.batcher.Traces(td)
if err != nil {
return err
}
return e.producer.Put(ctx, bt)
}
func (e Exporter) consumeMetrics(ctx context.Context, md pmetric.Metrics) error {
bt, err := e.batcher.Metrics(md)
if err != nil {
return err
}
return e.producer.Put(ctx, bt)
}
func (e Exporter) consumeLogs(ctx context.Context, ld plog.Logs) error {
bt, err := e.batcher.Logs(ld)
if err != nil {
return err
}
return e.producer.Put(ctx, bt)
}