Skip to content

Commit

Permalink
fix: Don't double-count metric events (#3350)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Jul 1, 2020
1 parent 7bd3e72 commit 2ab9495
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 29 deletions.
19 changes: 14 additions & 5 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type WorkflowController struct {
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
hydrator hydrator.Interface
wfArchive sqldb.WorkflowArchive
metrics metrics.Metrics
metrics *metrics.Metrics
eventRecorderManager EventRecorderManager
archiveLabelSelector labels.Selector
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (wfc *WorkflowController) runTTLController(ctx context.Context) {
}

func (wfc *WorkflowController) runCronController(ctx context.Context) {
cronController := cron.NewCronController(wfc.wfclientset, wfc.restConfig, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, &wfc.metrics)
cronController := cron.NewCronController(wfc.wfclientset, wfc.restConfig, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics)
cronController.Run(ctx)
}

Expand Down Expand Up @@ -587,13 +587,22 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
)
wfc.wfInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
wfc.metrics.WorkflowAdded(getWfPhase(obj))
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
wfc.metrics.WorkflowAdded(key, getWfPhase(obj))
}
},
UpdateFunc: func(old, new interface{}) {
wfc.metrics.WorkflowUpdated(getWfPhase(old), getWfPhase(new))
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
wfc.metrics.WorkflowUpdated(key, getWfPhase(old), getWfPhase(new))
}
},
DeleteFunc: func(obj interface{}) {
wfc.metrics.WorkflowDeleted(getWfPhase(obj))
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
wfc.metrics.WorkflowDeleted(key, getWfPhase(obj))
}
},
})
}
Expand Down
6 changes: 3 additions & 3 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) {
wfLister: &fakeLister{},
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: &testMetrics,
metrics: testMetrics,
}
woc.Run()

Expand Down Expand Up @@ -232,7 +232,7 @@ func TestSpecError(t *testing.T) {
wfLister: &fakeLister{},
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: &testMetrics,
metrics: testMetrics,
}

err = woc.validateCronWorkflow()
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestReapplyUpdate(t *testing.T) {
origCronWf: cronWf.DeepCopy(),
name: cronWf.Name,
log: logrus.WithFields(logrus.Fields{}),
metrics: &testMetrics,
metrics: testMetrics,
}

cronWf.Spec.Schedule = "1 * * * *"
Expand Down
41 changes: 27 additions & 14 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Metrics struct {

workflowsProcessed prometheus.Counter
workflowsByPhase map[v1alpha1.NodePhase]prometheus.Gauge
workflows map[string]bool
operationDurations prometheus.Histogram
errors map[ErrorCause]prometheus.Counter
customMetrics map[string]metric
Expand All @@ -47,14 +48,15 @@ type Metrics struct {
defaultMetricDescs map[string]bool
}

var _ prometheus.Collector = Metrics{}
var _ prometheus.Collector = &Metrics{}

func New(metricsConfig, telemetryConfig ServerConfig) Metrics {
metrics := Metrics{
func New(metricsConfig, telemetryConfig ServerConfig) *Metrics {
metrics := &Metrics{
metricsConfig: metricsConfig,
telemetryConfig: telemetryConfig,
workflowsProcessed: newCounter("workflows_processed_count", "Number of workflow updates processed", nil),
workflowsByPhase: getWorkflowPhaseGauges(),
workflows: make(map[string]bool),
operationDurations: newHistogram("operation_duration_seconds", "Histogram of durations of operations", nil, []float64{0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0}),
errors: getErrorCounters(),
customMetrics: make(map[string]metric),
Expand All @@ -68,7 +70,7 @@ func New(metricsConfig, telemetryConfig ServerConfig) Metrics {
return metrics
}

func (m Metrics) allMetrics() []prometheus.Metric {
func (m *Metrics) allMetrics() []prometheus.Metric {
allMetrics := []prometheus.Metric{
m.workflowsProcessed,
m.operationDurations,
Expand All @@ -86,33 +88,44 @@ func (m Metrics) allMetrics() []prometheus.Metric {
return allMetrics
}

func (m Metrics) WorkflowAdded(phase v1alpha1.NodePhase) {
func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) {
if m.workflows[key] {
return
}
m.workflows[key] = true
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Inc()
}
}

func (m Metrics) WorkflowUpdated(fromPhase, toPhase v1alpha1.NodePhase) {
m.WorkflowDeleted(fromPhase)
m.WorkflowAdded(toPhase)
func (m *Metrics) WorkflowUpdated(key string, fromPhase, toPhase v1alpha1.NodePhase) {
if fromPhase == toPhase || !m.workflows[key] {
return
}
m.WorkflowDeleted(key, fromPhase)
m.WorkflowAdded(key, toPhase)
}

func (m Metrics) WorkflowDeleted(phase v1alpha1.NodePhase) {
func (m *Metrics) WorkflowDeleted(key string, phase v1alpha1.NodePhase) {
if !m.workflows[key] {
return
}
delete(m.workflows, key)
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Dec()
}
}

func (m Metrics) OperationCompleted(durationSeconds float64) {
func (m *Metrics) OperationCompleted(durationSeconds float64) {
m.operationDurations.Observe(durationSeconds)
}

func (m Metrics) GetCustomMetric(key string) prometheus.Metric {
func (m *Metrics) GetCustomMetric(key string) prometheus.Metric {
// It's okay to return nil metrics in this function
return m.customMetrics[key].metric
}

func (m Metrics) UpsertCustomMetric(key string, newMetric prometheus.Metric) error {
func (m *Metrics) UpsertCustomMetric(key string, newMetric prometheus.Metric) error {
if _, inUse := m.defaultMetricDescs[newMetric.Desc().String()]; inUse {
return fmt.Errorf("metric '%s' is already in use by the system, please use a different name", newMetric.Desc())
}
Expand All @@ -127,10 +140,10 @@ const (
ErrorCauseCronWorkflowSubmissionError ErrorCause = "CronWorkflowSubmissionError"
)

func (m Metrics) OperationPanic() {
func (m *Metrics) OperationPanic() {
m.errors[ErrorCauseOperationPanic].Inc()
}

func (m Metrics) CronWorkflowSubmissionError() {
func (m *Metrics) CronWorkflowSubmissionError() {
m.errors[ErrorCauseCronWorkflowSubmissionError].Inc()
}
31 changes: 28 additions & 3 deletions workflow/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,21 @@ func TestMetrics(t *testing.T) {
}
m := New(config, config)

m.WorkflowAdded(v1alpha1.NodeRunning)
m.WorkflowAdded("wf", v1alpha1.NodeRunning)
var metric dto.Metric
err := m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
}

m.WorkflowUpdated(v1alpha1.NodeRunning, v1alpha1.NodeSucceeded)
// Test that we don't double add
m.WorkflowAdded("wf", v1alpha1.NodeRunning)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
}

m.WorkflowUpdated("wf", v1alpha1.NodeRunning, v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
Expand All @@ -64,12 +71,30 @@ func TestMetrics(t *testing.T) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
}

m.WorkflowDeleted(v1alpha1.NodeSucceeded)
m.WorkflowDeleted("wf", v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}

// Test that we don't double delete
m.WorkflowDeleted("wf", v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}

// Test that we don't update workflows that we're not tracking
m.WorkflowUpdated("does-not-exist", v1alpha1.NodeRunning, v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}
err = m.workflowsByPhase[v1alpha1.NodeSucceeded].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}

m.OperationCompleted(0.05)
err = m.operationDurations.Write(&metric)
if assert.NoError(t, err) {
Expand Down
8 changes: 4 additions & 4 deletions workflow/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// RunServer starts a metrics server
func (m Metrics) RunServer(ctx context.Context) {
func (m *Metrics) RunServer(ctx context.Context) {
if !m.metricsConfig.Enabled {
// If metrics aren't enabled, return
return
Expand Down Expand Up @@ -66,19 +66,19 @@ func runServer(config ServerConfig, registry *prometheus.Registry, ctx context.C
}
}

func (m Metrics) Describe(ch chan<- *prometheus.Desc) {
func (m *Metrics) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range m.allMetrics() {
ch <- metric.Desc()
}
}

func (m Metrics) Collect(ch chan<- prometheus.Metric) {
func (m *Metrics) Collect(ch chan<- prometheus.Metric) {
for _, metric := range m.allMetrics() {
ch <- metric
}
}

func (m Metrics) garbageCollector(ctx context.Context) {
func (m *Metrics) garbageCollector(ctx context.Context) {
if m.metricsConfig.TTL == 0 {
return
}
Expand Down

0 comments on commit 2ab9495

Please sign in to comment.