From f3ff3e9fd43fc86f0c33648f1b7f02e60e1863da Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Fri, 13 Aug 2021 11:54:36 +0400 Subject: [PATCH] resourcedetectionprocessor: populate SchemaURL (#4608) This changes ensures that all resources modified by resourcedetectionprocessor set the SchemaURL if it is not already set. The assumption is that the code that detects the resources uses semantic conventions defined in package "conventions". --- .../internal/aws/ec2/ec2.go | 12 +++---- .../internal/aws/ec2/ec2_test.go | 2 +- .../internal/aws/ecs/ecs.go | 10 +++--- .../internal/aws/ecs/ecs_test.go | 6 ++-- .../internal/aws/eks/detector.go | 6 ++-- .../internal/aws/eks/detector_test.go | 4 +-- .../aws/elasticbeanstalk/elasticbeanstalk.go | 9 +++--- .../elasticbeanstalk/elasticbeanstalk_test.go | 8 ++--- .../internal/azure/aks/aks.go | 8 ++--- .../internal/azure/aks/aks_test.go | 8 +++-- .../internal/azure/azure.go | 6 ++-- .../internal/azure/azure_test.go | 5 +-- .../internal/docker/docker.go | 8 ++--- .../internal/docker/docker_test.go | 3 +- .../internal/env/env.go | 10 +++--- .../internal/env/env_test.go | 12 ++++--- .../internal/gcp/gce/gce.go | 6 ++-- .../internal/gcp/gce/gce_test.go | 7 +++-- .../internal/gcp/gke/gke.go | 8 ++--- .../internal/gcp/gke/gke_test.go | 11 ++++--- .../internal/resourcedetection.go | 31 +++++++++++++++---- .../internal/resourcedetection_test.go | 14 ++++----- .../internal/system/system.go | 8 ++--- .../internal/system/system_test.go | 12 ++++--- .../resourcedetection_processor.go | 25 +++++++++------ .../resourcedetection_processor_test.go | 4 +-- 26 files changed, 142 insertions(+), 101 deletions(-) diff --git a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go index 54b64608911fd..74c8ebc1d4cac 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go +++ b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go @@ -56,20 +56,20 @@ func NewDetector(_ component.ProcessorCreateSettings, dcfg internal.DetectorConf return &Detector{metadataProvider: newMetadataClient(sess), tagKeyRegexes: tagKeyRegexes}, nil } -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() if !d.metadataProvider.available(ctx) { - return res, nil + return res, "", nil } meta, err := d.metadataProvider.get(ctx) if err != nil { - return res, fmt.Errorf("failed getting identity document: %w", err) + return res, "", fmt.Errorf("failed getting identity document: %w", err) } hostname, err := d.metadataProvider.hostname(ctx) if err != nil { - return res, fmt.Errorf("failed getting hostname: %w", err) + return res, "", fmt.Errorf("failed getting hostname: %w", err) } attr := res.Attributes() @@ -86,14 +86,14 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { if len(d.tagKeyRegexes) != 0 { tags, err := connectAndFetchEc2Tags(meta.Region, meta.InstanceID, d.tagKeyRegexes) if err != nil { - return res, fmt.Errorf("failed fetching ec2 instance tags: %w", err) + return res, "", fmt.Errorf("failed fetching ec2 instance tags: %w", err) } for key, val := range tags { attr.InsertString(tagPrefix+key, val) } } - return res, nil + return res, conventions.SchemaURL, nil } func connectAndFetchEc2Tags(region string, instanceID string, tagKeyRegexes []*regexp.Regexp) (map[string]string, error) { diff --git a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go index 506de323657dd..03fa46dd18a89 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go @@ -186,7 +186,7 @@ func TestDetector_Detect(t *testing.T) { d := &Detector{ metadataProvider: tt.fields.metadataProvider, } - got, err := d.Detect(tt.args.ctx) + got, _, err := d.Detect(tt.args.ctx) if tt.wantErr { require.Error(t, err) diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go index d3d1bdb924315..c30baae902dc9 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go @@ -48,7 +48,7 @@ func NewDetector(params component.ProcessorCreateSettings, _ internal.DetectorCo // Detect records metadata retrieved from the ECS Task Metadata Endpoint (TMDE) as resource attributes // TODO(willarmiros): Replace all attribute fields and enums with values defined in "conventions" once they exist -func (d *Detector) Detect(context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() tmde := getTmdeFromEnv() @@ -56,13 +56,13 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { // Fail fast if neither env var is present if tmde == "" { // TODO: Log a more specific error with zap - return res, nil + return res, "", nil } tmdeResp, err := d.provider.fetchTaskMetaData(tmde) if err != nil || tmdeResp == nil { - return res, err + return res, "", err } attr := res.Attributes() @@ -101,7 +101,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { selfMetaData, err := d.provider.fetchContainerMetaData(tmde) if err != nil || selfMetaData == nil { - return res, err + return res, "", err } logAttributes := [4]string{ @@ -117,7 +117,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { } } - return res, nil + return res, conventions.SchemaURL, nil } func getTmdeFromEnv() string { diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go index 42849770a170f..e697dac050765 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go @@ -67,7 +67,7 @@ func Test_ecsNewDetector(t *testing.T) { func Test_detectorReturnsIfNoEnvVars(t *testing.T) { os.Clearenv() d, _ := NewDetector(componenttest.NewNopProcessorCreateSettings(), nil) - res, err := d.Detect(context.TODO()) + res, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.Equal(t, 0, res.Attributes().Len()) @@ -136,7 +136,7 @@ func Test_ecsDetectV4(t *testing.T) { } d := Detector{provider: &mockMetaDataProvider{isV4: true}} - got, err := d.Detect(context.TODO()) + got, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, got) @@ -160,7 +160,7 @@ func Test_ecsDetectV3(t *testing.T) { attr.InsertString("cloud.account.id", "123456789123") d := Detector{provider: &mockMetaDataProvider{isV4: false}} - got, err := d.Detect(context.TODO()) + got, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, got) diff --git a/processor/resourcedetectionprocessor/internal/aws/eks/detector.go b/processor/resourcedetectionprocessor/internal/aws/eks/detector.go index a71e88789ca6f..74daa2ec73347 100644 --- a/processor/resourcedetectionprocessor/internal/aws/eks/detector.go +++ b/processor/resourcedetectionprocessor/internal/aws/eks/detector.go @@ -44,17 +44,17 @@ func NewDetector(_ component.ProcessorCreateSettings, _ internal.DetectorConfig) } // Detect returns a Resource describing the Amazon EKS environment being run in. -func (detector *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (detector *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() // Check if running on k8s. if os.Getenv(kubernetesServiceHostEnvVar) == "" { - return res, nil + return res, "", nil } attr := res.Attributes() attr.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) attr.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAWSEKS) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go b/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go index 778a6d451c523..2bc5d7e02733f 100644 --- a/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go @@ -40,7 +40,7 @@ func TestEKS(t *testing.T) { // Call EKS Resource detector to detect resources eksResourceDetector := &Detector{} - res, err := eksResourceDetector.Detect(ctx) + res, _, err := eksResourceDetector.Detect(ctx) require.NoError(t, err) assert.Equal(t, map[string]interface{}{ @@ -53,7 +53,7 @@ func TestEKS(t *testing.T) { func TestNotEKS(t *testing.T) { detector := Detector{} require.NoError(t, os.Unsetenv("KUBERNETES_SERVICE_HOST")) - r, err := detector.Detect(context.Background()) + r, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.Equal(t, 0, r.Attributes().Len(), "Resource object should be empty") } diff --git a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go index 405024412d5a2..938469ac657e0 100644 --- a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go +++ b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go @@ -51,11 +51,10 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{fs: &ebFileSystem{}}, nil } -func (d Detector) Detect(context.Context) (pdata.Resource, error) { +func (d Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() var conf io.ReadCloser - var err error if d.fs.IsWindows() { conf, err = d.fs.Open(windowsPath) } else { @@ -65,7 +64,7 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) { // Do not want to return error so it fails silently on non-EB instances if err != nil { // TODO: Log a more specific message with zap - return res, nil + return res, "", nil } ebmd := &EbMetaData{} @@ -74,7 +73,7 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) { if err != nil { // TODO: Log a more specific error with zap - return res, err + return res, "", err } attr := res.Attributes() @@ -83,5 +82,5 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) { attr.InsertString(conventions.AttributeServiceInstanceID, strconv.Itoa(ebmd.DeploymentID)) attr.InsertString(conventions.AttributeDeploymentEnvironment, ebmd.EnvironmentName) attr.InsertString(conventions.AttributeServiceVersion, ebmd.VersionLabel) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go index 6a8370472f7ff..9bb819f5f4d62 100644 --- a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go @@ -62,7 +62,7 @@ func Test_windowsPath(t *testing.T) { mfs := &mockFileSystem{windows: true, exists: true, contents: xrayConf} d := Detector{fs: mfs} - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, r) @@ -73,7 +73,7 @@ func Test_fileNotExists(t *testing.T) { mfs := &mockFileSystem{exists: false} d := Detector{fs: mfs} - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, r) @@ -84,7 +84,7 @@ func Test_fileMalformed(t *testing.T) { mfs := &mockFileSystem{exists: true, contents: "some overwritten value"} d := Detector{fs: mfs} - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.NotNil(t, err) assert.NotNil(t, r) @@ -103,7 +103,7 @@ func Test_AttributesDetectedSuccessfully(t *testing.T) { attr.InsertString("service.instance.id", "23") attr.InsertString("service.version", "env-version-1234") - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, r) diff --git a/processor/resourcedetectionprocessor/internal/azure/aks/aks.go b/processor/resourcedetectionprocessor/internal/azure/aks/aks.go index 8101c96996f66..25cf45d8088eb 100644 --- a/processor/resourcedetectionprocessor/internal/azure/aks/aks.go +++ b/processor/resourcedetectionprocessor/internal/azure/aks/aks.go @@ -43,23 +43,23 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{provider: azure.NewProvider()}, nil } -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() if !onK8s() { - return res, nil + return res, "", nil } // If we can't get a response from the metadata endpoint, we're not running in Azure if !azureMetadataAvailable(ctx, d.provider) { - return res, nil + return res, "", nil } attrs := res.Attributes() attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure) attrs.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAzureAKS) - return res, nil + return res, conventions.SchemaURL, nil } func onK8s() bool { diff --git a/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go b/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go index d856c0d16416a..f9ce6486404a9 100644 --- a/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go +++ b/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + conventions "go.opentelemetry.io/collector/translator/conventions/v1.5.0" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure" @@ -38,8 +39,9 @@ func TestDetector_Detect_K8s_Azure(t *testing.T) { os.Clearenv() setK8sEnv(t) detector := &Detector{provider: mockProvider()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) assert.Equal(t, map[string]interface{}{ "cloud.provider": "azure", "cloud.platform": "azure_aks", @@ -52,7 +54,7 @@ func TestDetector_Detect_K8s_NonAzure(t *testing.T) { mp := &azure.MockProvider{} mp.On("Metadata").Return(nil, errors.New("")) detector := &Detector{provider: mp} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) attrs := res.Attributes() assert.Equal(t, 0, attrs.Len()) @@ -61,7 +63,7 @@ func TestDetector_Detect_K8s_NonAzure(t *testing.T) { func TestDetector_Detect_NonK8s(t *testing.T) { os.Clearenv() detector := &Detector{provider: mockProvider()} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) attrs := res.Attributes() assert.Equal(t, 0, attrs.Len()) diff --git a/processor/resourcedetectionprocessor/internal/azure/azure.go b/processor/resourcedetectionprocessor/internal/azure/azure.go index 8e4b56668d54c..9f2f881667941 100644 --- a/processor/resourcedetectionprocessor/internal/azure/azure.go +++ b/processor/resourcedetectionprocessor/internal/azure/azure.go @@ -47,7 +47,7 @@ func NewDetector(p component.ProcessorCreateSettings, cfg internal.DetectorConfi } // Detect detects system metadata and returns a resource with the available ones -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() attrs := res.Attributes() @@ -55,7 +55,7 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { if err != nil { d.logger.Debug("Azure detector metadata retrieval failed", zap.Error(err)) // return an empty Resource and no error - return res, nil + return res, "", nil } attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure) @@ -68,5 +68,5 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { attrs.InsertString("azure.vm.scaleset.name", compute.VMScaleSetName) attrs.InsertString("azure.resourcegroup.name", compute.ResourceGroupName) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/azure/azure_test.go b/processor/resourcedetectionprocessor/internal/azure/azure_test.go index 3d899918aa89e..08b0f84bae629 100644 --- a/processor/resourcedetectionprocessor/internal/azure/azure_test.go +++ b/processor/resourcedetectionprocessor/internal/azure/azure_test.go @@ -47,8 +47,9 @@ func TestDetectAzureAvailable(t *testing.T) { }, nil) detector := &Detector{provider: mp} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) mp.AssertExpectations(t) res.Attributes().Sort() @@ -73,7 +74,7 @@ func TestDetectError(t *testing.T) { mp.On("Metadata").Return(&ComputeMetadata{}, fmt.Errorf("mock error")) detector := &Detector{provider: mp, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) assert.NoError(t, err) assert.True(t, internal.IsEmptyResource(res)) } diff --git a/processor/resourcedetectionprocessor/internal/docker/docker.go b/processor/resourcedetectionprocessor/internal/docker/docker.go index 032c761eeedf5..501ec39f8ece4 100644 --- a/processor/resourcedetectionprocessor/internal/docker/docker.go +++ b/processor/resourcedetectionprocessor/internal/docker/docker.go @@ -50,22 +50,22 @@ func NewDetector(p component.ProcessorCreateSettings, dcfg internal.DetectorConf } // Detect detects system metadata and returns a resource with the available ones -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() attrs := res.Attributes() osType, err := d.provider.OSType(ctx) if err != nil { - return res, fmt.Errorf("failed getting OS type: %w", err) + return res, "", fmt.Errorf("failed getting OS type: %w", err) } hostname, err := d.provider.Hostname(ctx) if err != nil { - return res, fmt.Errorf("failed getting OS hostname: %w", err) + return res, "", fmt.Errorf("failed getting OS hostname: %w", err) } attrs.InsertString(conventions.AttributeHostName, hostname) attrs.InsertString(conventions.AttributeOSType, osType) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/docker/docker_test.go b/processor/resourcedetectionprocessor/internal/docker/docker_test.go index 3d1c91f5ea5bf..e94584f1ab87b 100644 --- a/processor/resourcedetectionprocessor/internal/docker/docker_test.go +++ b/processor/resourcedetectionprocessor/internal/docker/docker_test.go @@ -47,8 +47,9 @@ func TestDetect(t *testing.T) { md.On("OSType").Return("DARWIN", nil) detector := &Detector{provider: md, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) md.AssertExpectations(t) res.Attributes().Sort() diff --git a/processor/resourcedetectionprocessor/internal/env/env.go b/processor/resourcedetectionprocessor/internal/env/env.go index e9c7c057cf57f..1f73a23f69e06 100644 --- a/processor/resourcedetectionprocessor/internal/env/env.go +++ b/processor/resourcedetectionprocessor/internal/env/env.go @@ -52,24 +52,24 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{}, nil } -func (d *Detector) Detect(context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() labels := strings.TrimSpace(os.Getenv(envVar)) if labels == "" { labels = strings.TrimSpace(os.Getenv(deprecatedEnvVar)) if labels == "" { - return res, nil + return res, "", nil } } - err := initializeAttributeMap(res.Attributes(), labels) + err = initializeAttributeMap(res.Attributes(), labels) if err != nil { res.Attributes().Clear() - return res, err + return res, "", err } - return res, nil + return res, "", nil } // labelRegex matches any key=value pair including a trailing comma or the end of the diff --git a/processor/resourcedetectionprocessor/internal/env/env_test.go b/processor/resourcedetectionprocessor/internal/env/env_test.go index cd742e5058807..88384f72cf76e 100644 --- a/processor/resourcedetectionprocessor/internal/env/env_test.go +++ b/processor/resourcedetectionprocessor/internal/env/env_test.go @@ -37,7 +37,8 @@ func TestDetectTrue(t *testing.T) { os.Setenv(envVar, "key=value") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) + assert.Equal(t, "", schemaURL) require.NoError(t, err) assert.Equal(t, internal.NewResource(map[string]interface{}{"key": "value"}), res) } @@ -46,8 +47,9 @@ func TestDetectFalse(t *testing.T) { os.Setenv(envVar, "") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) } @@ -56,8 +58,9 @@ func TestDetectDeprecatedEnv(t *testing.T) { os.Setenv(deprecatedEnvVar, "key=value") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, "", schemaURL) assert.Equal(t, internal.NewResource(map[string]interface{}{"key": "value"}), res) } @@ -65,8 +68,9 @@ func TestDetectError(t *testing.T) { os.Setenv(envVar, "key=value,key") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) assert.Error(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) } diff --git a/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go b/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go index 2e90410cd1e29..9201b0bed10cb 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go @@ -41,11 +41,11 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{metadata: &gcp.MetadataImpl{}}, nil } -func (d *Detector) Detect(context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() if !d.metadata.OnGCE() { - return res, nil + return res, "", nil } attr := res.Attributes() @@ -53,7 +53,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { var errors []error errors = append(errors, d.initializeCloudAttributes(attr)...) errors = append(errors, d.initializeHostAttributes(attr)...) - return res, consumererror.Combine(errors) + return res, conventions.SchemaURL, consumererror.Combine(errors) } func (d *Detector) initializeCloudAttributes(attr pdata.AttributeMap) []error { diff --git a/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go b/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go index 893b4ea3a09c7..7cf316e7e1860 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go @@ -45,9 +45,10 @@ func TestDetectTrue(t *testing.T) { md.On("Get", "instance/machine-type").Return("machine-type", nil) detector := &Detector{metadata: md} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) expected := internal.NewResource(map[string]interface{}{ conventions.AttributeCloudProvider: conventions.AttributeCloudProviderGCP, @@ -70,7 +71,7 @@ func TestDetectFalse(t *testing.T) { md.On("OnGCE").Return(false) detector := &Detector{metadata: md} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.True(t, internal.IsEmptyResource(res)) @@ -87,7 +88,7 @@ func TestDetectError(t *testing.T) { md.On("Get", "instance/machine-type").Return("", errors.New("err6")) detector := &Detector{metadata: md} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) assert.EqualError(t, err, "[err1; err2; err3; err4; err6]") diff --git a/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go b/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go index 36e90f5c55343..3889e91b9f0dd 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go @@ -50,12 +50,12 @@ func NewDetector(params component.ProcessorCreateSettings, _ internal.DetectorCo } // Detect detects associated resources when running in GKE environment. -func (gke *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (gke *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() // Check if on GCP. if !gke.metadata.OnGCE() { - return res, nil + return res, "", nil } attr := res.Attributes() @@ -63,7 +63,7 @@ func (gke *Detector) Detect(ctx context.Context) (pdata.Resource, error) { // Check if running on k8s. if os.Getenv(kubernetesServiceHostEnvVar) == "" { - return res, nil + return res, "", nil } attr.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformGCPKubernetesEngine) @@ -74,5 +74,5 @@ func (gke *Detector) Detect(ctx context.Context) (pdata.Resource, error) { attr.InsertString(conventions.AttributeK8SClusterName, clusterName) } - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go b/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go index baca0af2e6636..d5fc3695434b3 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + conventions "go.opentelemetry.io/collector/translator/conventions/v1.5.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" @@ -37,7 +38,7 @@ func TestNotGCE(t *testing.T) { } metadata.On("OnGCE").Return(false) - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.Equal(t, 0, res.Attributes().Len()) @@ -56,8 +57,9 @@ func TestDetectWithoutCluster(t *testing.T) { require.NoError(t, os.Setenv("KUBERNETES_SERVICE_HOST", "localhost")) - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) assert.Equal(t, map[string]interface{}{ "cloud.provider": "gcp", @@ -78,7 +80,7 @@ func TestDetectWithoutK8s(t *testing.T) { require.NoError(t, os.Unsetenv("KUBERNETES_SERVICE_HOST")) - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.Equal(t, map[string]interface{}{ @@ -100,8 +102,9 @@ func TestDetector_Detect(t *testing.T) { require.NoError(t, os.Setenv("KUBERNETES_SERVICE_HOST", "localhost")) - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) assert.Equal(t, map[string]interface{}{ "cloud.provider": "gcp", diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index e66e6b19a207d..fabdbeb91f94e 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -31,7 +31,7 @@ import ( type DetectorType string type Detector interface { - Detect(ctx context.Context) (pdata.Resource, error) + Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) } type DetectorConfig interface{} @@ -93,8 +93,9 @@ type ResourceProvider struct { } type resourceResult struct { - resource pdata.Resource - err error + resource pdata.Resource + schemaURL string + err error } func NewResourceProvider(logger *zap.Logger, timeout time.Duration, detectors ...Detector) *ResourceProvider { @@ -105,7 +106,7 @@ func NewResourceProvider(logger *zap.Logger, timeout time.Duration, detectors .. } } -func (p *ResourceProvider) Get(ctx context.Context) (pdata.Resource, error) { +func (p *ResourceProvider) Get(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { p.once.Do(func() { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, p.timeout) @@ -113,29 +114,32 @@ func (p *ResourceProvider) Get(ctx context.Context) (pdata.Resource, error) { p.detectResource(ctx) }) - return p.detectedResource.resource, p.detectedResource.err + return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err } func (p *ResourceProvider) detectResource(ctx context.Context) { p.detectedResource = &resourceResult{} res := pdata.NewResource() + mergedSchemaURL := "" p.logger.Info("began detecting resource information") for _, detector := range p.detectors { - r, err := detector.Detect(ctx) + r, schemaURL, err := detector.Detect(ctx) if err != nil { p.detectedResource.err = err return } + mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL) MergeResource(res, r, false) } p.logger.Info("detected resource information", zap.Any("resource", AttributesToMap(res.Attributes()))) p.detectedResource.resource = res + p.detectedResource.schemaURL = mergedSchemaURL } func AttributesToMap(am pdata.AttributeMap) map[string]interface{} { @@ -175,6 +179,21 @@ func getSerializableArray(inArr pdata.AnyValueArray) []interface{} { return outArr } +func MergeSchemaURL(currentSchemaURL string, newSchemaURL string) string { + if currentSchemaURL == "" { + return newSchemaURL + } + if newSchemaURL == "" { + return currentSchemaURL + } + if currentSchemaURL == newSchemaURL { + return currentSchemaURL + } + // TODO: handle the case when the schema URLs are different by performing + // schema conversion. For now we simply ignore the new schema URL. + return currentSchemaURL +} + func MergeResource(to, from pdata.Resource, overrideTo bool) { if IsEmptyResource(from) { return diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go index 381f86e3f79da..8e5f62051ffe3 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go @@ -35,9 +35,9 @@ type MockDetector struct { mock.Mock } -func (p *MockDetector) Detect(ctx context.Context) (pdata.Resource, error) { +func (p *MockDetector) Detect(ctx context.Context) (pdata.Resource, string, error) { args := p.Called() - return args.Get(0).(pdata.Resource), args.Error(1) + return args.Get(0).(pdata.Resource), "", args.Error(1) } type mockDetectorConfig struct{} @@ -99,7 +99,7 @@ func TestDetect(t *testing.T) { p, err := f.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorTypes...) require.NoError(t, err) - got, err := p.Get(context.Background()) + got, _, err := p.Get(context.Background()) require.NoError(t, err) tt.expectedResource.Attributes().Sort() @@ -135,7 +135,7 @@ func TestDetectResource_Error(t *testing.T) { md2.On("Detect").Return(pdata.NewResource(), errors.New("err1")) p := NewResourceProvider(zap.NewNop(), time.Second, md1, md2) - _, err := p.Get(context.Background()) + _, _, err := p.Get(context.Background()) require.EqualError(t, err, "err1") } @@ -181,10 +181,10 @@ func NewMockParallelDetector() *MockParallelDetector { return &MockParallelDetector{ch: make(chan struct{})} } -func (p *MockParallelDetector) Detect(ctx context.Context) (pdata.Resource, error) { +func (p *MockParallelDetector) Detect(ctx context.Context) (pdata.Resource, string, error) { <-p.ch args := p.Called() - return args.Get(0).(pdata.Resource), args.Error(1) + return args.Get(0).(pdata.Resource), "", args.Error(1) } // TestDetectResource_Parallel validates that Detect is only called once, even if there @@ -209,7 +209,7 @@ func TestDetectResource_Parallel(t *testing.T) { for i := 0; i < iterations; i++ { go func() { defer wg.Done() - _, err := p.Get(context.Background()) + _, _, err := p.Get(context.Background()) require.NoError(t, err) }() } diff --git a/processor/resourcedetectionprocessor/internal/system/system.go b/processor/resourcedetectionprocessor/internal/system/system.go index 5ef6f3e67fe81..f6394a484e283 100644 --- a/processor/resourcedetectionprocessor/internal/system/system.go +++ b/processor/resourcedetectionprocessor/internal/system/system.go @@ -45,13 +45,13 @@ func NewDetector(p component.ProcessorCreateSettings, _ internal.DetectorConfig) } // Detect detects system metadata and returns a resource with the available ones -func (d *Detector) Detect(_ context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(_ context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() attrs := res.Attributes() osType, err := d.provider.OSType() if err != nil { - return res, fmt.Errorf("failed getting OS type: %w", err) + return res, "", fmt.Errorf("failed getting OS type: %w", err) } hostname, err := d.provider.FQDN() @@ -60,12 +60,12 @@ func (d *Detector) Detect(_ context.Context) (pdata.Resource, error) { d.logger.Debug("FQDN query failed, falling back to OS hostname", zap.Error(err)) hostname, err = d.provider.Hostname() if err != nil { - return res, fmt.Errorf("failed getting OS hostname: %w", err) + return res, "", fmt.Errorf("failed getting OS hostname: %w", err) } } attrs.InsertString(conventions.AttributeHostName, hostname) attrs.InsertString(conventions.AttributeOSType, osType) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/system/system_test.go b/processor/resourcedetectionprocessor/internal/system/system_test.go index c9eb540c66644..c999d7a62746b 100644 --- a/processor/resourcedetectionprocessor/internal/system/system_test.go +++ b/processor/resourcedetectionprocessor/internal/system/system_test.go @@ -60,8 +60,9 @@ func TestDetectFQDNAvailable(t *testing.T) { md.On("OSType").Return("DARWIN", nil) detector := &Detector{provider: md, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) md.AssertExpectations(t) res.Attributes().Sort() @@ -82,8 +83,9 @@ func TestFallbackHostname(t *testing.T) { mdHostname.On("OSType").Return("DARWIN", nil) detector := &Detector{provider: mdHostname, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) mdHostname.AssertExpectations(t) res.Attributes().Sort() @@ -104,8 +106,9 @@ func TestDetectError(t *testing.T) { mdFQDN.On("Hostname").Return("", errors.New("err")) detector := &Detector{provider: mdFQDN, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) assert.Error(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) // OS type fails @@ -114,7 +117,8 @@ func TestDetectError(t *testing.T) { mdOSType.On("OSType").Return("", errors.New("err")) detector = &Detector{provider: mdOSType, logger: zap.NewNop()} - res, err = detector.Detect(context.Background()) + res, schemaURL, err = detector.Detect(context.Background()) assert.Error(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) } diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor.go b/processor/resourcedetectionprocessor/resourcedetection_processor.go index 2f2b974a99c5f..7003f825b8cb6 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor.go @@ -24,15 +24,16 @@ import ( ) type resourceDetectionProcessor struct { - provider *internal.ResourceProvider - resource pdata.Resource - override bool + provider *internal.ResourceProvider + resource pdata.Resource + schemaURL string + override bool } // Start is invoked during service startup. func (rdp *resourceDetectionProcessor) Start(ctx context.Context, _ component.Host) error { var err error - rdp.resource, err = rdp.provider.Get(ctx) + rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx) return err } @@ -40,7 +41,9 @@ func (rdp *resourceDetectionProcessor) Start(ctx context.Context, _ component.Ho func (rdp *resourceDetectionProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { rs := td.ResourceSpans() for i := 0; i < rs.Len(); i++ { - res := rs.At(i).Resource() + rss := rs.At(i) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) + res := rss.Resource() internal.MergeResource(res, rdp.resource, rdp.override) } return td, nil @@ -50,7 +53,9 @@ func (rdp *resourceDetectionProcessor) processTraces(_ context.Context, td pdata func (rdp *resourceDetectionProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { rm := md.ResourceMetrics() for i := 0; i < rm.Len(); i++ { - res := rm.At(i).Resource() + rss := rm.At(i) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) + res := rss.Resource() internal.MergeResource(res, rdp.resource, rdp.override) } return md, nil @@ -58,9 +63,11 @@ func (rdp *resourceDetectionProcessor) processMetrics(_ context.Context, md pdat // processLogs implements the ProcessLogsFunc type. func (rdp *resourceDetectionProcessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) { - rls := ld.ResourceLogs() - for i := 0; i < rls.Len(); i++ { - res := rls.At(i).Resource() + rl := ld.ResourceLogs() + for i := 0; i < rl.Len(); i++ { + rss := rl.At(i) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) + res := rss.Resource() internal.MergeResource(res, rdp.resource, rdp.override) } return ld, nil diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go index 7d74cc3104920..38adfb064e71a 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go @@ -40,9 +40,9 @@ type MockDetector struct { mock.Mock } -func (p *MockDetector) Detect(ctx context.Context) (pdata.Resource, error) { +func (p *MockDetector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { args := p.Called() - return args.Get(0).(pdata.Resource), args.Error(1) + return args.Get(0).(pdata.Resource), "", args.Error(1) } func TestResourceProcessor(t *testing.T) {