Skip to content

Commit

Permalink
[chore] [processor/resourcedetection] Create ResourceBuilder on start (
Browse files Browse the repository at this point in the history
…open-telemetry#24437)

Create ResourceBuilder during the initialization state, not in the
Detect call. We want to make sure that ResourceBuilder is initialized
only once so we can introduce the same warnings as MetricsBuilder has.
Detect is currently called only once as well, but it's better not to
depend on such behavior, so it can be changed independently in the
future if needed.
  • Loading branch information
dmitryax committed Jul 24, 2023
1 parent c55822f commit 7e3d665
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 272 deletions.
37 changes: 18 additions & 19 deletions processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ const (
var _ internal.Detector = (*Detector)(nil)

type Detector struct {
metadataProvider ec2provider.Provider
tagKeyRegexes []*regexp.Regexp
logger *zap.Logger
resourceAttributes metadata.ResourceAttributesConfig
metadataProvider ec2provider.Provider
tagKeyRegexes []*regexp.Regexp
logger *zap.Logger
rb *metadata.ResourceBuilder
}

func NewDetector(set processor.CreateSettings, dcfg internal.DetectorConfig) (internal.Detector, error) {
Expand All @@ -50,10 +50,10 @@ func NewDetector(set processor.CreateSettings, dcfg internal.DetectorConfig) (in
}

return &Detector{
metadataProvider: ec2provider.NewProvider(sess),
tagKeyRegexes: tagKeyRegexes,
logger: set.Logger,
resourceAttributes: cfg.ResourceAttributes,
metadataProvider: ec2provider.NewProvider(sess),
tagKeyRegexes: tagKeyRegexes,
logger: set.Logger,
rb: metadata.NewResourceBuilder(cfg.ResourceAttributes),
}, nil
}

Expand All @@ -73,17 +73,16 @@ func (d *Detector) Detect(ctx context.Context) (resource pcommon.Resource, schem
return pcommon.NewResource(), "", fmt.Errorf("failed getting hostname: %w", err)
}

rb := metadata.NewResourceBuilder(d.resourceAttributes)
rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSEC2)
rb.SetCloudRegion(meta.Region)
rb.SetCloudAccountID(meta.AccountID)
rb.SetCloudAvailabilityZone(meta.AvailabilityZone)
rb.SetHostID(meta.InstanceID)
rb.SetHostImageID(meta.ImageID)
rb.SetHostType(meta.InstanceType)
rb.SetHostName(hostname)
res := rb.Emit()
d.rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
d.rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSEC2)
d.rb.SetCloudRegion(meta.Region)
d.rb.SetCloudAccountID(meta.AccountID)
d.rb.SetCloudAvailabilityZone(meta.AvailabilityZone)
d.rb.SetHostID(meta.InstanceID)
d.rb.SetHostImageID(meta.ImageID)
d.rb.SetHostType(meta.InstanceType)
d.rb.SetHostName(hostname)
res := d.rb.Emit()

if len(d.tagKeyRegexes) != 0 {
client := getHTTPClientSettings(ctx, d.logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap"

ec2provider "github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders/aws/ec2"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ec2/internal/metadata"
)

var errUnavailable = errors.New("ec2metadata unavailable")
Expand Down Expand Up @@ -172,11 +173,10 @@ func TestDetector_Detect(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resourceAttributes := CreateDefaultConfig().ResourceAttributes
d := &Detector{
metadataProvider: tt.fields.metadataProvider,
logger: zap.NewNop(),
resourceAttributes: resourceAttributes,
metadataProvider: tt.fields.metadataProvider,
logger: zap.NewNop(),
rb: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()),
}
got, _, err := d.Detect(tt.args.ctx)

Expand Down
35 changes: 17 additions & 18 deletions processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ const (
var _ internal.Detector = (*Detector)(nil)

type Detector struct {
provider ecsutil.MetadataProvider
resourceAttributes metadata.ResourceAttributesConfig
provider ecsutil.MetadataProvider
rb *metadata.ResourceBuilder
}

func NewDetector(params processor.CreateSettings, dcfg internal.DetectorConfig) (internal.Detector, error) {
Expand All @@ -43,7 +43,7 @@ func NewDetector(params processor.CreateSettings, dcfg internal.DetectorConfig)
}
return nil, fmt.Errorf("unable to create task metadata provider: %w", err)
}
return &Detector{provider: provider, resourceAttributes: cfg.ResourceAttributes}, nil
return &Detector{provider: provider, rb: metadata.NewResourceBuilder(cfg.ResourceAttributes)}, nil
}

// Detect records metadata retrieved from the ECS Task Metadata Endpoint (TMDE) as resource attributes
Expand All @@ -60,46 +60,45 @@ func (d *Detector) Detect(context.Context) (resource pcommon.Resource, schemaURL
return pcommon.NewResource(), "", fmt.Errorf("unable to fetch task metadata: %w", err)
}

rb := metadata.NewResourceBuilder(d.resourceAttributes)
rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSECS)
rb.SetAwsEcsTaskArn(tmdeResp.TaskARN)
rb.SetAwsEcsTaskFamily(tmdeResp.Family)
rb.SetAwsEcsTaskRevision(tmdeResp.Revision)
d.rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
d.rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSECS)
d.rb.SetAwsEcsTaskArn(tmdeResp.TaskARN)
d.rb.SetAwsEcsTaskFamily(tmdeResp.Family)
d.rb.SetAwsEcsTaskRevision(tmdeResp.Revision)

region, account := parseRegionAndAccount(tmdeResp.TaskARN)
if account != "" {
rb.SetCloudAccountID(account)
d.rb.SetCloudAccountID(account)
}

if region != "" {
rb.SetCloudRegion(region)
d.rb.SetCloudRegion(region)
}

// TMDE returns the cluster short name or ARN, so we need to construct the ARN if necessary
rb.SetAwsEcsClusterArn(constructClusterArn(tmdeResp.Cluster, region, account))
d.rb.SetAwsEcsClusterArn(constructClusterArn(tmdeResp.Cluster, region, account))

if tmdeResp.AvailabilityZone != "" {
rb.SetCloudAvailabilityZone(tmdeResp.AvailabilityZone)
d.rb.SetCloudAvailabilityZone(tmdeResp.AvailabilityZone)
}

// The launch type and log data attributes are only available in TMDE v4
switch lt := strings.ToLower(tmdeResp.LaunchType); lt {
case "ec2":
rb.SetAwsEcsLaunchtype("ec2")
d.rb.SetAwsEcsLaunchtype("ec2")
case "fargate":
rb.SetAwsEcsLaunchtype("fargate")
d.rb.SetAwsEcsLaunchtype("fargate")
}

selfMetaData, err := d.provider.FetchContainerMetadata()

if err != nil || selfMetaData == nil {
return rb.Emit(), "", err
return d.rb.Emit(), "", err
}

addValidLogData(tmdeResp.Containers, selfMetaData, account, rb)
addValidLogData(tmdeResp.Containers, selfMetaData, account, d.rb)

return rb.Emit(), conventions.SchemaURL, nil
return d.rb.Emit(), conventions.SchemaURL, nil
}

func constructClusterArn(cluster, region, account string) string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ func Test_ecsDetectV4(t *testing.T) {
attr.PutEmptySlice("aws.log.stream.names").AppendEmpty().SetStr("stream")
attr.PutEmptySlice("aws.log.stream.arns").AppendEmpty().SetStr("arn:aws:logs:us-east-1:123456789123:log-group:group:log-stream:stream")

resourceAttributes := CreateDefaultConfig().ResourceAttributes
d := Detector{provider: &mockMetaDataProvider{isV4: true}, resourceAttributes: resourceAttributes}
d := Detector{provider: &mockMetaDataProvider{isV4: true}, rb: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig())}
got, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
Expand All @@ -144,8 +143,7 @@ func Test_ecsDetectV3(t *testing.T) {
attr.PutStr("cloud.availability_zone", "us-west-2a")
attr.PutStr("cloud.account.id", "123456789123")

resourceAttributes := CreateDefaultConfig().ResourceAttributes
d := Detector{provider: &mockMetaDataProvider{isV4: false}, resourceAttributes: resourceAttributes}
d := Detector{provider: &mockMetaDataProvider{isV4: false}, rb: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig())}
got, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
Expand Down
28 changes: 16 additions & 12 deletions processor/resourcedetectionprocessor/internal/aws/eks/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ type eksDetectorUtils struct {

// detector for EKS
type detector struct {
utils detectorUtils
logger *zap.Logger
err error
resourceAttributes metadata.ResourceAttributesConfig
utils detectorUtils
logger *zap.Logger
err error
rb *metadata.ResourceBuilder
}

var _ internal.Detector = (*detector)(nil)
Expand All @@ -54,23 +54,27 @@ var _ detectorUtils = (*eksDetectorUtils)(nil)
func NewDetector(set processor.CreateSettings, dcfg internal.DetectorConfig) (internal.Detector, error) {
cfg := dcfg.(Config)
utils, err := newK8sDetectorUtils()
return &detector{utils: utils, logger: set.Logger, err: err, resourceAttributes: cfg.ResourceAttributes}, nil
return &detector{
utils: utils,
logger: set.Logger,
err: err,
rb: metadata.NewResourceBuilder(cfg.ResourceAttributes),
}, nil
}

// Detect returns a Resource describing the Amazon EKS environment being run in.
func (detector *detector) Detect(ctx context.Context) (resource pcommon.Resource, schemaURL string, err error) {
func (d *detector) Detect(ctx context.Context) (resource pcommon.Resource, schemaURL string, err error) {
// Check if running on EKS.
isEKS, err := isEKS(ctx, detector.utils)
isEKS, err := isEKS(ctx, d.utils)
if !isEKS {
detector.logger.Debug("Unable to identify EKS environment", zap.Error(err))
d.logger.Debug("Unable to identify EKS environment", zap.Error(err))
return pcommon.NewResource(), "", err
}

rb := metadata.NewResourceBuilder(detector.resourceAttributes)
rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSEKS)
d.rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
d.rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSEKS)

return rb.Emit(), conventions.SchemaURL, nil
return d.rb.Emit(), conventions.SchemaURL, nil
}

func isEKS(ctx context.Context, utils detectorUtils) (bool, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/processor/processortest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/eks/internal/metadata"
)

type MockDetectorUtils struct {
Expand All @@ -38,8 +40,7 @@ func TestEKS(t *testing.T) {
t.Setenv("KUBERNETES_SERVICE_HOST", "localhost")
detectorUtils.On("getConfigMap", authConfigmapNS, authConfigmapName).Return(map[string]string{"cluster.name": "my-cluster"}, nil)
// Call EKS Resource detector to detect resources
resourceAttributes := CreateDefaultConfig().ResourceAttributes
eksResourceDetector := &detector{utils: detectorUtils, err: nil, resourceAttributes: resourceAttributes}
eksResourceDetector := &detector{utils: detectorUtils, err: nil, rb: metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig())}
res, _, err := eksResourceDetector.Detect(ctx)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ const (
var _ internal.Detector = (*Detector)(nil)

type Detector struct {
fs fileSystem
resourceAttributes metadata.ResourceAttributesConfig
fs fileSystem
rb *metadata.ResourceBuilder
}

type EbMetaData struct {
Expand All @@ -40,7 +40,7 @@ type EbMetaData struct {

func NewDetector(_ processor.CreateSettings, dcfg internal.DetectorConfig) (internal.Detector, error) {
cfg := dcfg.(Config)
return &Detector{fs: &ebFileSystem{}, resourceAttributes: cfg.ResourceAttributes}, nil
return &Detector{fs: &ebFileSystem{}, rb: metadata.NewResourceBuilder(cfg.ResourceAttributes)}, nil
}

func (d Detector) Detect(context.Context) (resource pcommon.Resource, schemaURL string, err error) {
Expand All @@ -67,12 +67,11 @@ func (d Detector) Detect(context.Context) (resource pcommon.Resource, schemaURL
return pcommon.NewResource(), "", err
}

rb := metadata.NewResourceBuilder(d.resourceAttributes)
rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSElasticBeanstalk)
rb.SetServiceInstanceID(strconv.Itoa(ebmd.DeploymentID))
rb.SetDeploymentEnvironment(ebmd.EnvironmentName)
rb.SetServiceVersion(ebmd.VersionLabel)
d.rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
d.rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSElasticBeanstalk)
d.rb.SetServiceInstanceID(strconv.Itoa(ebmd.DeploymentID))
d.rb.SetDeploymentEnvironment(ebmd.EnvironmentName)
d.rb.SetServiceVersion(ebmd.VersionLabel)

return rb.Emit(), conventions.SchemaURL, nil
return d.rb.Emit(), conventions.SchemaURL, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor/processortest"
)
Expand All @@ -37,17 +38,11 @@ func (mfs *mockFileSystem) IsWindows() bool {
return mfs.windows
}

func Test_newDetector(t *testing.T) {
dcfg := CreateDefaultConfig()
d, err := NewDetector(processortest.NewNopCreateSettings(), dcfg)

assert.Nil(t, err)
assert.NotNil(t, d)
}

func Test_windowsPath(t *testing.T) {
mfs := &mockFileSystem{windows: true, exists: true, contents: xrayConf}
d := Detector{fs: mfs}
d, err := NewDetector(processortest.NewNopCreateSettings(), CreateDefaultConfig())
require.NoError(t, err)
d.(*Detector).fs = mfs

r, _, err := d.Detect(context.TODO())

Expand Down Expand Up @@ -79,9 +74,9 @@ func Test_fileMalformed(t *testing.T) {
}

func Test_AttributesDetectedSuccessfully(t *testing.T) {
mfs := &mockFileSystem{exists: true, contents: xrayConf}
resourceAttributes := CreateDefaultConfig().ResourceAttributes
d := Detector{fs: mfs, resourceAttributes: resourceAttributes}
d, err := NewDetector(processortest.NewNopCreateSettings(), CreateDefaultConfig())
require.NoError(t, err)
d.(*Detector).fs = &mockFileSystem{exists: true, contents: xrayConf}

want := pcommon.NewResource()
attr := want.Attributes()
Expand Down
28 changes: 13 additions & 15 deletions processor/resourcedetectionprocessor/internal/aws/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ const (
var _ internal.Detector = (*detector)(nil)

type detector struct {
logger *zap.Logger
resourceAttributes metadata.ResourceAttributesConfig
logger *zap.Logger
rb *metadata.ResourceBuilder
}

func NewDetector(set processor.CreateSettings, dcfg internal.DetectorConfig) (internal.Detector, error) {
cfg := dcfg.(Config)
return &detector{logger: set.Logger, resourceAttributes: cfg.ResourceAttributes}, nil
return &detector{logger: set.Logger, rb: metadata.NewResourceBuilder(cfg.ResourceAttributes)}, nil
}

func (d *detector) Detect(_ context.Context) (resource pcommon.Resource, schemaURL string, err error) {
Expand All @@ -49,38 +49,36 @@ func (d *detector) Detect(_ context.Context) (resource pcommon.Resource, schemaU
return pcommon.NewResource(), "", err
}

rb := metadata.NewResourceBuilder(d.resourceAttributes)

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud.md
rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSLambda)
d.rb.SetCloudProvider(conventions.AttributeCloudProviderAWS)
d.rb.SetCloudPlatform(conventions.AttributeCloudPlatformAWSLambda)
if value, ok := os.LookupEnv(awsRegionEnvVar); ok {
rb.SetCloudRegion(value)
d.rb.SetCloudRegion(value)
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/faas.md
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/instrumentation/aws-lambda.md#resource-detector
rb.SetFaasName(functionName)
d.rb.SetFaasName(functionName)
if value, ok := os.LookupEnv(awsLambdaFunctionVersionEnvVar); ok {
rb.SetFaasVersion(value)
d.rb.SetFaasVersion(value)
}

// Note: The FaaS spec (https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/faas.md)
// recommends setting faas.instance to the full log stream name for AWS Lambda.
if value, ok := os.LookupEnv(awsLambdaLogStreamNameEnvVar); ok {
rb.SetFaasInstance(value)
d.rb.SetFaasInstance(value)
}
if value, ok := os.LookupEnv(awsLambdaFunctionMemorySizeEnvVar); ok {
rb.SetFaasMaxMemory(value)
d.rb.SetFaasMaxMemory(value)
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/logs.md
if value, ok := os.LookupEnv(awsLambdaLogGroupNameEnvVar); ok {
rb.SetAwsLogGroupNames([]any{value})
d.rb.SetAwsLogGroupNames([]any{value})
}
if value, ok := os.LookupEnv(awsLambdaLogStreamNameEnvVar); ok {
rb.SetAwsLogStreamNames([]any{value})
d.rb.SetAwsLogStreamNames([]any{value})
}

return rb.Emit(), conventions.SchemaURL, nil
return d.rb.Emit(), conventions.SchemaURL, nil
}
Loading

0 comments on commit 7e3d665

Please sign in to comment.