Skip to content

Commit

Permalink
fix: Correct usage of wait.ExponentialBackoff (argoproj#4962)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Jan 28, 2021
1 parent faa3363 commit a00aa32
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 86 deletions.
3 changes: 2 additions & 1 deletion server/event/dispatch/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/v2/server/auth"
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/util/instanceid"
"github.com/argoproj/argo/v2/util/labels"
"github.com/argoproj/argo/v2/workflow/common"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (o *Operation) Dispatch(ctx context.Context) {
nameSuffix := fmt.Sprintf("%v", time.Now().Unix())
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
_, err := o.dispatch(ctx, event, nameSuffix)
return err == nil, err
return errorsutil.Done(err)
})
if err != nil {
log.WithError(err).WithFields(log.Fields{"namespace": event.Namespace, "event": event.Name}).Error("failed to dispatch from event")
Expand Down
12 changes: 12 additions & 0 deletions util/errors/done.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package errors

// this function is intended to be used with wait.ExponentialBackoff to retry transient errors
// * no error? we are done
// * transient error? we ignore and try again
// * non-transient error? we are done
func Done(err error) (bool, error) {
if IsTransientErr(err) {
return false, nil
}
return true, err
}
27 changes: 27 additions & 0 deletions util/errors/done_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package errors

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
apierr "k8s.io/apimachinery/pkg/api/errors"
)

func TestDone(t *testing.T) {
t.Run("NoError", func(t *testing.T) {
done, err := Done(nil)
assert.NoError(t, err)
assert.True(t, done)
})
t.Run("TransientError", func(t *testing.T) {
done, err := Done(apierr.NewTooManyRequests("", 0))
assert.NoError(t, err)
assert.False(t, done)
})
t.Run("NonTransientError", func(t *testing.T) {
done, err := Done(errors.New(""))
assert.Error(t, err)
assert.True(t, done)
})
}
2 changes: 1 addition & 1 deletion util/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ func TestIsTransientErr(t *testing.T) {

_ = os.Unsetenv(transientEnvVarKey)
})
}
}
14 changes: 3 additions & 11 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"k8s.io/apimachinery/pkg/fields"

log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -37,17 +36,10 @@ func GetSecrets(ctx context.Context, clientSet kubernetes.Interface, namespace,

secretsIf := clientSet.CoreV1().Secrets(namespace)
var secret *apiv1.Secret
var err error
_ = wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
secret, err = secretsIf.Get(ctx, name, metav1.GetOptions{})
if err != nil {
log.Warnf("Failed to get secret '%s': %v", name, err)
if !errorsutil.IsTransientErr(err) {
return false, err
}
return false, nil
}
return true, nil
return errorsutil.Done(err)
})
if err != nil {
return []byte{}, errors.InternalWrapError(err)
Expand Down
2 changes: 1 addition & 1 deletion workflow/artifactrepositories/artifactrepositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *artifactRepositories) get(ctx context.Context, ref *wfv1.ArtifactReposi
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
cm, err = s.kubernetesInterface.CoreV1().ConfigMaps(namespace).Get(ctx, configMap, metav1.GetOptions{})
return err == nil || !errorsutil.IsTransientErr(err), err
return errorsutil.Done(err)
})
if err != nil {
return nil, nil, err
Expand Down
5 changes: 3 additions & 2 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/argoproj/argo/v2/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/v2/util"
errorsutil "github.com/argoproj/argo/v2/util/errors"
)

// FindOverlappingVolume looks an artifact path, checks if it overlaps with any
Expand Down Expand Up @@ -477,8 +478,8 @@ func addPodMetadata(ctx context.Context, c kubernetes.Interface, field, podName,
return errors.InternalWrapError(err)
}
return wait.ExponentialBackoff(backoff, func() (bool, error) {
_, err = c.CoreV1().Pods(namespace).Patch(ctx, podName, types.MergePatchType, patch, metav1.PatchOptions{})
return err == nil, err
_, err := c.CoreV1().Pods(namespace).Patch(ctx, podName, types.MergePatchType, patch, metav1.PatchOptions{})
return errorsutil.Done(err)
})
}

Expand Down
9 changes: 3 additions & 6 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,12 +619,9 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {

err = wfc.hydrator.Hydrate(woc.wf)
if err != nil {
transientErr := errorsutil.IsTransientErr(err)
woc.log.WithField("transientErr", transientErr).Errorf("hydration failed: %v", err)
if !transientErr {
woc.markWorkflowError(ctx, err)
woc.persistUpdates(ctx)
}
woc.log.Errorf("hydration failed: %v", err)
woc.markWorkflowError(ctx, err)
woc.persistUpdates(ctx)
return true
}

Expand Down
11 changes: 4 additions & 7 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,7 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
return wfv1.NodeFailed, pod.Status.Message
}
annotatedMsg := pod.Annotations[common.AnnotationKeyNodeMessage]

// We only get one message to set for the overall node status.
// If multiple containers failed, in order of preference:
// init, main (annotated), main (exit code), wait, sidecars
Expand Down Expand Up @@ -3099,14 +3100,10 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error {
}
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := woc.controller.kubeclientset.PolicyV1beta1().PodDisruptionBudgets(woc.wf.Namespace).Delete(ctx, woc.wf.Name, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
woc.log.WithField("err", err).Warn("Failed to delete PDB.")
if !errorsutil.IsTransientErr(err) {
return false, err
}
return false, nil
if apierr.IsNotFound(err) {
return true, nil
}
return true, nil
return errorsutil.Done(err)
})
if err != nil {
woc.log.WithField("err", err).Error("Unable to delete PDB resource for workflow.")
Expand Down
7 changes: 2 additions & 5 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/v2/pkg/client/clientset/versioned"
typed "github.com/argoproj/argo/v2/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
argoerr "github.com/argoproj/argo/v2/util/errors"
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/workflow/common"
"github.com/argoproj/argo/v2/workflow/metrics"
"github.com/argoproj/argo/v2/workflow/templateresolution"
Expand Down Expand Up @@ -145,10 +145,7 @@ func (woc *cronWfOperationCtx) patch(ctx context.Context, patch map[string]inter
err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
cronWf, err := woc.cronWfIf.Patch(ctx, woc.cronWf.Name, types.MergePatchType, data, v1.PatchOptions{})
if err != nil {
if argoerr.IsTransientErr(err) {
return false, nil
}
return false, err
return errorsutil.Done(err)
}
woc.cronWf = cronWf
return true, nil
Expand Down
47 changes: 16 additions & 31 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/v2/util"
"github.com/argoproj/argo/v2/util/archive"
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/util/retry"
artifact "github.com/argoproj/argo/v2/workflow/artifacts"
"github.com/argoproj/argo/v2/workflow/common"
Expand Down Expand Up @@ -609,14 +610,10 @@ func (we *WorkflowExecutor) InitDriver(ctx context.Context, art *wfv1.Artifact)
func (we *WorkflowExecutor) getPod(ctx context.Context) (*apiv1.Pod, error) {
podsIf := we.ClientSet.CoreV1().Pods(we.Namespace)
var pod *apiv1.Pod
var err error
_ = wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
err := wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
var err error
pod, err = podsIf.Get(ctx, we.PodName, metav1.GetOptions{})
if err != nil {
log.Warnf("Failed to get pod '%s': %v", we.PodName, err)
return false, nil
}
return true, nil
return errorsutil.Done(err)
})
if err != nil {
return nil, errors.InternalWrapError(err)
Expand All @@ -633,14 +630,10 @@ func (we *WorkflowExecutor) GetConfigMapKey(ctx context.Context, name, key strin
}
configmapsIf := we.ClientSet.CoreV1().ConfigMaps(namespace)
var configmap *apiv1.ConfigMap
var err error
_ = wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
configmap, err = configmapsIf.Get(ctx, name, metav1.GetOptions{})
if err != nil {
log.Warnf("Failed to get configmap '%s': %v", name, err)
return false, nil
}
return true, nil
return errorsutil.Done(err)
})
if err != nil {
return "", errors.InternalWrapError(err)
Expand All @@ -665,14 +658,10 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key
}
secretsIf := we.ClientSet.CoreV1().Secrets(namespace)
var secret *apiv1.Secret
var err error
_ = wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
secret, err = secretsIf.Get(ctx, name, metav1.GetOptions{})
if err != nil {
log.Warnf("Failed to get secret '%s': %v", name, err)
return false, nil
}
return true, nil
return errorsutil.Done(err)
})
if err != nil {
return []byte{}, errors.InternalWrapError(err)
Expand Down Expand Up @@ -1005,11 +994,11 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error {
annotationUpdatesCh := we.monitorAnnotations(ctx)
go we.monitorDeadline(ctx, annotationUpdatesCh)

_ = wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
err = we.RuntimeExecutor.Wait(ctx, mainContainerID)
err = wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
err := we.RuntimeExecutor.Wait(ctx, mainContainerID)
if err != nil {
log.Warnf("Failed to wait for container id '%s': %v", mainContainerID, err)
return false, err
return false, nil
}
return true, nil
})
Expand All @@ -1029,16 +1018,12 @@ func (we *WorkflowExecutor) waitMainContainerStart(ctx context.Context) (string,
FieldSelector: fieldSelector.String(),
}

var err error
var watchIf watch.Interface

err = wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
err := wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
var err error
watchIf, err = podsIf.Watch(ctx, opts)
if err != nil {
log.Debugf("Failed to establish watch, retrying: %v", err)
return false, nil
}
return true, nil
return errorsutil.Done(err)
})
if err != nil {
return "", errors.InternalWrapErrorf(err, "Failed to establish pod watch: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (p *PNSExecutor) GetTerminatedContainerStatus(ctx context.Context, containe
err := wait.ExponentialBackoff(backoffOver30s, func() (bool, error) {
podRes, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, p.podName, metav1.GetOptions{})
if err != nil {
return !errorsutil.IsTransientErr(err), fmt.Errorf("could not get pod: %w", err)
return errorsutil.Done(err)
}
for _, containerStatusRes := range podRes.Status.ContainerStatuses {
if execcommon.GetContainerID(&containerStatusRes) != containerID {
Expand All @@ -395,7 +395,7 @@ func (p *PNSExecutor) GetTerminatedContainerStatus(ctx context.Context, containe
containerStatus = &containerStatusRes
return containerStatus.State.Terminated != nil, nil
}
return false, errors.New(errors.CodeNotFound, fmt.Sprintf("containerID %q is not found in the pod %s", containerID, p.podName))
return false, nil
})
return pod, containerStatus, err
}
Expand Down
5 changes: 3 additions & 2 deletions workflow/hydrator/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/argoproj/argo/v2/persist/sqldb"
wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/workflow/packer"
)

Expand Down Expand Up @@ -75,7 +76,7 @@ func (h hydrator) Hydrate(wf *wfv1.Workflow) error {
var offloadedNodes wfv1.Nodes
err := wait.ExponentialBackoff(readRetry, func() (bool, error) {
offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
return err == nil, err
return errorsutil.Done(err)
})
if err != nil {
return err
Expand All @@ -101,7 +102,7 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error {
var offloadVersion string
err := wait.ExponentialBackoff(writeRetry, func() (bool, error) {
offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
return err == nil, err
return errorsutil.Done(err)
})
if err != nil {
return err
Expand Down
Loading

0 comments on commit a00aa32

Please sign in to comment.