Skip to content

Commit

Permalink
[processor/websocket] Implementation of websocket processor
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jun 4, 2023
1 parent 34d555c commit 6c212cf
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 10 deletions.
20 changes: 20 additions & 0 deletions .chloggen/websocket-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: websocketprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation of websocket processor

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [19633]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
12 changes: 7 additions & 5 deletions processor/websocketprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@ package websocketprocessor // import "github.com/open-telemetry/opentelemetry-co

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"golang.org/x/time/rate"
)

const defaultPort = 12001
const defaultEndpoint = ":12001"

type Config struct {
// Port indicates the port used by the web socket listener started by this processor.
// Defaults to 12001.
Port int `mapstructure:"port"`
confighttp.HTTPServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Limit is a float that indicates the maximum number of messages repeated
// through the websocket by this processor in messages per second. Defaults to 1.
Limit rate.Limit `mapstructure:"limit"`
}

func createDefaultConfig() component.Config {
return &Config{
Port: defaultPort,
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: defaultEndpoint,
},
Limit: 1,
}
}
2 changes: 1 addition & 1 deletion processor/websocketprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ import (

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.Equal(t, 12001, cfg.Port)
assert.Equal(t, ":12001", cfg.Endpoint)
assert.EqualValues(t, 1, cfg.Limit)
}
37 changes: 37 additions & 0 deletions processor/websocketprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
package websocketprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/websocketprocessor"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/websocketprocessor/internal/metadata"
Expand All @@ -13,5 +17,38 @@ func NewFactory() processor.Factory {
return processor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithTraces(createTraceProcessor, metadata.TracesStability),
processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
processor.WithLogs(createLogsProcessor, metadata.LogsStability),
)
}

func createMetricsProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, consumer consumer.Metrics) (processor.Metrics, error) {
rCfg := cfg.(*Config)
p, err := newProcessor(params, rCfg)
if err != nil {
return nil, err
}
p.metricsSink = consumer
return p, nil
}

func createLogsProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, consumer consumer.Logs) (processor.Logs, error) {
rCfg := cfg.(*Config)
p, err := newProcessor(params, rCfg)
if err != nil {
return nil, err
}
p.logsSink = consumer
return p, nil
}

func createTraceProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, consumer consumer.Traces) (processor.Traces, error) {
rCfg := cfg.(*Config)
p, err := newProcessor(params, rCfg)
if err != nil {
return nil, err
}
p.tracesSink = consumer
return p, nil
}
19 changes: 15 additions & 4 deletions processor/websocketprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,43 @@ require (
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector v0.78.3-0.20230601234953-deffd4892002
go.opentelemetry.io/collector/component v0.78.3-0.20230601234953-deffd4892002
go.opentelemetry.io/collector/consumer v0.78.3-0.20230601234953-deffd4892002
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012.0.20230601234953-deffd4892002
go.uber.org/zap v1.24.0
golang.org/x/net v0.10.0
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.9.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/confmap v0.78.3-0.20230601234953-deffd4892002 // indirect
go.opentelemetry.io/collector/consumer v0.78.3-0.20230601234953-deffd4892002 // indirect
go.opentelemetry.io/collector/exporter v0.78.3-0.20230601234953-deffd4892002 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012.0.20230601234953-deffd4892002 // indirect
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012.0.20230601234953-deffd4892002 // indirect
go.opentelemetry.io/collector/receiver v0.78.3-0.20230601234953-deffd4892002 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
Expand Down
Loading

0 comments on commit 6c212cf

Please sign in to comment.