Skip to content

Commit

Permalink
fix: ContainerSet termination during pending Pod #7635 (#7681)
Browse files Browse the repository at this point in the history
Signed-off-by: Denis <[email protected]>
  • Loading branch information
denis-codefresh authored Jan 31, 2022
1 parent 2ba6172 commit bf3b58b
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 19 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ ifneq ($(GIT_TAG),)
override LDFLAGS += -X github.com/argoproj/argo-workflows/v3.gitTag=${GIT_TAG}
endif

ifndef $(GOPATH)
GOPATH=$(shell go env GOPATH)
export GOPATH
endif

ARGOEXEC_PKGS := $(shell echo cmd/argoexec && go list -f '{{ join .Deps "\n" }}' ./cmd/argoexec/ | grep 'argoproj/argo-workflows/v3/' | cut -c 39-)
CLI_PKGS := $(shell echo cmd/argo && go list -f '{{ join .Deps "\n" }}' ./cmd/argo/ | grep 'argoproj/argo-workflows/v3/' | cut -c 39-)
CONTROLLER_PKGS := $(shell echo cmd/workflow-controller && go list -f '{{ join .Deps "\n" }}' ./cmd/workflow-controller/ | grep 'argoproj/argo-workflows/v3/' | cut -c 39-)
Expand Down Expand Up @@ -130,11 +135,6 @@ define protoc

endef

ifndef $(GOPATH)
GOPATH=$(shell go env GOPATH)
export GOPATH
endif

.PHONY: build
build: clis images

Expand Down
10 changes: 9 additions & 1 deletion workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,17 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
// handleExecutionControlError marks a node as failed with an error message
func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLock *sync.RWMutex, errorMsg string) {
wfNodesLock.Lock()
defer wfNodesLock.Unlock()

node := woc.wf.Status.Nodes[nodeID]
wfNodesLock.Unlock()
woc.markNodePhase(node.Name, wfv1.NodeFailed, errorMsg)

// if node is a pod created from ContainerSet template
// then need to fail child nodes so they will not hang in Pending after pod deletion
for _, nodeID := range node.Children {
child := woc.wf.Status.Nodes[nodeID]
woc.markNodePhase(child.Name, wfv1.NodeFailed, errorMsg)
}
}

// killDaemonedChildren kill any daemoned pods of a steps or DAG template node.
Expand Down
137 changes: 137 additions & 0 deletions workflow/controller/exec_control_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -30,3 +31,139 @@ func TestKillDaemonChildrenUnmarkPod(t *testing.T) {
woc.killDaemonedChildren("a")
assert.Nil(t, woc.wf.Status.Nodes["a"].Daemoned)
}

var workflowWithContainerSetPodInPending = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
workflows.argoproj.io/pod-name-format: v1
generateName: container-set-termination-demo
generation: 10
labels:
workflows.argoproj.io/phase: Running
workflows.argoproj.io/resubmitted-from-workflow: container-set-termination-demob7c6c
name: container-set-termination-demopw5vv
namespace: argo
resourceVersion: "88102"
uid: 2a5a4c10-3a5c-4fb4-8931-20ac78cabfee
spec:
entrypoint: main
shutdown: Terminate
templates:
- name: main
dag:
tasks:
- name: using-container-set-template
template: problematic-container-set
- name: problematic-container-set
containerSet:
containers:
- command:
- sh
- -c
- sleep 10
image: alpine
name: step-1
- command:
- sh
- -c
- sleep 10
image: alpine
name: step-2
status:
phase: Running
conditions:
- status: "True"
type: PodRunning
finishedAt: null
nodes:
container-set-termination-demopw5vv:
children:
- container-set-termination-demopw5vv-2652912851
displayName: container-set-termination-demopw5vv
finishedAt: null
id: container-set-termination-demopw5vv
name: container-set-termination-demopw5vv
phase: Running
progress: 2/2
startedAt: "2022-01-27T17:45:59Z"
templateName: main
templateScope: local/container-set-termination-demopw5vv
type: DAG
container-set-termination-demopw5vv-842041608:
boundaryID: container-set-termination-demopw5vv
children:
- container-set-termination-demopw5vv-893664226
- container-set-termination-demopw5vv-876886607
displayName: using-container-set-template
finishedAt: "2022-01-27T17:46:16Z"
hostNodeName: k3d-argo-workflow-server-0
id: container-set-termination-demopw5vv-842041608
name: container-set-termination-demopw5vv.using-container-set-template
phase: Pending
progress: 1/1
startedAt: "2022-01-27T17:46:14Z"
templateName: problematic-container-set
templateScope: local/container-set-termination-demopw5vv
type: Pod
container-set-termination-demopw5vv-876886607:
boundaryID: container-set-termination-demopw5vv-842041608
displayName: step-2
finishedAt: null
id: container-set-termination-demopw5vv-876886607
name: container-set-termination-demopw5vv.using-container-set-template.step-2
phase: Pending
startedAt: "2022-01-27T17:46:14Z"
templateName: problematic-container-set
templateScope: local/container-set-termination-demopw5vv
type: Container
container-set-termination-demopw5vv-893664226:
boundaryID: container-set-termination-demopw5vv-842041608
displayName: step-1
finishedAt: null
id: container-set-termination-demopw5vv-893664226
name: container-set-termination-demopw5vv.using-container-set-template.step-1
phase: Pending
startedAt: "2022-01-27T17:46:14Z"
templateName: problematic-container-set
templateScope: local/container-set-termination-demopw5vv
type: Container
`

func TestHandleExecutionControlErrorMarksProvidedNode(t *testing.T) {
cancel, controller := newController()
defer cancel()

workflow := v1alpha1.MustUnmarshalWorkflow(workflowWithContainerSetPodInPending)

woc := newWorkflowOperationCtx(workflow, controller)

containerSetNodeName := "container-set-termination-demopw5vv-842041608"

assert.Equal(t, v1alpha1.NodePending, woc.wf.Status.Nodes[containerSetNodeName].Phase)

woc.handleExecutionControlError(containerSetNodeName, &sync.RWMutex{}, "terminated")

assert.Equal(t, v1alpha1.NodeFailed, woc.wf.Status.Nodes[containerSetNodeName].Phase)
}

func TestHandleExecutionControlErrorMarksChildNodes(t *testing.T) {
cancel, controller := newController()
defer cancel()

workflow := v1alpha1.MustUnmarshalWorkflow(workflowWithContainerSetPodInPending)

woc := newWorkflowOperationCtx(workflow, controller)

containerSetNodeName := "container-set-termination-demopw5vv-842041608"
step1NodeName := "container-set-termination-demopw5vv-893664226"
step2NodeName := "container-set-termination-demopw5vv-876886607"

assert.Equal(t, v1alpha1.NodePending, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, v1alpha1.NodePending, woc.wf.Status.Nodes[step2NodeName].Phase)

woc.handleExecutionControlError(containerSetNodeName, &sync.RWMutex{}, "terminated")

assert.Equal(t, v1alpha1.NodeFailed, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, v1alpha1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
}
30 changes: 17 additions & 13 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,20 +1073,21 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool {
(woc.controller.Config.PodSpecLogStrategy.FailedPod && node.FailedOrError())
}

// fails any suspended and pending nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() {
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if woc.GetShutdownStrategy().Enabled() || deadlineExceeded {
for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() || (node.Phase == wfv1.NodePending && deadlineExceeded) {
var message string
if woc.GetShutdownStrategy().Enabled() {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
} else {
message = "Step exceeded its deadline"
}
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
}
for _, node := range woc.wf.Status.Nodes {
// fail suspended nodes when shuting down
if woc.GetShutdownStrategy().Enabled() && node.IsActiveSuspendNode() {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
}

// fail all pending and suspended nodes when exceeding deadline
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if deadlineExceeded && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) {
message := "Step exceeded its deadline"
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
}
}
}
Expand Down Expand Up @@ -1175,6 +1176,8 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
WithField("pod", pod.Name).Error(message)
}

// if it's ContainerSetTemplate pod then the inner container names should match to some node names,
// in this case need to update nodes according to container status
for _, c := range pod.Status.ContainerStatuses {
ctrNodeName := fmt.Sprintf("%s.%s", node.Name, c.Name)
if woc.wf.GetNodeByName(ctrNodeName) == nil {
Expand All @@ -1200,6 +1203,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
}
}
}

if !node.Completed() {
if newDaemonStatus != nil {
if !*newDaemonStatus {
Expand Down
Loading

0 comments on commit bf3b58b

Please sign in to comment.