Skip to content

Commit

Permalink
Sidecars unable to reference volume claim templates (resolves argopro…
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Jan 26, 2018
1 parent 0b0b52c commit fdafbe2
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 78 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Changelog

## 2.0.0-beta1 (2018-01-18)
## 2.0.0-beta2 (Unreleased)
- Fix issue preventing the referencing of artifacts in a container with retries
- Fix issue preventing the use of volumes in a sidecar

## 2.0.0-beta1 (2018-01-18)
+ Use and install minimal RBAC ClusterRoles for workflow-controller and argo-ui deployments
+ Introduce `retryStrategy` field to control set retries for failed/errored containers
+ Introduce `raw` input artifacts
Expand Down
7 changes: 3 additions & 4 deletions cmd/argoexec/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package commands

import (
"github.com/argoproj/argo/util/stats"
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand All @@ -19,18 +18,18 @@ var initCmd = &cobra.Command{

func loadArtifacts(cmd *cobra.Command, args []string) {
wfExecutor := initExecutor()
defer wfExecutor.AnnotatePanic()
defer wfExecutor.HandleError()
defer stats.LogStats()

// Download input artifacts
err := wfExecutor.StageFiles()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error loading staging files: %+v", err)
}
err = wfExecutor.LoadArtifacts()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error downloading input artifacts: %+v", err)
}
}
9 changes: 5 additions & 4 deletions cmd/argoexec/commands/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,25 @@ func execResource(cmd *cobra.Command, args []string) {
}

wfExecutor := initExecutor()
defer wfExecutor.HandleError()
err := wfExecutor.StageFiles()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error staging resource: %+v", err)
}
resourceName, err := wfExecutor.ExecResource(args[0], common.ExecutorResourceManifestPath)
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error running %s resource: %+v", args[0], err)
}
err = wfExecutor.WaitResource(resourceName)
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error waiting for resource %s: %+v", resourceName, err)
}
err = wfExecutor.SaveResourceParameters(resourceName)
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error saving output parameters for resource %s: %+v", resourceName, err)
}
}
2 changes: 1 addition & 1 deletion cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
)

var (
// Global CLI flags
// GlobalArgs hold global CLI flags
GlobalArgs globalFlags
)

Expand Down
15 changes: 7 additions & 8 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/argoproj/argo/util/stats"
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand All @@ -21,36 +20,36 @@ var waitCmd = &cobra.Command{

func waitContainer(cmd *cobra.Command, args []string) {
wfExecutor := initExecutor()
defer wfExecutor.AnnotatePanic()
defer wfExecutor.HandleError()
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

// Wait for main container to complete and kill sidecars
err := wfExecutor.Wait()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
log.Errorf("Error waiting on main container to be ready, %+v", err)
wfExecutor.AddError(err)
log.Errorf("Error on wait, %+v", err)
}
err = wfExecutor.SaveArtifacts()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error saving output artifacts, %+v", err)
}
// Saving output parameters
err = wfExecutor.SaveParameters()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error saving output parameters, %+v", err)
}
// Capture output script result
err = wfExecutor.CaptureScriptResult()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error capturing script output, %+v", err)
}
err = wfExecutor.AnnotateOutputs()
if err != nil {
_ = wfExecutor.AddAnnotation(common.AnnotationKeyNodeMessage, err.Error())
wfExecutor.AddError(err)
log.Fatalf("Error annotating outputs, %+v", err)
}
}
47 changes: 47 additions & 0 deletions test/e2e/functional/sidecar-volumes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Verifies sidecars can reference/use volumeClaimTemplates
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sidecar-with-volumes-
spec:
entrypoint: sidecar-with-volumes
volumeClaimTemplates:
- metadata:
name: workdir
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
templates:
- name: sidecar-with-volumes
steps:
- - name: generate
template: generate
- - name: verify
template: verify

- name: generate
script:
image: python:3.6
command: [python]
source: |
import time
time.sleep(5)
sidecars:
- name: sidevol
image: alpine:latest
command: [sh, -c]
args: ["echo 'it works' > /mnt/vol/test-art"]
volumeMounts:
- name: workdir
mountPath: /mnt/vol

- name: verify
container:
image: alpine:latest
command: [sh, -c]
args: ['[[ "$(cat /mnt/vol/test-art)" == "it works" ]]']
volumeMounts:
- name: workdir
mountPath: /mnt/vol
11 changes: 9 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,15 @@ func (wfc *WorkflowController) processNextItem() bool {
log.Warnf("Key '%s' in index is not a workflow", key)
return true
}
wfc.operateWorkflow(wf)
// TODO: operateWorkflow should return error if it was unable to operate properly

if wf.ObjectMeta.Labels[common.LabelKeyCompleted] == "true" {
// can get here if we already added the completed=true label,
// but we are still draining the controller's workflow workqueue
return true
}
woc := newWorkflowOperationCtx(wf, wfc)
woc.operate()
// TODO: operate should return error if it was unable to operate properly
// so we can requeue the work for a later time
// See: https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go
//c.handleErr(err, key)
Expand Down
18 changes: 0 additions & 18 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"io"
"io/ioutil"
"net/http"
"testing"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
fakewfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)
Expand Down Expand Up @@ -62,19 +60,3 @@ func unmarshalWF(yamlStr string) *wfv1.Workflow {
}
return &wf
}

// TestOperateWorkflowPanicRecover ensures we can recover from unexpected panics
func TestOperateWorkflowPanicRecover(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fail()
}
}()
controller := newController()
// intentionally set clientset to nil to induce panic
controller.kubeclientset = nil
wf := unmarshalWF(helloWorldWf)
_, err := controller.wfclientset.ArgoprojV1alpha1().Workflows("").Create(wf)
assert.Nil(t, err)
controller.operateWorkflow(wf)
}
55 changes: 26 additions & 29 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type wfScope struct {

// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object.
func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
woc := wfOperationCtx{
wf: wf.DeepCopyObject().(*wfv1.Workflow),
orig: wf,
Expand All @@ -77,22 +80,12 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
return &woc
}

// operateWorkflow is the main operator logic of a workflow.
// operate is the main operator logic of a workflow.
// It evaluates the current state of the workflow, and its pods
// and decides how to proceed down the execution path.
// TODO: an error returned by this method should result in requeing the
// workflow to be retried at a later time
func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
if wf.ObjectMeta.Labels[common.LabelKeyCompleted] == "true" {
// can get here if we already added the completed=true label,
// but we are still draining the controller's workflow workqueue
return
}
var err error
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
woc := newWorkflowOperationCtx(wf, wfc)
func (woc *wfOperationCtx) operate() {
defer woc.persistUpdates()
defer func() {
if r := recover(); r != nil {
Expand All @@ -114,26 +107,26 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
return
}
} else {
err = woc.podReconciliation()
err := woc.podReconciliation()
if err != nil {
woc.log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
// TODO: we need to re-add to the workqueue, but should happen in caller
return
}
}
woc.globalParams[common.GlobalVarWorkflowName] = wf.ObjectMeta.Name
woc.globalParams[common.GlobalVarWorkflowUID] = string(wf.ObjectMeta.UID)
for _, param := range wf.Spec.Arguments.Parameters {
woc.globalParams[common.GlobalVarWorkflowName] = woc.wf.ObjectMeta.Name
woc.globalParams[common.GlobalVarWorkflowUID] = string(woc.wf.ObjectMeta.UID)
for _, param := range woc.wf.Spec.Arguments.Parameters {
woc.globalParams["workflow.parameters."+param.Name] = *param.Value
}

err = woc.createPVCs()
err := woc.createPVCs()
if err != nil {
woc.log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
return
}
err = woc.executeTemplate(wf.Spec.Entrypoint, wf.Spec.Arguments, wf.ObjectMeta.Name)
err = woc.executeTemplate(woc.wf.Spec.Entrypoint, woc.wf.Spec.Arguments, woc.wf.ObjectMeta.Name)
if err != nil {
if errors.IsCode(errors.CodeTimeout, err) {
// A timeout indicates we took too long operating on the workflow.
Expand All @@ -142,24 +135,24 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
woc.requeue()
return
}
woc.log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
}
node := woc.wf.Status.Nodes[woc.wf.NodeID(wf.ObjectMeta.Name)]
node := woc.wf.Status.Nodes[woc.wf.NodeID(woc.wf.ObjectMeta.Name)]
if !node.Completed() {
return
}

var onExitNode *wfv1.NodeStatus
if wf.Spec.OnExit != "" {
if woc.wf.Spec.OnExit != "" {
if node.Phase == wfv1.NodeSkipped {
// treat skipped the same as Succeeded for workflow.status
woc.globalParams[common.GlobalVarWorkflowStatus] = string(wfv1.NodeSucceeded)
} else {
woc.globalParams[common.GlobalVarWorkflowStatus] = string(node.Phase)
}
woc.log.Infof("Running OnExit handler: %s", wf.Spec.OnExit)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
err = woc.executeTemplate(wf.Spec.OnExit, wf.Spec.Arguments, onExitNodeName)
woc.log.Infof("Running OnExit handler: %s", woc.wf.Spec.OnExit)
onExitNodeName := woc.wf.ObjectMeta.Name + ".onExit"
err = woc.executeTemplate(woc.wf.Spec.OnExit, woc.wf.Spec.Arguments, onExitNodeName)
if err != nil {
if errors.IsCode(errors.CodeTimeout, err) {
woc.requeue()
Expand All @@ -176,7 +169,7 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {

err = woc.deletePVCs()
if err != nil {
woc.log.Errorf("%s error: %+v", wf.ObjectMeta.Name, err)
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
// Mark the workflow with an error message and return, but intentionally do not
// markCompletion so that we can retry PVC deletion (TODO: use workqueue.ReAdd())
// This error phase may be cleared if a subsequent delete attempt is successful.
Expand All @@ -203,7 +196,7 @@ func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
default:
// NOTE: we should never make it here because if the the node was 'Running'
// we should have returned earlier.
err = errors.InternalErrorf("Unexpected node phase %s: %+v", wf.ObjectMeta.Name, err)
err = errors.InternalErrorf("Unexpected node phase %s: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
}
}
Expand Down Expand Up @@ -602,7 +595,11 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
if ctr.State.Terminated.Message != "" {
failMessages[ctr.Name] = ctr.State.Terminated.Message
} else {
failMessages[ctr.Name] = fmt.Sprintf("failed with exit code %d", ctr.State.Terminated.ExitCode)
errMsg := fmt.Sprintf("failed with exit code %d", ctr.State.Terminated.ExitCode)
if ctr.Name != common.MainContainerName {
errMsg = fmt.Sprintf("sidecar '%s' %s", ctr.Name, errMsg)
}
failMessages[ctr.Name] = errMsg
}
}
}
Expand Down
Loading

0 comments on commit fdafbe2

Please sign in to comment.