Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Delete realtime metrics of running Workflows that are deleted #3993

Merged
merged 2 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,22 +665,16 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
)
wfc.wfInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
wfc.metrics.WorkflowAdded(key, getWfPhase(obj))
}
wf := obj.(*unstructured.Unstructured)
wfc.metrics.WorkflowAdded(string(wf.GetUID()), getWfPhase(obj))
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
wfc.metrics.WorkflowUpdated(key, getWfPhase(old), getWfPhase(new))
}
wf := new.(*unstructured.Unstructured)
wfc.metrics.WorkflowUpdated(string(wf.GetUID()), getWfPhase(old), getWfPhase(new))
},
DeleteFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
wfc.metrics.WorkflowDeleted(key, getWfPhase(obj))
}
wf := obj.(*unstructured.Unstructured)
wfc.metrics.WorkflowDeleted(string(wf.GetUID()), getWfPhase(obj))
},
})
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2805,7 +2805,7 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricTmpl.Name, err))
continue
}
err = woc.controller.metrics.UpsertCustomMetric(metricTmpl.GetDesc(), updatedMetric)
err = woc.controller.metrics.UpsertCustomMetric(metricTmpl.GetDesc(), string(woc.wf.UID), updatedMetric, true)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricTmpl.Name, err))
continue
Expand Down Expand Up @@ -2834,7 +2834,7 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricSpec.Name, err))
continue
}
err = woc.controller.metrics.UpsertCustomMetric(metricSpec.GetDesc(), updatedMetric)
err = woc.controller.metrics.UpsertCustomMetric(metricSpec.GetDesc(), string(woc.wf.UID), updatedMetric, false)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("could not construct metric '%s': %s", metricSpec.Name, err))
continue
Expand Down
28 changes: 21 additions & 7 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Metrics struct {

workflowsProcessed prometheus.Counter
workflowsByPhase map[v1alpha1.NodePhase]prometheus.Gauge
workflows map[string]bool
workflows map[string][]string
operationDurations prometheus.Histogram
errors map[ErrorCause]prometheus.Counter
customMetrics map[string]metric
Expand Down Expand Up @@ -73,7 +73,7 @@ func New(metricsConfig, telemetryConfig ServerConfig) *Metrics {
telemetryConfig: telemetryConfig,
workflowsProcessed: newCounter("workflows_processed_count", "Number of workflow updates processed", nil),
workflowsByPhase: getWorkflowPhaseGauges(),
workflows: make(map[string]bool),
workflows: make(map[string][]string),
operationDurations: newHistogram("operation_duration_seconds", "Histogram of durations of operations", nil, []float64{0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0}),
errors: getErrorCounters(),
customMetrics: make(map[string]metric),
Expand Down Expand Up @@ -126,10 +126,10 @@ func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.workflows[key] {
if _, exists := m.workflows[key]; exists {
return
}
m.workflows[key] = true
m.workflows[key] = []string{}
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Inc()
}
Expand All @@ -139,7 +139,7 @@ func (m *Metrics) WorkflowUpdated(key string, fromPhase, toPhase v1alpha1.NodePh
m.mutex.Lock()
defer m.mutex.Unlock()

if fromPhase == toPhase || !m.workflows[key] {
if _, exists := m.workflows[key]; !exists || fromPhase == toPhase {
return
}
if _, ok := m.workflowsByPhase[fromPhase]; ok {
Expand All @@ -154,15 +154,23 @@ func (m *Metrics) WorkflowDeleted(key string, phase v1alpha1.NodePhase) {
m.mutex.Lock()
defer m.mutex.Unlock()

if !m.workflows[key] {
if _, exists := m.workflows[key]; !exists {
return
}
m.StopRealtimeMetricsForKey(key)
delete(m.workflows, key)
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Dec()
}
}

func (m *Metrics) StopRealtimeMetricsForKey(key string) {
realtimeMetrics := m.workflows[key]
for _, metric := range realtimeMetrics {
delete(m.customMetrics, metric)
}
}

func (m *Metrics) OperationCompleted(durationSeconds float64) {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand All @@ -178,7 +186,7 @@ func (m *Metrics) GetCustomMetric(key string) prometheus.Metric {
return m.customMetrics[key].metric
}

func (m *Metrics) UpsertCustomMetric(key string, newMetric prometheus.Metric) error {
func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prometheus.Metric, realtime bool) error {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -193,6 +201,12 @@ func (m *Metrics) UpsertCustomMetric(key string, newMetric prometheus.Metric) er
m.metricNameHelps[name] = help
}
m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now()}

// If this is a realtime metric, track it
if realtime {
m.workflows[ownerKey] = append(m.workflows[ownerKey], key)
}

return nil
}

Expand Down
47 changes: 43 additions & 4 deletions workflow/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func TestMetrics(t *testing.T) {

assert.Nil(t, m.GetCustomMetric("does-not-exist"))

err = m.UpsertCustomMetric("metric", newCounter("test", "test", nil))
err = m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false)
if assert.NoError(t, err) {
assert.NotNil(t, m.GetCustomMetric("metric"))
}

err = m.UpsertCustomMetric("metric2", newCounter("test", "new test", nil))
err = m.UpsertCustomMetric("metric2", "", newCounter("test", "new test", nil), false)
assert.Error(t, err)

badMetric, err := constructOrUpdateGaugeMetric(nil, &v1alpha1.Prometheus{
Expand All @@ -121,7 +121,7 @@ func TestMetrics(t *testing.T) {
},
})
if assert.NoError(t, err) {
err = m.UpsertCustomMetric("asdf", badMetric)
err = m.UpsertCustomMetric("asdf", "", badMetric, false)
assert.Error(t, err)
}
}
Expand All @@ -141,7 +141,7 @@ func TestMetricGC(t *testing.T) {
m := New(config, config)
assert.Len(t, m.customMetrics, 0)

err := m.UpsertCustomMetric("metric", newCounter("test", "test", nil))
err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false)
if assert.NoError(t, err) {
assert.Len(t, m.customMetrics, 1)
}
Expand Down Expand Up @@ -178,6 +178,45 @@ func TestWorkflowQueueMetrics(t *testing.T) {
if assert.NoError(t, err) {
assert.Equal(t, 1.0, *metric.Counter.Value)
}
}
}

func TestRealTimeMetricDeletion(t *testing.T) {
config := ServerConfig{
Enabled: true,
Path: DefaultMetricsServerPath,
Port: DefaultMetricsServerPort,
TTL: 1 * time.Second,
}
m := New(config, config)

m.WorkflowAdded("123", v1alpha1.NodeRunning)
rtMetric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name", Help: "hello"}, func() float64 { return 0.0 })
assert.NoError(t, err)
assert.Empty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 0)

err = m.UpsertCustomMetric("metrickey", "123", rtMetric, true)
assert.NoError(t, err)
assert.NotEmpty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 1)

m.WorkflowDeleted("123", v1alpha1.NodeRunning)
assert.Empty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 0)

m.WorkflowAdded("456", v1alpha1.NodeRunning)
metric, err := ConstructOrUpdateMetric(nil, &v1alpha1.Prometheus{Name: "name", Help: "hello", Gauge: &v1alpha1.Gauge{Value: "1"}})
assert.NoError(t, err)
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 0)

err = m.UpsertCustomMetric("metrickey", "456", metric, false)
assert.NoError(t, err)
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)

m.WorkflowDeleted("456", v1alpha1.NodeRunning)
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)
}