Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/trace/api: add support for Unix Domain Sockets #3556

Merged
merged 2 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func initConfig(config Config) {
config.SetKnown("apm_config.apm_dd_url")
config.SetKnown("apm_config.max_cpu_percent")
config.SetKnown("apm_config.receiver_port")
config.SetKnown("apm_config.receiver_socket")
config.SetKnown("apm_config.connection_limit")
config.SetKnown("apm_config.ignore_resources")
config.SetKnown("apm_config.replace_tags")
Expand Down
68 changes: 48 additions & 20 deletions pkg/trace/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"net/http"
"net/http/pprof"
"os"
"runtime"
"sort"
"strconv"
Expand Down Expand Up @@ -140,11 +141,27 @@ func (r *HTTPReceiver) Start() {
Handler: mux,
}

// expvar implicitly publishes "/debug/vars" on the same port
addr := fmt.Sprintf("%s:%d", r.conf.ReceiverHost, r.conf.ReceiverPort)
if err := r.Listen(addr, ""); err != nil {
log.Criticalf("Error creating listener: %v", err)
killProcess(err.Error())
ln, err := r.listenTCP(addr)
if err != nil {
killProcess("Error creating tcp listener: %v", err)
}
go func() {
defer watchdog.LogOnPanic()
r.server.Serve(ln)
}()
log.Infof("Listening for traces at https://%s", addr)

if path := r.conf.ReceiverSocket; path != "" {
ln, err := r.listenUnix(path)
if err != nil {
killProcess("Error creating UDS listener: %v", err)
}
go func() {
defer watchdog.LogOnPanic()
r.server.Serve(ln)
}()
erichgess marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("Listening for traces at unix:https://%s", path)
}

go r.RateLimiter.Run()
Expand Down Expand Up @@ -188,29 +205,40 @@ func (r *HTTPReceiver) attachDebugHandlers(mux *http.ServeMux) {
mux.Handle("/debug/vars", expvar.Handler())
}

// Listen creates a new HTTP server listening on the provided address.
func (r *HTTPReceiver) Listen(addr, logExtra string) error {
listener, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("cannot listen on %s: %v", addr, err)
// listenUnix returns a net.Listener listening on the given "unix" socket path.
func (r *HTTPReceiver) listenUnix(path string) (net.Listener, error) {
fi, err := os.Stat(path)
if err == nil {
// already exists
if fi.Mode()&os.ModeSocket == 0 {
return nil, fmt.Errorf("cannot reuse %q; not a unix socket", path)
}
if err := os.Remove(path); err != nil {
return nil, fmt.Errorf("unable to remove stale socket: %v", err)
}
}
ln, err := newRateLimitedListener(listener, r.conf.ConnectionLimit)
erichgess marked this conversation as resolved.
Show resolved Hide resolved
ln, err := net.Listen("unix", path)
if err != nil {
return fmt.Errorf("cannot create listener: %v", err)
return nil, err
}
if err := os.Chmod(path, 0722); err != nil {
return nil, fmt.Errorf("error setting socket permissions: %v", err)
}
return ln, err
}

log.Infof("Listening for traces at https://%s%s", addr, logExtra)

// listenTCP creates a new net.Listener on the provided TCP address.
func (r *HTTPReceiver) listenTCP(addr string) (net.Listener, error) {
tcpln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
ln, err := newRateLimitedListener(tcpln, r.conf.ConnectionLimit)
go func() {
defer watchdog.LogOnPanic()
ln.Refresh(r.conf.ConnectionLimit)
}()
go func() {
defer watchdog.LogOnPanic()
r.server.Serve(ln)
}()

return nil
return ln, err
}

// Stop stops the receiver and shuts down the HTTP server.
Expand Down Expand Up @@ -433,7 +461,7 @@ func (r *HTTPReceiver) loop() {
}

// killProcess exits the process with the given msg; replaced in tests.
var killProcess = func(msg string) { osutil.Exitf(msg) }
var killProcess = func(format string, a ...interface{}) { osutil.Exitf(format, a...) }

// watchdog checks the trace-agent's heap and CPU usage and updates the rate limiter using a correct
// sampling rate to maintain resource usage within set thresholds. These thresholds are defined by
Expand Down
65 changes: 65 additions & 0 deletions pkg/trace/api/api_nix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// +build !windows

package api

import (
"bytes"
"context"
"net"
"net/http"
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/test/testutil"
)

func TestUDS(t *testing.T) {
sockPath := "/tmp/test-trace.sock"
payload := msgpTraces(t, pb.Traces{testutil.RandomTrace(10, 20)})
client := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", sockPath)
},
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}

t.Run("off", func(t *testing.T) {
conf := config.New()
conf.Endpoints[0].APIKey = "apikey_2"

r := newTestReceiverFromConfig(conf)
r.Start()
defer r.Stop()

resp, err := client.Post("https://localhost:8126/v0.4/traces", "application/msgpack", bytes.NewReader(payload))
if err == nil {
t.Fatalf("expected to fail, got response %#v", resp)
}
})

t.Run("on", func(t *testing.T) {
conf := config.New()
conf.Endpoints[0].APIKey = "apikey_2"
conf.ReceiverSocket = sockPath

r := newTestReceiverFromConfig(conf)
r.Start()
defer r.Stop()

resp, err := client.Post("https://localhost:8126/v0.4/traces", "application/msgpack", bytes.NewReader(payload))
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != 200 {
t.Fatalf("expected http.StatusOK, got response: %#v", resp)
}
})
}
12 changes: 6 additions & 6 deletions pkg/trace/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func newTestReceiverConfig() *config.AgentConfig {
func TestMain(m *testing.M) {
seelog.UseLogger(seelog.Disabled)

defer func(old func(string)) { killProcess = old }(killProcess)
killProcess = func(_ string) {}
defer func(old func(string, ...interface{})) { killProcess = old }(killProcess)
killProcess = func(_ string, _ ...interface{}) {}

os.Exit(m.Run())
}
Expand Down Expand Up @@ -887,10 +887,10 @@ func TestWatchdog(t *testing.T) {
func TestOOMKill(t *testing.T) {
var kills uint64

defer func(old func(string)) { killProcess = old }(killProcess)
killProcess = func(msg string) {
if msg != "OOM" {
t.Fatalf("wrong message: %s", msg)
defer func(old func(string, ...interface{})) { killProcess = old }(killProcess)
killProcess = func(format string, a ...interface{}) {
if format != "OOM" {
t.Fatalf("wrong message: %s", fmt.Sprintf(format, a...))
}
atomic.AddUint64(&kills, 1)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/trace/config/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (c *AgentConfig) applyDatadogConfig() error {
if config.Datadog.IsSet("apm_config.receiver_port") {
c.ReceiverPort = config.Datadog.GetInt("apm_config.receiver_port")
}
if config.Datadog.IsSet("apm_config.receiver_socket") {
c.ReceiverSocket = config.Datadog.GetString("apm_config.receiver_socket")
}
if config.Datadog.IsSet("apm_config.connection_limit") {
c.ConnectionLimit = config.Datadog.GetInt("apm_config.connection_limit")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type AgentConfig struct {
// Receiver
ReceiverHost string
ReceiverPort int
ConnectionLimit int // for rate-limiting, how many unique connections to allow in a lease period (30s)
ReceiverSocket string // if not empty, UDS will be enabled on unix:https://<receiver_socket>
ConnectionLimit int // for rate-limiting, how many unique connections to allow in a lease period (30s)
ReceiverTimeout int

// Writers
Expand Down
1 change: 1 addition & 0 deletions pkg/trace/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func loadEnv() {
{"DD_APM_MAX_TPS", "apm_config.max_traces_per_second"},
{"DD_APM_MAX_MEMORY", "apm_config.max_memory"},
{"DD_APM_MAX_CPU_PERCENT", "apm_config.max_cpu_percent"},
{"DD_APM_RECEIVER_SOCKET", "apm_config.receiver_socket"},
} {
if v := os.Getenv(override.env); v != "" {
config.Datadog.Set(override.key, v)
Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/osutil/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Exitf(format string, args ...interface{}) {
fmt.Printf(format, args...)
fmt.Print("")
} else {
log.Errorf(format, args...)
log.Criticalf(format, args...)
log.Flush()
}
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Each section from every releasenote are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
features:
- |
APM: add support for Unix Domain Sockets by means of the `apm_config.receiver_socket` configuration. It is off by default. When set,
it must point to a valid sock file.