Skip to content

Commit

Permalink
signalfxexporter: Group metrics by token (#813)
Browse files Browse the repository at this point in the history
Description: Group metrics by token to reduce number of network calls made by the exporter.

When the Partial error type extends support for metrics, the exporter should be updated to mark errors as partial wherever applicable so it can also leverage the queued retry mechanism available in new exporters.

Testing: Updated tests.
  • Loading branch information
asuresh4 committed Aug 26, 2020
1 parent eae636e commit 618cb4e
Show file tree
Hide file tree
Showing 7 changed files with 445 additions and 171 deletions.
81 changes: 71 additions & 10 deletions exporter/signalfxexporter/dpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"sync"

sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

Expand All @@ -48,37 +51,81 @@ type sfxDPClient struct {

func (s *sfxDPClient) pushMetricsData(
ctx context.Context,
md consumerdata.MetricsData,
md pdata.Metrics,
) (droppedTimeSeries int, err error) {
accessToken := s.retrieveAccessToken(md)
metricsData := pdatautil.MetricsToMetricsData(md)
var numDroppedTimeseries int
var errs []error

var currentToken string
var batchStartIdx int
for i := range metricsData {
metricToken := s.retrieveAccessToken(metricsData[i])
if currentToken != metricToken {
if batchStartIdx < i {
// TODO:
// 1) To leverage the queued retry mechanism available in new exporters,
// look for error cases that are non-permanent and track the list of metric
// data that needs to be retried. This will be done once partial retry support
// is added for metrics type.
// 2) Also consider invoking in a goroutine to some concurrency going, in case
// there are a lot of tokens.
droppedCount, err := s.pushMetricsDataForToken(metricsData[batchStartIdx:i], currentToken)
numDroppedTimeseries += droppedCount
if err != nil {
errs = append(errs, err)
}
batchStartIdx = i
}
currentToken = metricToken
}
}

// Ensure to get the last chunk of metrics.
if len(metricsData[batchStartIdx:]) > 0 {
droppedCount, err := s.pushMetricsDataForToken(metricsData[batchStartIdx:], currentToken)
numDroppedTimeseries += droppedCount
if err != nil {
errs = append(errs, err)
}
}

return numDroppedTimeseries, componenterror.CombineErrors(errs)
}

sfxDataPoints, numDroppedTimeseries := s.converter.MetricDataToSignalFxV2(md)
func (s *sfxDPClient) pushMetricsDataForToken(
metricsData []consumerdata.MetricsData, accessToken string) (int, error) {
numTimeseries := timeseriesCount(metricsData)
sfxDataPoints, numDroppedTimeseries := s.converter.MetricDataToSignalFxV2(metricsData)

body, compressed, err := s.encodeBody(sfxDataPoints)
if err != nil {
return exporterhelper.NumTimeSeries(md), consumererror.Permanent(err)
return numTimeseries, consumererror.Permanent(err)
}

req, err := http.NewRequest("POST", s.ingestURL.String(), body)
if err != nil {
return exporterhelper.NumTimeSeries(md), consumererror.Permanent(err)
return numTimeseries, consumererror.Permanent(err)
}

for k, v := range s.headers {
req.Header.Set(k, v)
}

if s.accessTokenPassthrough && accessToken != "" {
// Override access token in headers map if it's non empty.
if accessToken != "" {
req.Header.Set(splunk.SFxAccessTokenHeader, accessToken)
}

if compressed {
req.Header.Set("Content-Encoding", "gzip")
}

// TODO: Mark errors as partial errors wherever applicable when, partial
// error for metrics is available.
resp, err := s.client.Do(req)
if err != nil {
return exporterhelper.NumTimeSeries(md), err
return numTimeseries, err
}

io.Copy(ioutil.Discard, resp.Body)
Expand All @@ -90,13 +137,22 @@ func (s *sfxDPClient) pushMetricsData(
"HTTP %d %q",
resp.StatusCode,
http.StatusText(resp.StatusCode))
return exporterhelper.NumTimeSeries(md), err
return numTimeseries, err
}

return numDroppedTimeseries, nil

}

func buildHeaders(config *Config) (map[string]string, error) {
func timeseriesCount(metricsData []consumerdata.MetricsData) int {
numTimeseries := 0
for _, metricData := range metricsData {
numTimeseries += exporterhelper.NumTimeSeries(metricData)
}
return numTimeseries
}

func buildHeaders(config *Config) map[string]string {
headers := map[string]string{
"Connection": "keep-alive",
"Content-Type": "application/x-protobuf",
Expand All @@ -114,7 +170,7 @@ func buildHeaders(config *Config) (map[string]string, error) {
headers[k] = v
}

return headers, nil
return headers
}

func (s *sfxDPClient) encodeBody(dps []*sfxpb.DataPoint) (bodyReader io.Reader, compressed bool, err error) {
Expand All @@ -129,6 +185,11 @@ func (s *sfxDPClient) encodeBody(dps []*sfxpb.DataPoint) (bodyReader io.Reader,
}

func (s *sfxDPClient) retrieveAccessToken(md consumerdata.MetricsData) string {
if !s.accessTokenPassthrough || md.Resource == nil {
// Nothing to do if token is pass through not configured or resource is nil.
return ""
}

accessToken := ""
if labels := md.Resource.GetLabels(); labels != nil {
accessToken = labels[splunk.SFxAccessTokenLabel]
Expand Down
19 changes: 3 additions & 16 deletions exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/obsreport"
Expand All @@ -38,7 +37,7 @@ import (

type signalfxExporter struct {
logger *zap.Logger
pushMetricsData func(ctx context.Context, md consumerdata.MetricsData) (droppedTimeSeries int, err error)
pushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error)
pushKubernetesMetadata func(metadata []*collection.KubernetesMetadataUpdate) error
}

Expand Down Expand Up @@ -67,10 +66,7 @@ func newSignalFxExporter(
fmt.Errorf("failed to process %q config: %v", config.Name(), err)
}

headers, err := buildHeaders(config)
if err != nil {
return nil, err
}
headers := buildHeaders(config)

dpClient := &sfxDPClient{
ingestURL: options.ingestURL,
Expand Down Expand Up @@ -123,16 +119,7 @@ func (se signalfxExporter) Shutdown(context.Context) error {

func (se signalfxExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
ctx = obsreport.StartMetricsExportOp(ctx, typeStr)
numDroppedTimeSeries := 0
var err error
for _, md := range pdatautil.MetricsToMetricsData(md) {
ndts, cErr := se.pushMetricsData(ctx, md)
if cErr != nil {
err = cErr
}
numDroppedTimeSeries += ndts
}

numDroppedTimeSeries, err := se.pushMetricsData(ctx, md)
numReceivedTimeSeries, numPoints := pdatautil.MetricAndDataPointCount(md)

obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err)
Expand Down
Loading

0 comments on commit 618cb4e

Please sign in to comment.