Skip to content

Commit

Permalink
feat(controller): Retry transient offload errors. Resolves argoproj#4464
Browse files Browse the repository at this point in the history
 (argoproj#4482)

Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Nov 23, 2020
1 parent 2a3ab1a commit 1c62586
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
10 changes: 7 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1"
authutil "github.com/argoproj/argo/util/auth"
errorsutil "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/workflow/common"
controllercache "github.com/argoproj/argo/workflow/controller/cache"
"github.com/argoproj/argo/workflow/controller/estimation"
Expand Down Expand Up @@ -580,9 +581,12 @@ func (wfc *WorkflowController) processNextItem() bool {

err = wfc.hydrator.Hydrate(woc.wf)
if err != nil {
woc.log.Errorf("hydration failed: %v", err)
woc.markWorkflowError(err)
woc.persistUpdates()
transientErr := errorsutil.IsTransientErr(err)
woc.log.WithField("transientErr", transientErr).Errorf("hydration failed: %v", err)
if !transientErr {
woc.markWorkflowError(err)
woc.persistUpdates()
}
return true
}

Expand Down
25 changes: 22 additions & 3 deletions workflow/hydrator/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package hydrator

import (
"os"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -47,14 +47,33 @@ func (h hydrator) HydrateWithNodes(wf *wfv1.Workflow, offloadedNodes wfv1.Nodes)
wf.Status.OffloadNodeStatusVersion = ""
}

// should be <10s
// Retry Seconds
// 1 0.10
// 2 0.30
// 3 0.70
// 4 1.50
// 5 3.10
var readRetry = wait.Backoff{Steps: 5, Duration: 100 * time.Millisecond, Factor: 2}

// needs to be long
// http:https://backoffcalculator.com/?attempts=5&rate=2&interval=1
// Retry Seconds
// 1 1.00
// 2 3.00
// 3 7.00
// 4 15.00
// 5 31.00
var writeRetry = wait.Backoff{Steps: 5, Duration: 1 * time.Second, Factor: 2}

func (h hydrator) Hydrate(wf *wfv1.Workflow) error {
err := packer.DecompressWorkflow(wf)
if err != nil {
return err
}
if wf.Status.IsOffloadNodeStatus() {
var offloadedNodes wfv1.Nodes
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(readRetry, func() (bool, error) {
offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
return err == nil, err
})
Expand All @@ -80,7 +99,7 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error {
}
if packer.IsTooLargeError(err) || alwaysOffloadNodeStatus {
var offloadVersion string
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(writeRetry, func() (bool, error) {
offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
return err == nil, err
})
Expand Down

0 comments on commit 1c62586

Please sign in to comment.