Skip to content

Commit

Permalink
fix(controller): Workflow hangs indefinitely during ContainerCreating…
Browse files Browse the repository at this point in the history
… if the Pod or Node unexpectedly dies (#5585)
  • Loading branch information
sarabala1979 committed Apr 7, 2021
1 parent d0a0289 commit 0edd32b
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 1 deletion.
14 changes: 14 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ const (
VolumeClaimGCOnSuccess VolumeClaimGCStrategy = "OnWorkflowSuccess"
)

type NodeReason string

const (
WaitingForSyncLock NodeReason = "PendingForSyncLock"
)

// Workflow is the definition of a workflow resource
// +genclient
// +genclient:noStatus
Expand Down Expand Up @@ -1279,6 +1285,14 @@ func (n Nodes) Find(f func(NodeStatus) bool) *NodeStatus {
return nil
}

func (ns NodeStatus) GetReason() NodeReason {
// If node is waiting for synchronize lock, Pod will not be created in this scenario
if ns.SynchronizationStatus != nil && ns.SynchronizationStatus.Waiting != "" {
return WaitingForSyncLock
}
return ""
}

func NodeWithDisplayName(name string) func(n NodeStatus) bool {
return func(n NodeStatus) bool { return n.DisplayName == name }
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,19 @@ func TestHasChild(t *testing.T) {
assert.False(t, node.HasChild("c"))
assert.False(t, node.HasChild(""))
}

func TestNodeStatusGetReason(t *testing.T) {
nodeStatus := NodeStatus{Phase: NodePending}
nodeStatusWithLock := NodeStatus{Phase: NodePending, SynchronizationStatus: &NodeSynchronizationStatus{Waiting: "test"}}
t.Run("WaitingForLockReason", func(t *testing.T) {
assert.Equal(t, NodeReason(""), nodeStatus.GetReason())
assert.Equal(t, WaitingForSyncLock, nodeStatusWithLock.GetReason())
})
t.Run("EmptyReason", func(t *testing.T) {
nodeStatus.Phase = NodeRunning
nodeStatusWithLock.Phase = NodeRunning
nodeStatusWithLock.SynchronizationStatus = nil
assert.Equal(t, NodeReason(""), nodeStatus.GetReason())
assert.Equal(t, NodeReason(""), nodeStatusWithLock.GetReason())
})
}
4 changes: 3 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,9 +975,11 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {

// If the node is pending and the pod does not exist, it could be the case that we want to try to submit it
// again instead of marking it as an error. Check if that's the case.
if node.Pending() {
// Node will be in pending state without Pod create if Node is waiting for Synchronize lock
if node.Pending() && node.GetReason() == wfv1.WaitingForSyncLock {
continue
}

if recentlyStarted {
// If the pod was deleted, then we it is possible that the controller never get another informer message about it.
// In this case, the workflow will only be requeued after the resync period (20m). This means
Expand Down
120 changes: 120 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
hydratorfake "github.com/argoproj/argo-workflows/v3/workflow/hydrator/fake"
"github.com/argoproj/argo-workflows/v3/workflow/sync"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

Expand Down Expand Up @@ -6596,3 +6597,122 @@ func TestOnExitDAGStatusCompatibility(t *testing.T) {
nodeB := woc.wf.Status.Nodes.FindByDisplayName("B")
assert.Nil(t, nodeB)
}

var wfPending = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
creationTimestamp: "2021-04-05T21:50:18Z"
name: hello-world-4srt7
namespace: argo
spec:
activeDeadlineSeconds: 300
entrypoint: whalesay
podSpecPatch: |
terminationGracePeriodSeconds: 3
templates:
- container:
args:
- hello world
command:
- cowsay
image: docker/whalesay:latest
name: ""
name: whalesay
ttlStrategy:
secondsAfterCompletion: 600
status:
artifactRepositoryRef:
configMap: artifact-repositories
key: default-v1
namespace: argo
finishedAt: null
nodes:
hello-world-4srt7:
displayName: hello-world-4srt7
finishedAt: null
id: hello-world-4srt7
name: hello-world-4srt7
phase: Pending
progress: 0/1
startedAt: "2021-04-05T21:50:18Z"
templateName: whalesay
templateScope: local/hello-world-4srt7
type: Pod
phase: Running
progress: 0/1
startedAt: "2021-04-05T21:50:18Z"
`

func TestWfPendingWithNoPod(t *testing.T) {
wf := unmarshalWF(wfPending)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
}

var wfPendingWithSync = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world-mpdht
namespace: argo
spec:
entrypoint: whalesay
templates:
- container:
args:
- hello world
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
synchronization:
mutex:
name: welcome
ttlStrategy:
secondsAfterCompletion: 600
status:
nodes:
hello-world-mpdht:
displayName: hello-world-mpdht
finishedAt: null
id: hello-world-mpdht
message: 'Waiting for argo/Mutex/welcome lock. Lock status: 0/1 '
name: hello-world-mpdht
phase: Pending
progress: 0/1
startedAt: "2021-04-05T22:11:21Z"
synchronizationStatus:
waiting: argo/Mutex/welcome
templateName: whalesay
templateScope: local/hello-world-mpdht
type: Pod
phase: Running
progress: 0/1
startedAt: "2021-04-05T22:11:21Z"
synchronization:
mutex:
waiting:
- holder: argo/hello-world-tmph8/hello-world-tmph8
mutex: argo/Mutex/welcome
`

func TestMutexWfPendingWithNoPod(t *testing.T) {
wf := unmarshalWF(wfPendingWithSync)
cancel, controller := newController(wf)
defer cancel()
ctx := context.Background()
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
}, workflowExistenceFunc)
_, _, _, err := controller.syncManager.TryAcquire(wf, "test", &wfv1.Synchronization{Mutex: &wfv1.Mutex{Name: "welcome"}})
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.NodePending, woc.wf.Status.Nodes.FindByDisplayName("hello-world-mpdht").Phase)
}

0 comments on commit 0edd32b

Please sign in to comment.