Skip to content

Commit

Permalink
feat(controller): Workflow retention PoC. Closes argoproj#5369
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Mar 12, 2021
1 parent 2d331f3 commit 3006b73
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
controller: PNS_PRIVILEGED=true DEFAULT_REQUEUE_TIME=${DEFAULT_REQUEUE_TIME} LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image ${IMAGE_NAMESPACE}/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --loglevel ${LOG_LEVEL}
controller: RETENTION_GC_PERIOD=10s PNS_PRIVILEGED=true DEFAULT_REQUEUE_TIME=${DEFAULT_REQUEUE_TIME} LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image ${IMAGE_NAMESPACE}/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --loglevel ${LOG_LEVEL}
argo-server: UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ./dist/argo --loglevel ${LOG_LEVEL} server --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --auth-mode ${AUTH_MODE} --secure=$SECURE --x-frame-options=SAMEORIGIN
ui: yarn --cwd ui install && yarn --cwd ui start
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type Config struct {
// The command/args for each image, needed when the command is not specified and the emissary executor is used.
// https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary
Images map[string]Image `json:"images,omitempty"`

RetentionPolicy RetentionPolicy `json:"retentionPolicy,omitempty"`
}

func (c Config) GetContainerRuntimeExecutor(labels labels.Labels) (string, error) {
Expand Down
7 changes: 7 additions & 0 deletions config/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package config

type RetentionPolicy struct {
Completed int `json:"completed,omitempty"`
Failed int `json:"failed,omitempty"`
Errored int `json:"errored,omitempty"`
}
3 changes: 2 additions & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ Note that these environment variables may be removed at any time.
| `WF_DEL_PROPAGATION_POLICY` | `string` | The deletion propogation policy for workflows. |
| `WORKFLOW_GC_PERIOD` | `time.Duration` | The periodicity for GC of workflows. |
| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | Whether to bubble up template errors to workflow. Default true |
| `INFORMER_WRITE_BACK` | `bool` | Whether to write back to informer instead of catching up. Deafult true |
| `INFORMER_WRITE_BACK` | `bool` | Whether to write back to informer instead of catching up. Default true |
| `RETENTION_GC_PERIOD` | `time.Duration` | How often to perform retention GC. Default `1m` |

## Executor

Expand Down
4 changes: 4 additions & 0 deletions manifests/quick-start-minimal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ data:
enabled: true
path: /metrics
port: 9090
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Expand Down
4 changes: 4 additions & 0 deletions manifests/quick-start-mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,10 @@ data:
passwordSecret:
name: argo-mysql-config
key: password
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Expand Down
4 changes: 4 additions & 0 deletions manifests/quick-start-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,10 @@ data:
passwordSecret:
name: argo-postgres-config
key: password
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ data:
secretKeySecret:
name: my-minio-cred
key: secretkey
retentionPolicy: |
completed: 10
failed: 3
errored: 3
metricsConfig: |
enabled: true
path: /metrics
Expand Down
7 changes: 6 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
"github.com/argoproj/argo-workflows/v3/workflow/controller/informer"
"github.com/argoproj/argo-workflows/v3/workflow/controller/pod"
"github.com/argoproj/argo-workflows/v3/workflow/controller/retention"
"github.com/argoproj/argo-workflows/v3/workflow/cron"
"github.com/argoproj/argo-workflows/v3/workflow/events"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
Expand All @@ -65,6 +66,8 @@ type WorkflowController struct {
managedNamespace string

configController config.Controller
//
retentionController retention.Interface
// Config is the workflow controller's configuration
Config config.Config
// get the artifact repository
Expand Down Expand Up @@ -139,7 +142,6 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
wfc.UpdateConfig(ctx)

wfc.metrics = metrics.New(wfc.getMetricsServerConfig())

workqueue.SetProvider(wfc.metrics) // must execute SetProvider before we created the queues
wfc.wfQueue = wfc.metrics.RateLimiterWithBusyWorkers(&fixedItemIntervalRateLimiter{}, "workflow_queue")
wfc.throttler = wfc.newThrottler()
Expand Down Expand Up @@ -172,6 +174,7 @@ var indexers = cache.Indexers{
indexes.CronWorkflowIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyCronWorkflow),
indexes.WorkflowTemplateIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyWorkflowTemplate),
indexes.SemaphoreConfigIndexName: indexes.WorkflowSemaphoreKeysIndexFunc(),
indexes.WorkflowCompletedIndex: indexes.MetaLabelIndexFunc(common.LabelKeyCompleted),
indexes.WorkflowPhaseIndex: indexes.MetaWorkflowPhaseIndexFunc(),
indexes.ConditionsIndex: indexes.ConditionsIndexFunc,
}
Expand All @@ -191,6 +194,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
wfc.addWorkflowInformerHandlers(ctx)
wfc.podInformer = wfc.newPodInformer(ctx)
wfc.updateEstimatorFactory()
wfc.retentionController = retention.New(wfc.Config.RetentionPolicy, wfc.wfInformer, wfc.wfclientset)

go wfc.runConfigMapWatcher(ctx.Done())
go wfc.configController.Run(ctx.Done(), wfc.updateConfig)
Expand Down Expand Up @@ -246,6 +250,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.runTTLController(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wfc.metrics.RunServer(ctx)
go wfc.retentionController.Run(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())

Expand Down
1 change: 1 addition & 0 deletions workflow/controller/indexes/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
CronWorkflowIndex = "cronworkflow"
WorkflowIndex = "workflow"
WorkflowTemplateIndex = "workflowtemplate"
WorkflowCompletedIndex = "workflow.completed"
WorkflowPhaseIndex = "workflow.phase"
PodPhaseIndex = "pod.phase"
ConditionsIndex = "status.conditions"
Expand Down
14 changes: 14 additions & 0 deletions workflow/controller/indexes/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ func MetaWorkflowPhaseIndexFunc() cache.IndexFunc {
}
}

func MetaLabelIndexFunc(label string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
if err != nil {
return nil, nil
}
if value, exists := v.GetLabels()[label]; exists {
return []string{value}, nil
} else {
return nil, nil
}
}
}

func MetaNamespaceLabelIndexFunc(label string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
Expand Down
110 changes: 110 additions & 0 deletions workflow/controller/retention/retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package retention

import (
"context"
"sort"
"time"

log "github.com/sirupsen/logrus"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo-workflows/v3/config"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
workflow "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/argoproj/argo-workflows/v3/util"
envutil "github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)

var retentionGCPeriod = envutil.LookupEnvDurationOr("RETENTION_GC_PERIOD", time.Minute)

type Interface interface {
Run(ctx context.Context)
}

type retention struct {
policy config.RetentionPolicy
wfInformer cache.SharedIndexInformer
workflowInterface workflow.Interface
}

func (r retention) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
objs, err := r.wfInformer.GetIndexer().ByIndex(indexes.WorkflowCompletedIndex, "true")
if err != nil {
panic(err)
}
var uns []*unstructured.Unstructured
for _, obj := range objs {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
panic("nok")
}
uns = append(uns, un)
}
// sort youngest ... oldest - we priorities keeping newer workflows
sort.Slice(uns, func(i, j int) bool {
return uns[i].GetCreationTimestamp().Time.After(uns[j].GetCreationTimestamp().Time)
})
retain := make(map[types.UID]bool) // which one we should retain

// firstly, we prioritise keeping errored and failed
failed, errored := 0, 0
for _, un := range uns {
if failed >= r.policy.Failed && errored >= r.policy.Errored {
break
}
switch wfv1.WorkflowPhase(un.GetLabels()[common.LabelKeyPhase]) {
case wfv1.WorkflowError:
if errored < r.policy.Errored {
errored++
retain[un.GetUID()] = true
}
case wfv1.WorkflowFailed:
if failed < r.policy.Failed {
failed++
retain[un.GetUID()] = true
}
}
}
// the we add any completed until we have enough
for _, un := range uns {
if len(retain) >= r.policy.Completed {
break
}
retain[un.GetUID()] = true
}
log.WithFields(log.Fields{
"policy": log.Fields{"failed": r.policy.Failed, "errored": r.policy.Errored, "completed": r.policy.Completed},
"retention": log.Fields{"failed": failed, "errored": errored, "completed": len(retain)},
"total": len(uns),
}).Info("Performing retention GC")
for _, un := range uns {
println("ALEX", "retain", un.GetCreationTimestamp().String(), un.GetName(), retain[un.GetUID()])
if retain[un.GetUID()] {
continue
}
err := r.workflowInterface.ArgoprojV1alpha1().Workflows(un.GetNamespace()).Delete(ctx, un.GetName(), metav1.DeleteOptions{
PropagationPolicy: util.GetDeletePropagation(),
})
if err != nil && !apierr.IsNotFound(err) {
log.WithError(err).Warn("failed to delete workflow for retention")
}
}
time.Sleep(retentionGCPeriod)
}
}
}

func New(f config.RetentionPolicy, i cache.SharedIndexInformer, wi workflow.Interface) Interface {
return &retention{f, i, wi}
}

0 comments on commit 3006b73

Please sign in to comment.