Skip to content

Commit

Permalink
[receiver/wavefront] wrap metrics receiver under carbon receiver inst…
Browse files Browse the repository at this point in the history
…ead of using export function (open-telemetry#27259)

**Description:** 
Wavefrontreceiver is very similar to carbonreceiver: it is TCP based in
which each received text line represents a single metric data point. In
order to avoid using exported function `carbonreceiver.New(...)`, we can
wrap metrics receiver under carbon receiver.

**Link to tracking Issue:** 

open-telemetry#27248

**Testing:** 
make chlog-validate
go test for wavefrontreceiver

**Documentation:**

---------

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
sakulali and mx-psi committed Nov 1, 2023
1 parent f530bc4 commit 1b8ccde
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 24 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix-wavefront-metric-factory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: wavefrontreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Wrap metrics receiver under carbon receiver instead of using export function

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27248]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
29 changes: 5 additions & 24 deletions receiver/wavefrontreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-col

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver/internal/metadata"
)
Expand Down Expand Up @@ -42,27 +41,9 @@ func createMetricsReceiver(
cfg component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {

rCfg := cfg.(*Config)

// Wavefront is very similar to Carbon: it is TCP based in which each received
// text line represents a single metric data point. They differ on the format
// of their textual representation.
//
// The Wavefront receiver leverages the Carbon receiver code by implementing
// a dedicated parser for its format.
carbonCfg := carbonreceiver.Config{
NetAddr: confignet.NetAddr{
Endpoint: rCfg.Endpoint,
Transport: "tcp",
},
TCPIdleTimeout: rCfg.TCPIdleTimeout,
Parser: &protocol.Config{
Type: "plaintext", // TODO: update after other parsers are implemented for Carbon receiver.
Config: &WavefrontParser{
ExtractCollectdTags: rCfg.ExtractCollectdTags,
},
},
rCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("a wavefront receiver config was expected by the receiver factory, but got %T", rCfg)
}
return carbonreceiver.New(params, carbonCfg, consumer)
return newMetricsReceiver(rCfg, params, consumer), nil
}
72 changes: 72 additions & 0 deletions receiver/wavefrontreceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package wavefrontreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
)

var _ receiver.Metrics = (*metricsReceiver)(nil)

type metricsReceiver struct {
cfg *Config
set receiver.CreateSettings
nextConsumer consumer.Metrics
carbonReceiver receiver.Metrics
}

func newMetricsReceiver(cfg *Config, set receiver.CreateSettings, nextConsumer consumer.Metrics) *metricsReceiver {
return &metricsReceiver{
cfg: cfg,
set: set,
nextConsumer: nextConsumer,
}
}

func (r *metricsReceiver) Start(ctx context.Context, host component.Host) error {
fact := carbonreceiver.NewFactory()

// Wavefront is very similar to Carbon: it is TCP based in which each received
// text line represents a single metric data point. They differ on the format
// of their textual representation.
//
// The Wavefront receiver leverages the Carbon receiver code by implementing
// a dedicated parser for its format.
carbonCfg := &carbonreceiver.Config{
NetAddr: confignet.NetAddr{
Endpoint: r.cfg.Endpoint,
Transport: "tcp",
},
TCPIdleTimeout: r.cfg.TCPIdleTimeout,
Parser: &protocol.Config{
Type: "plaintext", // TODO: update after other parsers are implemented for Carbon receiver.
Config: &WavefrontParser{
ExtractCollectdTags: r.cfg.ExtractCollectdTags,
},
},
}

carbonReceiver, err := fact.CreateMetricsReceiver(ctx, r.set, carbonCfg, r.nextConsumer)
if err != nil {
return err
}
r.carbonReceiver = carbonReceiver

return r.carbonReceiver.Start(ctx, host)
}

func (r *metricsReceiver) Shutdown(ctx context.Context) error {
if r.carbonReceiver != nil {
return r.carbonReceiver.Shutdown(ctx)
}
return nil
}

0 comments on commit 1b8ccde

Please sign in to comment.