Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Retry with DAG. Fixes #7617 #7652

Merged
merged 2 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: Retry with DAG. Fixes #7617
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Jan 26, 2022
commit f2627437dc0b821b82767a9316536f6e5d20a141
112 changes: 112 additions & 0 deletions tmp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: retry-barebones
spec:
entrypoint: main
templates:
- name: main
inputs:
parameters:
- name: numbers
value: |
["one", "two", "three"]
dag:
tasks:
- name: whalesay-before-process
template: whalesay
arguments:
parameters:
- name: message
value: "HELLO ALL"

- name: process
template: process
withParam: "{{inputs.parameters.numbers}}"
dependencies:
- whalesay-before-process
arguments:
parameters:
- name: message
value: "{{item}}"

- name: whalesay-after-process
template: whalesay
dependencies:
- process
arguments:
parameters:
- name: message
value: "Root: {{tasks.process.outputs.parameters.output}}"
# On retry there are extra values in the output, duplicated multiple times based on retry count

- name: process
# this fixes the behaviour
inputs:
parameters:
- name: message
outputs:
parameters:
- name: output
valueFrom:
parameter: "{{tasks.step2.outputs.parameters.output}}"
dag:
tasks:
- name: step0
template: whalesay
arguments:
parameters:
- name: message
value: "{{inputs.parameters.message}}"

- name: step1
template: whalesay
dependencies:
- step0
arguments:
parameters:
- name: message
value: "{{inputs.parameters.message}}"
- name: always-pass
value: "false"

- name: step2
template: whalesay
dependencies:
- step1
arguments:
parameters:
- name: message
value: "{{tasks.step1.outputs.parameters.output}}"

- name: whalesay
inputs:
parameters:
- name: message
- name: always-pass
value: "true"
outputs:
parameters:
- name: output
valueFrom:
path: /tmp/output.txt
script:
command: [ python ]
image: python:alpine
imagePullPolicy: IfNotPresent
source: |
import random
import os
msg = '{{inputs.parameters.message}}'
print(msg)

if '{{inputs.parameters.always-pass}}' == 'true':
a = -1
else:
a = random.randint(1,10)

if a > 0 and a <= 5:
raise Exception(msg)

with open('/tmp/output.txt', 'w') as o:
o.write('random' + msg + ' : ' +str(a))
2 changes: 1 addition & 1 deletion workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func retryWorkflow(ctx context.Context, kubeClient kubernetes.Interface, hydrato
continue
}
case wfv1.NodeError, wfv1.NodeFailed, wfv1.NodeOmitted:
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeStepGroup) {
if !strings.HasPrefix(node.Name, onExitNodeName) && (node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeTaskGroup || node.Type == wfv1.NodeTypeStepGroup) {
newNode := node.DeepCopy()
newNode.Phase = wfv1.NodeRunning
newNode.Message = ""
Expand Down
102 changes: 64 additions & 38 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,50 +828,76 @@ func TestDeepDeleteNodes(t *testing.T) {
}

func TestRetryWorkflow(t *testing.T) {
ctx := context.Background()
kubeClient := kubefake.NewSimpleClientset()
wfClient := argofake.NewSimpleClientset().ArgoprojV1alpha1().Workflows("my-ns")
createdTime := metav1.Time{Time: time.Now().UTC()}
finishedTime := metav1.Time{Time: createdTime.Add(time.Second * 2)}
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
common.LabelKeyCompleted: "true",
common.LabelKeyWorkflowArchivingStatus: "Pending",
}},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
StartedAt: createdTime,
FinishedAt: finishedTime,
Nodes: map[string]wfv1.NodeStatus{
"failed-node": {Name: "failed-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeFailed, Message: "failed"},
"succeeded-node": {Name: "succeeded-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeSucceeded, Message: "succeeded"}},
},
}

ctx := context.Background()
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, err = RetryWorkflow(ctx, kubeClient, hydratorfake.Always, wfClient, wf.Name, false, "")
if assert.NoError(t, err) {
assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase)
assert.Equal(t, metav1.Time{}, wf.Status.FinishedAt)
assert.True(t, wf.Status.StartedAt.After(createdTime.Time))
assert.NotContains(t, wf.Labels, common.LabelKeyCompleted)
assert.NotContains(t, wf.Labels, common.LabelKeyWorkflowArchivingStatus)
for _, node := range wf.Status.Nodes {
switch node.Phase {
case wfv1.NodeSucceeded:
assert.Equal(t, "succeeded", node.Message)
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Equal(t, createdTime, node.StartedAt)
assert.Equal(t, finishedTime, node.FinishedAt)
case wfv1.NodeFailed:
assert.Equal(t, "", node.Message)
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.Equal(t, metav1.Time{}, node.FinishedAt)
assert.True(t, node.StartedAt.After(createdTime.Time))
t.Run("Steps", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "my-steps",
Labels: map[string]string{
common.LabelKeyCompleted: "true",
common.LabelKeyWorkflowArchivingStatus: "Pending",
},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
StartedAt: createdTime,
FinishedAt: finishedTime,
Nodes: map[string]wfv1.NodeStatus{
"failed-node": {Name: "failed-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeFailed, Message: "failed"},
"succeeded-node": {Name: "succeeded-node", StartedAt: createdTime, FinishedAt: finishedTime, Phase: wfv1.NodeSucceeded, Message: "succeeded"}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, err = RetryWorkflow(ctx, kubeClient, hydratorfake.Noop, wfClient, wf.Name, false, "")
if assert.NoError(t, err) {
assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase)
assert.Equal(t, metav1.Time{}, wf.Status.FinishedAt)
assert.True(t, wf.Status.StartedAt.After(createdTime.Time))
assert.NotContains(t, wf.Labels, common.LabelKeyCompleted)
assert.NotContains(t, wf.Labels, common.LabelKeyWorkflowArchivingStatus)
for _, node := range wf.Status.Nodes {
switch node.Phase {
case wfv1.NodeSucceeded:
assert.Equal(t, "succeeded", node.Message)
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
assert.Equal(t, createdTime, node.StartedAt)
assert.Equal(t, finishedTime, node.FinishedAt)
case wfv1.NodeFailed:
assert.Equal(t, "", node.Message)
assert.Equal(t, wfv1.NodeRunning, node.Phase)
assert.Equal(t, metav1.Time{}, node.FinishedAt)
assert.True(t, node.StartedAt.After(createdTime.Time))
}
}
}
}
})
t.Run("DAG", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "my-dag",
Labels: map[string]string{},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
Nodes: map[string]wfv1.NodeStatus{
"": {Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeTaskGroup}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, err = RetryWorkflow(ctx, kubeClient, hydratorfake.Noop, wfClient, wf.Name, false, "")
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 1) {
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes[""].Phase)
}

}
})
}

func TestFromUnstructuredObj(t *testing.T) {
Expand Down