Skip to content

Commit

Permalink
Support sending a service tag for all integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
ofek committed Feb 11, 2020
1 parent c33a2f0 commit acac344
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 7 deletions.
10 changes: 10 additions & 0 deletions pkg/aggregator/mocksender/mocked_methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ func (m *MockSender) SetCheckCustomTags(tags []string) {
m.Called(tags)
}

//SetCheckService enables the setting of check service mock call.
func (m *MockSender) SetCheckService(service string) {
m.Called(service)
}

//FinalizeCheckServiceTag enables the sending of check service tag mock call.
func (m *MockSender) FinalizeCheckServiceTag() {
m.Called()
}

//GetMetricStats enables the get metric stats mock call.
func (m *MockSender) GetMetricStats() map[string]int64 {
m.Called()
Expand Down
2 changes: 2 additions & 0 deletions pkg/aggregator/mocksender/mocksender.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (m *MockSender) SetupAcceptAll() {
m.On("GetMetricStats", mock.AnythingOfType("map[string]int64")).Return()
m.On("DisableDefaultHostname", mock.AnythingOfType("bool")).Return()
m.On("SetCheckCustomTags", mock.AnythingOfType("[]string")).Return()
m.On("SetCheckService", mock.AnythingOfType("string")).Return()
m.On("FinalizeCheckServiceTag").Return()
m.On("Commit").Return()
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/aggregator/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Sender interface {
GetMetricStats() map[string]int64
DisableDefaultHostname(disable bool)
SetCheckCustomTags(tags []string)
SetCheckService(service string)
FinalizeCheckServiceTag()
}

type metricStats struct {
Expand Down Expand Up @@ -66,6 +68,7 @@ type checkSender struct {
eventOut chan<- metrics.Event
histogramBucketOut chan<- senderHistogramBucket
checkTags []string
service string
}

type senderMetricSample struct {
Expand Down Expand Up @@ -169,6 +172,19 @@ func (s *checkSender) SetCheckCustomTags(tags []string) {
s.checkTags = tags
}

// SetCheckService appends the service as a tag for metrics, events, and service checks
// This may be called any number of times, though the only the last call will have an effect
func (s *checkSender) SetCheckService(service string) {
s.service = service
}

// FinalizeCheckServiceTag appends the service as a tag for metrics, events, and service checks
func (s *checkSender) FinalizeCheckServiceTag() {
if s.service != "" {
s.checkTags = append(s.checkTags, fmt.Sprintf("service:%s", s.service))
}
}

// Commit commits the metric samples & histogram buckets that were added during a check run
// Should be called at the end of every check run
func (s *checkSender) Commit() {
Expand Down
89 changes: 89 additions & 0 deletions pkg/aggregator/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,95 @@ func TestGetSenderDefaultHostname(t *testing.T) {
assert.Equal(t, false, checksender.defaultHostnameDisabled)
}

func TestGetSenderServiceTagMetrics(t *testing.T) {
resetAggregator()
InitAggregator(nil, nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
bucketChan := make(chan senderHistogramBucket, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan, bucketChan)
checkTags := []string{"check:tag1", "check:tag2"}

// only tags added by the check
checkSender.SetCheckService("")
checkSender.FinalizeCheckServiceTag()
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", checkTags, metrics.CounterType)
sms := <-senderMetricSampleChan
assert.Equal(t, checkTags, sms.metricSample.Tags)

// only last call is added as a tag
checkSender.SetCheckService("service1")
checkSender.SetCheckService("service2")
checkSender.FinalizeCheckServiceTag()
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", checkTags, metrics.CounterType)
sms = <-senderMetricSampleChan
assert.Equal(t, append(checkTags, "service:service2"), sms.metricSample.Tags)
}

func TestGetSenderServiceTagServiceCheck(t *testing.T) {
resetAggregator()
InitAggregator(nil, nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
bucketChan := make(chan senderHistogramBucket, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan, bucketChan)
checkTags := []string{"check:tag1", "check:tag2"}

// only tags added by the check
checkSender.SetCheckService("")
checkSender.FinalizeCheckServiceTag()
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", checkTags, "test message")
sc := <-serviceCheckChan
assert.Equal(t, checkTags, sc.Tags)

// only last call is added as a tag
checkSender.SetCheckService("service1")
checkSender.SetCheckService("service2")
checkSender.FinalizeCheckServiceTag()
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", checkTags, "test message")
sc = <-serviceCheckChan
assert.Equal(t, append(checkTags, "service:service2"), sc.Tags)
}

func TestGetSenderServiceTagEvent(t *testing.T) {
resetAggregator()
InitAggregator(nil, nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
bucketChan := make(chan senderHistogramBucket, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan, bucketChan)
checkTags := []string{"check:tag1", "check:tag2"}

event := metrics.Event{
Title: "title",
Host: "testhostname",
Ts: time.Now().Unix(),
Text: "text",
Tags: checkTags,
}

// only tags added by the check
checkSender.SetCheckService("")
checkSender.FinalizeCheckServiceTag()
checkSender.Event(event)
e := <-eventChan
assert.Equal(t, checkTags, e.Tags)

// only last call is added as a tag
checkSender.SetCheckService("service1")
checkSender.SetCheckService("service2")
checkSender.FinalizeCheckServiceTag()
checkSender.Event(event)
e = <-eventChan
assert.Equal(t, append(checkTags, "service:service2"), e.Tags)
}

func TestGetSenderAddCheckCustomTagsMetrics(t *testing.T) {
resetAggregator()
InitAggregator(nil, nil, "testhostname", "")
Expand Down
6 changes: 6 additions & 0 deletions pkg/autodiscovery/integration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ type CommonInstanceConfig struct {
MinCollectionInterval int `yaml:"min_collection_interval"`
EmptyDefaultHostname bool `yaml:"empty_default_hostname"`
Tags []string `yaml:"tags"`
Service string `yaml:"service"`
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
}

// CommonGlobalConfig holds the reserved fields for the yaml init_config data
type CommonGlobalConfig struct {
Service string `yaml:"service"`
}

// Equal determines whether the passed config is the same
func (c *Config) Equal(cfg *Config) bool {
if cfg == nil {
Expand Down
42 changes: 41 additions & 1 deletion pkg/collector/corechecks/checkbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,37 @@ func (c *CheckBase) BuildID(instance, initConfig integration.Data) {
// Configure is provided for checks that require no config. If overridden,
// the call to CommonConfigure must be preserved.
func (c *CheckBase) Configure(data integration.Data, initConfig integration.Data, source string) error {
return c.CommonConfigure(data, source)
commonGlobalOptions := integration.CommonGlobalConfig{}
err := yaml.Unmarshal(initConfig, &commonGlobalOptions)
if err != nil {
log.Errorf("invalid init_config section for check %s: %s", string(c.ID()), err)
return err
}

// Set service for this check
if len(commonGlobalOptions.Service) > 0 {
s, err := aggregator.GetSender(c.checkID)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.ID()), err)
return err
}
s.SetCheckService(commonGlobalOptions.Service)
}

err = c.CommonConfigure(data, source)
if err != nil {
return err
}

// Add the possibly configured service as a tag for this check
s, err := aggregator.GetSender(c.checkID)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.ID()), err)
return err
}
s.FinalizeCheckServiceTag()

return nil
}

// CommonConfigure is called when checks implement their own Configure method,
Expand Down Expand Up @@ -108,6 +138,16 @@ func (c *CheckBase) CommonConfigure(instance integration.Data, source string) er
s.SetCheckCustomTags(commonOptions.Tags)
}

// Set configured service for this check, overriding the one possibly defined globally
if len(commonOptions.Service) > 0 {
s, err := aggregator.GetSender(c.checkID)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.ID()), err)
return err
}
s.SetCheckService(commonOptions.Service)
}

c.source = source
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/collector/corechecks/system/file_handles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"github.com/DataDog/datadog-agent/pkg/aggregator/mocksender"
"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func TestFhCheckLinux(t *testing.T) {
t.Logf("Testing from file %s", fileNrHandle) // To pass circle ci tests

fileHandleCheck := new(fhCheck)
fileHandleCheck.Configure(nil, nil, "test")
fileHandleCheck.Configure(integration.Data("{\"val\": 42}"), integration.Data("{\"val\": 42}"), "test")

mock := mocksender.NewMockSender(fileHandleCheck.ID())

Expand Down
34 changes: 34 additions & 0 deletions pkg/collector/python/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ func (c *PythonCheck) Configure(data integration.Data, initConfig integration.Da
// Generate check ID
c.id = check.Identify(c, data, initConfig)

commonGlobalOptions := integration.CommonGlobalConfig{}
if err := yaml.Unmarshal(initConfig, &commonGlobalOptions); err != nil {
log.Errorf("invalid init_config section for check %s: %s", string(c.id), err)
return err
}

// Set service for this check
if len(commonGlobalOptions.Service) > 0 {
s, err := aggregator.GetSender(c.id)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.id), err)
} else {
s.SetCheckService(commonGlobalOptions.Service)
}
}

commonOptions := integration.CommonInstanceConfig{}
if err := yaml.Unmarshal(data, &commonOptions); err != nil {
log.Errorf("invalid instance section for check %s: %s", string(c.id), err)
Expand All @@ -194,6 +210,16 @@ func (c *PythonCheck) Configure(data integration.Data, initConfig integration.Da
}
}

// Set configured service for this check, overriding the one possibly defined globally
if len(commonOptions.Service) > 0 {
s, err := aggregator.GetSender(c.id)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.id), err)
} else {
s.SetCheckService(commonOptions.Service)
}
}

cInitConfig := TrackedCString(string(initConfig))
cInstance := TrackedCString(string(data))
cCheckID := TrackedCString(string(c.id))
Expand Down Expand Up @@ -232,6 +258,14 @@ func (c *PythonCheck) Configure(data integration.Data, initConfig integration.Da
c.instance = check
c.source = source

// Add the possibly configured service as a tag for this check
s, err := aggregator.GetSender(c.id)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.id), err)
} else {
s.FinalizeCheckServiceTag()
}

log.Debugf("python check configure done %s", c.ModuleName)
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/collector/python/test_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,11 @@ func testConfigure(t *testing.T) {

C.get_check_return = 1
C.get_check_check = &C.rtloader_pyobject_t{}
err := c.Configure(integration.Data("{\"val\": 21}"), integration.Data("aaa"), "test")
err := c.Configure(integration.Data("{\"val\": 21}"), integration.Data("{\"val\": 21}"), "test")
assert.Nil(t, err)

assert.Equal(t, c.class, C.get_check_py_class)
assert.Equal(t, "aaa", C.GoString(C.get_check_init_config))
assert.Equal(t, "{\"val\": 21}", C.GoString(C.get_check_init_config))
assert.Equal(t, "{\"val\": 21}", C.GoString(C.get_check_instance))
assert.Equal(t, string(c.id), C.GoString(C.get_check_check_id))
assert.Equal(t, "fake_check", C.GoString(C.get_check_check_name))
Expand All @@ -323,18 +323,18 @@ func testConfigureDeprecated(t *testing.T) {
C.get_check_return = 0
C.get_check_deprecated_check = &C.rtloader_pyobject_t{}
C.get_check_deprecated_return = 1
err := c.Configure(integration.Data("{\"val\": 21}"), integration.Data("aaa"), "test")
err := c.Configure(integration.Data("{\"val\": 21}"), integration.Data("{\"val\": 21}"), "test")
assert.Nil(t, err)

assert.Equal(t, c.class, C.get_check_py_class)
assert.Equal(t, "aaa", C.GoString(C.get_check_init_config))
assert.Equal(t, "{\"val\": 21}", C.GoString(C.get_check_init_config))
assert.Equal(t, "{\"val\": 21}", C.GoString(C.get_check_instance))
assert.Equal(t, string(c.id), C.GoString(C.get_check_check_id))
assert.Equal(t, "fake_check", C.GoString(C.get_check_check_name))
assert.Nil(t, C.get_check_check)

assert.Equal(t, c.class, C.get_check_deprecated_py_class)
assert.Equal(t, "aaa", C.GoString(C.get_check_deprecated_init_config))
assert.Equal(t, "{\"val\": 21}", C.GoString(C.get_check_deprecated_init_config))
assert.Equal(t, "{\"val\": 21}", C.GoString(C.get_check_deprecated_instance))
assert.Equal(t, string(c.id), C.GoString(C.get_check_deprecated_check_id))
assert.Equal(t, "fake_check", C.GoString(C.get_check_deprecated_check_name))
Expand Down
6 changes: 6 additions & 0 deletions releasenotes/notes/send-service-as-tag-b3ff43fc63b052bd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
Send a tag for any ``service`` defined in the ``init_config`` or
``instances`` section of integration configuration, with the latter
taking precedence. This applies to metrics, events, and service checks.

0 comments on commit acac344

Please sign in to comment.