Skip to content

Commit

Permalink
feat: Add metric retention policy (argoproj#2836)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Apr 28, 2020
1 parent f03cda6 commit c0143d3
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 31 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type HDFSArtifactRepository struct {
type PrometheusConfig struct {
Enabled bool `json:"enabled,omitempty"`
DisableLegacy bool `json:"disableLegacy"`
MetricsTTL TTL `json:"metricsTTL"`
Path string `json:"path,omitempty"`
Port string `json:"port,omitempty"`
}
19 changes: 17 additions & 2 deletions config/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,23 @@ func (l *TTL) UnmarshalJSON(b []byte) error {
return nil
}
if strings.HasSuffix(value, "d") {
hours, err := strconv.Atoi(strings.TrimSuffix(value, "d"))
*l = TTL(time.Duration(hours) * 24 * time.Hour)
days, err := strconv.Atoi(strings.TrimSuffix(value, "d"))
*l = TTL(time.Duration(days) * 24 * time.Hour)
return err
}
if strings.HasSuffix(value, "h") {
hours, err := strconv.Atoi(strings.TrimSuffix(value, "h"))
*l = TTL(time.Duration(hours) * time.Hour)
return err
}
if strings.HasSuffix(value, "m") {
minutes, err := strconv.Atoi(strings.TrimSuffix(value, "m"))
*l = TTL(time.Duration(minutes) * time.Minute)
return err
}
if strings.HasSuffix(value, "s") {
seconds, err := strconv.Atoi(strings.TrimSuffix(value, "s"))
*l = TTL(time.Duration(seconds) * time.Second)
return err
}
d, err := time.ParseDuration(value)
Expand Down
1 change: 1 addition & 0 deletions manifests/quick-start-mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ data:
scope: pod
url: http:https://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name}
metricsConfig: |
disableLegacy: true
enabled: true
path: /metrics
port: 9090
Expand Down
1 change: 1 addition & 0 deletions manifests/quick-start-no-db.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ data:
scope: pod
url: http:https://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name}
metricsConfig: |
disableLegacy: true
enabled: true
path: /metrics
port: 9090
Expand Down
1 change: 1 addition & 0 deletions manifests/quick-start-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ data:
scope: pod
url: http:https://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name}
metricsConfig: |
disableLegacy: true
enabled: true
path: /metrics
port: 9090
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ data:
featureFlags: |
resourcesDuration: true
metricsConfig: |
disableLegacy: true
enabled: true
path: /metrics
port: 9090
Expand All @@ -27,4 +28,4 @@ data:
url: http:https://logging-facility?namespace=${metadata.namespace}&podName=${metadata.name}
kind: ConfigMap
metadata:
name: workflow-controller-configmap
name: workflow-controller-configmap
20 changes: 10 additions & 10 deletions persist/sqldb/mocks/WorkflowArchive.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion persist/sqldb/null_workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func (r *nullWorkflowArchive) DeleteWorkflow(string) error {
return fmt.Errorf("deleting archived workflows not supported")
}

func (r *nullWorkflowArchive) DeleteWorkflows(time.Duration) error {
func (r *nullWorkflowArchive) DeleteExpiredWorkflows(time.Duration) error {
return nil
}
6 changes: 3 additions & 3 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type WorkflowArchive interface {
ListWorkflows(namespace string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error)
GetWorkflow(uid string) (*wfv1.Workflow, error)
DeleteWorkflow(uid string) error
DeleteWorkflows(ttl time.Duration) error
DeleteExpiredWorkflows(ttl time.Duration) error
}

type workflowArchive struct {
Expand Down Expand Up @@ -220,7 +220,7 @@ func (r *workflowArchive) DeleteWorkflow(uid string) error {
return nil
}

func (r *workflowArchive) DeleteWorkflows(ttl time.Duration) error {
func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
rs, err := r.session.
DeleteFrom(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
Expand All @@ -235,4 +235,4 @@ func (r *workflowArchive) DeleteWorkflows(ttl time.Duration) error {
}
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
return nil
}
}
12 changes: 12 additions & 0 deletions workflow/common/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

type Metric struct {
Metric prometheus.Metric
LastUpdated time.Time
}
49 changes: 44 additions & 5 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type WorkflowController struct {
session sqlbuilder.Database
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
wfArchive sqldb.WorkflowArchive
Metrics map[string]prometheus.Metric
Metrics map[string]common.Metric
}

const (
Expand Down Expand Up @@ -120,7 +120,7 @@ func NewWorkflowController(
configController: config.NewController(namespace, configMap, kubeclientset),
completedPods: make(chan string, 512),
gcPods: make(chan string, 512),
Metrics: make(map[string]prometheus.Metric),
Metrics: make(map[string]common.Metric),
}
wfc.throttler = NewThrottler(0, wfc.wfQueue)
return &wfc
Expand Down Expand Up @@ -187,6 +187,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
go wfc.podGarbageCollector(ctx.Done())
go wfc.workflowGarbageCollector(ctx.Done())
go wfc.archivedWorkflowGarbageCollector(ctx.Done())
go wfc.metricsGarbageCollector(ctx.Done())

wfc.createClusterWorkflowTemplateInformer(ctx)

Expand Down Expand Up @@ -382,14 +383,40 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan st
return
case <-ticker.C:
log.Info("Performing archived workflow GC")
err := wfc.wfArchive.DeleteWorkflows(time.Duration(ttl))
err := wfc.wfArchive.DeleteExpiredWorkflows(time.Duration(ttl))
if err != nil {
log.WithField("err", err).Error("Failed to delete archived workflows")
}
}
}
}

func (wfc *WorkflowController) metricsGarbageCollector(stopCh <-chan struct{}) {
if !wfc.Config.MetricsConfig.Enabled {
log.Info("Cannot start metrics GC: metrics are disabled")
return
}
ttl := wfc.Config.MetricsConfig.MetricsTTL
if ttl == config.TTL(0) {
log.Info("Metrics TTL is zero, metrics GC is disabled")
return
}
duration := time.Duration(ttl)
log.WithFields(log.Fields{"ttl": ttl}).Info("Performing metrics GC")
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {
select {
case <-stopCh:
log.Info("Stopping metrics GC")
return
case <-ticker.C:
log.Info("Performing metrics GC")
wfc.DeleteExpiredMetrics(duration)
}
}
}

func (wfc *WorkflowController) runWorker() {
for wfc.processNextItem() {
}
Expand Down Expand Up @@ -736,6 +763,18 @@ func (wfc *WorkflowController) GetContainerRuntimeExecutor() string {
return wfc.Config.ContainerRuntimeExecutor
}

func (wfc *WorkflowController) GetMetrics() map[string]prometheus.Metric {
return wfc.Metrics
func (wfc *WorkflowController) GetMetrics() []prometheus.Metric {
var out []prometheus.Metric
for _, metric := range wfc.Metrics {
out = append(out, metric.Metric)
}
return out
}

func (wfc *WorkflowController) DeleteExpiredMetrics(ttl time.Duration) {
for key, metric := range wfc.Metrics {
if time.Since(metric.LastUpdated) > ttl {
delete(wfc.Metrics, key)
}
}
}
23 changes: 21 additions & 2 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/argoproj/argo/workflow/common"

"github.com/stretchr/testify/assert"
authorizationv1 "k8s.io/api/authorization/v1"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -125,7 +126,7 @@ func newController(objects ...runtime.Object) (context.CancelFunc, *WorkflowCont
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
wfArchive: sqldb.NullWorkflowArchive,
Metrics: make(map[string]prometheus.Metric),
Metrics: make(map[string]common.Metric),
}
return cancel, controller
}
Expand Down Expand Up @@ -326,3 +327,21 @@ func TestWorkflowController_archivedWorkflowGarbageCollector(t *testing.T) {

controller.archivedWorkflowGarbageCollector(make(chan struct{}))
}

func TestWorkflowControllerMetricsGarbageCollector(t *testing.T) {
cancel, controller := newController()
defer cancel()

controller.Metrics["metric-1"] = common.Metric{Metric: nil, LastUpdated: time.Now().Add(-1 * time.Minute)}
controller.Metrics["metric-2"] = common.Metric{Metric: nil, LastUpdated: time.Now().Add(3 * time.Second)}

controller.Config.MetricsConfig.Enabled = true
controller.Config.MetricsConfig.MetricsTTL = config.TTL(1 * time.Second)

stop := make(chan struct{})
go func() { time.Sleep(2 * time.Second); stop <- struct{}{} }()
controller.metricsGarbageCollector(stop)

assert.Contains(t, controller.Metrics, "metric-2")
assert.NotContains(t, controller.Metrics, "metric-1")
}
6 changes: 3 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2494,7 +2494,7 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc
continue
}
updatedMetric := metrics.ConstructRealTimeGaugeMetric(metricTmpl, valueFunc)
woc.controller.Metrics[metricTmpl.GetDesc()] = updatedMetric
woc.controller.Metrics[metricTmpl.GetDesc()] = common.Metric{Metric: updatedMetric, LastUpdated: time.Now()}
continue
} else {
metricSpec := metricTmpl.DeepCopy()
Expand All @@ -2508,14 +2508,14 @@ func (woc *wfOperationCtx) computeMetrics(metricList []*wfv1.Prometheus, localSc
}
metricSpec.SetValueString(replacedValue)

metric := woc.controller.Metrics[metricSpec.GetDesc()]
metric := woc.controller.Metrics[metricSpec.GetDesc()].Metric
// It is valid to pass a nil metric to ConstructOrUpdateMetric, in that case the metric will be created for us
updatedMetric, err := metrics.ConstructOrUpdateMetric(metric, metricSpec)
if err != nil {
woc.reportMetricEmissionError(fmt.Sprintf("could not compute metric '%s': %s", metricSpec.Name, err))
continue
}
woc.controller.Metrics[metricSpec.GetDesc()] = updatedMetric
woc.controller.Metrics[metricSpec.GetDesc()] = common.Metric{Metric: updatedMetric, LastUpdated: time.Now()}
continue
}
}
Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/operator_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestBasicMetric(t *testing.T) {

metricDesc := wf.Spec.Templates[0].Metrics.Prometheus[0].GetDesc()
assert.Contains(t, controller.Metrics, metricDesc)
metric := controller.Metrics[metricDesc].(prometheus.Gauge)
metric := controller.Metrics[metricDesc].Metric.(prometheus.Gauge)
metrtcString, err := getMetricStringValue(metric)
assert.NoError(t, err)
assert.Contains(t, metrtcString, `label:<name:"name" value:"random-int" > gauge:<value:`)
Expand Down Expand Up @@ -120,12 +120,12 @@ func TestCounterMetric(t *testing.T) {
metricErrorDesc := wf.Spec.Templates[0].Metrics.Prometheus[1].GetDesc()
assert.Contains(t, controller.Metrics, metricErrorDesc)

metricTotalCounter := controller.Metrics[metricTotalDesc].(prometheus.Counter)
metricTotalCounter := controller.Metrics[metricTotalDesc].Metric.(prometheus.Counter)
metricTotalCounterString, err := getMetricStringValue(metricTotalCounter)
assert.NoError(t, err)
assert.Contains(t, metricTotalCounterString, `label:<name:"name" value:"flakey" > counter:<value:1 >`)

metricErrorCounter := controller.Metrics[metricErrorDesc].(prometheus.Counter)
metricErrorCounter := controller.Metrics[metricErrorDesc].Metric.(prometheus.Counter)
metricErrorCounterString, err := getMetricStringValue(metricErrorCounter)
assert.NoError(t, err)
assert.Contains(t, metricErrorCounterString, `label:<name:"name" value:"flakey" > counter:<value:1 >`)
Expand Down
4 changes: 3 additions & 1 deletion workflow/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"os"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/tools/cache"
Expand All @@ -15,7 +16,8 @@ const (
)

type MetricsProvider interface {
GetMetrics() map[string]prometheus.Metric
GetMetrics() []prometheus.Metric
DeleteExpiredMetrics(ttl time.Duration)
}

func NewMetricsRegistry(metricsProvider MetricsProvider, informer cache.SharedIndexInformer, disableLegacyMetrics bool) *prometheus.Registry {
Expand Down

0 comments on commit c0143d3

Please sign in to comment.