diff --git a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go index 54b64608911fd..74c8ebc1d4cac 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go +++ b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2.go @@ -56,20 +56,20 @@ func NewDetector(_ component.ProcessorCreateSettings, dcfg internal.DetectorConf return &Detector{metadataProvider: newMetadataClient(sess), tagKeyRegexes: tagKeyRegexes}, nil } -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() if !d.metadataProvider.available(ctx) { - return res, nil + return res, "", nil } meta, err := d.metadataProvider.get(ctx) if err != nil { - return res, fmt.Errorf("failed getting identity document: %w", err) + return res, "", fmt.Errorf("failed getting identity document: %w", err) } hostname, err := d.metadataProvider.hostname(ctx) if err != nil { - return res, fmt.Errorf("failed getting hostname: %w", err) + return res, "", fmt.Errorf("failed getting hostname: %w", err) } attr := res.Attributes() @@ -86,14 +86,14 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { if len(d.tagKeyRegexes) != 0 { tags, err := connectAndFetchEc2Tags(meta.Region, meta.InstanceID, d.tagKeyRegexes) if err != nil { - return res, fmt.Errorf("failed fetching ec2 instance tags: %w", err) + return res, "", fmt.Errorf("failed fetching ec2 instance tags: %w", err) } for key, val := range tags { attr.InsertString(tagPrefix+key, val) } } - return res, nil + return res, conventions.SchemaURL, nil } func connectAndFetchEc2Tags(region string, instanceID string, tagKeyRegexes []*regexp.Regexp) (map[string]string, error) { diff --git a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go index 506de323657dd..03fa46dd18a89 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/ec2/ec2_test.go @@ -186,7 +186,7 @@ func TestDetector_Detect(t *testing.T) { d := &Detector{ metadataProvider: tt.fields.metadataProvider, } - got, err := d.Detect(tt.args.ctx) + got, _, err := d.Detect(tt.args.ctx) if tt.wantErr { require.Error(t, err) diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go index d3d1bdb924315..c30baae902dc9 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go @@ -48,7 +48,7 @@ func NewDetector(params component.ProcessorCreateSettings, _ internal.DetectorCo // Detect records metadata retrieved from the ECS Task Metadata Endpoint (TMDE) as resource attributes // TODO(willarmiros): Replace all attribute fields and enums with values defined in "conventions" once they exist -func (d *Detector) Detect(context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() tmde := getTmdeFromEnv() @@ -56,13 +56,13 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { // Fail fast if neither env var is present if tmde == "" { // TODO: Log a more specific error with zap - return res, nil + return res, "", nil } tmdeResp, err := d.provider.fetchTaskMetaData(tmde) if err != nil || tmdeResp == nil { - return res, err + return res, "", err } attr := res.Attributes() @@ -101,7 +101,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { selfMetaData, err := d.provider.fetchContainerMetaData(tmde) if err != nil || selfMetaData == nil { - return res, err + return res, "", err } logAttributes := [4]string{ @@ -117,7 +117,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { } } - return res, nil + return res, conventions.SchemaURL, nil } func getTmdeFromEnv() string { diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go index 42849770a170f..e697dac050765 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go @@ -67,7 +67,7 @@ func Test_ecsNewDetector(t *testing.T) { func Test_detectorReturnsIfNoEnvVars(t *testing.T) { os.Clearenv() d, _ := NewDetector(componenttest.NewNopProcessorCreateSettings(), nil) - res, err := d.Detect(context.TODO()) + res, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.Equal(t, 0, res.Attributes().Len()) @@ -136,7 +136,7 @@ func Test_ecsDetectV4(t *testing.T) { } d := Detector{provider: &mockMetaDataProvider{isV4: true}} - got, err := d.Detect(context.TODO()) + got, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, got) @@ -160,7 +160,7 @@ func Test_ecsDetectV3(t *testing.T) { attr.InsertString("cloud.account.id", "123456789123") d := Detector{provider: &mockMetaDataProvider{isV4: false}} - got, err := d.Detect(context.TODO()) + got, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, got) diff --git a/processor/resourcedetectionprocessor/internal/aws/eks/detector.go b/processor/resourcedetectionprocessor/internal/aws/eks/detector.go index a71e88789ca6f..74daa2ec73347 100644 --- a/processor/resourcedetectionprocessor/internal/aws/eks/detector.go +++ b/processor/resourcedetectionprocessor/internal/aws/eks/detector.go @@ -44,17 +44,17 @@ func NewDetector(_ component.ProcessorCreateSettings, _ internal.DetectorConfig) } // Detect returns a Resource describing the Amazon EKS environment being run in. -func (detector *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (detector *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() // Check if running on k8s. if os.Getenv(kubernetesServiceHostEnvVar) == "" { - return res, nil + return res, "", nil } attr := res.Attributes() attr.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) attr.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAWSEKS) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go b/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go index 778a6d451c523..2bc5d7e02733f 100644 --- a/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/eks/detector_test.go @@ -40,7 +40,7 @@ func TestEKS(t *testing.T) { // Call EKS Resource detector to detect resources eksResourceDetector := &Detector{} - res, err := eksResourceDetector.Detect(ctx) + res, _, err := eksResourceDetector.Detect(ctx) require.NoError(t, err) assert.Equal(t, map[string]interface{}{ @@ -53,7 +53,7 @@ func TestEKS(t *testing.T) { func TestNotEKS(t *testing.T) { detector := Detector{} require.NoError(t, os.Unsetenv("KUBERNETES_SERVICE_HOST")) - r, err := detector.Detect(context.Background()) + r, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.Equal(t, 0, r.Attributes().Len(), "Resource object should be empty") } diff --git a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go index 405024412d5a2..938469ac657e0 100644 --- a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go +++ b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk.go @@ -51,11 +51,10 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{fs: &ebFileSystem{}}, nil } -func (d Detector) Detect(context.Context) (pdata.Resource, error) { +func (d Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() var conf io.ReadCloser - var err error if d.fs.IsWindows() { conf, err = d.fs.Open(windowsPath) } else { @@ -65,7 +64,7 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) { // Do not want to return error so it fails silently on non-EB instances if err != nil { // TODO: Log a more specific message with zap - return res, nil + return res, "", nil } ebmd := &EbMetaData{} @@ -74,7 +73,7 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) { if err != nil { // TODO: Log a more specific error with zap - return res, err + return res, "", err } attr := res.Attributes() @@ -83,5 +82,5 @@ func (d Detector) Detect(context.Context) (pdata.Resource, error) { attr.InsertString(conventions.AttributeServiceInstanceID, strconv.Itoa(ebmd.DeploymentID)) attr.InsertString(conventions.AttributeDeploymentEnvironment, ebmd.EnvironmentName) attr.InsertString(conventions.AttributeServiceVersion, ebmd.VersionLabel) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go index 6a8370472f7ff..9bb819f5f4d62 100644 --- a/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk/elasticbeanstalk_test.go @@ -62,7 +62,7 @@ func Test_windowsPath(t *testing.T) { mfs := &mockFileSystem{windows: true, exists: true, contents: xrayConf} d := Detector{fs: mfs} - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, r) @@ -73,7 +73,7 @@ func Test_fileNotExists(t *testing.T) { mfs := &mockFileSystem{exists: false} d := Detector{fs: mfs} - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, r) @@ -84,7 +84,7 @@ func Test_fileMalformed(t *testing.T) { mfs := &mockFileSystem{exists: true, contents: "some overwritten value"} d := Detector{fs: mfs} - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.NotNil(t, err) assert.NotNil(t, r) @@ -103,7 +103,7 @@ func Test_AttributesDetectedSuccessfully(t *testing.T) { attr.InsertString("service.instance.id", "23") attr.InsertString("service.version", "env-version-1234") - r, err := d.Detect(context.TODO()) + r, _, err := d.Detect(context.TODO()) assert.Nil(t, err) assert.NotNil(t, r) diff --git a/processor/resourcedetectionprocessor/internal/azure/aks/aks.go b/processor/resourcedetectionprocessor/internal/azure/aks/aks.go index 8101c96996f66..25cf45d8088eb 100644 --- a/processor/resourcedetectionprocessor/internal/azure/aks/aks.go +++ b/processor/resourcedetectionprocessor/internal/azure/aks/aks.go @@ -43,23 +43,23 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{provider: azure.NewProvider()}, nil } -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() if !onK8s() { - return res, nil + return res, "", nil } // If we can't get a response from the metadata endpoint, we're not running in Azure if !azureMetadataAvailable(ctx, d.provider) { - return res, nil + return res, "", nil } attrs := res.Attributes() attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure) attrs.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAzureAKS) - return res, nil + return res, conventions.SchemaURL, nil } func onK8s() bool { diff --git a/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go b/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go index d856c0d16416a..f9ce6486404a9 100644 --- a/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go +++ b/processor/resourcedetectionprocessor/internal/azure/aks/aks_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + conventions "go.opentelemetry.io/collector/translator/conventions/v1.5.0" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure" @@ -38,8 +39,9 @@ func TestDetector_Detect_K8s_Azure(t *testing.T) { os.Clearenv() setK8sEnv(t) detector := &Detector{provider: mockProvider()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) assert.Equal(t, map[string]interface{}{ "cloud.provider": "azure", "cloud.platform": "azure_aks", @@ -52,7 +54,7 @@ func TestDetector_Detect_K8s_NonAzure(t *testing.T) { mp := &azure.MockProvider{} mp.On("Metadata").Return(nil, errors.New("")) detector := &Detector{provider: mp} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) attrs := res.Attributes() assert.Equal(t, 0, attrs.Len()) @@ -61,7 +63,7 @@ func TestDetector_Detect_K8s_NonAzure(t *testing.T) { func TestDetector_Detect_NonK8s(t *testing.T) { os.Clearenv() detector := &Detector{provider: mockProvider()} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) attrs := res.Attributes() assert.Equal(t, 0, attrs.Len()) diff --git a/processor/resourcedetectionprocessor/internal/azure/azure.go b/processor/resourcedetectionprocessor/internal/azure/azure.go index 8e4b56668d54c..9f2f881667941 100644 --- a/processor/resourcedetectionprocessor/internal/azure/azure.go +++ b/processor/resourcedetectionprocessor/internal/azure/azure.go @@ -47,7 +47,7 @@ func NewDetector(p component.ProcessorCreateSettings, cfg internal.DetectorConfi } // Detect detects system metadata and returns a resource with the available ones -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() attrs := res.Attributes() @@ -55,7 +55,7 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { if err != nil { d.logger.Debug("Azure detector metadata retrieval failed", zap.Error(err)) // return an empty Resource and no error - return res, nil + return res, "", nil } attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure) @@ -68,5 +68,5 @@ func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { attrs.InsertString("azure.vm.scaleset.name", compute.VMScaleSetName) attrs.InsertString("azure.resourcegroup.name", compute.ResourceGroupName) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/azure/azure_test.go b/processor/resourcedetectionprocessor/internal/azure/azure_test.go index 3d899918aa89e..08b0f84bae629 100644 --- a/processor/resourcedetectionprocessor/internal/azure/azure_test.go +++ b/processor/resourcedetectionprocessor/internal/azure/azure_test.go @@ -47,8 +47,9 @@ func TestDetectAzureAvailable(t *testing.T) { }, nil) detector := &Detector{provider: mp} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) mp.AssertExpectations(t) res.Attributes().Sort() @@ -73,7 +74,7 @@ func TestDetectError(t *testing.T) { mp.On("Metadata").Return(&ComputeMetadata{}, fmt.Errorf("mock error")) detector := &Detector{provider: mp, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) assert.NoError(t, err) assert.True(t, internal.IsEmptyResource(res)) } diff --git a/processor/resourcedetectionprocessor/internal/docker/docker.go b/processor/resourcedetectionprocessor/internal/docker/docker.go index 032c761eeedf5..501ec39f8ece4 100644 --- a/processor/resourcedetectionprocessor/internal/docker/docker.go +++ b/processor/resourcedetectionprocessor/internal/docker/docker.go @@ -50,22 +50,22 @@ func NewDetector(p component.ProcessorCreateSettings, dcfg internal.DetectorConf } // Detect detects system metadata and returns a resource with the available ones -func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() attrs := res.Attributes() osType, err := d.provider.OSType(ctx) if err != nil { - return res, fmt.Errorf("failed getting OS type: %w", err) + return res, "", fmt.Errorf("failed getting OS type: %w", err) } hostname, err := d.provider.Hostname(ctx) if err != nil { - return res, fmt.Errorf("failed getting OS hostname: %w", err) + return res, "", fmt.Errorf("failed getting OS hostname: %w", err) } attrs.InsertString(conventions.AttributeHostName, hostname) attrs.InsertString(conventions.AttributeOSType, osType) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/docker/docker_test.go b/processor/resourcedetectionprocessor/internal/docker/docker_test.go index 3d1c91f5ea5bf..e94584f1ab87b 100644 --- a/processor/resourcedetectionprocessor/internal/docker/docker_test.go +++ b/processor/resourcedetectionprocessor/internal/docker/docker_test.go @@ -47,8 +47,9 @@ func TestDetect(t *testing.T) { md.On("OSType").Return("DARWIN", nil) detector := &Detector{provider: md, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) md.AssertExpectations(t) res.Attributes().Sort() diff --git a/processor/resourcedetectionprocessor/internal/env/env.go b/processor/resourcedetectionprocessor/internal/env/env.go index e9c7c057cf57f..1f73a23f69e06 100644 --- a/processor/resourcedetectionprocessor/internal/env/env.go +++ b/processor/resourcedetectionprocessor/internal/env/env.go @@ -52,24 +52,24 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{}, nil } -func (d *Detector) Detect(context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() labels := strings.TrimSpace(os.Getenv(envVar)) if labels == "" { labels = strings.TrimSpace(os.Getenv(deprecatedEnvVar)) if labels == "" { - return res, nil + return res, "", nil } } - err := initializeAttributeMap(res.Attributes(), labels) + err = initializeAttributeMap(res.Attributes(), labels) if err != nil { res.Attributes().Clear() - return res, err + return res, "", err } - return res, nil + return res, "", nil } // labelRegex matches any key=value pair including a trailing comma or the end of the diff --git a/processor/resourcedetectionprocessor/internal/env/env_test.go b/processor/resourcedetectionprocessor/internal/env/env_test.go index cd742e5058807..88384f72cf76e 100644 --- a/processor/resourcedetectionprocessor/internal/env/env_test.go +++ b/processor/resourcedetectionprocessor/internal/env/env_test.go @@ -37,7 +37,8 @@ func TestDetectTrue(t *testing.T) { os.Setenv(envVar, "key=value") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) + assert.Equal(t, "", schemaURL) require.NoError(t, err) assert.Equal(t, internal.NewResource(map[string]interface{}{"key": "value"}), res) } @@ -46,8 +47,9 @@ func TestDetectFalse(t *testing.T) { os.Setenv(envVar, "") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) } @@ -56,8 +58,9 @@ func TestDetectDeprecatedEnv(t *testing.T) { os.Setenv(deprecatedEnvVar, "key=value") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, "", schemaURL) assert.Equal(t, internal.NewResource(map[string]interface{}{"key": "value"}), res) } @@ -65,8 +68,9 @@ func TestDetectError(t *testing.T) { os.Setenv(envVar, "key=value,key") detector := &Detector{} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) assert.Error(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) } diff --git a/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go b/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go index 2e90410cd1e29..9201b0bed10cb 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gce/gce.go @@ -41,11 +41,11 @@ func NewDetector(component.ProcessorCreateSettings, internal.DetectorConfig) (in return &Detector{metadata: &gcp.MetadataImpl{}}, nil } -func (d *Detector) Detect(context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() if !d.metadata.OnGCE() { - return res, nil + return res, "", nil } attr := res.Attributes() @@ -53,7 +53,7 @@ func (d *Detector) Detect(context.Context) (pdata.Resource, error) { var errors []error errors = append(errors, d.initializeCloudAttributes(attr)...) errors = append(errors, d.initializeHostAttributes(attr)...) - return res, consumererror.Combine(errors) + return res, conventions.SchemaURL, consumererror.Combine(errors) } func (d *Detector) initializeCloudAttributes(attr pdata.AttributeMap) []error { diff --git a/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go b/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go index 893b4ea3a09c7..7cf316e7e1860 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gce/gce_test.go @@ -45,9 +45,10 @@ func TestDetectTrue(t *testing.T) { md.On("Get", "instance/machine-type").Return("machine-type", nil) detector := &Detector{metadata: md} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) expected := internal.NewResource(map[string]interface{}{ conventions.AttributeCloudProvider: conventions.AttributeCloudProviderGCP, @@ -70,7 +71,7 @@ func TestDetectFalse(t *testing.T) { md.On("OnGCE").Return(false) detector := &Detector{metadata: md} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.True(t, internal.IsEmptyResource(res)) @@ -87,7 +88,7 @@ func TestDetectError(t *testing.T) { md.On("Get", "instance/machine-type").Return("", errors.New("err6")) detector := &Detector{metadata: md} - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) assert.EqualError(t, err, "[err1; err2; err3; err4; err6]") diff --git a/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go b/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go index 36e90f5c55343..3889e91b9f0dd 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gke/gke.go @@ -50,12 +50,12 @@ func NewDetector(params component.ProcessorCreateSettings, _ internal.DetectorCo } // Detect detects associated resources when running in GKE environment. -func (gke *Detector) Detect(ctx context.Context) (pdata.Resource, error) { +func (gke *Detector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() // Check if on GCP. if !gke.metadata.OnGCE() { - return res, nil + return res, "", nil } attr := res.Attributes() @@ -63,7 +63,7 @@ func (gke *Detector) Detect(ctx context.Context) (pdata.Resource, error) { // Check if running on k8s. if os.Getenv(kubernetesServiceHostEnvVar) == "" { - return res, nil + return res, "", nil } attr.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformGCPKubernetesEngine) @@ -74,5 +74,5 @@ func (gke *Detector) Detect(ctx context.Context) (pdata.Resource, error) { attr.InsertString(conventions.AttributeK8SClusterName, clusterName) } - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go b/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go index baca0af2e6636..d5fc3695434b3 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gke/gke_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + conventions "go.opentelemetry.io/collector/translator/conventions/v1.5.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" @@ -37,7 +38,7 @@ func TestNotGCE(t *testing.T) { } metadata.On("OnGCE").Return(false) - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.Equal(t, 0, res.Attributes().Len()) @@ -56,8 +57,9 @@ func TestDetectWithoutCluster(t *testing.T) { require.NoError(t, os.Setenv("KUBERNETES_SERVICE_HOST", "localhost")) - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) assert.Equal(t, map[string]interface{}{ "cloud.provider": "gcp", @@ -78,7 +80,7 @@ func TestDetectWithoutK8s(t *testing.T) { require.NoError(t, os.Unsetenv("KUBERNETES_SERVICE_HOST")) - res, err := detector.Detect(context.Background()) + res, _, err := detector.Detect(context.Background()) require.NoError(t, err) assert.Equal(t, map[string]interface{}{ @@ -100,8 +102,9 @@ func TestDetector_Detect(t *testing.T) { require.NoError(t, os.Setenv("KUBERNETES_SERVICE_HOST", "localhost")) - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) assert.Equal(t, map[string]interface{}{ "cloud.provider": "gcp", diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index e66e6b19a207d..fabdbeb91f94e 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -31,7 +31,7 @@ import ( type DetectorType string type Detector interface { - Detect(ctx context.Context) (pdata.Resource, error) + Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) } type DetectorConfig interface{} @@ -93,8 +93,9 @@ type ResourceProvider struct { } type resourceResult struct { - resource pdata.Resource - err error + resource pdata.Resource + schemaURL string + err error } func NewResourceProvider(logger *zap.Logger, timeout time.Duration, detectors ...Detector) *ResourceProvider { @@ -105,7 +106,7 @@ func NewResourceProvider(logger *zap.Logger, timeout time.Duration, detectors .. } } -func (p *ResourceProvider) Get(ctx context.Context) (pdata.Resource, error) { +func (p *ResourceProvider) Get(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { p.once.Do(func() { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, p.timeout) @@ -113,29 +114,32 @@ func (p *ResourceProvider) Get(ctx context.Context) (pdata.Resource, error) { p.detectResource(ctx) }) - return p.detectedResource.resource, p.detectedResource.err + return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err } func (p *ResourceProvider) detectResource(ctx context.Context) { p.detectedResource = &resourceResult{} res := pdata.NewResource() + mergedSchemaURL := "" p.logger.Info("began detecting resource information") for _, detector := range p.detectors { - r, err := detector.Detect(ctx) + r, schemaURL, err := detector.Detect(ctx) if err != nil { p.detectedResource.err = err return } + mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, schemaURL) MergeResource(res, r, false) } p.logger.Info("detected resource information", zap.Any("resource", AttributesToMap(res.Attributes()))) p.detectedResource.resource = res + p.detectedResource.schemaURL = mergedSchemaURL } func AttributesToMap(am pdata.AttributeMap) map[string]interface{} { @@ -175,6 +179,21 @@ func getSerializableArray(inArr pdata.AnyValueArray) []interface{} { return outArr } +func MergeSchemaURL(currentSchemaURL string, newSchemaURL string) string { + if currentSchemaURL == "" { + return newSchemaURL + } + if newSchemaURL == "" { + return currentSchemaURL + } + if currentSchemaURL == newSchemaURL { + return currentSchemaURL + } + // TODO: handle the case when the schema URLs are different by performing + // schema conversion. For now we simply ignore the new schema URL. + return currentSchemaURL +} + func MergeResource(to, from pdata.Resource, overrideTo bool) { if IsEmptyResource(from) { return diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go index 381f86e3f79da..8e5f62051ffe3 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection_test.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection_test.go @@ -35,9 +35,9 @@ type MockDetector struct { mock.Mock } -func (p *MockDetector) Detect(ctx context.Context) (pdata.Resource, error) { +func (p *MockDetector) Detect(ctx context.Context) (pdata.Resource, string, error) { args := p.Called() - return args.Get(0).(pdata.Resource), args.Error(1) + return args.Get(0).(pdata.Resource), "", args.Error(1) } type mockDetectorConfig struct{} @@ -99,7 +99,7 @@ func TestDetect(t *testing.T) { p, err := f.CreateResourceProvider(componenttest.NewNopProcessorCreateSettings(), time.Second, &mockDetectorConfig{}, mockDetectorTypes...) require.NoError(t, err) - got, err := p.Get(context.Background()) + got, _, err := p.Get(context.Background()) require.NoError(t, err) tt.expectedResource.Attributes().Sort() @@ -135,7 +135,7 @@ func TestDetectResource_Error(t *testing.T) { md2.On("Detect").Return(pdata.NewResource(), errors.New("err1")) p := NewResourceProvider(zap.NewNop(), time.Second, md1, md2) - _, err := p.Get(context.Background()) + _, _, err := p.Get(context.Background()) require.EqualError(t, err, "err1") } @@ -181,10 +181,10 @@ func NewMockParallelDetector() *MockParallelDetector { return &MockParallelDetector{ch: make(chan struct{})} } -func (p *MockParallelDetector) Detect(ctx context.Context) (pdata.Resource, error) { +func (p *MockParallelDetector) Detect(ctx context.Context) (pdata.Resource, string, error) { <-p.ch args := p.Called() - return args.Get(0).(pdata.Resource), args.Error(1) + return args.Get(0).(pdata.Resource), "", args.Error(1) } // TestDetectResource_Parallel validates that Detect is only called once, even if there @@ -209,7 +209,7 @@ func TestDetectResource_Parallel(t *testing.T) { for i := 0; i < iterations; i++ { go func() { defer wg.Done() - _, err := p.Get(context.Background()) + _, _, err := p.Get(context.Background()) require.NoError(t, err) }() } diff --git a/processor/resourcedetectionprocessor/internal/system/system.go b/processor/resourcedetectionprocessor/internal/system/system.go index 5ef6f3e67fe81..f6394a484e283 100644 --- a/processor/resourcedetectionprocessor/internal/system/system.go +++ b/processor/resourcedetectionprocessor/internal/system/system.go @@ -45,13 +45,13 @@ func NewDetector(p component.ProcessorCreateSettings, _ internal.DetectorConfig) } // Detect detects system metadata and returns a resource with the available ones -func (d *Detector) Detect(_ context.Context) (pdata.Resource, error) { +func (d *Detector) Detect(_ context.Context) (resource pdata.Resource, schemaURL string, err error) { res := pdata.NewResource() attrs := res.Attributes() osType, err := d.provider.OSType() if err != nil { - return res, fmt.Errorf("failed getting OS type: %w", err) + return res, "", fmt.Errorf("failed getting OS type: %w", err) } hostname, err := d.provider.FQDN() @@ -60,12 +60,12 @@ func (d *Detector) Detect(_ context.Context) (pdata.Resource, error) { d.logger.Debug("FQDN query failed, falling back to OS hostname", zap.Error(err)) hostname, err = d.provider.Hostname() if err != nil { - return res, fmt.Errorf("failed getting OS hostname: %w", err) + return res, "", fmt.Errorf("failed getting OS hostname: %w", err) } } attrs.InsertString(conventions.AttributeHostName, hostname) attrs.InsertString(conventions.AttributeOSType, osType) - return res, nil + return res, conventions.SchemaURL, nil } diff --git a/processor/resourcedetectionprocessor/internal/system/system_test.go b/processor/resourcedetectionprocessor/internal/system/system_test.go index c9eb540c66644..c999d7a62746b 100644 --- a/processor/resourcedetectionprocessor/internal/system/system_test.go +++ b/processor/resourcedetectionprocessor/internal/system/system_test.go @@ -60,8 +60,9 @@ func TestDetectFQDNAvailable(t *testing.T) { md.On("OSType").Return("DARWIN", nil) detector := &Detector{provider: md, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) md.AssertExpectations(t) res.Attributes().Sort() @@ -82,8 +83,9 @@ func TestFallbackHostname(t *testing.T) { mdHostname.On("OSType").Return("DARWIN", nil) detector := &Detector{provider: mdHostname, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) require.NoError(t, err) + assert.Equal(t, conventions.SchemaURL, schemaURL) mdHostname.AssertExpectations(t) res.Attributes().Sort() @@ -104,8 +106,9 @@ func TestDetectError(t *testing.T) { mdFQDN.On("Hostname").Return("", errors.New("err")) detector := &Detector{provider: mdFQDN, logger: zap.NewNop()} - res, err := detector.Detect(context.Background()) + res, schemaURL, err := detector.Detect(context.Background()) assert.Error(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) // OS type fails @@ -114,7 +117,8 @@ func TestDetectError(t *testing.T) { mdOSType.On("OSType").Return("", errors.New("err")) detector = &Detector{provider: mdOSType, logger: zap.NewNop()} - res, err = detector.Detect(context.Background()) + res, schemaURL, err = detector.Detect(context.Background()) assert.Error(t, err) + assert.Equal(t, "", schemaURL) assert.True(t, internal.IsEmptyResource(res)) } diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor.go b/processor/resourcedetectionprocessor/resourcedetection_processor.go index 2f2b974a99c5f..7003f825b8cb6 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor.go @@ -24,15 +24,16 @@ import ( ) type resourceDetectionProcessor struct { - provider *internal.ResourceProvider - resource pdata.Resource - override bool + provider *internal.ResourceProvider + resource pdata.Resource + schemaURL string + override bool } // Start is invoked during service startup. func (rdp *resourceDetectionProcessor) Start(ctx context.Context, _ component.Host) error { var err error - rdp.resource, err = rdp.provider.Get(ctx) + rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx) return err } @@ -40,7 +41,9 @@ func (rdp *resourceDetectionProcessor) Start(ctx context.Context, _ component.Ho func (rdp *resourceDetectionProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { rs := td.ResourceSpans() for i := 0; i < rs.Len(); i++ { - res := rs.At(i).Resource() + rss := rs.At(i) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) + res := rss.Resource() internal.MergeResource(res, rdp.resource, rdp.override) } return td, nil @@ -50,7 +53,9 @@ func (rdp *resourceDetectionProcessor) processTraces(_ context.Context, td pdata func (rdp *resourceDetectionProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { rm := md.ResourceMetrics() for i := 0; i < rm.Len(); i++ { - res := rm.At(i).Resource() + rss := rm.At(i) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) + res := rss.Resource() internal.MergeResource(res, rdp.resource, rdp.override) } return md, nil @@ -58,9 +63,11 @@ func (rdp *resourceDetectionProcessor) processMetrics(_ context.Context, md pdat // processLogs implements the ProcessLogsFunc type. func (rdp *resourceDetectionProcessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) { - rls := ld.ResourceLogs() - for i := 0; i < rls.Len(); i++ { - res := rls.At(i).Resource() + rl := ld.ResourceLogs() + for i := 0; i < rl.Len(); i++ { + rss := rl.At(i) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) + res := rss.Resource() internal.MergeResource(res, rdp.resource, rdp.override) } return ld, nil diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go index 7d74cc3104920..38adfb064e71a 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor_test.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor_test.go @@ -40,9 +40,9 @@ type MockDetector struct { mock.Mock } -func (p *MockDetector) Detect(ctx context.Context) (pdata.Resource, error) { +func (p *MockDetector) Detect(ctx context.Context) (resource pdata.Resource, schemaURL string, err error) { args := p.Called() - return args.Get(0).(pdata.Resource), args.Error(1) + return args.Get(0).(pdata.Resource), "", args.Error(1) } func TestResourceProcessor(t *testing.T) {