Skip to content

Commit

Permalink
Re-apply workflow changes and reattempt update on resource conflicts.…
Browse files Browse the repository at this point in the history
… Make completed pod labeling asynchronous
  • Loading branch information
jessesuen committed Jan 17, 2018
1 parent 81bd6d3 commit 672542d
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 18 deletions.
36 changes: 32 additions & 4 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/argoproj/argo"
Expand Down Expand Up @@ -43,10 +44,11 @@ type WorkflowController struct {
wfclientset wfclientset.Interface

// datastructures to support the processing of workflows and workflow pods
wfInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
wfQueue workqueue.RateLimitingInterface
podQueue workqueue.RateLimitingInterface
wfInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
wfQueue workqueue.RateLimitingInterface
podQueue workqueue.RateLimitingInterface
completedPods chan string
}

// WorkflowControllerConfig contain the configuration settings for the workflow controller
Expand Down Expand Up @@ -105,6 +107,7 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int
ConfigMap: configMap,
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
completedPods: make(chan string, 128),
}
return &wfc
}
Expand All @@ -126,6 +129,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
wfc.podInformer = wfc.newPodInformer()
go wfc.wfInformer.Run(ctx.Done())
go wfc.podInformer.Run(ctx.Done())
go wfc.podLabeler(ctx.Done())

// Wait for all involved caches to be synced, before processing items from the queue is started
for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.podInformer} {
Expand All @@ -144,6 +148,30 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
<-ctx.Done()
}

// podLabeler will label all pods on the controllers completedPod channel as completed
func (wfc *WorkflowController) podLabeler(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case pod := <-wfc.completedPods:
parts := strings.Split(pod, "/")
if len(parts) != 2 {
log.Warnf("Unexpected item on completed pod channel: %s", pod)
continue
}
namespace := parts[0]
podName := parts[1]
err := common.AddPodLabel(wfc.kubeclientset, podName, namespace, common.LabelKeyCompleted, "true")
if err != nil {
log.Errorf("Failed to label pod %s completed: %+v", podName, err)
} else {
log.Infof("Labeled pod %s completed", podName)
}
}
}
}

func (wfc *WorkflowController) runWorker() {
for wfc.processNextItem() {
}
Expand Down
88 changes: 74 additions & 14 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/util/retry"
"github.com/argoproj/argo/workflow/common"
jsonpatch "github.com/evanphx/json-patch"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -212,28 +215,85 @@ func (woc *wfOperationCtx) persistUpdates() {
if !woc.updated {
return
}
// NOTE: a previous implementation of persistUpdates attempted to use JSON Merge Patch
// instead of Update. But it was discovered in issue #686 that a Patch against the
// resource would sometimes not have any affect, resulting in inconsistent state.
// TODO(jessesuen): investigate JSON Patch instead of Merge patch.
// When persisting workflow updates, we first attempt to Update() the workflow with
// our modifications. However, in highly parallized workflows (seemingly relative to
// the number of pod ownership references), the resourceVersion of the workflow
// increments at a very rapid rate, resulting in the API conflict error:
// Error updating workflow: Operation cannot be fulfilled on workflows.argoproj.io
// \"pod-limits-2w8rl\": the object has been modified; please apply your changes to
// the latest version and try again.
// When the conflict error occurs, we reapply the changes on the current version of
// the resource.
wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace)
_, err := wfClient.Update(woc.wf)
if err != nil {
woc.log.Errorf("Error updating workflow: %v. Requeueing...", err)
woc.requeue()
return
woc.log.Warnf("Error updating workflow: %v", err)
if !apierr.IsConflict(err) {
return
}
woc.log.Info("Re-appying updates on latest version and retrying update")
err = woc.reapplyUpdate(wfClient)
if err != nil {
woc.log.Infof("Failed to re-apply update: %+v", err)
return
}
}
woc.log.Info("Workflow update successful")

// It is important that we *never* label pods as completed until we successfully updated the workflow
// Failing to do so means we can have inconsistent state.
if len(woc.completedPods) > 0 {
woc.log.Infof("Labeling %d completed pods", len(woc.completedPods))
for podName := range woc.completedPods {
err = common.AddPodLabel(woc.controller.kubeclientset, podName, woc.wf.ObjectMeta.Namespace, common.LabelKeyCompleted, "true")
if err != nil {
woc.log.Errorf("Failed adding completed label to pod %s: %+v", podName, err)
}
woc.controller.completedPods <- fmt.Sprintf("%s/%s", woc.wf.ObjectMeta.Namespace, podName)
}
}
}

// reapplyUpdate GETs the latest version of the workflow, re-applies the updates and
// retries the UPDATE multiple times. For reasoning behind this technique, see:
// https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency
func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface) error {
// First generate the patch
oldData, err := json.Marshal(woc.orig)
if err != nil {
return errors.InternalWrapError(err)
}
newData, err := json.Marshal(woc.wf)
if err != nil {
return errors.InternalWrapError(err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return errors.InternalWrapError(err)
}
// Next get latest version of the workflow, apply the patch and retyr the Update
attempt := 1
for {
currWf, err := wfClient.Get(woc.wf.ObjectMeta.Name, metav1.GetOptions{})
if !retry.IsRetryableKubeAPIError(err) {
return errors.InternalWrapError(err)
}
currWfBytes, err := json.Marshal(currWf)
if err != nil {
return errors.InternalWrapError(err)
}
newWfBytes, err := jsonpatch.MergePatch(currWfBytes, patchBytes)
if err != nil {
return errors.InternalWrapError(err)
}
var newWf wfv1.Workflow
err = json.Unmarshal(newWfBytes, &newWf)
if err != nil {
return errors.InternalWrapError(err)
}
_, err = wfClient.Update(&newWf)
if err == nil {
woc.log.Infof("Update retry attempt %d successful", attempt)
return nil
}
attempt++
woc.log.Warnf("Update retry attempt %d failed: %v", attempt, err)
if attempt > 5 {
return err
}
}
}
Expand Down Expand Up @@ -889,7 +949,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS
}

func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template) error {
woc.log.Infof("Executing node %s with container template: %v\n", nodeName, tmpl)
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
pod, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl)
if err != nil {
woc.markNodeError(nodeName, err)
Expand Down

0 comments on commit 672542d

Please sign in to comment.