Skip to content

Commit

Permalink
Logzio exporter impl (#1161)
Browse files Browse the repository at this point in the history
Added a logz.io traces exporter

**Link to tracking Issue**: #686

**Testing**: Added test for each of the components in the new exporter

**Documentation**: Added a readme specifying how to use the exporter and its parameters with an example.
  • Loading branch information
yyyogev committed Oct 21, 2020
1 parent 6a410f0 commit fb3477d
Show file tree
Hide file tree
Showing 11 changed files with 804 additions and 8 deletions.
18 changes: 18 additions & 0 deletions exporter/logzioexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Logzio Exporter

This exporter supports sending trace data to [Logz.io](https://www.logz.io)

The following configuration options are supported:

* `account_token` (Required): Your logz.io account token for your tracing account.
* `region` (Optional): Your logz.io account [region code](https://docs.logz.io/user-guide/accounts/account-region.html#available-regions). Defaults to `us`. Required only if your logz.io region is different than US.
* `custom_listener_address` (Optional): Custom traces endpoint, for dev. This will override the region parameter.

Example:

```yaml
exporters:
logzio:
account_token: "youLOGZIOaccountTOKEN"
region: "eu"
```
11 changes: 10 additions & 1 deletion exporter/logzioexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package logzioexporter

import (
"errors"

"go.opentelemetry.io/collector/config/configmodels"
)

Expand All @@ -23,5 +25,12 @@ type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"`
Token string `mapstructure:"account_token"` // Your Logz.io Account Token, can be found at https://app.logz.io/#/dashboard/settings/general
Region string `mapstructure:"region"` // Your Logz.io 2-letter region code, can be found at https://docs.logz.io/user-guide/accounts/account-region.html#available-regions
CustomEndpoint string `mapstructure:"custom_endpoint"` // Custom endpoint to ship traces to. Use only for dev and tests. The will override the Region parameter
CustomEndpoint string `mapstructure:"custom_endpoint"` // Custom endpoint to ship traces to. Use only for dev and tests.
}

func (c *Config) validate() error {
if c.Token == "" {
return errors.New("`account_token` not specified")
}
return nil
}
50 changes: 50 additions & 0 deletions exporter/logzioexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logzioexporter

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
)

func TestLoadConfig(tester *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(tester, err)

factory := NewFactory()
factories.Exporters[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
tester, path.Join(".", "testdata", "config.yaml"), factories,
)

require.NoError(tester, err)
require.NotNil(tester, cfg)

assert.Equal(tester, 2, len(cfg.Exporters))

config := cfg.Exporters["logzio/2"].(*Config)
assert.Equal(tester, &Config{
ExporterSettings: configmodels.ExporterSettings{NameVal: "logzio/2"},
Token: "logzioTESTtoken",
Region: "eu",
CustomEndpoint: "https://some-url.com:8888",
}, config)
}
92 changes: 91 additions & 1 deletion exporter/logzioexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,99 @@
package logzioexporter

import (
"context"
"errors"
"fmt"

"github.com/hashicorp/go-hclog"
"github.com/jaegertracing/jaeger/model"
"github.com/logzio/jaeger-logzio/store"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/translator/trace/jaeger"
)

const (
loggerName = "logzio-exporter"
)

// logzioExporter implements an OpenTelemetry trace exporter that exports all spans to Logz.io
type logzioExporter struct {
accountToken string
writer *store.LogzioSpanWriter
logger hclog.Logger
WriteSpanFunc func(span *model.Span) error
InternalTracesToJaegerTraces func(td pdata.Traces) ([]*model.Batch, error)
}

//var WriteSpanFunc func(span *model.Span) error
//var InternalTracesToJaegerTraces = jaeger.InternalTracesToJaegerProto

func newLogzioExporter(config *Config, params component.ExporterCreateParams) (*logzioExporter, error) {
logger := Hclog2ZapLogger{
Zap: params.Logger,
name: loggerName,
}

if config == nil {
return nil, errors.New("exporter config can't be null")
}
writerConfig := store.LogzioConfig{
Region: config.Region,
AccountToken: config.Token,
CustomListenerURL: config.CustomEndpoint,
}

spanWriter, err := store.NewLogzioSpanWriter(writerConfig, logger)
if err != nil {
return nil, err
}

return &logzioExporter{
writer: spanWriter,
accountToken: config.Token,
logger: logger,
InternalTracesToJaegerTraces: jaeger.InternalTracesToJaegerProto,
WriteSpanFunc: spanWriter.WriteSpan,
}, nil
}

func newLogzioTraceExporter(config *Config, params component.ExporterCreateParams) (component.TraceExporter, error) {
return nil, nil
exporter, err := newLogzioExporter(config, params)
if err != nil {
return nil, err
}
if err := config.validate(); err != nil {
return nil, err
}

return exporterhelper.NewTraceExporter(
config,
exporter.pushTraceData,
exporterhelper.WithShutdown(exporter.Shutdown))
}

func (exporter *logzioExporter) pushTraceData(ctx context.Context, traces pdata.Traces) (droppedSpansCount int, err error) {
droppedSpans := 0
batches, err := exporter.InternalTracesToJaegerTraces(traces)
if err != nil {
return traces.SpanCount(), err
}
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process
if err := exporter.WriteSpanFunc(span); err != nil {
exporter.logger.Debug(fmt.Sprintf("dropped bad span: %s", span.String()))
droppedSpans++
}
}
}
return droppedSpans, nil
}

func (exporter *logzioExporter) Shutdown(ctx context.Context) error {
exporter.logger.Info("Closing logzio exporter..")
exporter.writer.Close()
return nil
}
179 changes: 179 additions & 0 deletions exporter/logzioexporter/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logzioexporter

import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/jaegertracing/jaeger/model"
"github.com/logzio/jaeger-logzio/store/objects"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/internaldata"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/wrapperspb"
)

const (
testService = "testService"
testHost = "testHost"
testOperation = "testOperation"
)

var testSpans = []*tracepb.Span{
{
TraceId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
SpanId: []byte{0, 0, 0, 0, 0, 0, 0, 2},
Name: &tracepb.TruncatableString{Value: testOperation},
Kind: tracepb.Span_SERVER,
SameProcessAsParentSpan: &wrapperspb.BoolValue{Value: true},
},
}

func testTraceExporter(td pdata.Traces, t *testing.T, cfg *Config) {
params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, err := createTraceExporter(context.Background(), params, cfg)
require.NoError(t, err)

ctx := context.Background()
err = exporter.ConsumeTraces(ctx, td)
require.NoError(t, err)
err = exporter.Shutdown(ctx)
require.NoError(t, err)
}

func TestNullTraceExporterConfig(tester *testing.T) {
params := component.ExporterCreateParams{Logger: zap.NewNop()}
_, err := newLogzioTraceExporter(nil, params)
assert.Error(tester, err, "Null exporter config should produce error")
}

func TestNullExporterConfig(tester *testing.T) {
params := component.ExporterCreateParams{Logger: zap.NewNop()}
_, err := newLogzioExporter(nil, params)
assert.Error(tester, err, "Null exporter config should produce error")
}

func TestNullTokenConfig(tester *testing.T) {
cfg := Config{
Region: "eu",
}
params := component.ExporterCreateParams{Logger: zap.NewNop()}
_, err := createTraceExporter(context.Background(), params, &cfg)
assert.Error(tester, err, "Empty token should produce error")
}

func TestEmptyNode(tester *testing.T) {
cfg := Config{
Token: "test",
Region: "eu",
}
td := consumerdata.TraceData{
Node: nil,
Spans: nil,
}
testTraceExporter(internaldata.OCToTraceData(td), tester, &cfg)
}

func TestWriteSpanError(tester *testing.T) {
cfg := Config{
Token: "test",
Region: "eu",
}
td := consumerdata.TraceData{
Node: nil,
Spans: testSpans,
}
params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, _ := newLogzioExporter(&cfg, params)
oldFunc := exporter.WriteSpanFunc
defer func() { exporter.WriteSpanFunc = oldFunc }()
exporter.WriteSpanFunc = func(*model.Span) error {
return errors.New("fail")
}
droppedSpans, _ := exporter.pushTraceData(context.Background(), internaldata.OCToTraceData(td))
assert.Equal(tester, 1, droppedSpans)
}

func TestConversionTraceError(tester *testing.T) {
cfg := Config{
Token: "test",
Region: "eu",
}
td := consumerdata.TraceData{
Node: nil,
Spans: testSpans,
}
params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, _ := newLogzioExporter(&cfg, params)
oldFunc := exporter.InternalTracesToJaegerTraces
defer func() { exporter.InternalTracesToJaegerTraces = oldFunc }()
exporter.InternalTracesToJaegerTraces = func(td pdata.Traces) ([]*model.Batch, error) {
return nil, errors.New("fail")
}
_, err := exporter.pushTraceData(context.Background(), internaldata.OCToTraceData(td))
assert.Error(tester, err)
}

func TestPushTraceData(tester *testing.T) {
var recordedRequests []byte
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
recordedRequests, _ = ioutil.ReadAll(req.Body)
rw.WriteHeader(http.StatusOK)
}))
cfg := Config{
Token: "test",
Region: "eu",
CustomEndpoint: server.URL,
}
defer server.Close()

td := consumerdata.TraceData{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{
Name: testService,
},
Identifier: &commonpb.ProcessIdentifier{
HostName: testHost,
},
},
Spans: testSpans,
}
testTraceExporter(internaldata.OCToTraceData(td), tester, &cfg)
requests := strings.Split(string(recordedRequests), "\n")
var logzioSpan objects.LogzioSpan
assert.NoError(tester, json.Unmarshal([]byte(requests[0]), &logzioSpan))
assert.Equal(tester, testOperation, logzioSpan.OperationName)
assert.Equal(tester, testService, logzioSpan.Process.ServiceName)

var logzioService objects.LogzioService
assert.NoError(tester, json.Unmarshal([]byte(requests[1]), &logzioService))

assert.Equal(tester, testOperation, logzioService.OperationName)
assert.Equal(tester, testService, logzioService.ServiceName)

}
Loading

0 comments on commit fb3477d

Please sign in to comment.