Skip to content

Commit

Permalink
fix(executor): Fix container set bugs (argoproj#5317)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Mar 8, 2021
1 parent 9d2e961 commit ba949c3
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 84 deletions.
2 changes: 1 addition & 1 deletion cmd/argoexec/commands/emissary.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func NewEmissaryCommand() *cobra.Command {

// this may not be that important an optimisation, except for very long logs we don't want to capture
if includeScriptOutput {
logger.Info("capturing script output")
logger.Info("capturing logs")
stdout, err := os.Create(varRunArgo + "/ctr/" + containerName + "/stdout")
if err != nil {
return fmt.Errorf("failed to open stdout: %w", err)
Expand Down
28 changes: 0 additions & 28 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ Workflow is the definition of a workflow resource

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down Expand Up @@ -567,8 +565,6 @@ WorkflowSpec is the specification of a Workflow.

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down Expand Up @@ -958,8 +954,6 @@ CronWorkflowSpec is the specification of a CronWorkflow

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down Expand Up @@ -1307,8 +1301,6 @@ WorkflowTemplateSpec is a spec of WorkflowTemplate.

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down Expand Up @@ -1582,8 +1574,6 @@ Arguments to a template

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)

- [`parallelism-nested-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-workflow.yaml)
Expand Down Expand Up @@ -2000,8 +1990,6 @@ Template is a reusable and composable unit of execution in a workflow

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down Expand Up @@ -2587,8 +2575,6 @@ Parameter indicate a passed string parameter to a service template with an optio

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)

- [`parallelism-nested-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-workflow.yaml)
Expand Down Expand Up @@ -2850,8 +2836,6 @@ DAGTemplate is a template subtype for directed acyclic graph templates

- [`map-reduce.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/map-reduce.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)

- [`parameter-aggregation-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parameter-aggregation-dag.yaml)
Expand Down Expand Up @@ -3055,8 +3039,6 @@ Inputs are the mechanism for passing parameters, artifacts, volumes from one tem

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)

- [`parallelism-nested-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-workflow.yaml)
Expand Down Expand Up @@ -3904,8 +3886,6 @@ MetricLabel is a single label for a prometheus metric

- [`k8s-patch.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/k8s-patch.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`pod-gc-strategy-with-label-selector.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/pod-gc-strategy-with-label-selector.yaml)

- [`pod-metadata-wf-field.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/pod-metadata-wf-field.yaml)
Expand Down Expand Up @@ -4035,8 +4015,6 @@ DAGTask represents a node in the graph during DAG execution

- [`map-reduce.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/map-reduce.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)

- [`parameter-aggregation-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parameter-aggregation-dag.yaml)
Expand Down Expand Up @@ -4580,8 +4558,6 @@ ObjectMeta is metadata that all persisted resources must have, which includes al

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down Expand Up @@ -5148,8 +5124,6 @@ A single application container that you want to run within a pod.

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down Expand Up @@ -5811,8 +5785,6 @@ PersistentVolumeClaimSpec describes the common attributes of storage devices and

- [`output-parameter.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-parameter.yaml)

- [`output-result-workflow.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/output-result-workflow.yaml)

- [`parallelism-limit.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-limit.yaml)

- [`parallelism-nested-dag.yaml`](https://github.com/argoproj/argo-workflows/blob/master/examples/parallelism-nested-dag.yaml)
Expand Down
43 changes: 0 additions & 43 deletions examples/output-result-workflow.yaml

This file was deleted.

6 changes: 4 additions & 2 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2225,12 +2225,14 @@ func (tmpl *Template) GetVolumeMounts() []apiv1.VolumeMount {
return nil
}

// whether or not the template can and will have outputs (i.e. exit code and result)
func (tmpl *Template) HasOutput() bool {
return tmpl.Container != nil || tmpl.ContainerSet.HasContainerNamed("main") || tmpl.Script != nil || tmpl.Data != nil
}

func (tmpl *Template) HasLogs() bool {
return tmpl.HasOutput() || tmpl.Resource != nil
// if logs should be saved as an artifact
func (tmpl *Template) SaveLogsAsArtifact() bool {
return tmpl != nil && tmpl.ArchiveLocation.IsArchiveLogs() && (tmpl.ContainerSet == nil || tmpl.ContainerSet.HasContainerNamed("main"))
}

// DAGTemplate is a template subtype for directed acyclic graph templates
Expand Down
26 changes: 21 additions & 5 deletions pkg/apis/workflow/v1alpha1/workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,23 +683,19 @@ func TestTemplate_HasOutputs(t *testing.T) {
t.Run("Default", func(t *testing.T) {
x := &Template{}
assert.False(t, x.HasOutput())
assert.False(t, x.HasLogs())
})
t.Run("Container", func(t *testing.T) {
x := &Template{Container: &corev1.Container{}}
assert.True(t, x.HasOutput())
assert.True(t, x.HasLogs())
})
t.Run("ContainerSet", func(t *testing.T) {
t.Run("NoMain", func(t *testing.T) {
x := &Template{ContainerSet: &ContainerSetTemplate{}}
assert.False(t, x.HasOutput())
assert.False(t, x.HasLogs())
})
t.Run("Main", func(t *testing.T) {
x := &Template{ContainerSet: &ContainerSetTemplate{Containers: []ContainerNode{{Container: corev1.Container{Name: "main"}}}}}
assert.True(t, x.HasOutput())
assert.True(t, x.HasLogs())
})
})
t.Run("Script", func(t *testing.T) {
Expand All @@ -713,6 +709,26 @@ func TestTemplate_HasOutputs(t *testing.T) {
t.Run("Resource", func(t *testing.T) {
x := &Template{Resource: &ResourceTemplate{}}
assert.False(t, x.HasOutput())
assert.True(t, x.HasLogs())
})
}

func TestTemplate_SaveLogsAsArtifact(t *testing.T) {
t.Run("Default", func(t *testing.T) {
x := &Template{}
assert.False(t, x.SaveLogsAsArtifact())
})
t.Run("IsArchiveLogs", func(t *testing.T) {
x := &Template{ArchiveLocation: &ArtifactLocation{ArchiveLogs: pointer.BoolPtr(true)}}
assert.True(t, x.SaveLogsAsArtifact())
})
t.Run("ContainerSet", func(t *testing.T) {
t.Run("NoMain", func(t *testing.T) {
x := &Template{ArchiveLocation: &ArtifactLocation{ArchiveLogs: pointer.BoolPtr(true)}, ContainerSet: &ContainerSetTemplate{}}
assert.False(t, x.SaveLogsAsArtifact())
})
t.Run("Main", func(t *testing.T) {
x := &Template{ArchiveLocation: &ArtifactLocation{ArchiveLogs: pointer.BoolPtr(true)}, ContainerSet: &ContainerSetTemplate{Containers: []ContainerNode{{Container: corev1.Container{Name: "main"}}}}}
assert.True(t, x.SaveLogsAsArtifact())
})
})
}
19 changes: 18 additions & 1 deletion test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *ArtifactsSuite) TestOutputOnMount() {
}

func (s *ArtifactsSuite) TestOutputOnInput() {
s.Need(fixtures.BaseLayerArtifacts) // I believe this would work on both K8S and Kubelet, not validation does not allow it
s.Need(fixtures.BaseLayerArtifacts) // I believe this would work on both K8S and Kubelet, but validation does not allow it
s.Given().
Workflow("@testdata/output-on-input-workflow.yaml").
When().
Expand Down Expand Up @@ -148,6 +148,23 @@ func (s *ArtifactsSuite) TestOutputArtifactS3BucketCreationEnabled() {
})
}

func (s *ArtifactsSuite) TestOutputResult() {
s.Given().
Workflow("@testdata/output-result-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
n := status.Nodes.FindByDisplayName("a")
if assert.NotNil(t, n) {
assert.NotNil(t, n.Outputs.ExitCode)
assert.NotNil(t, n.Outputs.Result)
}
})
}

func TestArtifactsSuite(t *testing.T) {
suite.Run(t, new(ArtifactsSuite))
}
31 changes: 31 additions & 0 deletions test/e2e/testdata/output-result-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
kind: Workflow
apiVersion: argoproj.io/v1alpha1
metadata:
generateName: output-result-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: a
template: produce
- name: b
template: consume
dependencies:
- a
arguments:
parameters:
- name: text
value: "{{tasks.a.outputs.result}}"

- name: produce
container:
image: argoproj/argosay:v2

- name: consume
inputs:
parameters:
- name: text
container:
image: argoproj/argosay:v2
8 changes: 6 additions & 2 deletions workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
woc.markNodePhase(nodeName, wfv1.NodeFailed, fmt.Sprintf("template has sequenced containers, so you must use the emissary executor rather than %q, learn more: https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary", woc.getContainerRuntimeExecutor()))
return woc.wf.GetNodeByName(nodeName), nil
}
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
if err != nil {
return node, err
}

_, err := woc.createWorkflowPod(ctx, nodeName, tmpl.ContainerSet.GetContainers(), tmpl, &createWorkflowPodOpts{
includeScriptOutput: tmpl.HasOutput(),
_, err = woc.createWorkflowPod(ctx, nodeName, tmpl.ContainerSet.GetContainers(), tmpl, &createWorkflowPodOpts{
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2524,7 +2524,7 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(tmpl *wfv1.Template, scop
resultsList = append(resultsList, item)
}
}
if tmpl.HasOutput() {
if tmpl.GetType() == wfv1.TemplateTypeScript {
resultsJSON, err := json.Marshal(resultsList)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error {

// SaveLogs saves logs
func (we *WorkflowExecutor) SaveLogs(ctx context.Context) (*wfv1.Artifact, error) {
if !we.Template.ArchiveLocation.IsArchiveLogs() || !we.Template.HasLogs() {
if !we.Template.SaveLogsAsArtifact() {
return nil, nil
}
log.Infof("Saving logs")
Expand Down

0 comments on commit ba949c3

Please sign in to comment.