From 7d871b8baabe93dcabe5510ea2dfc0207d23494b Mon Sep 17 00:00:00 2001 From: Simon Behar Date: Thu, 5 Nov 2020 11:30:41 -0600 Subject: [PATCH 1/2] fix: Ensure ContainerStatus in PNS is terminated before continuing Signed-off-by: Simon Behar --- workflow/executor/pns/pns.go | 42 +++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index 07c235cd2bf2..c971704b472c 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -16,14 +16,15 @@ import ( log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "github.com/argoproj/argo/errors" "github.com/argoproj/argo/util/archive" "github.com/argoproj/argo/workflow/common" execcommon "github.com/argoproj/argo/workflow/executor/common" - "github.com/argoproj/argo/workflow/executor/common/wait" - os_specific "github.com/argoproj/argo/workflow/executor/os-specific" + argowait "github.com/argoproj/argo/workflow/executor/common/wait" + osspecific "github.com/argoproj/argo/workflow/executor/os-specific" ) type PNSExecutor struct { @@ -97,7 +98,7 @@ func (p *PNSExecutor) enterChroot() error { if err := p.mainFS.Chdir(); err != nil { return errors.InternalWrapErrorf(err, "failed to chdir to main filesystem: %v", err) } - err := os_specific.CallChroot() + err := osspecific.CallChroot() if err != nil { return errors.InternalWrapErrorf(err, "failed to chroot to main filesystem: %v", err) } @@ -109,7 +110,7 @@ func (p *PNSExecutor) exitChroot() error { if err := p.rootFS.Chdir(); err != nil { return errors.InternalWrapError(err) } - err := os_specific.CallChroot() + err := osspecific.CallChroot() if err != nil { return errors.InternalWrapError(err) } @@ -167,7 +168,7 @@ func (p *PNSExecutor) Wait(containerID string) error { log.Warnf("Ignoring wait failure: %v. Process assumed to have completed", err) return nil } - return wait.UntilTerminated(p.clientset, p.namespace, p.podName, containerID) + return argowait.UntilTerminated(p.clientset, p.namespace, p.podName, containerID) } log.Infof("Main pid identified as %d", mainPID) for pid, f := range p.pidFileHandles { @@ -228,7 +229,7 @@ func (p *PNSExecutor) GetOutputStream(containerID string, combinedOutput bool) ( func (p *PNSExecutor) GetExitCode(containerID string) (string, error) { log.Infof("Getting exit code of %s", containerID) - _, containerStatus, err := p.GetContainerStatus(containerID) + _, containerStatus, err := p.GetTerminatedContainerStatus(containerID) if err != nil { return "", fmt.Errorf("could not get container status: %s", err) } @@ -368,18 +369,25 @@ func (p *PNSExecutor) updateCtrIDMap() { } } -func (p *PNSExecutor) GetContainerStatus(containerID string) (*corev1.Pod, *corev1.ContainerStatus, error) { - pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(p.podName, metav1.GetOptions{}) - if err != nil { - return nil, nil, fmt.Errorf("could not get pod: %s", err) - } - for _, containerStatus := range pod.Status.ContainerStatuses { - if execcommon.GetContainerID(&containerStatus) != containerID { - continue +func (p *PNSExecutor) GetTerminatedContainerStatus(containerID string) (*corev1.Pod, *corev1.ContainerStatus, error) { + var pod *corev1.Pod + var containerStatus *corev1.ContainerStatus + err := wait.Poll(500*time.Millisecond, 2*time.Second, func() (bool, error) { + podRes, err := p.clientset.CoreV1().Pods(p.namespace).Get(p.podName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("could not get pod: %s", err) } - return pod, &containerStatus, nil - } - return nil, nil, errors.New(errors.CodeNotFound, fmt.Sprintf("containerID %q is not found in the pod %s", containerID, p.podName)) + for _, containerStatusRes := range podRes.Status.ContainerStatuses { + if execcommon.GetContainerID(&containerStatusRes) != containerID { + continue + } + pod = podRes + containerStatus = &containerStatusRes + return containerStatus.State.Terminated != nil, nil + } + return false, errors.New(errors.CodeNotFound, fmt.Sprintf("containerID %q is not found in the pod %s", containerID, p.podName)) + }) + return pod, containerStatus, err } // parseContainerID parses the containerID of a pid From b9fb9417fd1b80d8428456c5226b24ec80bba0f5 Mon Sep 17 00:00:00 2001 From: Simon Behar Date: Fri, 6 Nov 2020 07:37:38 -0600 Subject: [PATCH 2/2] test Signed-off-by: Simon Behar --- test/e2e/functional_test.go | 32 ++++++++++++++++++++++++++++++++ workflow/executor/pns/pns.go | 2 +- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 0569ba97a5f5..b8e109836fad 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -912,6 +912,38 @@ spec: DeleteMemoryQuota() } +func (s *FunctionalSuite) TestExitCodePNSSleep() { + s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: cond + labels: + argo-e2e: true +spec: + entrypoint: conditional-example + templates: + - name: conditional-example + steps: + - - name: print-hello + template: whalesay + - name: whalesay + container: + image: argoproj/argosay:v2 + args: [sleep, 5s] +`). + When(). + SubmitWorkflow(). + Wait(10 * time.Second). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + node := status.Nodes.FindByDisplayName("print-hello") + if assert.NotNil(t, node) && assert.NotNil(t, node.Outputs) && assert.NotNil(t, node.Outputs.ExitCode) { + assert.Equal(t, "0", *node.Outputs.ExitCode) + } + }) +} + func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) } diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index c971704b472c..7feeca49347d 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -372,7 +372,7 @@ func (p *PNSExecutor) updateCtrIDMap() { func (p *PNSExecutor) GetTerminatedContainerStatus(containerID string) (*corev1.Pod, *corev1.ContainerStatus, error) { var pod *corev1.Pod var containerStatus *corev1.ContainerStatus - err := wait.Poll(500*time.Millisecond, 2*time.Second, func() (bool, error) { + err := wait.Poll(1*time.Second, 3*time.Second, func() (bool, error) { podRes, err := p.clientset.CoreV1().Pods(p.namespace).Get(p.podName, metav1.GetOptions{}) if err != nil { return false, fmt.Errorf("could not get pod: %s", err)