Skip to content

Commit

Permalink
Add Carbon receiver (open-telemetry#109)
Browse files Browse the repository at this point in the history
* Add Carbon receiver

Add a receiver supporting the Carbon plaintext protocol over TCP or UDP.
This also sets the stage to other more complex parsers that can make
various transformations over metrics sent to the endpoints.

* PR Feedback first round

* PR feedback on transport.Client
  • Loading branch information
Paulo Janotti committed Jan 31, 2020
1 parent b633954 commit b64ac15
Show file tree
Hide file tree
Showing 29 changed files with 3,244 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver"
Expand All @@ -47,6 +48,7 @@ func components() (config.Factories, error) {
&sapmreceiver.Factory{},
&zipkinscribereceiver.Factory{},
&signalfxreceiver.Factory{},
&carbonreceiver.Factory{},
}
for _, rcv := range factories.Receivers {
receivers = append(receivers, rcv)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor v0.0.0-20200116182905-41c032071dce
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver v0.0.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver v0.0.0
Expand Down Expand Up @@ -40,6 +41,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signa

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter => ./exporter/stackdriverexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver => ./receiver/carbonreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver => ./receiver/collectdreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver => ./receiver/sapmreceiver
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -535,6 +536,7 @@ github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/open-telemetry/opentelemetry-collector v0.2.4-0.20200118022335-34b2459597f1/go.mod h1:WxiK9mcisb/hM6M6+2BRV/VIU2c8VzlCRJED2S1MWns=
github.com/open-telemetry/opentelemetry-collector v0.2.4 h1:KLDYeGaDz9nVwFgZSBaw3ACg3ykuWUEDvgLG1eZ9lvs=
github.com/open-telemetry/opentelemetry-collector v0.2.4/go.mod h1:ccg/kg89vmkW1VsSg74QKoVOsgSwH7bvEBvo5LeN+V8=
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w=
Expand Down Expand Up @@ -653,6 +655,7 @@ github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac h1:wbW+By
github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4-0.20190306220146-200a235640ff h1:JcVn27VGCEwd33jyNj+3IqEbOmzAX9f9LILt3SoGPHU=
github.com/smartystreets/goconvey v1.6.4-0.20190306220146-200a235640ff/go.mod h1:KSQcGKpxUMHk3nbYzs/tIBAM2iDooCn0BmttHOJEbLs=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sourcegraph/go-diff v0.5.1 h1:gO6i5zugwzo1RVTvgvfwCOSVegNuvnNi6bAD1QCmkHs=
Expand All @@ -679,6 +682,7 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.4.1-0.20190911140308-99520c81d86e h1:D4jmJ9BqzeMv7BvAqjooNtXH2PXG7m+pcYRnj2Ojlrk=
github.com/spf13/viper v1.4.1-0.20190911140308-99520c81d86e/go.mod h1:jUyf+v/KTOnRyUy2/AsjF537WfJWVv3AnlcKSNd+AIg=
github.com/spf13/viper v1.6.2/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6VSaahyKadf4WtSsJIgne6A1WLOAGM8A=
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU=
Expand Down Expand Up @@ -923,6 +927,7 @@ golang.org/x/tools v0.0.0-20190311215038-5c2858a9cfe5/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190322203728-c1a832b0ad89/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190521203540-521d6ed310dd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
Expand Down Expand Up @@ -1018,6 +1023,7 @@ gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
Expand Down
1 change: 1 addition & 0 deletions receiver/carbonreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
46 changes: 46 additions & 0 deletions receiver/carbonreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package carbonreceiver

import (
"time"

"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
)

const (
// parserConfigSection is the name that must be used for the parser settings
// in the configuration struct. The metadata mapstructure for the parser
// should use the same string.
parserConfigSection = "parser"
)

// Config defines configuration for the Carbon receiver.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`

// Transport is either "tcp" or "udp".
Transport string `mapstructure:"transport"`

// TCPIdleTimeout is the timout for idle TCP connections, it is ignored
// if transport being used is UDP.
TCPIdleTimeout time.Duration `mapstructure:"tcp_idle_timeout"`

// Parser specifies a parser and the respective configuration to be used
// by the receiver.
Parser *protocol.Config `mapstructure:"parser"`
}
66 changes: 66 additions & 0 deletions receiver/carbonreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package carbonreceiver

import (
"path"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
)

func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
assert.Nil(t, err)

factory := &Factory{}
factories.Receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 2)

r0 := cfg.Receivers["carbon"]
assert.Equal(t, factory.CreateDefaultConfig(), r0)

r1 := cfg.Receivers["carbon/allsettings"].(*Config)
assert.Equal(t,
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "carbon/allsettings",
Endpoint: "localhost:8080",
},
Transport: "udp",
TCPIdleTimeout: 5 * time.Second,
Parser: &protocol.Config{
Type: "delimiter",
Config: &protocol.DelimiterParser{
OrDemiliter: "|",
},
},
},
r1)
}
19 changes: 19 additions & 0 deletions receiver/carbonreceiver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package carbonreceiver implements a receiver that can be used by the
// OpenTelemetry collector to receive data in the Carbon supported formats.
// Carbon is the backend used by Graphite, see
// https://graphite.readthedocs.io/en/latest/carbon-daemons.html
package carbonreceiver
125 changes: 125 additions & 0 deletions receiver/carbonreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package carbonreceiver

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport"
)

// This file implements factory for Carbon receiver.

const (
// The value of "type" key in configuration.
typeStr = "carbon"
)

// Factory is the factory for carbon receiver.
type Factory struct {
}

var _ receiver.Factory = (*Factory)(nil)

// Type gets the type of the Receiver config created by this factory.
func (f *Factory) Type() string {
return typeStr
}

// CustomUnmarshaler returns the custom function to handle the special settings
// used on the receiver.
func (f *Factory) CustomUnmarshaler() receiver.CustomUnmarshaler {
return func(v *viper.Viper, viperKey string, sourceViperSection *viper.Viper, intoCfg interface{}) error {
if sourceViperSection == nil {
// The section is empty nothing to do, using the default config.
return nil
}

// Unmarshal but not exact yet so the different keys under config do not
// trigger errors, this is needed so that the types of protocol and transport
// are read.
if err := sourceViperSection.Unmarshal(intoCfg); err != nil {
return err
}

// Unmarshal the protocol, so the type of config can be properly set.
rCfg := intoCfg.(*Config)
vParserCfg := sourceViperSection.Sub(parserConfigSection)
if vParserCfg != nil {
if err := protocol.LoadParserConfig(vParserCfg, rCfg.Parser); err != nil {
return fmt.Errorf(
"error on %q section for %s: %v",
parserConfigSection,
rCfg.Name(),
err)
}
}

// Unmarshal exact to validate the config keys.
if err := sourceViperSection.UnmarshalExact(intoCfg); err != nil {
return err
}

return nil
}
}

// CreateDefaultConfig creates the default configuration for Carbon receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
Endpoint: "localhost:2003",
},
Transport: "tcp",
TCPIdleTimeout: transport.TCPIdleTimeoutDefault,
Parser: &protocol.Config{
Type: "plaintext",
Config: &protocol.PlaintextParser{},
},
}
}

// CreateTraceReceiver creates a trace receiver based on provided config.
func (f *Factory) CreateTraceReceiver(
ctx context.Context,
logger *zap.Logger,
cfg configmodels.Receiver,
consumer consumer.TraceConsumer,
) (receiver.TraceReceiver, error) {

return nil, configerror.ErrDataTypeIsNotSupported
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(
logger *zap.Logger,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
) (receiver.MetricsReceiver, error) {

rCfg := cfg.(*Config)
return New(logger, *rCfg, consumer)
}
Loading

0 comments on commit b64ac15

Please sign in to comment.