Skip to content

Commit

Permalink
Added CollectD JSON receiver
Browse files Browse the repository at this point in the history
Ported SignalFx CollectD receiver from SignalFX Gateway to OpenTelemetry Collector

Ported from: https://github.com/signalfx/gateway/tree/master/protocol/collectd
  • Loading branch information
owais committed Dec 3, 2019
1 parent d018721 commit dc833e8
Show file tree
Hide file tree
Showing 22 changed files with 2,632 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
local/

# GoLand IDEA
/.idea/

Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ MISSPELL=misspell -error
MISSPELL_CORRECTION=misspell -w
STATICCHECK=staticcheck
IMPI=impi
RUN_CONFIG=local/config.yaml

GIT_SHA=$(shell git rev-parse --short HEAD)
BUILD_INFO_IMPORT_PATH=github.com/open-telemetry/opentelemetry-collector-contrib/internal/version
Expand Down Expand Up @@ -138,6 +139,7 @@ install-tools:
GO111MODULE=on go install \
github.com/google/addlicense \
golang.org/x/lint/golint \
github.com/golangci/golangci-lint/cmd/golangci-lint \
golang.org/x/tools/cmd/goimports \
github.com/client9/misspell/cmd/misspell \
honnef.co/go/tools/cmd/staticcheck \
Expand All @@ -147,6 +149,10 @@ install-tools:
otelcontribcol:
GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/$(GOOS)/otelcontribcol $(BUILD_INFO) ./cmd/otelcontribcol

.PHONY: run
run:
GO111MODULE=on go run --race ./cmd/otelcontribcol/... --config ${RUN_CONFIG}

.PHONY: docker-component # Not intended to be used directly
docker-component: check-component
GOOS=linux $(MAKE) $(COMPONENT)
Expand Down
10 changes: 8 additions & 2 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinscribereceiver"
)

Expand All @@ -32,7 +33,10 @@ func components() (config.Factories, error) {
return config.Factories{}, err
}

receivers := []receiver.Factory{&zipkinscribereceiver.Factory{}}
receivers := []receiver.Factory{
&collectdreceiver.Factory{},
&zipkinscribereceiver.Factory{},
}
for _, rcv := range factories.Receivers {
receivers = append(receivers, rcv)
}
Expand All @@ -41,7 +45,9 @@ func components() (config.Factories, error) {
errs = append(errs, err)
}

exporters := []exporter.Factory{&stackdriverexporter.Factory{}}
exporters := []exporter.Factory{
&stackdriverexporter.Factory{},
}
for _, exp := range factories.Exporters {
exporters = append(exporters, exp)
}
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ go 1.12

require (
github.com/client9/misspell v0.3.4
github.com/golangci/golangci-lint v1.21.0 // indirect
github.com/google/addlicense v0.0.0-20190907113143-be125746c2c4
github.com/open-telemetry/opentelemetry-collector v0.2.1-0.20191126183205-e94dd19191e0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter v0.0.0-20191126142441-b2a048090ad6
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinscribereceiver v0.0.0-20191126142441-b2a048090ad6
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b
golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac
golang.org/x/tools v0.0.0-20190917162342-3b4f30a44f3b
golang.org/x/tools v0.0.0-20191010075000-0337d82405ff
honnef.co/go/tools v0.0.1-2019.2.3
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver => ./receiver/collectdreceiver
146 changes: 146 additions & 0 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions receiver/collectdreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
5 changes: 5 additions & 0 deletions receiver/collectdreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CollectD `write_http` plugin JSON receiver

This receiver can receive data exported by the CollectD's `write_http` plugin. Only JSON format is supported. Authentication is not supported but support can be added later if needed.

This receiver was donated by SignalFx and ported from SignalFx's Gateway (https://github.com/signalfx/gateway/tree/master/protocol/collectd). As a result, this receiver has supports some additional faetures that are technically not compatible with stock CollectD's write_http plugin. That said, in practice such incompatibilities should never surface. For example, this receiver supports extracting labels from different fields. Given a field value `field[a=b, k=v]`, this receiver will extract `a=b` and `k=v` as label values.
292 changes: 292 additions & 0 deletions receiver/collectdreceiver/collectd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collectdreceiver

import (
"encoding/json"
"fmt"
"strings"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
)

const (
collectDMetricDerive = "derive"
collectDMetricGauge = "gauge"
collectDMetricCounter = "counter"
collectDMetricAbsolute = "absolute"
)

type collectDRecord struct {
Dsnames []*string `json:"dsnames"`
Dstypes []*string `json:"dstypes"`
Host *string `json:"host"`
Interval *float64 `json:"interval"`
Plugin *string `json:"plugin"`
PluginInstance *string `json:"plugin_instance"`
Time *float64 `json:"time"`
TypeS *string `json:"type"`
TypeInstance *string `json:"type_instance"`
Values []*json.Number `json:"values"`
Message *string `json:"message"`
Meta map[string]interface{} `json:"meta"`
Severity *string `json:"severity"`
}

func (r *collectDRecord) isEvent() bool {
return r.Time != nil && r.Severity != nil && r.Message != nil
}

func (r *collectDRecord) protoTime() *timestamp.Timestamp {
if r.Time == nil {
return nil
}
ts := time.Unix(0, int64(float64(time.Second)**r.Time))
tsp, err := ptypes.TimestampProto(ts)
if err != nil {
return nil
}
return tsp
}

func (r *collectDRecord) extractAndAppendMetrics(metrics []*metricspb.Metric, defaultLabels map[string]string) ([]*metricspb.Metric, error) {
// Ignore if record is an event instead of data point
if r.isEvent() {
recordEventsReceived()
return metrics, nil

}

recordMetricsReceived()
labels := make(map[string]string, len(defaultLabels))
for k, v := range defaultLabels {
labels[k] = v
}

for i := range r.Dsnames {
if i < len(r.Dstypes) && i < len(r.Values) && r.Values[i] != nil {
dsType, dsName, val := r.Dstypes[i], r.Dsnames[i], r.Values[i]
metricName, usedDsName := r.getReasonableMetricName(i, labels)

addIfNotNullOrEmpty(labels, "plugin", true, r.Plugin)
parseAndAddLabels(labels, r.PluginInstance, r.Host)
addIfNotNullOrEmpty(labels, "dsname", !usedDsName, dsName)

metric, _ := r.newMetric(metricName, dsType, val, labels)
metrics = append(metrics, metric)

}
}
return metrics, nil
}

func (r *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (*metricspb.Metric, error) {
metric := &metricspb.Metric{}
point, isDouble, err := r.newPoint(val)
if err != nil {
return metric, err
}

lKeys, lValues := labelKeysAndValues(labels)
metric.MetricDescriptor = &metricspb.MetricDescriptor{
Name: name,
Type: r.metricType(dsType, isDouble),
LabelKeys: lKeys,
}
metric.Timeseries = []*metricspb.TimeSeries{
{
LabelValues: lValues,
Points: []*metricspb.Point{point},
},
}

return metric, nil
}

func (r *collectDRecord) metricType(dsType *string, isDouble bool) metricspb.MetricDescriptor_Type {
val := ""
if dsType != nil {
val = *dsType
}

switch val {
case collectDMetricCounter, collectDMetricDerive:
return metricCumulative(isDouble)

// Prometheus collectd exporter just ignores it. We use gauge for it as it seems the
// closes type. https://github.com/prometheus/collectd_exporter/blob/master/main.go#L109-L129
case collectDMetricGauge, collectDMetricAbsolute:
return metricGauge(isDouble)
}
return metricGauge(isDouble)
}

func (r *collectDRecord) newPoint(val *json.Number) (*metricspb.Point, bool, error) {
p := &metricspb.Point{
Timestamp: r.protoTime(),
}

isDouble := true
if v, err := val.Int64(); err == nil {
isDouble = false
p.Value = &metricspb.Point_Int64Value{Int64Value: v}
} else {
v, err := val.Float64()
if err != nil {
return nil, isDouble, fmt.Errorf("value could not be decoded: %v", err)
}
p.Value = &metricspb.Point_DoubleValue{DoubleValue: v}
}
return p, isDouble, nil
}

// getReasonableMetricName creates metrics names by joining them (if non empty) type.typeinstance
// if there are more than one dsname append .dsname for the particular uint. if there's only one it
// becomes a dimension.
func (r *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
usedDsName := false
parts := make([]byte, 0, len(*r.TypeS)+len(*r.TypeInstance))

if !isNilOrEmpty(r.TypeS) {
parts = append(parts, *r.TypeS...)
}
parts = r.pointTypeInstance(attrs, parts)
if r.Dsnames != nil && !isNilOrEmpty(r.Dsnames[index]) && len(r.Dsnames) > 1 {
if len(parts) > 0 {
parts = append(parts, '.')
}
parts = append(parts, *r.Dsnames[index]...)
usedDsName = true
}
return string(parts), usedDsName
}

// pointTypeInstance extracts information from the TypeInstance field and appends to the metric name when possible.
func (r *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
if !isNilOrEmpty(r.TypeInstance) {
instanceName, extractedAttrs := labelsFromName(r.TypeInstance)
if instanceName != "" {
if len(parts) > 0 {
parts = append(parts, '.')
}
parts = append(parts, instanceName...)
}
for k, v := range extractedAttrs {
if _, exists := attrs[k]; !exists {
val := v
addIfNotNullOrEmpty(attrs, k, true, &val)
}
}
}
return parts
}

// labelsFromName tries to pull out dimensions out of name in the format name[k=v,f=x]-morename
// would return name-morename and extract dimensions (k,v) and (f,x)
// if we encounter something we don't expect use original
// this is a bit complicated to avoid allocations, string.split allocates, while slices
// inside same function, do not.
func labelsFromName(val *string) (instanceName string, toAddDims map[string]string) {
instanceName = *val
index := strings.Index(*val, "[")
if index > -1 {
left := (*val)[:index]
rest := (*val)[index+1:]
index = strings.Index(rest, "]")
if index > -1 {
working := make(map[string]string)
dimensions := rest[:index]
rest = rest[index+1:]
cindex := strings.Index(dimensions, ",")
prev := 0
for {
if cindex < prev {
cindex = len(dimensions)
}
piece := dimensions[prev:cindex]
tindex := strings.Index(piece, "=")
//lint:ignore S1003 staticcheck wants us to use strings.Conains instead of Index
if tindex == -1 || strings.Index(piece[tindex+1:], "=") > -1 {
return
}
working[piece[:tindex]] = piece[tindex+1:]
if cindex == len(dimensions) {
break
}
prev = cindex + 1
cindex = strings.Index(dimensions[prev:], ",") + prev
}
toAddDims = working
instanceName = left + rest
}
}
return
}

func isNilOrEmpty(str *string) bool {
return str == nil || *str == ""
}

func addIfNotNullOrEmpty(m map[string]string, key string, cond bool, val *string) {
if cond && val != nil && *val != "" {
m[key] = *val
}
}

func parseAndAddLabels(labels map[string]string, pluginInstance *string, host *string) {
parseNameForLabels(labels, "plugin_instance", pluginInstance)
parseNameForLabels(labels, "host", host)
}

func parseNameForLabels(labels map[string]string, key string, val *string) {
instanceName, toAddDims := labelsFromName(val)

for k, v := range toAddDims {
if _, exists := labels[k]; !exists {
val := v
addIfNotNullOrEmpty(labels, k, true, &val)
}
}
addIfNotNullOrEmpty(labels, key, true, &instanceName)
}

func labelKeysAndValues(labels map[string]string) ([]*metricspb.LabelKey, []*metricspb.LabelValue) {
keys := make([]*metricspb.LabelKey, len(labels))
values := make([]*metricspb.LabelValue, len(labels))
i := 0
for k, v := range labels {
keys[i] = &metricspb.LabelKey{Key: k}
values[i] = &metricspb.LabelValue{Value: v}
i++
}
return keys, values
}

func metricCumulative(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_CUMULATIVE_DOUBLE
}
return metricspb.MetricDescriptor_CUMULATIVE_INT64
}

func metricGauge(isDouble bool) metricspb.MetricDescriptor_Type {
if isDouble {
return metricspb.MetricDescriptor_GAUGE_DOUBLE
}
return metricspb.MetricDescriptor_GAUGE_INT64
}
Loading

0 comments on commit dc833e8

Please sign in to comment.