Skip to content

Commit

Permalink
fix: Daemond status stuck with Running (#6742)
Browse files Browse the repository at this point in the history
* fix: Daemond status stuck with Running

Signed-off-by: Saravanan Balasubramanian <[email protected]>

* fixed test

Signed-off-by: Saravanan Balasubramanian <[email protected]>

* added test

Signed-off-by: Saravanan Balasubramanian <[email protected]>
  • Loading branch information
sarabala1979 committed Sep 16, 2021
1 parent 665c08d commit da5ce18
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 97 deletions.
56 changes: 0 additions & 56 deletions wft.yaml

This file was deleted.

82 changes: 41 additions & 41 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,56 +1146,56 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
}
}
}

if !node.Fulfilled() && newDaemonStatus != nil {
if !*newDaemonStatus {
// if the daemon status switched to false, we prefer to just unset daemoned status field
// (as opposed to setting it to false)
newDaemonStatus = nil
}
if (newDaemonStatus != nil && node.Daemoned == nil) || (newDaemonStatus == nil && node.Daemoned != nil) {
woc.log.Infof("Setting node %v daemoned: %v -> %v", node.ID, node.Daemoned, newDaemonStatus)
node.Daemoned = newDaemonStatus
updated = true
if pod.Status.PodIP != "" && pod.Status.PodIP != node.PodIP {
// only update Pod IP for daemoned nodes to reduce number of updates
woc.log.Infof("Updating daemon node %s IP %s -> %s", node.ID, node.PodIP, pod.Status.PodIP)
node.PodIP = pod.Status.PodIP
if !node.Completed() {
if newDaemonStatus != nil {
if !*newDaemonStatus {
// if the daemon status switched to false, we prefer to just unset daemoned status field
// (as opposed to setting it to false)
newDaemonStatus = nil
}
if (newDaemonStatus != nil && node.Daemoned == nil) || (newDaemonStatus == nil && node.Daemoned != nil) {
woc.log.Infof("Setting node %v daemoned: %v -> %v", node.ID, node.Daemoned, newDaemonStatus)
node.Daemoned = newDaemonStatus
updated = true
if pod.Status.PodIP != "" && pod.Status.PodIP != node.PodIP {
// only update Pod IP for daemoned nodes to reduce number of updates
woc.log.Infof("Updating daemon node %s IP %s -> %s", node.ID, node.PodIP, pod.Status.PodIP)
node.PodIP = pod.Status.PodIP
}
}
}
}

// we only need to update these values if the container transitions to complete
if !node.Phase.Fulfilled() && newPhase.Fulfilled() {
// outputs are mixed between the annotation (parameters, artifacts, and result) and the pod's status (exit code)
if exitCode := getExitCode(pod); exitCode != nil {
woc.log.Infof("Updating node %s exit code %d", node.ID, *exitCode)
node.Outputs = &wfv1.Outputs{ExitCode: pointer.StringPtr(fmt.Sprintf("%d", int(*exitCode)))}
if outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]; ok {
woc.log.Infof("Setting node %v outputs: %s", node.ID, outputStr)
if err := json.Unmarshal([]byte(outputStr), node.Outputs); err != nil { // I don't expect an error to ever happen in production
node.Phase = wfv1.NodeError
node.Message = err.Error()
// we only need to update these values if the container transitions to complete
if newPhase.Fulfilled() {
// outputs are mixed between the annotation (parameters, artifacts, and result) and the pod's status (exit code)
if exitCode := getExitCode(pod); exitCode != nil {
woc.log.Infof("Updating node %s exit code %d", node.ID, *exitCode)
node.Outputs = &wfv1.Outputs{ExitCode: pointer.StringPtr(fmt.Sprintf("%d", int(*exitCode)))}
if outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]; ok {
woc.log.Infof("Setting node %v outputs: %s", node.ID, outputStr)
if err := json.Unmarshal([]byte(outputStr), node.Outputs); err != nil { // I don't expect an error to ever happen in production
node.Phase = wfv1.NodeError
node.Message = err.Error()
}
}
}
}
}

if node.Phase != newPhase {
woc.log.Infof("Updating node %s status %s -> %s", node.ID, node.Phase, newPhase)
// if we are transitioning from Pending to a different state, clear out pending message
if node.Phase == wfv1.NodePending {
node.Message = ""
if node.Phase != newPhase {
woc.log.Infof("Updating node %s status %s -> %s", node.ID, node.Phase, newPhase)
// if we are transitioning from Pending to a different state, clear out pending message
if node.Phase == wfv1.NodePending {
node.Message = ""
}
updated = true
node.Phase = newPhase
}
if message != "" && node.Message != message {
woc.log.Infof("Updating node %s message: %s", node.ID, message)
updated = true
node.Message = message
}
updated = true
node.Phase = newPhase
}
if message != "" && node.Message != message {
woc.log.Infof("Updating node %s message: %s", node.ID, message)
updated = true
node.Message = message
}

if node.Fulfilled() && node.FinishedAt.IsZero() {
updated = true
node.FinishedAt = getLatestFinishedAt(pod)
Expand Down
27 changes: 27 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,33 @@ func TestAssessNodeStatus(t *testing.T) {
assert.Equal(t, tt.want, got.Phase)
})
}
t.Run("Daemon Step finished - Pod running", func(t *testing.T) {
cancel, controller := newController()
defer cancel()
pod := &apiv1.Pod{
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
},
}
node := &wfv1.NodeStatus{Daemoned: &daemoned, Phase: wfv1.NodeFailed}
woc := newWorkflowOperationCtx(wf, controller)
got := woc.assessNodeStatus(pod, node)
assert.Nil(t, got)
})

t.Run("Daemon Step finished - Pod running", func(t *testing.T) {
cancel, controller := newController()
defer cancel()
pod := &apiv1.Pod{
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
},
}
node := &wfv1.NodeStatus{Daemoned: &daemoned, Phase: wfv1.NodeSucceeded}
woc := newWorkflowOperationCtx(wf, controller)
got := woc.assessNodeStatus(pod, node)
assert.Nil(t, got)
})
}

var workflowStepRetry = `
Expand Down

0 comments on commit da5ce18

Please sign in to comment.