Skip to content

Commit

Permalink
fix: Report container, plugin and HTTP progress. Fixes #7918 (#7960)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Feb 22, 2022
1 parent 4e80438 commit d57fd0f
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ git-ask-pass.sh
/go-diagrams/
/.run/
sdks/python/client/dist/*
/cmd/argoexec/commands/test.txt
6 changes: 6 additions & 0 deletions pkg/apis/workflow/v1alpha1/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
// Progress in N/M format. N is number of task complete. M is number of tasks.
type Progress string

const (
ProgressUndefined = Progress("")
ProgressZero = Progress("0/0") // zero value (not the same as "no progress)
ProgressDefault = Progress("0/1")
)

func NewProgress(n, m int64) (Progress, bool) {
return ParseProgress(fmt.Sprintf("%v/%v", n, m))
}
Expand Down
55 changes: 42 additions & 13 deletions workflow/progress/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// PodProgress reads the progress annotation of a pod and ensures it's valid and synced
// with the node status.
func PodProgress(pod *apiv1.Pod, node *wfv1.NodeStatus) wfv1.Progress {
progress := wfv1.Progress("0/1")
progress := wfv1.ProgressDefault
if node.Progress.IsValid() {
progress = node.Progress
}
Expand All @@ -21,37 +21,66 @@ func PodProgress(pod *apiv1.Pod, node *wfv1.NodeStatus) wfv1.Progress {
progress = v
}
}
if node.Fulfilled() {
progress = progress.Complete()
}
return progress
}

// UpdateProgress ensures the workflow's progress is updated with the individual node progress.
// This func can perform any repair work needed
func UpdateProgress(wf *wfv1.Workflow) {
wf.Status.Progress = "0/0"
for _, node := range wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod && node.Type != wfv1.NodeTypeHTTP {
wf.Status.Progress = wfv1.ProgressZero
// We loop over all executable nodes first, otherwise sum will be wrong.
for nodeID, node := range wf.Status.Nodes {
if !executable(node.Type) {
continue
}
// all executable nodes should have progress defined, if not, we just set it to the default value.
if node.Progress == wfv1.ProgressUndefined {
node.Progress = wfv1.ProgressDefault
wf.Status.Nodes[nodeID] = node
}
// it could be possible for corruption to result in invalid progress, we just ignore invalid progress
if !node.Progress.IsValid() {
continue
}
if node.Progress.IsValid() {
wf.Status.Progress = wf.Status.Progress.Add(node.Progress)
// if the node has finished successfully, then we can just set progress complete
switch node.Phase {
case wfv1.NodeSucceeded, wfv1.NodeSkipped, wfv1.NodeOmitted:
node.Progress = node.Progress.Complete()
wf.Status.Nodes[nodeID] = node
}
// the total should only contain node that are valid
wf.Status.Progress = wf.Status.Progress.Add(node.Progress)
}
// For non-executable nodes, we sum up the children.
// It is quite possible for a succeeded node to contain failed children (e.g. continues-on failed flag is set)
// so it is possible for the sum progress to be "1/2" (for example)
for nodeID, node := range wf.Status.Nodes {
if node.Type == wfv1.NodeTypePod {
if executable(node.Type) {
continue
}
progress := sumProgress(wf, node, make(map[string]bool))
if progress.IsValid() && node.Progress != progress {
if progress.IsValid() {
node.Progress = progress
wf.Status.Nodes[nodeID] = node
}
}
// we could check an invariant here, wf.Status.Nodes[wf.Name].Progress == wf.Status.Progress, but I think there's
// always the chance that the nodes get corrupted, so I think we leave it
}

// executable states that the progress of this node type is updated by other code. It should not be summed.
// It maybe that this type of node never gets progress.
func executable(nodeType wfv1.NodeType) bool {
switch nodeType {
case wfv1.NodeTypePod, wfv1.NodeTypeHTTP, wfv1.NodeTypePlugin, wfv1.NodeTypeContainer, wfv1.NodeTypeSuspend:
return true
default:
return false
}
}

func sumProgress(wf *wfv1.Workflow, node wfv1.NodeStatus, visited map[string]bool) wfv1.Progress {
progress := wfv1.Progress("0/0")
progress := wfv1.ProgressZero
for _, childNodeID := range node.Children {
if visited[childNodeID] {
continue
Expand All @@ -60,7 +89,7 @@ func sumProgress(wf *wfv1.Workflow, node wfv1.NodeStatus, visited map[string]boo
// this will tolerate missing child (will be "") and therefore ignored
child := wf.Status.Nodes[childNodeID]
progress = progress.Add(sumProgress(wf, child, visited))
if child.Type == wfv1.NodeTypePod || child.Type == wfv1.NodeTypeHTTP {
if executable(child.Type) {
v := child.Progress
if v.IsValid() {
progress = progress.Add(v)
Expand Down
49 changes: 23 additions & 26 deletions workflow/progress/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,35 @@ func TestUpdater(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: "wf"},
Status: wfv1.WorkflowStatus{
Nodes: wfv1.Nodes{
"pod-1": wfv1.NodeStatus{Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, Progress: wfv1.Progress("50/50")},
"pod-2": wfv1.NodeStatus{Type: wfv1.NodeTypePod, Progress: wfv1.Progress("50/150")},
"pod-3": wfv1.NodeStatus{Type: wfv1.NodeTypePod, Progress: wfv1.Progress("50/100")},
"wf": wfv1.NodeStatus{Children: []string{"pod-1", "pod-2", "pod-3"}},
"pod-1": wfv1.NodeStatus{Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, Progress: wfv1.Progress("25/50")},
"pod-2": wfv1.NodeStatus{Phase: wfv1.NodeRunning, Type: wfv1.NodeTypePod, Progress: wfv1.Progress("50/150")},
"http": wfv1.NodeStatus{Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeHTTP},
"plug": wfv1.NodeStatus{Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePlugin},
"dag": wfv1.NodeStatus{Children: []string{"pod-1", "pod-2", "http", "plug"}},
},
},
}

UpdateProgress(wf)

assert.Equal(t, wfv1.Progress("50/50"), wf.Status.Nodes["pod-1"].Progress)
assert.Equal(t, wfv1.Progress("50/150"), wf.Status.Nodes["pod-2"].Progress)
assert.Equal(t, wfv1.Progress("50/100"), wf.Status.Nodes["pod-3"].Progress)
assert.Equal(t, wfv1.Progress("150/300"), wf.Status.Nodes["wf"].Progress)
assert.Equal(t, wfv1.Progress("150/300"), wf.Status.Progress)
nodes := wf.Status.Nodes
assert.Equal(t, wfv1.Progress("50/50"), nodes["pod-1"].Progress, "succeeded pod is completed")
assert.Equal(t, wfv1.Progress("50/150"), nodes["pod-2"].Progress, "running pod is unchanged")
assert.Equal(t, wfv1.Progress("0/1"), nodes["http"].Progress, "failed http is unchanged")
assert.Equal(t, wfv1.Progress("1/1"), nodes["plug"].Progress, "succeeded plug is completed")
assert.Equal(t, wfv1.Progress("101/202"), nodes["dag"].Progress, "dag is summed up")
assert.Equal(t, wfv1.Progress("101/202"), wf.Status.Progress, "wf is sum total")
}

func TestUpdaterWithHTTPNode(t *testing.T) {
ns := "my-ns"
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: "wf"},
Status: wfv1.WorkflowStatus{
Nodes: wfv1.Nodes{
"http": wfv1.NodeStatus{Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypeHTTP, Progress: wfv1.Progress("1/1")},
"wf": wfv1.NodeStatus{Children: []string{"http"}},
},
},
}

UpdateProgress(wf)

assert.Equal(t, wfv1.Progress("1/1"), wf.Status.Nodes["http"].Progress)
assert.Equal(t, wfv1.Progress("1/1"), wf.Status.Nodes["wf"].Progress)
assert.Equal(t, wfv1.Progress("1/1"), wf.Status.Progress)
func Test_executes(t *testing.T) {
assert.False(t, executable(""))
assert.True(t, executable(wfv1.NodeTypePod))
assert.True(t, executable(wfv1.NodeTypeContainer))
assert.False(t, executable(wfv1.NodeTypeSteps))
assert.False(t, executable(wfv1.NodeTypeStepGroup))
assert.False(t, executable(wfv1.NodeTypeDAG))
assert.False(t, executable(wfv1.NodeTypeTaskGroup))
assert.True(t, executable(wfv1.NodeTypeSuspend))
assert.True(t, executable(wfv1.NodeTypeHTTP))
assert.True(t, executable(wfv1.NodeTypePlugin))
}

0 comments on commit d57fd0f

Please sign in to comment.