Skip to content

Commit

Permalink
[exporter/signalfx_correlation] Move correlation from sapm to signalf…
Browse files Browse the repository at this point in the history
…x_correlation (#1376)
  • Loading branch information
jrcamp committed Oct 27, 2020
1 parent 9856d15 commit 1d4b8cf
Show file tree
Hide file tree
Showing 20 changed files with 51 additions and 153 deletions.
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/newrelicexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sentryexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxcorrelationexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
Expand Down Expand Up @@ -128,6 +129,7 @@ func components() (component.Factories, error) {
newrelicexporter.NewFactory(),
sapmexporter.NewFactory(),
sentryexporter.NewFactory(),
signalfxcorrelationexporter.NewFactory(),
signalfxexporter.NewFactory(),
splunkhecexporter.NewFactory(),
stackdriverexporter.NewFactory(),
Expand Down
11 changes: 0 additions & 11 deletions exporter/sapmexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ trace resource attribute, if any, as SFx access token. In either case this attr
during final translation. Intended to be used in tandem with identical configuration option for
[SAPM receiver](../../receiver/sapmreceiver/README.md) to preserve trace origin.
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
- `correlation`
- `enabled` (default = false): Whether to enable spans to metric correlation. If this block is not set at all then correlation is not enabled due to the default.
- `endpoint` (default = ""): Required if `enabled` is `true`. This is the base URL for API requests (e.g. https://api.signalfx.com).
- `stale_service_timeout` (default = 5 minutes): How long to wait after a span's service name is last seen before uncorrelating it.
- `max_requests` (default = 20): Max HTTP requests to be made in parallel.
- `max_buffered` (default = 10,000): Max number of correlation updates that can be buffered before updates are dropped.
- `max_retries` (default = 2): Max number of retries that will be made for failed correlation updates.
- `log_updates` (default = false): Whether or not to log correlation updates to dimensions (at `DEBUG` level).
- `retry_delay` (default = 30 seconds): How long to wait between retries.
- `cleanup_interval` (default = 1 minute): How frequently to purge duplicate requests.
- `sync_attributes` (default = `{"k8s.pod.uid": "k8s.pod.uid", "container.id": "container.id"}`) Map containing key of the attribute to read from spans to sync to dimensions specified as the value.

In addition, this exporter offers queued retry which is enabled by default.
Information about queued retry configuration parameters can be found
Expand Down
23 changes: 0 additions & 23 deletions exporter/sapmexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ package sapmexporter
import (
"errors"
"net/url"
"time"

sapmclient "github.com/signalfx/sapm-proto/client"
"github.com/signalfx/signalfx-agent/pkg/apm/correlations"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

Expand All @@ -33,19 +30,6 @@ const (
defaultNumWorkers = 8
)

// CorrelationConfig defines correlation settings.
type CorrelationConfig struct {
confighttp.HTTPClientSettings `mapstructure:",squash"`
correlations.Config `mapstructure:",squash"`
// Enabled determines whether correlation is enabled or not.
Enabled bool `mapstructure:"enabled"`
// How long to wait after a trace span's service name is last seen before
// uncorrelating that service.
StaleServiceTimeout time.Duration `mapstructure:"stale_service_timeout"`
// SyncAttributes is a key of the span attribute name to sync to the dimension as the value.
SyncAttributes map[string]string `mapstructure:"sync_attributes"`
}

// Config defines configuration for SAPM exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
Expand All @@ -54,9 +38,6 @@ type Config struct {
// It must be a full URL and include the scheme, port and path e.g, https://ingest.signalfx.com/v2/trace
Endpoint string `mapstructure:"endpoint"`

// Correlation settings for associating environment and services observed from traces to metrics.
Correlation CorrelationConfig `mapstructure:"correlation"`

// AccessToken is the authentication token provided by SignalFx.
AccessToken string `mapstructure:"access_token"`

Expand All @@ -82,10 +63,6 @@ func (c *Config) validate() error {
return errors.New("`endpoint` not specified")
}

if c.Correlation.Enabled && c.Correlation.Endpoint == "" {
return errors.New("`correlation.endpoint` must be set when `correlation.enabled` is true")
}

e, err := url.Parse(c.Endpoint)
if err != nil {
return err
Expand Down
17 changes: 0 additions & 17 deletions exporter/sapmexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"
"time"

"github.com/signalfx/signalfx-agent/pkg/apm/correlations"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -73,22 +72,6 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Correlation: CorrelationConfig{
Enabled: false,
StaleServiceTimeout: 5 * time.Minute,
SyncAttributes: map[string]string{
"k8s.pod.uid": "k8s.pod.uid",
"container.id": "container.id",
},
Config: correlations.Config{
MaxRequests: 20,
MaxBuffered: 10_000,
MaxRetries: 2,
LogUpdates: false,
RetryDelay: 30 * time.Second,
CleanupInterval: 1 * time.Minute,
},
},
})
}

Expand Down
24 changes: 6 additions & 18 deletions exporter/sapmexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ import (

// sapmExporter is a wrapper struct of SAPM exporter
type sapmExporter struct {
client *sapmclient.Client
logger *zap.Logger
config *Config
tracker *Tracker
client *sapmclient.Client
logger *zap.Logger
config *Config
}

func (se *sapmExporter) Shutdown(context.Context) error {
se.client.Stop()
se.tracker.Shutdown()
return nil
}

Expand All @@ -54,17 +52,10 @@ func newSAPMExporter(cfg *Config, params component.ExporterCreateParams) (sapmEx
return sapmExporter{}, err
}

var tracker *Tracker

if cfg.Correlation.Enabled {
tracker = NewTracker(cfg, params)
}

return sapmExporter{
client: client,
logger: params.Logger,
config: cfg,
tracker: tracker,
client: client,
logger: params.Logger,
config: cfg,
}, err
}

Expand Down Expand Up @@ -144,9 +135,6 @@ func (se *sapmExporter) pushTraceData(ctx context.Context, td pdata.Traces) (dro
}
droppedSpansCount += trace.SpanCount()
}

// NOTE: Correlation does not currently support inline access token.
se.tracker.AddSpans(ctx, trace)
}
return
}
26 changes: 0 additions & 26 deletions exporter/sapmexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,6 @@ func TestCreateTraceExporter(t *testing.T) {
assert.NoError(t, te.Shutdown(context.Background()), "trace exporter shutdown failed")
}

func TestCreateTraceExporterWithCorrelationEnabled(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:1234"
cfg.Correlation.Enabled = true
cfg.Correlation.Endpoint = "http:https://localhost"
params := component.ExporterCreateParams{Logger: zap.NewNop()}

te, err := newSAPMExporter(cfg, params)
assert.Nil(t, err)
assert.NotNil(t, te, "failed to create trace exporter")

assert.NotNil(t, te.tracker, "correlation tracker should have been set")
}

func TestCreateTraceExporterWithCorrelationDisabled(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:1234"
params := component.ExporterCreateParams{Logger: zap.NewNop()}

te, err := newSAPMExporter(cfg, params)
assert.Nil(t, err)
assert.NotNil(t, te, "failed to create trace exporter")

assert.Nil(t, te.tracker, "tracker correlation should not be created")
}

func TestCreateTraceExporterWithInvalidConfig(t *testing.T) {
config := &Config{}
params := component.ExporterCreateParams{Logger: zap.NewNop()}
Expand Down
19 changes: 0 additions & 19 deletions exporter/sapmexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@ package sapmexporter

import (
"context"
"time"

"github.com/signalfx/signalfx-agent/pkg/apm/correlations"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/translator/conventions"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
)
Expand Down Expand Up @@ -53,22 +50,6 @@ func createDefaultConfig() configmodels.Exporter {
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),
Correlation: CorrelationConfig{
Enabled: false,
StaleServiceTimeout: 5 * time.Minute,
SyncAttributes: map[string]string{
conventions.AttributeK8sPodUID: conventions.AttributeK8sPodUID,
conventions.AttributeContainerID: conventions.AttributeContainerID,
},
Config: correlations.Config{
MaxRequests: 20,
MaxBuffered: 10_000,
MaxRetries: 2,
LogUpdates: false,
RetryDelay: 30 * time.Second,
CleanupInterval: 1 * time.Minute,
},
},
}
}

Expand Down
12 changes: 0 additions & 12 deletions exporter/sapmexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestCreateExporter(t *testing.T) {
cfg := factory.CreateDefaultConfig()
eCfg := cfg.(*Config)
eCfg.Endpoint = "http:https://local"
eCfg.Correlation.Endpoint = "http:https://local"
params := component.ExporterCreateParams{Logger: zap.NewNop()}

te, err := factory.CreateTraceExporter(context.Background(), params, eCfg)
Expand All @@ -49,14 +48,3 @@ func TestCreateExporter(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, me)
}

func TestCreateExporterWithoutAPIEndpoint(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "http:https://local"
cfg.Correlation.Enabled = true
params := component.ExporterCreateParams{Logger: zap.NewNop()}
te, err := factory.CreateTraceExporter(context.Background(), params, cfg)
assert.Nil(t, te)
assert.EqualError(t, err, "`correlation.endpoint` must be set when `correlation.enabled` is true")
}
4 changes: 4 additions & 0 deletions exporter/sapmexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,10 @@ github.com/signalfx/gomemcache v0.0.0-20180823214636-4f7ef64c72a9/go.mod h1:Ytb8
github.com/signalfx/sapm-proto v0.4.0/go.mod h1:x3gtwJ1GRejtkghB4nYpwixh2zqJrLbPU959ZNhM0Fk=
github.com/signalfx/sapm-proto v0.6.2 h1:2LtB8AUGVyP5lSlsaBjFTsHfZNK/zn+jzWl1tWwniRA=
github.com/signalfx/sapm-proto v0.6.2/go.mod h1:AHtWypa5paGVlvDjSZw9Bh5GLgS62ee2U0UcsrLlLhU=
github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201005151249-ce1a2e0a25e7 h1:+KSSs1oE/YFmd487gpPk79OcFo51tEiFqadFoE3RVvg=
github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201005151249-ce1a2e0a25e7/go.mod h1:pNaqfprM2bSCBhE8sTT2NtasSWEsIJbrmnIF0ap/Cvg=
github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201009143858-d25fd073fb56 h1:XYBr6vxBtAufUs72S5LYkjCmCB7QM4kvX2jwufGCqhg=
github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201009143858-d25fd073fb56/go.mod h1:pNaqfprM2bSCBhE8sTT2NtasSWEsIJbrmnIF0ap/Cvg=
github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201015185032-52a4f97df2a4 h1:NpsZqpjpM3c0YokW6ozSF+VktmkOUIni3YQkCytHeto=
github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201015185032-52a4f97df2a4/go.mod h1:pNaqfprM2bSCBhE8sTT2NtasSWEsIJbrmnIF0ap/Cvg=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sapmexporter
package signalfxcorrelationexporter

import (
"context"
Expand Down Expand Up @@ -56,20 +56,20 @@ func NewTracker(cfg *Config, params component.ExporterCreateParams) *Tracker {
func newCorrelationClient(cfg *Config, params component.ExporterCreateParams) (
*correlationContext, error,
) {
corrURL, err := url.Parse(cfg.Correlation.Endpoint)
corrURL, err := url.Parse(cfg.Endpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse correlation endpoint URL %q: %v", cfg.Correlation.Endpoint, err)
return nil, fmt.Errorf("failed to parse correlation endpoint URL %q: %v", cfg.Endpoint, err)
}

httpClient, err := cfg.Correlation.ToClient()
httpClient, err := cfg.ToClient()
if err != nil {
return nil, fmt.Errorf("failed to create correlation API client: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())

client, err := correlations.NewCorrelationClient(newZapShim(params.Logger), ctx, httpClient, correlations.ClientConfig{
Config: cfg.Correlation.Config,
Config: cfg.Config,
AccessToken: cfg.AccessToken,
URL: corrURL,
})
Expand Down Expand Up @@ -112,14 +112,14 @@ func (cor *Tracker) AddSpans(ctx context.Context, traces pdata.Traces) {

cor.traceTracker = tracetracker.New(
newZapShim(cor.params.Logger),
cor.cfg.Correlation.StaleServiceTimeout,
cor.cfg.StaleServiceTimeout,
cor.correlation,
map[string]string{
string(hostID.Key): hostID.ID,
},
false,
nil,
cor.cfg.Correlation.SyncAttributes)
cor.cfg.SyncAttributes)
cor.Start()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sapmexporter
package signalfxcorrelationexporter

import (
"context"
Expand All @@ -25,14 +25,12 @@ import (
)

func TestTrackerAddSpans(t *testing.T) {
tracker := NewTracker(&Config{
Correlation: CorrelationConfig{
Enabled: true,
tracker := NewTracker(
&Config{},
component.ExporterCreateParams{
Logger: zap.NewNop(),
},
AccessToken: "",
}, component.ExporterCreateParams{
Logger: zap.NewNop(),
})
)

traces := pdata.NewTraces()
spans := pdata.NewResourceSpans()
Expand Down
13 changes: 9 additions & 4 deletions exporter/signalfxcorrelationexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (

// corrExporter is a wrapper struct of the correlation exporter
type corrExporter struct {
logger *zap.Logger
config *Config
logger *zap.Logger
config *Config
tracker *Tracker
}

func (se *corrExporter) Shutdown(context.Context) error {
se.tracker.Shutdown()
return nil
}

Expand All @@ -40,8 +42,9 @@ func newCorrExporter(cfg *Config, params component.ExporterCreateParams) (corrEx
}

return corrExporter{
logger: params.Logger,
config: cfg,
logger: params.Logger,
config: cfg,
tracker: NewTracker(cfg, params),
}, err
}

Expand All @@ -59,5 +62,7 @@ func newTraceExporter(cfg *Config, params component.ExporterCreateParams) (compo

// pushTraceData processes traces to extract services and environments to associate them to their emitting host/pods.
func (se *corrExporter) pushTraceData(ctx context.Context, td pdata.Traces) (droppedSpansCount int, err error) {
// NOTE: Correlation does not currently support inline access token.
se.tracker.AddSpans(ctx, td)
return 0, nil
}
Loading

0 comments on commit 1d4b8cf

Please sign in to comment.