Skip to content

Commit

Permalink
ext: ecsobserver Add docker label based matcher (open-telemetry#3276)
Browse files Browse the repository at this point in the history
Split from open-telemetry#2734

- Get port number from label value
- Check if the port number exists in container definition
  • Loading branch information
pingleig committed May 11, 2021
1 parent 71ecd34 commit c7591c8
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 2 deletions.
74 changes: 73 additions & 1 deletion extension/observer/ecsobserver/docker_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ package ecsobserver

import (
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecs"
"go.uber.org/zap"
)

// DockerLabelConfig matches all tasks based on their docker label.
Expand All @@ -28,7 +33,7 @@ import (
type DockerLabelConfig struct {
CommonExporterConfig `mapstructure:",squash" yaml:",inline"`

// PortLabel is mandetory, empty string means docker label based match is skipped.
// PortLabel is mandatory, empty string means docker label based match is skipped.
PortLabel string `mapstructure:"port_label" yaml:"port_label"`
JobNameLabel string `mapstructure:"job_name_label" yaml:"job_name_label"`
MetricsPathLabel string `mapstructure:"metrics_path_label" yaml:"metrics_path_label"`
Expand All @@ -45,3 +50,70 @@ func (d *DockerLabelConfig) Init() error {
}
return nil
}

func (d *DockerLabelConfig) NewMatcher(options MatcherOptions) (Matcher, error) {
return &dockerLabelMatcher{
logger: options.Logger,
cfg: *d,
}, nil
}

// dockerLabelMatcher implements Matcher interface.
// It checks PortLabel from config and only matches if the label value is a valid number.
type dockerLabelMatcher struct {
logger *zap.Logger
cfg DockerLabelConfig
}

func (d *dockerLabelMatcher) Type() MatcherType {
return MatcherTypeDockerLabel
}

// MatchTargets first checks the port label to find the expected port value.
// Then it checks if that port is specified in container definition.
// It only returns match target when both conditions are met.
func (d *dockerLabelMatcher) MatchTargets(t *Task, c *ecs.ContainerDefinition) ([]MatchedTarget, error) {
portLabel := d.cfg.PortLabel

// Only check port label
ps, ok := c.DockerLabels[portLabel]
if !ok {
return nil, errNotMatched
}

// Convert port
s := aws.StringValue(ps)
port, err := strconv.Atoi(s)
if err != nil {
return nil, fmt.Errorf("invalid port_label value, container=%s labelKey=%s labelValue=%s: %w",
aws.StringValue(c.Name), d.cfg.PortLabel, s, err)
}

// Checks if the task does have the container port
portExists := false
for _, portMapping := range c.PortMappings {
if aws.Int64Value(portMapping.ContainerPort) == int64(port) {
portExists = true
break
}
}
if !portExists {
return nil, errNotMatched
}

// Export only one target based on docker port label.
target := MatchedTarget{
Port: port,
}
if v, ok := c.DockerLabels[d.cfg.MetricsPathLabel]; ok {
target.MetricsPath = aws.StringValue(v)
}
if v, ok := c.DockerLabels[d.cfg.JobNameLabel]; ok {
target.Job = aws.StringValue(v)
}
// NOTE: we only override job name but keep port and metrics from docker label instead of using common export config.
if d.cfg.JobName != "" {
target.Job = d.cfg.JobName
}
return []MatchedTarget{target}, nil
}
163 changes: 163 additions & 0 deletions extension/observer/ecsobserver/docker_label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package ecsobserver
import (
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
)

func TestDockerLabelMatcher_Match(t *testing.T) {
Expand All @@ -43,4 +46,164 @@ func TestDockerLabelMatcher_Match(t *testing.T) {
}
assert.NoError(t, cfg.Init())
})

portLabel := "MY_PROMETHEUS_PORT"
portLabelWithInvalidValue := "MY_PROMETHEUS_PORT_IS_INVALID"
portLabelWithoutMapping := "MY_PROMETHEUS_PORT_IS_NOT_THERE"
jobLabel := "MY_PROMETHEUS_JOB"
metricsPathLabel := "MY_METRICS_PATH"

genTasks := func() []*Task {
return []*Task{
{
Definition: &ecs.TaskDefinition{
ContainerDefinitions: []*ecs.ContainerDefinition{
{
DockerLabels: map[string]*string{
portLabel: aws.String("2112"),
jobLabel: aws.String("PROM_JOB_1"),
metricsPathLabel: aws.String("/new/metrics"),
},
PortMappings: []*ecs.PortMapping{
{
ContainerPort: aws.Int64(2112),
HostPort: aws.Int64(2113), // doesn't matter for matcher test
},
},
},
{
DockerLabels: map[string]*string{
"not" + portLabel: aws.String("bar"),
},
// no port mapping at all
},
{
// port value in label does not match container port.
// most likely a misconfiguration or the labels are attached by tools.
DockerLabels: map[string]*string{
portLabelWithoutMapping: aws.String("2113"),
},
PortMappings: []*ecs.PortMapping{
{
ContainerPort: aws.Int64(2113 + 1), // a different port from label value
},
},
},
},
},
},
{
Definition: &ecs.TaskDefinition{
ContainerDefinitions: []*ecs.ContainerDefinition{
{
DockerLabels: map[string]*string{
portLabelWithInvalidValue: aws.String("not a port number"),
},
},
},
},
},
}
}

t.Run("port label", func(t *testing.T) {
cfg := DockerLabelConfig{
PortLabel: portLabel,
JobNameLabel: jobLabel,
}
res := newMatcherAndMatch(t, &cfg, genTasks())
assert.Equal(t, &MatchResult{
Tasks: []int{0},
Containers: []MatchedContainer{
{
TaskIndex: 0,
ContainerIndex: 0,
Targets: []MatchedTarget{
{
MatcherType: MatcherTypeDockerLabel,
Port: 2112,
Job: "PROM_JOB_1",
},
},
},
},
}, res)
})

t.Run("port mapping not found", func(t *testing.T) {
cfg := DockerLabelConfig{
PortLabel: portLabelWithoutMapping,
}
m := newMatcher(t, &cfg)
// Direct match has error
_, err := m.MatchTargets(genTasks()[0], genTasks()[0].Definition.ContainerDefinitions[2])
require.Error(t, err)

// errNotMatched is ignored
_, err = matchContainers(genTasks(), m, 0)
require.NoError(t, err)
})

t.Run("invalid port label value", func(t *testing.T) {
cfg := DockerLabelConfig{
PortLabel: portLabelWithInvalidValue,
}
m := newMatcher(t, &cfg)
res, err := matchContainers(genTasks(), m, 0)
require.Error(t, err)
errs := multierr.Errors(err)
assert.Len(t, errs, 1)
assert.NotNil(t, res, "return non nil res even if there are some errors, don't drop all task because one invalid task")
})

t.Run("metrics path", func(t *testing.T) {
cfg := DockerLabelConfig{
PortLabel: portLabel,
MetricsPathLabel: metricsPathLabel,
}
res := newMatcherAndMatch(t, &cfg, genTasks())
assert.Equal(t, &MatchResult{
Tasks: []int{0},
Containers: []MatchedContainer{
{
TaskIndex: 0,
ContainerIndex: 0,
Targets: []MatchedTarget{
{
MatcherType: MatcherTypeDockerLabel,
Port: 2112,
MetricsPath: "/new/metrics",
},
},
},
},
}, res)
})

t.Run("override job label", func(t *testing.T) {
cfg := DockerLabelConfig{
PortLabel: portLabel,
JobNameLabel: jobLabel,
CommonExporterConfig: CommonExporterConfig{
JobName: "override docker label",
},
}
res := newMatcherAndMatch(t, &cfg, genTasks())
assert.Equal(t, &MatchResult{
Tasks: []int{0},
Containers: []MatchedContainer{
{
TaskIndex: 0,
ContainerIndex: 0,
Targets: []MatchedTarget{
{
MatcherType: MatcherTypeDockerLabel,
Port: 2112,
Job: "override docker label",
},
},
},
},
}, res)
})
}
2 changes: 1 addition & 1 deletion extension/observer/ecsobserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ require (
github.com/aws/aws-sdk-go v1.38.36
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.26.1-0.20210510162429-51281a719256
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
)
51 changes: 51 additions & 0 deletions extension/observer/ecsobserver/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package ecsobserver

import (
"fmt"

"github.com/aws/aws-sdk-go/service/ecs"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -84,3 +87,51 @@ type MatchedTarget struct {
MetricsPath string
Job string
}

// a global instance because it's expected and we don't care about why the container didn't match (for now).
// In the future we might add a debug flag for each matcher config and return typed error with more detail
// to help user debug. e.g. type ^ngix-*$ does not match nginx-service.
var errNotMatched = fmt.Errorf("container not matched")

// matchContainers apply one matcher to a list of tasks and returns MatchResult.
// It does not modify the task in place, the attaching match result logic is
// performed by TaskFilter at later stage.
func matchContainers(tasks []*Task, matcher Matcher, matcherIndex int) (*MatchResult, error) {
var (
matchedTasks []int
matchedContainers []MatchedContainer
)
var merr error
tpe := matcher.Type()
for tIndex, t := range tasks {
var matched []MatchedContainer
for cIndex, c := range t.Definition.ContainerDefinitions {
targets, err := matcher.MatchTargets(t, c)
// NOTE: we don't stop when there is an error because it could be one task having invalid docker label.
if err != nil {
// Keep track of unexpected error
if err != errNotMatched {
multierr.AppendInto(&merr, err)
}
continue
}
for i := range targets {
targets[i].MatcherType = tpe
targets[i].MatcherIndex = matcherIndex
}
matched = append(matched, MatchedContainer{
TaskIndex: tIndex,
ContainerIndex: cIndex,
Targets: targets,
})
}
if len(matched) > 0 {
matchedTasks = append(matchedTasks, tIndex)
matchedContainers = append(matchedContainers, matched...)
}
}
return &MatchResult{
Tasks: matchedTasks,
Containers: matchedContainers,
}, merr
}
25 changes: 25 additions & 0 deletions extension/observer/ecsobserver/sd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestNewDiscovery(t *testing.T) {
Expand All @@ -32,3 +33,27 @@ func TestNewDiscovery(t *testing.T) {
require.Error(t, err)
})
}

// Util Start

func newMatcher(t *testing.T, cfg MatcherConfig) Matcher {
require.NoError(t, cfg.Init())
m, err := cfg.NewMatcher(testMatcherOptions())
require.NoError(t, err)
return m
}

func newMatcherAndMatch(t *testing.T, cfg MatcherConfig, tasks []*Task) *MatchResult {
m := newMatcher(t, cfg)
res, err := matchContainers(tasks, m, 0)
require.NoError(t, err)
return res
}

func testMatcherOptions() MatcherOptions {
return MatcherOptions{
Logger: zap.NewExample(),
}
}

// Util End

0 comments on commit c7591c8

Please sign in to comment.