Skip to content

Commit

Permalink
[processor/websocket] Implementation of websocket processor (open-tel…
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme authored and Caleb-Hurshman committed Jul 6, 2023
1 parent 8b1bb31 commit c9c0162
Show file tree
Hide file tree
Showing 10 changed files with 449 additions and 10 deletions.
20 changes: 20 additions & 0 deletions .chloggen/websocket-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: websocketprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation of websocket processor

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [19633]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
50 changes: 50 additions & 0 deletions processor/websocketprocessor/channelset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package websocketprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/websocketprocessor"

import "sync"

// channelSet is a collection of byte channels where adding, removing, and writing to
// the channels is synchronized.
type channelSet struct {
i int
mu sync.RWMutex
chanmap map[int]chan []byte
}

func newChannelSet() *channelSet {
return &channelSet{
chanmap: map[int]chan []byte{},
}
}

// add adds the channel to the channelSet and returns a key (just an int) used to
// remove the channel later.
func (c *channelSet) add(ch chan []byte) int {
c.mu.Lock()
idx := c.i
c.chanmap[idx] = ch
c.i++
c.mu.Unlock()
return idx
}

// writeBytes writes the passed in bytes to all of the channels in the
// channelSet.
func (c *channelSet) writeBytes(bytes []byte) {
c.mu.RLock()
for _, ch := range c.chanmap {
ch <- bytes
}
c.mu.RUnlock()
}

// closeAndRemove closes then removes the channel associated with the passed in
// key. Panics if an invalid key is passed in.
func (c *channelSet) closeAndRemove(key int) {
c.mu.Lock()
close(c.chanmap[key])
delete(c.chanmap, key)
c.mu.Unlock()
}
24 changes: 24 additions & 0 deletions processor/websocketprocessor/channelset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package websocketprocessor

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestChannelset(t *testing.T) {
cs := newChannelSet()
ch := make(chan []byte)
key := cs.add(ch)
go func() {
cs.writeBytes([]byte("hello"))
}()
assert.Eventually(t, func() bool {
return assert.Equal(t, []byte("hello"), <-ch)
}, time.Second, time.Millisecond*10)
cs.closeAndRemove(key)
}
12 changes: 7 additions & 5 deletions processor/websocketprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@ package websocketprocessor // import "github.com/open-telemetry/opentelemetry-co

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"golang.org/x/time/rate"
)

const defaultPort = 12001
const defaultEndpoint = ":12001"

type Config struct {
// Port indicates the port used by the web socket listener started by this processor.
// Defaults to 12001.
Port int `mapstructure:"port"`
confighttp.HTTPServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Limit is a float that indicates the maximum number of messages repeated
// through the websocket by this processor in messages per second. Defaults to 1.
Limit rate.Limit `mapstructure:"limit"`
}

func createDefaultConfig() component.Config {
return &Config{
Port: defaultPort,
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: defaultEndpoint,
},
Limit: 1,
}
}
2 changes: 1 addition & 1 deletion processor/websocketprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ import (

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.Equal(t, 12001, cfg.Port)
assert.Equal(t, ":12001", cfg.Endpoint)
assert.EqualValues(t, 1, cfg.Limit)
}
56 changes: 56 additions & 0 deletions processor/websocketprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,70 @@
package websocketprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/websocketprocessor"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/websocketprocessor/internal/metadata"
)

var processors = sharedcomponent.NewSharedComponents()

func NewFactory() processor.Factory {
return processor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithTraces(createTraceProcessor, metadata.TracesStability),
processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
processor.WithLogs(createLogsProcessor, metadata.LogsStability),
)
}

func createMetricsProcessor(ctx context.Context, params processor.CreateSettings, cfg component.Config, c consumer.Metrics) (processor.Metrics, error) {
rCfg := cfg.(*Config)
p := processors.GetOrAdd(cfg, func() component.Component {
return newProcessor(params, rCfg)
})
fn := p.Unwrap().(*wsprocessor).ConsumeMetrics
return processorhelper.NewMetricsProcessor(ctx, params, cfg, c,
fn,
processorhelper.WithCapabilities(consumer.Capabilities{
MutatesData: false,
}),
processorhelper.WithStart(p.Start),
processorhelper.WithShutdown(p.Shutdown))
}

func createLogsProcessor(ctx context.Context, params processor.CreateSettings, cfg component.Config, c consumer.Logs) (processor.Logs, error) {
rCfg := cfg.(*Config)
p := processors.GetOrAdd(cfg, func() component.Component {
return newProcessor(params, rCfg)
})
fn := p.Unwrap().(*wsprocessor).ConsumeLogs
return processorhelper.NewLogsProcessor(ctx, params, cfg, c,
fn,
processorhelper.WithCapabilities(consumer.Capabilities{
MutatesData: false,
}),
processorhelper.WithStart(p.Start),
processorhelper.WithShutdown(p.Shutdown))
}

func createTraceProcessor(ctx context.Context, params processor.CreateSettings, cfg component.Config, c consumer.Traces) (processor.Traces, error) {
rCfg := cfg.(*Config)
p := processors.GetOrAdd(cfg, func() component.Component {
return newProcessor(params, rCfg)
})
fn := p.Unwrap().(*wsprocessor).ConsumeTraces
return processorhelper.NewTracesProcessor(ctx, params, cfg, c,
fn,
processorhelper.WithCapabilities(consumer.Capabilities{
MutatesData: false,
}),
processorhelper.WithStart(p.Start),
processorhelper.WithShutdown(p.Shutdown))
}
20 changes: 16 additions & 4 deletions processor/websocketprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,51 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/webso
go 1.19

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.79.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector v0.79.0
go.opentelemetry.io/collector/component v0.79.0
go.opentelemetry.io/collector/consumer v0.79.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012
go.uber.org/zap v1.24.0
golang.org/x/net v0.10.0
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.9.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/confmap v0.79.0 // indirect
go.opentelemetry.io/collector/consumer v0.79.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent
Loading

0 comments on commit c9c0162

Please sign in to comment.