forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
operator.go
165 lines (155 loc) · 5.37 KB
/
operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package controller
import (
"fmt"
wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
log "github.com/sirupsen/logrus"
)
// operateWorkflow is the operator logic of a workflow
// It evaluates the current state of the workflow and decides how to proceed down the execution path
func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
if wf.Completed() {
return
}
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
wfCopy := wf.DeepCopyObject().(*wfv1.Workflow)
updated := false
defer func() {
if updated {
_, err := wfc.WorkflowClient.UpdateWorkflow(wfCopy)
if err != nil {
log.Errorf("ERROR updating status: %v", err)
} else {
log.Infof("UPDATED %s: %#v", wfCopy.ObjectMeta.Name, wfCopy.Status)
}
}
}()
if wfCopy.Status.Nodes == nil {
wfCopy.Status.Nodes = make(map[string]wfv1.NodeStatus)
updated = true
}
tmplUpdates, err := wfc.executeTemplate(wfCopy, wfCopy.Spec.Entrypoint, nil, wfCopy.ObjectMeta.Name)
updated = updated || tmplUpdates
if err != nil {
log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
}
}
// Returns tuple of: (workflow was updated, error)
func (wfc *WorkflowController) executeTemplate(wf *wfv1.Workflow, templateName string, args *wfv1.Arguments, nodeName string) (bool, error) {
log.Infof("Evaluating node %s: %v, args: %#v", nodeName, templateName, args)
nodeID := wf.NodeID(nodeName)
node, ok := wf.Status.Nodes[nodeID]
if ok && node.Completed() {
log.Infof("Node %s already completed", nodeName)
return false, nil
}
tmpl := wf.GetTemplate(templateName)
if tmpl == nil {
err := errors.Errorf(errors.CodeBadRequest, "Node %s error: template '%s' undefined", nodeName, templateName)
wf.Status.Nodes[nodeID] = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: wfv1.NodeStatusError}
return true, err
}
switch tmpl.Type {
case wfv1.TypeContainer:
if !ok {
// We have not yet created the pod
status := wfv1.NodeStatusRunning
err := wfc.createWorkflowPod(wf, nodeName, tmpl, args)
if err != nil {
// TODO: may need to query pod status if we hit already exists error
status = wfv1.NodeStatusError
return false, err
}
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: status}
wf.Status.Nodes[nodeID] = node
log.Infof("Initialized container node %v", node)
return true, nil
}
return false, nil
case wfv1.TypeWorkflow:
updates := false
if !ok {
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: wfv1.NodeStatusRunning}
log.Infof("Initialized workflow node %v", node)
wf.Status.Nodes[nodeID] = node
updates = true
}
for i, stepGroup := range tmpl.Steps {
sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i)
sgUpdates, err := wfc.executeStepGroup(wf, stepGroup, sgNodeName)
if err != nil {
node.Status = wfv1.NodeStatusError
wf.Status.Nodes[nodeID] = node
return true, err
}
updates = updates || sgUpdates
sgNodeID := wf.NodeID(sgNodeName)
if !wf.Status.Nodes[sgNodeID].Completed() {
log.Infof("Workflow step group node %v not yet completed", wf.Status.Nodes[sgNodeID])
return updates, nil
}
if !wf.Status.Nodes[sgNodeID].Successful() {
log.Infof("Workflow step group %v not successful", wf.Status.Nodes[sgNodeID])
node.Status = wfv1.NodeStatusFailed
wf.Status.Nodes[nodeID] = node
return true, nil
}
}
node.Status = wfv1.NodeStatusSucceeded
wf.Status.Nodes[nodeID] = node
return true, nil
default:
wf.Status.Nodes[nodeID] = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: wfv1.NodeStatusError}
return true, fmt.Errorf("Unknown type: %s", tmpl.Type)
}
}
func (wfc *WorkflowController) executeStepGroup(wf *wfv1.Workflow, stepGroup map[string]wfv1.WorkflowStep, nodeName string) (bool, error) {
nodeID := wf.NodeID(nodeName)
node, ok := wf.Status.Nodes[nodeID]
if ok && node.Completed() {
log.Infof("Step group node %v already marked completed", node)
return false, nil
}
updates := false
if !ok {
node = wfv1.NodeStatus{ID: nodeID, Name: nodeName, Status: "Running"}
wf.Status.Nodes[nodeID] = node
log.Infof("Initializing step group node %v", node)
updates = true
}
childNodeIDs := make([]string, 0)
// First kick off all parallel steps in the group
for stepName, step := range stepGroup {
childNodeName := fmt.Sprintf("%s.%s", nodeName, stepName)
childNodeIDs = append(childNodeIDs, wf.NodeID(childNodeName))
sUpdates, err := wfc.executeTemplate(wf, step.Template, &step.Arguments, childNodeName)
updates = updates || sUpdates
if err != nil {
node.Status = wfv1.NodeStatusError
wf.Status.Nodes[nodeID] = node
return true, err
}
}
// Return if not all children completed
for _, childNodeID := range childNodeIDs {
if !wf.Status.Nodes[childNodeID].Completed() {
return updates, nil
}
}
// All children completed. Determine status
for _, childNodeID := range childNodeIDs {
if !wf.Status.Nodes[childNodeID].Successful() {
node.Status = wfv1.NodeStatusFailed
wf.Status.Nodes[nodeID] = node
updates = true
log.Infof("Step group node %s deemed failed due to failure of %s", nodeID, childNodeID)
return updates, nil
}
}
node.Status = wfv1.NodeStatusSucceeded
wf.Status.Nodes[nodeID] = node
log.Infof("Step group node %s successful", nodeID)
return true, nil
}