Skip to content

Commit

Permalink
pkg/trace/api: add support for Unix Domain Sockets
Browse files Browse the repository at this point in the history
This change adds support for Unix Domain Sockets by means of a
configuration setting. By default, this feature is disabled. To enable
it, simply point the yaml setting to the right socket path, for example:

    apm_config:
        receiver_socket: /tmp/trace.sock

This will enable the HTTP server to listen on unix:https:///tmp/trace.sock

The DD_APM_RECEIVER_SOCKET environment variable is an alias for the
above.
  • Loading branch information
gbbr committed May 27, 2019
1 parent a5569aa commit 5757e47
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ func initConfig(config Config) {
config.SetKnown("apm_config.apm_dd_url")
config.SetKnown("apm_config.max_cpu_percent")
config.SetKnown("apm_config.receiver_port")
config.SetKnown("apm_config.receiver_socket")
config.SetKnown("apm_config.connection_limit")
config.SetKnown("apm_config.ignore_resources")
config.SetKnown("apm_config.replace_tags")
Expand Down
71 changes: 49 additions & 22 deletions pkg/trace/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"sort"
"strconv"
Expand Down Expand Up @@ -134,11 +135,27 @@ func (r *HTTPReceiver) Start() {
Handler: mux,
}

// expvar implicitly publishes "/debug/vars" on the same port
addr := fmt.Sprintf("%s:%d", r.conf.ReceiverHost, r.conf.ReceiverPort)
if err := r.Listen(addr, ""); err != nil {
log.Criticalf("Error creating listener: %v", err)
killProcess(err.Error())
ln, err := r.listenTCP(addr)
if err != nil {
killProcess("Error creating tcp listener: %v", err)
}
go func() {
defer watchdog.LogOnPanic()
r.server.Serve(ln)
}()
log.Infof("Listening for traces at https://%s", addr)

if path := r.conf.ReceiverSocket; path != "" {
ln, err := r.listenUnix(path)
if err != nil {
killProcess("Error creating UDS listener: %v", err)
}
go func() {
defer watchdog.LogOnPanic()
r.server.Serve(ln)
}()
log.Infof("Listening for traces at unix:https://%s", path)
}

go r.PreSampler.Run()
Expand Down Expand Up @@ -180,29 +197,40 @@ func (r *HTTPReceiver) attachDebugHandlers(mux *http.ServeMux) {
})
}

// Listen creates a new HTTP server listening on the provided address.
func (r *HTTPReceiver) Listen(addr, logExtra string) error {
listener, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("cannot listen on %s: %v", addr, err)
// listenUnix returns a *net.Listener listening on the given "unix" socket path.
func (r *HTTPReceiver) listenUnix(path string) (net.Listener, error) {
fi, err := os.Stat(path)
if err == nil {
// already exists
if fi.Mode()&os.ModeSocket == 0 {
return nil, fmt.Errorf("cannot reuse %q; not a unix socket", path)
}
if err := os.Remove(path); err != nil {
return nil, fmt.Errorf("unable to remove stale socket: %v", err)
}
}
ln, err := newRateLimitedListener(listener, r.conf.ConnectionLimit)
ln, err := net.Listen("unix", path)
if err != nil {
return fmt.Errorf("cannot create listener: %v", err)
return nil, err
}
if err := os.Chmod(path, 0722); err != nil {
return nil, fmt.Errorf("error setting socket permissions: %v", err)
}
return ln, err
}

log.Infof("Listening for traces at https://%s%s", addr, logExtra)

// listenTCP creates a new HTTP server listening on the provided TCP address.
func (r *HTTPReceiver) listenTCP(addr string) (net.Listener, error) {
tcpln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
ln, err := newRateLimitedListener(tcpln, r.conf.ConnectionLimit)
go func() {
defer watchdog.LogOnPanic()
ln.Refresh(r.conf.ConnectionLimit)
}()
go func() {
defer watchdog.LogOnPanic()
r.server.Serve(ln)
}()

return nil
return ln, err
}

// Stop stops the receiver and shuts down the HTTP server.
Expand Down Expand Up @@ -409,7 +437,7 @@ func (r *HTTPReceiver) loop() {
}

// killProcess exits the process with the given msg; replaced in tests.
var killProcess = func(msg string) { osutil.Exitf(msg) }
var killProcess = func(format string, a ...interface{}) { osutil.Exitf(format, a...) }

func (r *HTTPReceiver) watchdog(now time.Time) {
wi := watchdog.Info{
Expand All @@ -422,8 +450,7 @@ func (r *HTTPReceiver) watchdog(now time.Time) {
// is likely a leak somewhere; we'll kill the process to avoid polluting host memory.
metrics.Count("datadog.trace_agent.receiver.suicide", 1, nil, 1)
metrics.Flush()
log.Criticalf("Killing process. Memory threshold exceeded: %.2fM / %.2fM", current/1024/1024, allowed/1024/1024)
killProcess("OOM")
killProcess("Killing process. Memory threshold exceeded: %.2fM / %.2fM", current/1024/1024, allowed/1024/1024)
}

rateCPU, err := sampler.CalcPreSampleRate(r.conf.MaxCPU, wi.CPU.UserAvg, r.PreSampler.RealRate())
Expand Down
65 changes: 65 additions & 0 deletions pkg/trace/api/api_nix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// +build !windows

package api

import (
"bytes"
"context"
"net"
"net/http"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/test/testutil"
)

func TestUDS(t *testing.T) {
sockPath := "/tmp/test-trace.sock"
payload := msgpTraces(t, pb.Traces{testutil.RandomTrace(10, 20)})
client := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", sockPath)
},
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}

t.Run("off", func(t *testing.T) {
conf := config.New()
conf.Endpoints[0].APIKey = "apikey_2"

r := newTestReceiverFromConfig(conf)
r.Start()
defer r.Stop()

resp, err := client.Post("https://localhost:8126/v0.4/traces", "application/msgpack", bytes.NewReader(payload))
if err == nil {
t.Fatalf("expected to fail, got response %#v", resp)
}
})

t.Run("on", func(t *testing.T) {
conf := config.New()
conf.Endpoints[0].APIKey = "apikey_2"
conf.ReceiverSocket = sockPath

r := newTestReceiverFromConfig(conf)
r.Start()
defer r.Stop()

resp, err := client.Post("https://localhost:8126/v0.4/traces", "application/msgpack", bytes.NewReader(payload))
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != 200 {
t.Fatalf("expected http.StatusOK, got response: %#v", resp)
}
})
}
12 changes: 6 additions & 6 deletions pkg/trace/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func newTestReceiverConfig() *config.AgentConfig {
func TestMain(m *testing.M) {
seelog.UseLogger(seelog.Disabled)

defer func(old func(string)) { killProcess = old }(killProcess)
killProcess = func(_ string) {}
defer func(old func(string, ...interface{})) { killProcess = old }(killProcess)
killProcess = func(_ string, _ ...interface{}) {}

m.Run()
}
Expand Down Expand Up @@ -812,10 +812,10 @@ func TestWatchdog(t *testing.T) {
func TestOOMKill(t *testing.T) {
var kills uint64

defer func(old func(string)) { killProcess = old }(killProcess)
killProcess = func(msg string) {
if msg != "OOM" {
t.Fatalf("wrong message: %s", msg)
defer func(old func(string, ...interface{})) { killProcess = old }(killProcess)
killProcess = func(format string, a ...interface{}) {
if !strings.Contains(format, "Memory threshold exceeded") {
t.Fatalf("wrong message: %s", fmt.Sprintf(format, a...))
}
atomic.AddUint64(&kills, 1)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/trace/config/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ func (c *AgentConfig) applyDatadogConfig() error {
if config.Datadog.IsSet("apm_config.receiver_port") {
c.ReceiverPort = config.Datadog.GetInt("apm_config.receiver_port")
}
if config.Datadog.IsSet("apm_config.receiver_socket") {
c.ReceiverSocket = config.Datadog.GetString("apm_config.receiver_socket")
}
if config.Datadog.IsSet("apm_config.connection_limit") {
c.ConnectionLimit = config.Datadog.GetInt("apm_config.connection_limit")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type AgentConfig struct {
// Receiver
ReceiverHost string
ReceiverPort int
ConnectionLimit int // for rate-limiting, how many unique connections to allow in a lease period (30s)
ReceiverSocket string // if not empty, UDS will be enabled on unix:https://<receiver_socket>
ConnectionLimit int // for rate-limiting, how many unique connections to allow in a lease period (30s)
ReceiverTimeout int

// Writers
Expand Down
1 change: 1 addition & 0 deletions pkg/trace/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func applyEnv() {
{"DD_APM_MAX_TPS", "apm_config.max_traces_per_second"},
{"DD_APM_MAX_MEMORY", "apm_config.max_memory"},
{"DD_APM_MAX_CPU_PERCENT", "apm_config.max_cpu_percent"},
{"DD_APM_RECEIVER_SOCKET", "apm_config.receiver_socket"},
} {
if v := os.Getenv(override.env); v != "" {
config.Datadog.Set(override.key, v)
Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/osutil/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Exitf(format string, args ...interface{}) {
fmt.Printf(format, args...)
fmt.Print("")
} else {
log.Errorf(format, args...)
log.Criticalf(format, args...)
log.Flush()
}
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Each section from every releasenote are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
features:
- |
APM: add support for Unix Domain Sockets by means of the `apm_config.receiver_socket` configuration. It is off by default. When set,
it must point to a valid sock file.

0 comments on commit 5757e47

Please sign in to comment.