Skip to content

Commit

Permalink
Fix template.parallelism limiting parallelism of entire workflow (res…
Browse files Browse the repository at this point in the history
…olves argoproj#772)

Refactor operator to make template execution method signatures consistent
  • Loading branch information
jessesuen committed Mar 5, 2018
1 parent 7d7b74f commit cd73a9c
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 164 deletions.
7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,13 @@
"suspend": {
"description": "Suspend template subtype which can suspend a workflow when reaching the step",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.SuspendTemplate"
},
"tolerations": {
"description": "Tolerations to apply to workflow pods.",
"type": "array",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.Toleration"
}
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion examples/parallelism-limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
generateName: parallelism-limit-
spec:
entrypoint: parallelism-limit
parallelism: 100
parallelism: 2
templates:
- name: parallelism-limit
steps:
Expand Down
63 changes: 63 additions & 0 deletions examples/parallelism-nested.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Example with vertical and horizontal scalability
#
# Imagine we have 'M' workers which work in parallel,
# each worker should performs 'N' loops sequentially
#
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parallelism-nested-
spec:
arguments:
parameters:
- name: seq-list
value: |
["a","b","c","d"]
- name: parallel-list
value: |
[1,2,3,4]
entrypoint: parallel-worker
templates:
- name: parallel-worker
inputs:
parameters:
- name: seq-list
- name: parallel-list
steps:
- - name: parallel-worker
template: seq-worker
arguments:
parameters:
- name: seq-list
value: "{{inputs.parameters.seq-list}}"
- name: parallel-id
value: "{{item}}"
withParam: "{{inputs.parameters.parallel-list}}"

- name: seq-worker
parallelism: 1
inputs:
parameters:
- name: seq-list
- name: parallel-id
steps:
- - name: seq-step
template: one-job
arguments:
parameters:
- name: parallel-id
value: "{{inputs.parameters.parallel-id}}"
- name: seq-id
value: "{{item}}"
withParam: "{{inputs.parameters.seq-list}}"

- name: one-job
inputs:
parameters:
- name: seq-id
- name: parallel-id
container:
image: alpine
command: ['/bin/sh', '-c']
args: ["echo {{inputs.parameters.parallel-id}} {{inputs.parameters.seq-id}}; sleep 10"]
2 changes: 2 additions & 0 deletions examples/steps.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# This template demonstrates a steps template and how to control sequential vs. parallel steps.
# In this example, the hello1 completes before the hello2a, and hello2b steps, which run in parallel.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
Expand Down
6 changes: 3 additions & 3 deletions examples/suspend-template.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# This example demonstrates the use of a suspend template. Suspend templates allow a workflow to
# enter a suspended state at a predetermiend point in time in the workflow. Some use cases for this
# enter a suspended state at a predetermined point in time in the workflow. Some use cases for this
# might include: human approval during release process, performing asynchronous/long soak tests,
# manual judgement of a staging environment before deploying to production. To run this example,
# manual judgment of a staging environment before deploying to production. To run this example,
# submit the workflow and wait until the workflow reaches the second, "approve" step, at which point
# the workflow will be suspended. To resume the workflw, run:
# the workflow will be suspended. To resume the workflow, run:
# argo resume <workflowname>
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
14 changes: 0 additions & 14 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,17 +379,6 @@ type WorkflowStatus struct {
PersistentVolumeClaims []apiv1.Volume `json:"persistentVolumeClaims,omitempty"`
}

// GetNodesWithRetries returns a list of nodes that have retries.
func (wfs *WorkflowStatus) GetNodesWithRetries() []NodeStatus {
var nodesWithRetries []NodeStatus
for _, node := range wfs.Nodes {
if node.RetryStrategy != nil {
nodesWithRetries = append(nodesWithRetries, node)
}
}
return nodesWithRetries
}

// RetryStrategy provides controls on how to retry a workflow step
type RetryStrategy struct {
// Limit is the maximum number of attempts when retrying a container
Expand Down Expand Up @@ -437,9 +426,6 @@ type NodeStatus struct {
// Daemoned tracks whether or not this node was daemoned and need to be terminated
Daemoned *bool `json:"daemoned,omitempty"`

// RetryStrategy contains retry information about the node
RetryStrategy *RetryStrategy `json:"retryStrategy,omitempty"`

// Inputs captures input parameter values and artifact locations supplied to this template invocation
Inputs *Inputs `json:"inputs,omitempty"`

Expand Down
9 changes: 0 additions & 9 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,6 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) {
**out = **in
}
}
if in.RetryStrategy != nil {
in, out := &in.RetryStrategy, &out.RetryStrategy
if *in == nil {
*out = nil
} else {
*out = new(RetryStrategy)
(*in).DeepCopyInto(*out)
}
}
if in.Inputs != nil {
in, out := &in.Inputs, &out.Inputs
if *in == nil {
Expand Down
1 change: 1 addition & 0 deletions test/e2e/functional/parallelism-nested.yaml
2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, task.Template, dagCtx.boundaryID, wfv1.NodeError, err.Error())
return
}
_ = woc.executeTemplate(newTask.Template, newTask.Arguments, nodeName, dagCtx.boundaryID)
_, _ = woc.executeTemplate(newTask.Template, newTask.Arguments, nodeName, dagCtx.boundaryID)
}

// resolveDependencyReferences replaces any references to outputs of task dependencies, or artifacts in the inputs
Expand Down
Loading

0 comments on commit cd73a9c

Please sign in to comment.