Skip to content

Commit

Permalink
[receiver/statsd] add TCP support to statsdreceiver (#26700)
Browse files Browse the repository at this point in the history
**Description:**
Adds TCP support to statsdreceiver

**Link to tracking Issue:**
Fixes #23327

**Testing:**
Unit tests

**Documentation:**
No changes.

---------

Co-authored-by: Joshua MacDonald <[email protected]>
  • Loading branch information
atoulme and jmacd committed Sep 22, 2023
1 parent e7380a7 commit 5356349
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 18 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_tcp_support_statsd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: statsdreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add TCP support to statsdreceiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23327]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
13 changes: 9 additions & 4 deletions receiver/statsdreceiver/internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,21 @@ func NewStatsD(transport Transport, host string, port int) (*StatsD, error) {
// connect populates the StatsD.Conn
func (s *StatsD) connect(transport Transport) error {
if cl, ok := s.Conn.(io.Closer); ok {
cl.Close()
err := cl.Close()
if err != nil {
return err
}
}

address := fmt.Sprintf("%s:%d", s.Host, s.Port)

var err error
switch transport {
case TCP:
// TODO: implement TCP support
return fmt.Errorf("TCP unsupported")
s.Conn, err = net.Dial("tcp", address)
if err != nil {
return err
}
case UDP:
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp", address)
Expand Down Expand Up @@ -99,5 +104,5 @@ type Metric struct {

// String formats a Metric into a StatsD message.
func (m Metric) String() string {
return fmt.Sprintf("%s:%s|%s", m.Name, m.Value, m.Type)
return fmt.Sprintf("%s:%s|%s\n", m.Name, m.Value, m.Type)
}
41 changes: 27 additions & 14 deletions receiver/statsdreceiver/internal/transport/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,46 @@ import (
func Test_Server_ListenAndServe(t *testing.T) {
tests := []struct {
name string
transport string
buildServerFn func(addr string) (Server, error)
buildClientFn func(host string, port int) (*client.StatsD, error)
}{
{
name: "udp",
transport: "udp",
buildServerFn: NewUDPServer,
buildClientFn: func(host string, port int) (*client.StatsD, error) {
return client.NewStatsD(client.UDP, host, port)
},
},
{
name: "tcp",
transport: "tcp",
buildServerFn: NewTCPServer,
buildClientFn: func(host string, port int) (*client.StatsD, error) {
return client.NewStatsD(client.TCP, host, port)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addr := testutil.GetAvailableLocalNetworkAddress(t, "udp")

// Endpoint should be free.
ln0, err := net.ListenPacket("udp", addr)
require.NoError(t, err)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
ln1, err := net.ListenPacket("udp", addr)
require.Error(t, err)
require.Nil(t, ln1)

// Unbind the local address so the mock UDP service can use it
ln0.Close()
addr := testutil.GetAvailableLocalNetworkAddress(t, tt.transport)

if tt.transport == "udp" {
// Endpoint should be free.
ln0, err := net.ListenPacket("udp", addr)
require.NoError(t, err)
require.NotNil(t, ln0)

// Ensure that the endpoint wasn't something like ":0" by checking that a second listener will fail.
ln1, err := net.ListenPacket("udp", addr)
require.Error(t, err)
require.Nil(t, ln1)

// Unbind the local address so the mock UDP service can use it
err = ln0.Close()
require.NoError(t, err)
}

srv, err := tt.buildServerFn(addr)
require.NoError(t, err)
Expand Down
105 changes: 105 additions & 0 deletions receiver/statsdreceiver/internal/transport/tcp_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"bytes"
"errors"
"io"
"net"
"strings"
"sync"

"go.opentelemetry.io/collector/consumer"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/protocol"
)

var errTCPServerDone = errors.New("server stopped")

type tcpServer struct {
listener net.Listener
reporter Reporter
wg sync.WaitGroup
stopChan chan struct{}
}

var _ Server = (*tcpServer)(nil)

// NewTCPServer creates a transport.Server using TCP as its transport.
func NewTCPServer(addr string) (Server, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}

t := tcpServer{
listener: l,
stopChan: make(chan struct{}),
}
return &t, nil
}

func (t *tcpServer) ListenAndServe(parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, transferChan chan<- Metric) error {
if parser == nil || nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

t.reporter = reporter
LOOP:
for {
connChan := make(chan net.Conn, 1)
go func() {
c, err := t.listener.Accept()
if err != nil {
t.reporter.OnDebugf("TCP Transport - Accept error: %v",
err)
} else {
connChan <- c
}
}()

select {
case conn := <-connChan:
t.wg.Add(1)
go t.handleConn(conn, transferChan)
case <-t.stopChan:
break LOOP
}
}
return errTCPServerDone
}

func (t *tcpServer) handleConn(c net.Conn, transferChan chan<- Metric) {
payload := make([]byte, 4096)
var remainder []byte
for {
n, err := c.Read(payload)
if err != nil {
t.reporter.OnDebugf("TCP transport (%s) Error reading payload: %v", c.LocalAddr(), err)
t.wg.Done()
return
}
buf := bytes.NewBuffer(append(remainder, payload[0:n]...))
for {
bytes, err := buf.ReadBytes((byte)('\n'))
if errors.Is(err, io.EOF) {
if len(bytes) != 0 {
remainder = bytes
}
break
}
line := strings.TrimSpace(string(bytes))
if line != "" {
transferChan <- Metric{line, c.LocalAddr()}
}
}
}
}

func (t *tcpServer) Close() error {
close(t.stopChan)
t.wg.Wait()
return t.listener.Close()
}
2 changes: 2 additions & 0 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func buildTransportServer(config Config) (transport.Server, error) {
switch strings.ToLower(config.NetAddr.Transport) {
case "", "udp":
return transport.NewUDPServer(config.NetAddr.Endpoint)
case "tcp":
return transport.NewTCPServer(config.NetAddr.Endpoint)
}

return nil, fmt.Errorf("unsupported transport %q", config.NetAddr.Transport)
Expand Down

0 comments on commit 5356349

Please sign in to comment.