Skip to content

Commit

Permalink
Added CollectD JSON receiver
Browse files Browse the repository at this point in the history
Ported CollectD receiver from SignalFX Gateway to OpenTelemetry Collector

Ported from: https://github.com/signalfx/gateway/tree/master/protocol/collectd
  • Loading branch information
owais committed Nov 27, 2019
1 parent b2a0480 commit fb26eb4
Show file tree
Hide file tree
Showing 21 changed files with 2,486 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
local/

# GoLand IDEA
/.idea/

Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ MISSPELL=misspell -error
MISSPELL_CORRECTION=misspell -w
STATICCHECK=staticcheck
IMPI=impi
RUN_CONFIG=local/config.yaml

GIT_SHA=$(shell git rev-parse --short HEAD)
BUILD_INFO_IMPORT_PATH=github.com/open-telemetry/opentelemetry-collector-contrib/internal/version
Expand Down Expand Up @@ -138,6 +139,7 @@ install-tools:
GO111MODULE=on go install \
github.com/google/addlicense \
golang.org/x/lint/golint \
github.com/golangci/golangci-lint/cmd/golangci-lint \
golang.org/x/tools/cmd/goimports \
github.com/client9/misspell/cmd/misspell \
honnef.co/go/tools/cmd/staticcheck \
Expand All @@ -147,6 +149,10 @@ install-tools:
otelcontribcol:
GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/$(GOOS)/otelcontribcol $(BUILD_INFO) ./cmd/otelcontribcol

.PHONY: run
run:
GO111MODULE=on go run --race ./cmd/otelcontribcol/... --config ${RUN_CONFIG}

.PHONY: docker-component # Not intended to be used directly
docker-component: check-component
GOOS=linux $(MAKE) $(COMPONENT)
Expand Down
10 changes: 8 additions & 2 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinscribereceiver"
)

Expand All @@ -32,7 +33,10 @@ func components() (config.Factories, error) {
return config.Factories{}, err
}

receivers := []receiver.Factory{&zipkinscribereceiver.Factory{}}
receivers := []receiver.Factory{
&collectdreceiver.Factory{},
&zipkinscribereceiver.Factory{},
}
for _, rcv := range factories.Receivers {
receivers = append(receivers, rcv)
}
Expand All @@ -41,7 +45,9 @@ func components() (config.Factories, error) {
errs = append(errs, err)
}

exporters := []exporter.Factory{&stackdriverexporter.Factory{}}
exporters := []exporter.Factory{
&stackdriverexporter.Factory{},
}
for _, exp := range factories.Exporters {
exporters = append(exporters, exp)
}
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ require (
github.com/google/addlicense v0.0.0-20190907113143-be125746c2c4
github.com/open-telemetry/opentelemetry-collector v0.2.1-0.20191016224815-dfabfb0c1d1e
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter v0.0.0-20191021165924-bb954188ac10
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinscribereceiver v0.0.0-20191021165924-bb954188ac10
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b
golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac
golang.org/x/tools v0.0.0-20190917162342-3b4f30a44f3b
honnef.co/go/tools v0.0.1-2019.2.3
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver => ./receiver/collectdreceiver
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,9 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac h1:8R1esu+8QioDxo4E4mX6bFztO+dMTM49DNAaWfO5OeY=
golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e h1:JgcxKXxCjrA2tyDP/aNU9K0Ck5Czfk6C7e2tMw7+bSI=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down Expand Up @@ -665,8 +666,9 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190813034749-528a2984e271/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190906203814-12febf440ab1/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190917162342-3b4f30a44f3b h1:5PDpbTpVmeVPIQOoxshLbs4ATaIDQrZN5z3nTUtm2+8=
golang.org/x/tools v0.0.0-20190917162342-3b4f30a44f3b/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
Expand Down
1 change: 1 addition & 0 deletions receiver/collectdreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
292 changes: 292 additions & 0 deletions receiver/collectdreceiver/collectd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collectdreceiver

import (
"encoding/json"
"fmt"
"strings"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
)

const (
collectDMetricDerive = "derive"
collectDMetricGauge = "gauge"
collectDMetricCounter = "counter"
collectDMetricAbsolute = "absolute"
)

type collectDRecord struct {
Dsnames []*string `json:"dsnames"`
Dstypes []*string `json:"dstypes"`
Host *string `json:"host"`
Interval *float64 `json:"interval"`
Plugin *string `json:"plugin"`
PluginInstance *string `json:"plugin_instance"`
Time *float64 `json:"time"`
TypeS *string `json:"type"`
TypeInstance *string `json:"type_instance"`
Values []*json.Number `json:"values"`
Message *string `json:"message"`
Meta map[string]interface{} `json:"meta"`
Severity *string `json:"severity"`
}

func (r *collectDRecord) isEvent() bool {
return r.Time != nil && r.Severity != nil && r.Message != nil
}

func (r *collectDRecord) protoTime() *timestamp.Timestamp {
if r.Time == nil {
return nil
}
ts := time.Unix(0, int64(float64(time.Second)**r.Time))
tsp, err := ptypes.TimestampProto(ts)
if err != nil {
return nil
}
return tsp
}

func (r *collectDRecord) extractAndAppendMetrics(metrics []*metricspb.Metric, defaultLabels map[string]string) ([]*metricspb.Metric, error) {
// Ignore if record is an event instead of data point
if r.isEvent() {
recordEventsReceived()
return metrics, nil

}

recordMetricsReceived()
labels := make(map[string]string, len(defaultLabels))
for k, v := range defaultLabels {
labels[k] = v
}

for i := range r.Dsnames {
if i < len(r.Dstypes) && i < len(r.Values) && r.Values[i] != nil {
dsType, dsName, val := r.Dstypes[i], r.Dsnames[i], r.Values[i]
metricName, usedDsName := r.getReasonableMetricName(i, labels)

addIfNotNullOrEmpty(labels, "plugin", true, r.Plugin)
parseAndAddLabels(labels, r.PluginInstance, r.Host)
addIfNotNullOrEmpty(labels, "dsname", !usedDsName, dsName)

metric, _ := r.newMetric(metricName, dsType, val, labels)
metrics = append(metrics, metric)

}
}
return metrics, nil
}

func (r *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (*metricspb.Metric, error) {
metric := &metricspb.Metric{}
point, isDouble, err := r.newPoint(val)
if err != nil {
return metric, err
}

lKeys, lValues := labelKeysAndValues(labels)
metric.MetricDescriptor = &metricspb.MetricDescriptor{
Name: name,
Type: r.metricType(dsType, isDouble),
LabelKeys: lKeys,
}
metric.Timeseries = []*metricspb.TimeSeries{
{
LabelValues: lValues,
Points: []*metricspb.Point{point},
},
}

return metric, nil
}

func (r *collectDRecord) metricType(dsType *string, isDouble bool) metricspb.MetricDescriptor_Type {
val := ""
if dsType != nil {
val = *dsType
}

switch val {
case collectDMetricCounter, collectDMetricDerive:
return metricCumulative(isDouble)

// Prometheus collectd exporter just ignores it. We use gauge for it as it seems the
// closes type. https://github.com/prometheus/collectd_exporter/blob/master/main.go#L109-L129
case collectDMetricGauge, collectDMetricAbsolute:
return metricGauge(isDouble)
}
return metricGauge(isDouble)
}

func (r *collectDRecord) newPoint(val *json.Number) (*metricspb.Point, bool, error) {
p := &metricspb.Point{
Timestamp: r.protoTime(),
}

isDouble := true
if v, err := val.Int64(); err == nil {
isDouble = false
p.Value = &metricspb.Point_Int64Value{Int64Value: v}
} else {
v, err := val.Float64()
if err != nil {
return nil, isDouble, fmt.Errorf("value could not be decoded: %v", err)
}
p.Value = &metricspb.Point_DoubleValue{DoubleValue: v}
}
return p, isDouble, nil
}

// getReasonableMetricName creates metrics names by joining them (if non empty) type.typeinstance
// if there are more than one dsname append .dsname for the particular uint. if there's only one it
// becomes a dimension.
func (r *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
usedDsName := false
parts := make([]byte, 0, len(*r.TypeS)+len(*r.TypeInstance))

if !isNilOrEmpty(r.TypeS) {
parts = append(parts, *r.TypeS...)
}
parts = r.pointTypeInstance(attrs, parts)
if r.Dsnames != nil && !isNilOrEmpty(r.Dsnames[index]) && len(r.Dsnames) > 1 {
if len(parts) > 0 {
parts = append(parts, '.')
}
parts = append(parts, *r.Dsnames[index]...)
usedDsName = true
}
return string(parts), usedDsName
}

// pointTypeInstance extracts information from the TypeInstance field and appends to the metric name when possible.
func (r *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
if !isNilOrEmpty(r.TypeInstance) {
instanceName, extractedAttrs := labelsFromName(r.TypeInstance)
if instanceName != "" {
if len(parts) > 0 {
parts = append(parts, '.')
}
parts = append(parts, instanceName...)
}
for k, v := range extractedAttrs {
if _, exists := attrs[k]; !exists {
val := v
addIfNotNullOrEmpty(attrs, k, true, &val)
}
}
}
return parts
}

// labelsFromName tries to pull out dimensions out of name in the format name[k=v,f=x]-morename
// would return name-morename and extract dimensions (k,v) and (f,x)
// if we encounter something we don't expect use original
// this is a bit complicated to avoid allocations, string.split allocates, while slices
// inside same function, do not.
func labelsFromName(val *string) (instanceName string, toAddDims map[string]string) {
instanceName = *val
index := strings.Index(*val, "[")
if index > -1 {
left := (*val)[:index]
rest := (*val)[index+1:]
index = strings.Index(rest, "]")
if index > -1 {
working := make(map[string]string)
dimensions := rest[:index]
rest = rest[index+1:]
cindex := strings.Index(dimensions, ",")
prev := 0
for {
if cindex < prev {
cindex = len(dimensions)
}
piece := dimensions[prev:cindex]
tindex := strings.Index(piece, "=")
//lint:ignore S1003 staticcheck wants us to use strings.Conains instead of Index
if tindex == -1 || strings.Index(piece[tindex+1:], "=") > -1 {
return
}
working[piece[:tindex]] = piece[tindex+1:]
if cindex == len(dimensions) {
break
}
prev = cindex + 1
cindex = strings.Index(dimensions[prev:], ",") + prev
}
toAddDims = working
instanceName = left + rest
}
}
return
}

func isNilOrEmpty(str *string) bool {
return str == nil || *str == ""
}

func addIfNotNullOrEmpty(m map[string]string, key string, cond bool, val *string) {
if cond && val != nil && *val != "" {
m[key] = *val
}
}

func parseAndAddLabels(labels map[string]string, pluginInstance *string, host *string) {
parseNameForLabels(labels, "plugin_instance", pluginInstance)
parseNameForLabels(labels, "host", host)
}

func parseNameForLabels(labels map[string]string, key string, val *string) {
instanceName, toAddDims := labelsFromName(val)

for k, v := range toAddDims {
if _, exists := labels[k]; !exists {
val := v
addIfNotNullOrEmpty(labels, k, true, &val)
}
}
addIfNotNullOrEmpty(labels, key, true, &instanceName)
}

func labelKeysAndValues(labels map[string]string) ([]*metricspb.LabelKey, []*metricspb.LabelValue) {
keys := make([]*metricspb.LabelKey, len(labels))
values := make([]*metricspb.LabelValue, len(labels))
i := 0
for k, v := range labels {
keys[i] = &metricspb.LabelKey{Key: k}
values[i] = &metricspb.LabelValue{Value: v}
i++
}
return keys, values
}

func metricCumulative(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_CUMULATIVE_DOUBLE
}
return metricspb.MetricDescriptor_CUMULATIVE_INT64
}

func metricGauge(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_GAUGE_DOUBLE
}
return metricspb.MetricDescriptor_GAUGE_INT64
}
Loading

0 comments on commit fb26eb4

Please sign in to comment.