diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index e58ee0e9011e..bf6ffa0b2245 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strings" "time" "github.com/argoproj/argo" @@ -43,10 +44,11 @@ type WorkflowController struct { wfclientset wfclientset.Interface // datastructures to support the processing of workflows and workflow pods - wfInformer cache.SharedIndexInformer - podInformer cache.SharedIndexInformer - wfQueue workqueue.RateLimitingInterface - podQueue workqueue.RateLimitingInterface + wfInformer cache.SharedIndexInformer + podInformer cache.SharedIndexInformer + wfQueue workqueue.RateLimitingInterface + podQueue workqueue.RateLimitingInterface + completedPods chan string } // WorkflowControllerConfig contain the configuration settings for the workflow controller @@ -105,6 +107,7 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int ConfigMap: configMap, wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + completedPods: make(chan string, 128), } return &wfc } @@ -126,6 +129,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in wfc.podInformer = wfc.newPodInformer() go wfc.wfInformer.Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) + go wfc.podLabeler(ctx.Done()) // Wait for all involved caches to be synced, before processing items from the queue is started for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.podInformer} { @@ -144,6 +148,30 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in <-ctx.Done() } +// podLabeler will label all pods on the controllers completedPod channel as completed +func (wfc *WorkflowController) podLabeler(stopCh <-chan struct{}) { + for { + select { + case <-stopCh: + return + case pod := <-wfc.completedPods: + parts := strings.Split(pod, "/") + if len(parts) != 2 { + log.Warnf("Unexpected item on completed pod channel: %s", pod) + continue + } + namespace := parts[0] + podName := parts[1] + err := common.AddPodLabel(wfc.kubeclientset, podName, namespace, common.LabelKeyCompleted, "true") + if err != nil { + log.Errorf("Failed to label pod %s completed: %+v", podName, err) + } else { + log.Infof("Labeled pod %s completed", podName) + } + } + } +} + func (wfc *WorkflowController) runWorker() { for wfc.processNextItem() { } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 12fb9bd27546..c4d19c19936b 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -9,7 +9,10 @@ import ( "github.com/argoproj/argo/errors" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" + "github.com/argoproj/argo/util/retry" "github.com/argoproj/argo/workflow/common" + jsonpatch "github.com/evanphx/json-patch" log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" @@ -212,28 +215,85 @@ func (woc *wfOperationCtx) persistUpdates() { if !woc.updated { return } - // NOTE: a previous implementation of persistUpdates attempted to use JSON Merge Patch - // instead of Update. But it was discovered in issue #686 that a Patch against the - // resource would sometimes not have any affect, resulting in inconsistent state. - // TODO(jessesuen): investigate JSON Patch instead of Merge patch. + // When persisting workflow updates, we first attempt to Update() the workflow with + // our modifications. However, in highly parallized workflows (seemingly relative to + // the number of pod ownership references), the resourceVersion of the workflow + // increments at a very rapid rate, resulting in the API conflict error: + // Error updating workflow: Operation cannot be fulfilled on workflows.argoproj.io + // \"pod-limits-2w8rl\": the object has been modified; please apply your changes to + // the latest version and try again. + // When the conflict error occurs, we reapply the changes on the current version of + // the resource. wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace) _, err := wfClient.Update(woc.wf) if err != nil { - woc.log.Errorf("Error updating workflow: %v. Requeueing...", err) - woc.requeue() - return + woc.log.Warnf("Error updating workflow: %v", err) + if !apierr.IsConflict(err) { + return + } + woc.log.Info("Re-appying updates on latest version and retrying update") + err = woc.reapplyUpdate(wfClient) + if err != nil { + woc.log.Infof("Failed to re-apply update: %+v", err) + return + } } woc.log.Info("Workflow update successful") - // It is important that we *never* label pods as completed until we successfully updated the workflow // Failing to do so means we can have inconsistent state. if len(woc.completedPods) > 0 { - woc.log.Infof("Labeling %d completed pods", len(woc.completedPods)) for podName := range woc.completedPods { - err = common.AddPodLabel(woc.controller.kubeclientset, podName, woc.wf.ObjectMeta.Namespace, common.LabelKeyCompleted, "true") - if err != nil { - woc.log.Errorf("Failed adding completed label to pod %s: %+v", podName, err) - } + woc.controller.completedPods <- fmt.Sprintf("%s/%s", woc.wf.ObjectMeta.Namespace, podName) + } + } +} + +// reapplyUpdate GETs the latest version of the workflow, re-applies the updates and +// retries the UPDATE multiple times. For reasoning behind this technique, see: +// https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency +func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface) error { + // First generate the patch + oldData, err := json.Marshal(woc.orig) + if err != nil { + return errors.InternalWrapError(err) + } + newData, err := json.Marshal(woc.wf) + if err != nil { + return errors.InternalWrapError(err) + } + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return errors.InternalWrapError(err) + } + // Next get latest version of the workflow, apply the patch and retyr the Update + attempt := 1 + for { + currWf, err := wfClient.Get(woc.wf.ObjectMeta.Name, metav1.GetOptions{}) + if !retry.IsRetryableKubeAPIError(err) { + return errors.InternalWrapError(err) + } + currWfBytes, err := json.Marshal(currWf) + if err != nil { + return errors.InternalWrapError(err) + } + newWfBytes, err := jsonpatch.MergePatch(currWfBytes, patchBytes) + if err != nil { + return errors.InternalWrapError(err) + } + var newWf wfv1.Workflow + err = json.Unmarshal(newWfBytes, &newWf) + if err != nil { + return errors.InternalWrapError(err) + } + _, err = wfClient.Update(&newWf) + if err == nil { + woc.log.Infof("Update retry attempt %d successful", attempt) + return nil + } + attempt++ + woc.log.Warnf("Update retry attempt %d failed: %v", attempt, err) + if attempt > 5 { + return err } } } @@ -889,7 +949,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS } func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template) error { - woc.log.Infof("Executing node %s with container template: %v\n", nodeName, tmpl) + woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl) pod, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl) if err != nil { woc.markNodeError(nodeName, err)