Skip to content

Commit

Permalink
feat: added retention controller. Fixes #5369 (#6854)
Browse files Browse the repository at this point in the history
Signed-off-by: NikeNano <[email protected]>
  • Loading branch information
NikeNano committed Dec 13, 2021
1 parent ae91861 commit 8d552fb
Show file tree
Hide file tree
Showing 20 changed files with 364 additions and 28 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ type Config struct {
// The command/args for each image, needed when the command is not specified and the emissary executor is used.
// https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary
Images map[string]Image `json:"images,omitempty"`

RetentionPolicy *RetentionPolicy `json:"retentionPolicy,omitempty"`
}

func (c Config) GetContainerRuntimeExecutor(labels labels.Labels) (string, error) {
Expand Down
7 changes: 7 additions & 0 deletions config/retention_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package config

type RetentionPolicy struct {
Completed int `json:"completed,omitempty"`
Failed int `json:"failed,omitempty"`
Errored int `json:"errored,omitempty"`
}
4 changes: 4 additions & 0 deletions manifests/quick-start-minimal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ data:
path: /metrics
port: 9090
namespaceParallelism: "10"
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Expand Down
4 changes: 4 additions & 0 deletions manifests/quick-start-mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ data:
passwordSecret:
name: argo-mysql-config
key: password
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Expand Down
4 changes: 4 additions & 0 deletions manifests/quick-start-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ data:
passwordSecret:
name: argo-postgres-config
key: password
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Expand Down
4 changes: 4 additions & 0 deletions manifests/quick-start/minimal/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ kind: Kustomization

resources:
- ../base

patchesStrategicMerge:
- overlays/workflow-controller-configmap.yaml

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: v1
data:
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ data:
passwordSecret:
name: argo-mysql-config
key: password
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ data:
passwordSecret:
name: argo-postgres-config
key: password
retentionPolicy: |
completed: 10
failed: 3
errored: 3
kind: ConfigMap
metadata:
name: workflow-controller-configmap
26 changes: 26 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,32 @@ func (w *When) WaitForWorkflow(options ...interface{}) *When {
}
}

func (w *When) WaitForWorkflowList(listOptions metav1.ListOptions, condition func(list []wfv1.Workflow) bool) *When {
w.t.Helper()
timeout := defaultTimeout
start := time.Now()
_, _ = fmt.Println("Waiting", timeout.String(), "for workflows", listOptions)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
select {
case <-ctx.Done():
w.t.Errorf("timeout after %v waiting for condition", timeout)
return w
default:
wfList, err := w.client.List(ctx, listOptions)
if err != nil {
w.t.Error(err)
return w
}
if ok := condition(wfList.Items); ok {
_, _ = fmt.Printf("Condition met after %s\n", time.Since(start).Truncate(time.Second))
return w
}
}
}
}

func (w *When) hydrateWorkflow(wf *wfv1.Workflow) {
w.t.Helper()
err := w.hydrator.Hydrate(wf)
Expand Down
22 changes: 22 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,28 @@ spec:
ExpectWorkflowDeleted()
}

func (s *FunctionalSuite) TestWorkflowRetention() {
listOptions := metav1.ListOptions{LabelSelector: "workflows.argoproj.io/phase=Failed"}
s.Given().
Workflow("@testdata/exit-1.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Given().
Workflow("@testdata/exit-1.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Given().
Workflow("@testdata/exit-1.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
WaitForWorkflowList(listOptions, func(list []wfv1.Workflow) bool {
return len(list) == 2
})
}

// in this test we create a poi quota, and then we create a workflow that needs one more pod than the quota allows
// because we run them in parallel, the first node will run to completion, and then the second one
func (s *FunctionalSuite) TestResourceQuota() {
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/manifests/mixins/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ data:
limits:
cpu: 0.5
memory: 128Mi
retentionPolicy: |
completed: 10
failed: 2
errored: 2
kubeletInsecure: "true"
7 changes: 7 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -353,3 +354,9 @@ func GetTemplateHolderString(tmplHolder wfv1.TemplateReferenceHolder) string {
func GenerateOnExitNodeName(parentNodeName string) string {
return fmt.Sprintf("%s.onExit", parentNodeName)
}

func IsDone(un *unstructured.Unstructured) bool {
return un.GetDeletionTimestamp() == nil &&
un.GetLabels()[LabelKeyCompleted] == "true" &&
un.GetLabels()[LabelKeyWorkflowArchivingStatus] != "Pending"
}
20 changes: 20 additions & 0 deletions workflow/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes/fake"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -141,3 +142,22 @@ func TestGetTemplateHolderString(t *testing.T) {
ClusterScope: true,
}}))
}

func TestIsDone(t *testing.T) {
assert.False(t, IsDone(&unstructured.Unstructured{}))
assert.True(t, IsDone(&unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
LabelKeyCompleted: "true",
},
},
}}))
assert.False(t, IsDone(&unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
LabelKeyCompleted: "true",
LabelKeyWorkflowArchivingStatus: "Pending",
},
},
}}))
}
12 changes: 6 additions & 6 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/controller/pod"
"github.com/argoproj/argo-workflows/v3/workflow/cron"
"github.com/argoproj/argo-workflows/v3/workflow/events"
"github.com/argoproj/argo-workflows/v3/workflow/gccontroller"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
"github.com/argoproj/argo-workflows/v3/workflow/metrics"
"github.com/argoproj/argo-workflows/v3/workflow/signal"
"github.com/argoproj/argo-workflows/v3/workflow/sync"
"github.com/argoproj/argo-workflows/v3/workflow/ttlcontroller"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

Expand Down Expand Up @@ -184,12 +184,12 @@ func (wfc *WorkflowController) newThrottler() sync.Throttler {
}
}

// RunTTLController runs the workflow TTL controller
func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTLWorkers int) {
// runGCcontroller runs the workflow garbage collector controller
func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer, wfc.metrics)
err := ttlCtrl.Run(ctx.Done(), workflowTTLWorkers)
gcCtrl := gccontroller.NewController(wfc.wfclientset, wfc.wfInformer, wfc.metrics, wfc.Config.RetentionPolicy)
err := gcCtrl.Run(ctx.Done(), workflowTTLWorkers)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func (wfc *WorkflowController) startLeading(ctx context.Context, logCtx *log.Ent
go wfc.workflowGarbageCollector(ctx.Done())
go wfc.archivedWorkflowGarbageCollector(ctx.Done())

go wfc.runTTLController(ctx, workflowTTLWorkers)
go wfc.runGCcontroller(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())
Expand Down
2 changes: 2 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,8 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
// the container. The status of this node should be "Success" if any
// of the retries succeed. Otherwise, it is "Failed".
retryNodeName := ""

// Here it is needed to be updated
if woc.retryStrategy(processedTmpl) != nil {
retryNodeName = nodeName
retryParentNode := node
Expand Down
Loading

0 comments on commit 8d552fb

Please sign in to comment.