Skip to content

Commit

Permalink
resourcedetectionprocessor: populate SchemaURL (#4608)
Browse files Browse the repository at this point in the history
This changes ensures that all resources modified by resourcedetectionprocessor
set the SchemaURL if it is not already set. The assumption is that the code that
detects the resources uses semantic conventions defined in package "conventions".
  • Loading branch information
tigrannajaryan committed Aug 13, 2021
1 parent 22ade2c commit f3ff3e9
Show file tree
Hide file tree
Showing 26 changed files with 142 additions and 101 deletions.
12 changes: 6 additions & 6 deletions processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ func NewDetector(_ component.ProcessorCreateSettings, dcfg internal.DetectorConf
return &Detector{metadataProvider: newMetadataClient(sess), tagKeyRegexes: tagKeyRegexes}, nil
}

func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) {
res := pdata.NewResource()
if !d.metadataProvider.available(ctx) {
return res, nil
return res, "", nil
}

meta, err := d.metadataProvider.get(ctx)
if err != nil {
return res, fmt.Errorf("failed getting identity document: %w", err)
return res, "", fmt.Errorf("failed getting identity document: %w", err)
}

hostname, err := d.metadataProvider.hostname(ctx)
if err != nil {
return res, fmt.Errorf("failed getting hostname: %w", err)
return res, "", fmt.Errorf("failed getting hostname: %w", err)
}

attr := res.Attributes()
Expand All @@ -86,14 +86,14 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
if len(d.tagKeyRegexes) != 0 {
tags, err := connectAndFetchEc2Tags(meta.Region, meta.InstanceID, d.tagKeyRegexes)
if err != nil {
return res, fmt.Errorf("failed fetching ec2 instance tags: %w", err)
return res, "", fmt.Errorf("failed fetching ec2 instance tags: %w", err)
}
for key, val := range tags {
attr.InsertString(tagPrefix+key, val)
}
}

return res, nil
return res, conventions.SchemaURL, nil
}

func connectAndFetchEc2Tags(region string, instanceID string, tagKeyRegexes []*regexp.Regexp) (map[string]string, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestDetector_Detect(t *testing.T) {
d := &Detector{
metadataProvider: tt.fields.metadataProvider,
}
got, err := d.Detect(tt.args.ctx)
got, _, err := d.Detect(tt.args.ctx)

if tt.wantErr {
require.Error(t, err)
Expand Down
10 changes: 5 additions & 5 deletions processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@ func NewDetector(params component.ProcessorCreateSettings, _ internal.DetectorCo

// Detect records metadata retrieved from the ECS Task Metadata Endpoint (TMDE) as resource attributes
// TODO(willarmiros): Replace all attribute fields and enums with values defined in "conventions" once they exist
func (d *Detector) Detect(context.Context) (pdata.Resource, error) {
func (d *Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) {
res := pdata.NewResource()

tmde := getTmdeFromEnv()

// Fail fast if neither env var is present
if tmde == "" {
// TODO: Log a more specific error with zap
return res, nil
return res, "", nil
}

tmdeResp, err := d.provider.fetchTaskMetaData(tmde)

if err != nil || tmdeResp == nil {
return res, err
return res, "", err
}

attr := res.Attributes()
Expand Down Expand Up @@ -101,7 +101,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) {
selfMetaData, err := d.provider.fetchContainerMetaData(tmde)

if err != nil || selfMetaData == nil {
return res, err
return res, "", err
}

logAttributes := [4]string{
Expand All @@ -117,7 +117,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) {
}
}

return res, nil
return res, conventions.SchemaURL, nil
}

func getTmdeFromEnv() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Test_ecsNewDetector(t *testing.T) {
func Test_detectorReturnsIfNoEnvVars(t *testing.T) {
os.Clearenv()
d, _ := NewDetector(componenttest.NewNopProcessorCreateSettings(), nil)
res, err := d.Detect(context.TODO())
res, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
assert.Equal(t, 0, res.Attributes().Len())
Expand Down Expand Up @@ -136,7 +136,7 @@ func Test_ecsDetectV4(t *testing.T) {
}

d := Detector{provider: &mockMetaDataProvider{isV4: true}}
got, err := d.Detect(context.TODO())
got, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
assert.NotNil(t, got)
Expand All @@ -160,7 +160,7 @@ func Test_ecsDetectV3(t *testing.T) {
attr.InsertString("cloud.account.id", "123456789123")

d := Detector{provider: &mockMetaDataProvider{isV4: false}}
got, err := d.Detect(context.TODO())
got, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
assert.NotNil(t, got)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ func NewDetector(_ component.ProcessorCreateSettings, _ internal.DetectorConfig)
}

// Detect returns a Resource describing the Amazon EKS environment being run in.
func (detector *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
func (detector *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) {
res := pdata.NewResource()

// Check if running on k8s.
if os.Getenv(kubernetesServiceHostEnvVar) == "" {
return res, nil
return res, "", nil
}

attr := res.Attributes()
attr.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS)
attr.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAWSEKS)

return res, nil
return res, conventions.SchemaURL, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestEKS(t *testing.T) {

// Call EKS Resource detector to detect resources
eksResourceDetector := &Detector{}
res, err := eksResourceDetector.Detect(ctx)
res, _, err := eksResourceDetector.Detect(ctx)
require.NoError(t, err)

assert.Equal(t, map[string]interface{}{
Expand All @@ -53,7 +53,7 @@ func TestEKS(t *testing.T) {
func TestNotEKS(t *testing.T) {
detector := Detector{}
require.NoError(t, os.Unsetenv("KUBERNETES_SERVICE_HOST"))
r, err := detector.Detect(context.Background())
r, _, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, 0, r.Attributes().Len(), "Resource object should be empty")
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in
return &Detector{fs: &ebFileSystem{}}, nil
}

func (d Detector) Detect(context.Context) (pdata.Resource, error) {
func (d Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) {
res := pdata.NewResource()
var conf io.ReadCloser

var err error
if d.fs.IsWindows() {
conf, err = d.fs.Open(windowsPath)
} else {
Expand All @@ -65,7 +64,7 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) {
// Do not want to return error so it fails silently on non-EB instances
if err != nil {
// TODO: Log a more specific message with zap
return res, nil
return res, "", nil
}

ebmd := &EbMetaData{}
Expand All @@ -74,7 +73,7 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) {

if err != nil {
// TODO: Log a more specific error with zap
return res, err
return res, "", err
}

attr := res.Attributes()
Expand All @@ -83,5 +82,5 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) {
attr.InsertString(conventions.AttributeServiceInstanceID, strconv.Itoa(ebmd.DeploymentID))
attr.InsertString(conventions.AttributeDeploymentEnvironment, ebmd.EnvironmentName)
attr.InsertString(conventions.AttributeServiceVersion, ebmd.VersionLabel)
return res, nil
return res, conventions.SchemaURL, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Test_windowsPath(t *testing.T) {
mfs := &mockFileSystem{windows: true, exists: true, contents: xrayConf}
d := Detector{fs: mfs}

r, err := d.Detect(context.TODO())
r, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
assert.NotNil(t, r)
Expand All @@ -73,7 +73,7 @@ func Test_fileNotExists(t *testing.T) {
mfs := &mockFileSystem{exists: false}
d := Detector{fs: mfs}

r, err := d.Detect(context.TODO())
r, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
assert.NotNil(t, r)
Expand All @@ -84,7 +84,7 @@ func Test_fileMalformed(t *testing.T) {
mfs := &mockFileSystem{exists: true, contents: "some overwritten value"}
d := Detector{fs: mfs}

r, err := d.Detect(context.TODO())
r, _, err := d.Detect(context.TODO())

assert.NotNil(t, err)
assert.NotNil(t, r)
Expand All @@ -103,7 +103,7 @@ func Test_AttributesDetectedSuccessfully(t *testing.T) {
attr.InsertString("service.instance.id", "23")
attr.InsertString("service.version", "env-version-1234")

r, err := d.Detect(context.TODO())
r, _, err := d.Detect(context.TODO())

assert.Nil(t, err)
assert.NotNil(t, r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in
return &Detector{provider: azure.NewProvider()}, nil
}

func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) {
res := pdata.NewResource()

if !onK8s() {
return res, nil
return res, "", nil
}

// If we can't get a response from the metadata endpoint, we're not running in Azure
if !azureMetadataAvailable(ctx, d.provider) {
return res, nil
return res, "", nil
}

attrs := res.Attributes()
attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure)
attrs.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAzureAKS)

return res, nil
return res, conventions.SchemaURL, nil
}

func onK8s() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
conventions "go.opentelemetry.io/collector/translator/conventions/v1.5.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure"
Expand All @@ -38,8 +39,9 @@ func TestDetector_Detect_K8s_Azure(t *testing.T) {
os.Clearenv()
setK8sEnv(t)
detector := &Detector{provider: mockProvider()}
res, err := detector.Detect(context.Background())
res, schemaURL, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, conventions.SchemaURL, schemaURL)
assert.Equal(t, map[string]interface{}{
"cloud.provider": "azure",
"cloud.platform": "azure_aks",
Expand All @@ -52,7 +54,7 @@ func TestDetector_Detect_K8s_NonAzure(t *testing.T) {
mp := &azure.MockProvider{}
mp.On("Metadata").Return(nil, errors.New(""))
detector := &Detector{provider: mp}
res, err := detector.Detect(context.Background())
res, _, err := detector.Detect(context.Background())
require.NoError(t, err)
attrs := res.Attributes()
assert.Equal(t, 0, attrs.Len())
Expand All @@ -61,7 +63,7 @@ func TestDetector_Detect_K8s_NonAzure(t *testing.T) {
func TestDetector_Detect_NonK8s(t *testing.T) {
os.Clearenv()
detector := &Detector{provider: mockProvider()}
res, err := detector.Detect(context.Background())
res, _, err := detector.Detect(context.Background())
require.NoError(t, err)
attrs := res.Attributes()
assert.Equal(t, 0, attrs.Len())
Expand Down
6 changes: 3 additions & 3 deletions processor/resourcedetectionprocessor/internal/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func NewDetector(p component.ProcessorCreateSettings, cfg internal.DetectorConfi
}

// Detect detects system metadata and returns a resource with the available ones
func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) {
res := pdata.NewResource()
attrs := res.Attributes()

compute, err := d.provider.Metadata(ctx)
if err != nil {
d.logger.Debug("Azure detector metadata retrieval failed", zap.Error(err))
// return an empty Resource and no error
return res, nil
return res, "", nil
}

attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure)
Expand All @@ -68,5 +68,5 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
attrs.InsertString("azure.vm.scaleset.name", compute.VMScaleSetName)
attrs.InsertString("azure.resourcegroup.name", compute.ResourceGroupName)

return res, nil
return res, conventions.SchemaURL, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func TestDetectAzureAvailable(t *testing.T) {
}, nil)

detector := &Detector{provider: mp}
res, err := detector.Detect(context.Background())
res, schemaURL, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, conventions.SchemaURL, schemaURL)
mp.AssertExpectations(t)
res.Attributes().Sort()

Expand All @@ -73,7 +74,7 @@ func TestDetectError(t *testing.T) {
mp.On("Metadata").Return(&ComputeMetadata{}, fmt.Errorf("mock error"))

detector := &Detector{provider: mp, logger: zap.NewNop()}
res, err := detector.Detect(context.Background())
res, _, err := detector.Detect(context.Background())
assert.NoError(t, err)
assert.True(t, internal.IsEmptyResource(res))
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ func NewDetector(p component.ProcessorCreateSettings, dcfg internal.DetectorConf
}

// Detect detects system metadata and returns a resource with the available ones
func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) {
res := pdata.NewResource()
attrs := res.Attributes()

osType, err := d.provider.OSType(ctx)
if err != nil {
return res, fmt.Errorf("failed getting OS type: %w", err)
return res, "", fmt.Errorf("failed getting OS type: %w", err)
}

hostname, err := d.provider.Hostname(ctx)
if err != nil {
return res, fmt.Errorf("failed getting OS hostname: %w", err)
return res, "", fmt.Errorf("failed getting OS hostname: %w", err)
}

attrs.InsertString(conventions.AttributeHostName, hostname)
attrs.InsertString(conventions.AttributeOSType, osType)

return res, nil
return res, conventions.SchemaURL, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func TestDetect(t *testing.T) {
md.On("OSType").Return("DARWIN", nil)

detector := &Detector{provider: md, logger: zap.NewNop()}
res, err := detector.Detect(context.Background())
res, schemaURL, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, conventions.SchemaURL, schemaURL)
md.AssertExpectations(t)
res.Attributes().Sort()

Expand Down
Loading

0 comments on commit f3ff3e9

Please sign in to comment.