Skip to content

Commit

Permalink
code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jun 8, 2023
1 parent 02048e5 commit 49d6711
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 60 deletions.
50 changes: 50 additions & 0 deletions processor/websocketprocessor/channelset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package websocketprocessor

import "sync"

// channelSet is a collection of byte channels where adding, removing, and writing to
// the channels is synchronized.
type channelSet struct {
i int
mu sync.RWMutex
chanmap map[int]chan []byte
}

func newChannelSet() *channelSet {
return &channelSet{
chanmap: map[int]chan []byte{},
}
}

// add adds the channel to the channelSet and returns a key (just an int) used to
// remove the channel later.
func (c *channelSet) add(ch chan []byte) int {
c.mu.Lock()
idx := c.i
c.chanmap[idx] = ch
c.i++
c.mu.Unlock()
return idx
}

// writeBytes writes the passed in bytes to all of the channels in the
// channelSet.
func (c *channelSet) writeBytes(bytes []byte) {
c.mu.RLock()
for _, ch := range c.chanmap {
ch <- bytes
}
c.mu.RUnlock()
}

// closeAndRemove closes then removes the channel associated with the passed in
// key. Panics if an invalid key is passed in.
func (c *channelSet) closeAndRemove(key int) {
c.mu.Lock()
close(c.chanmap[key])
delete(c.chanmap, key)
c.mu.Unlock()
}
24 changes: 24 additions & 0 deletions processor/websocketprocessor/channelset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package websocketprocessor

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestChannelset(t *testing.T) {
cs := newChannelSet()
ch := make(chan []byte)
key := cs.add(ch)
go func() {
cs.writeBytes([]byte("hello"))
}()
assert.Eventually(t, func() bool {
return assert.Equal(t, []byte("hello"), <-ch)
}, time.Second, time.Millisecond*10)
cs.closeAndRemove(key)
}
92 changes: 32 additions & 60 deletions processor/websocketprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ type wsprocessor struct {
tracesSink consumer.Traces
server *http.Server
shutdownWG sync.WaitGroup
connections map[string]chan []byte
connLock sync.RWMutex
cs *channelSet
}

var processors = map[*Config]*wsprocessor{}

var logMarshaler = &plog.JSONMarshaler{}
var metricMarshaler = &pmetric.JSONMarshaler{}
var traceMarshaler = &ptrace.JSONMarshaler{}

func newProcessor(settings processor.CreateSettings, config *Config) (*wsprocessor, error) {
if p, ok := processors[config]; ok {
return p, nil
Expand All @@ -49,12 +52,11 @@ func newProcessor(settings processor.CreateSettings, config *Config) (*wsprocess
if err != nil {
return nil, err
}
conns := make(map[string]chan []byte)
p := &wsprocessor{
config: config,
obsproc: obsproc,
telemetrySettings: settings.TelemetrySettings,
connections: conns,
cs: newChannelSet(),
}
processors[config] = p

Expand Down Expand Up @@ -88,35 +90,21 @@ func (w *wsprocessor) handleConn(conn *websocket.Conn) {
w.telemetrySettings.Logger.Debug("Error setting deadline", zap.Error(err))
return
}
sendChan := make(chan []byte)
key := conn.Request().RequestURI
w.connLock.Lock()
w.connections[key] = sendChan
w.connLock.Unlock()
for {
msg := <-sendChan
if len(msg) == 0 {
break
}
_, err := conn.Write(msg)
ch := make(chan []byte)
idx := w.cs.add(ch)
for bytes := range ch {
_, err := conn.Write(bytes)
if err != nil {
w.telemetrySettings.Logger.Debug("websocket write error: %w", zap.Error(err))
w.cs.closeAndRemove(idx)
break
}
}
w.connLock.Lock()
delete(w.connections, key)
w.connLock.Unlock()
}

func (w *wsprocessor) Shutdown(_ context.Context) error {
func (w *wsprocessor) Shutdown(ctx context.Context) error {
if w.server != nil {
w.connLock.RLock()
defer w.connLock.RUnlock()
for _, c := range w.connections {
close(c)
}
err := w.server.Close()
w.shutdownWG.Wait()
err := w.server.Shutdown(ctx)
return err
}
return nil
Expand All @@ -129,47 +117,31 @@ func (w *wsprocessor) Capabilities() consumer.Capabilities {
}

func (w *wsprocessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
go func() {
b, err := (&pmetric.JSONMarshaler{}).MarshalMetrics(md)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.sendToConnections(b)
}
}()
b, err := metricMarshaler.MarshalMetrics(md)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
return w.metricsSink.ConsumeMetrics(ctx, md)
}

func (w *wsprocessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
go func() {
b, err := (&plog.JSONMarshaler{}).MarshalLogs(ld)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.sendToConnections(b)
}
}()
b, err := logMarshaler.MarshalLogs(ld)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
return w.logsSink.ConsumeLogs(ctx, ld)
}

func (w *wsprocessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {

go func() {
b, err := (&ptrace.JSONMarshaler{}).MarshalTraces(td)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.sendToConnections(b)
}

}()
return w.tracesSink.ConsumeTraces(ctx, td)
}

func (w *wsprocessor) sendToConnections(payload []byte) {
w.connLock.RLock()
defer w.connLock.RUnlock()
for _, c := range w.connections {
c <- payload
b, err := traceMarshaler.MarshalTraces(td)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
return w.tracesSink.ConsumeTraces(ctx, td)
}

0 comments on commit 49d6711

Please sign in to comment.