Skip to content

Commit

Permalink
chore: Introduce WorkflowPhase (argoproj#4856)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Jan 21, 2021
1 parent f872366 commit 957ef67
Show file tree
Hide file tree
Showing 34 changed files with 746 additions and 694 deletions.
2 changes: 1 addition & 1 deletion cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func printWorkflowHelper(wf *wfv1.Workflow, getArgs getFlags) string {
if !wf.Status.StartedAt.IsZero() {
out += fmt.Sprintf(fmtStr, "Duration:", humanize.RelativeDuration(wf.Status.StartedAt.Time, wf.Status.FinishedAt.Time))
}
if wf.Status.Phase == wfv1.NodeRunning {
if wf.Status.Phase == wfv1.WorkflowRunning {
if wf.Status.EstimatedDuration > 0 {
out += fmt.Sprintf(fmtStr, "EstimatedDuration:", humanize.Duration(wf.Status.EstimatedDuration.ToDuration()))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/argo/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func waitOnOne(serviceClient workflowpkg.WorkflowServiceClient, ctx context.Cont
if !quiet {
fmt.Printf("%s %s at %v\n", wfName, wf.Status.Phase, wf.Status.FinishedAt)
}
if wf.Status.Phase == wfv1.NodeFailed || wf.Status.Phase == wfv1.NodeError {
if wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError {
return false
}
return true
Expand Down
16 changes: 8 additions & 8 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ const archiveTableName = "argo_archived_workflows"
const archiveLabelsTableName = archiveTableName + "_labels"

type archivedWorkflowMetadata struct {
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
Name string `db:"name"`
Namespace string `db:"namespace"`
Phase wfv1.NodePhase `db:"phase"`
StartedAt time.Time `db:"startedat"`
FinishedAt time.Time `db:"finishedat"`
ClusterName string `db:"clustername"`
InstanceID string `db:"instanceid"`
UID string `db:"uid"`
Name string `db:"name"`
Namespace string `db:"namespace"`
Phase wfv1.WorkflowPhase `db:"phase"`
StartedAt time.Time `db:"startedat"`
FinishedAt time.Time `db:"finishedat"`
}

type archivedWorkflowRecord struct {
Expand Down
949 changes: 475 additions & 474 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_phase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package v1alpha1

// the workflow's phase
type WorkflowPhase string

const (
WorkflowUnknown WorkflowPhase = ""
WorkflowPending WorkflowPhase = "Pending" // pending some set-up - rarely used
WorkflowRunning WorkflowPhase = "Running" // any node has started; pods might not be running yet, the workflow maybe suspended too
WorkflowSucceeded WorkflowPhase = "Succeeded"
WorkflowFailed WorkflowPhase = "Failed" // it maybe that the the workflow was terminated
WorkflowError WorkflowPhase = "Error"
)

func (p WorkflowPhase) Completed() bool {
switch p {
case WorkflowSucceeded, WorkflowFailed, WorkflowError:
return true
default:
return false
}
}
16 changes: 16 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_phase_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWorkflowPhase_Completed(t *testing.T) {
assert.False(t, WorkflowUnknown.Completed())
assert.False(t, WorkflowPending.Completed())
assert.False(t, WorkflowRunning.Completed())
assert.True(t, WorkflowSucceeded.Completed())
assert.True(t, WorkflowFailed.Completed())
assert.True(t, WorkflowError.Completed())
}
14 changes: 7 additions & 7 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
"strings"
"time"

"github.com/argoproj/argo/util/slice"

apiv1 "k8s.io/api/core/v1"
policyv1beta "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/argoproj/argo/util/slice"
)

// TemplateType is the type of a template
Expand Down Expand Up @@ -1291,7 +1291,7 @@ type UserContainer struct {
// WorkflowStatus contains overall status information about a workflow
type WorkflowStatus struct {
// Phase a simple, high-level summary of where the workflow is in its lifecycle.
Phase NodePhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=NodePhase"`
Phase WorkflowPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=WorkflowPhase"`

// Time at which this workflow started
StartedAt metav1.Time `json:"startedAt,omitempty" protobuf:"bytes,2,opt,name=startedAt"`
Expand Down Expand Up @@ -1646,19 +1646,19 @@ func (phase NodePhase) FailedOrError() bool {
return phase == NodeFailed || phase == NodeError
}

// Fulfilled returns whether or not the workflow has fulfilled its execution, i.e. it completed execution or was skipped
// Fulfilled returns whether or not the workflow has fulfilled its execution
func (ws WorkflowStatus) Fulfilled() bool {
return ws.Phase.Fulfilled()
return ws.Phase.Completed()
}

// Successful return whether or not the workflow has succeeded
func (ws WorkflowStatus) Successful() bool {
return ws.Phase == NodeSucceeded
return ws.Phase == WorkflowSucceeded
}

// Failed return whether or not the workflow has failed
func (ws WorkflowStatus) Failed() bool {
return ws.Phase == NodeFailed
return ws.Phase == WorkflowFailed
}

func (ws WorkflowStatus) StartTime() *metav1.Time {
Expand Down
6 changes: 3 additions & 3 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error {
func TestWatchWorkflows(t *testing.T) {
server, ctx := getWorkflowServer()
wf := &v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeSucceeded},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.WorkflowSucceeded},
}
assert.NoError(t, json.Unmarshal([]byte(wf1), &wf))
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -645,7 +645,7 @@ func TestWatchWorkflows(t *testing.T) {
func TestWatchLatestWorkflow(t *testing.T) {
server, ctx := getWorkflowServer()
wf := &v1alpha1.Workflow{
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeSucceeded},
Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.WorkflowSucceeded},
}
assert.NoError(t, json.Unmarshal([]byte(wf1), &wf))
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -818,7 +818,7 @@ func TestStopWorkflow(t *testing.T) {
wf, err = server.StopWorkflow(ctx, &rsmWfReq)
if assert.NoError(t, err) {
assert.NotNil(t, wf)
assert.Equal(t, v1alpha1.NodeRunning, wf.Status.Phase)
assert.Equal(t, v1alpha1.WorkflowRunning, wf.Status.Phase)
}
}

Expand Down
18 changes: 9 additions & 9 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (s *CLISuite) TestRoot() {
WaitForWorkflow(createdWorkflowName).
Then().
ExpectWorkflowName(createdWorkflowName, func(t *testing.T, metadata *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
})
}
Expand All @@ -449,7 +449,7 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
}

Expand All @@ -476,11 +476,11 @@ func (s *CLISuite) TestNodeSuspendResume() {
}
}).
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == wfv1.NodeFailed
return wf.Status.Phase == wfv1.WorkflowFailed
}), "suspended node").
Then().
ExpectWorkflow(func(t *testing.T, _ *corev1.ObjectMeta, status *wfv1.WorkflowStatus) {
if assert.Equal(t, wfv1.NodeFailed, status.Phase) {
if assert.Equal(t, wfv1.WorkflowFailed, status.Phase) {
r := regexp.MustCompile(`child '(node-suspend-[0-9]+)' failed`)
res := r.FindStringSubmatch(status.Message)
assert.Equal(t, len(res), 2)
Expand Down Expand Up @@ -762,7 +762,7 @@ func (s *CLISuite) TestWorkflowRetry() {
}).
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
retryTime = wf.Status.FinishedAt
return wf.Status.Phase == wfv1.NodeFailed
return wf.Status.Phase == wfv1.WorkflowFailed
}), "is terminated", 20*time.Second).
Wait(3*time.Second).
RunCli([]string{"retry", "retry-test", "--restart-successful", "--node-field-selector", "templateName==steps-inner"}, func(t *testing.T, output string, err error) {
Expand Down Expand Up @@ -1184,13 +1184,13 @@ func (s *CLISuite) TestWorkflowLevelSemaphore() {
}).
SubmitWorkflow().
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == ""
return wf.Status.Phase == wfv1.WorkflowUnknown
}), "Workflow is waiting for lock").
WaitForWorkflow().
DeleteConfigMap("my-config").
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
})
}

Expand All @@ -1206,7 +1206,7 @@ func (s *CLISuite) TestTemplateLevelSemaphore() {
CreateConfigMap("my-config", semaphoreData).
SubmitWorkflow().
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) bool {
return wf.Status.Phase == wfv1.NodeRunning
return wf.Status.Phase == wfv1.WorkflowRunning
}), "waiting for Workflow to run", 10*time.Second).
RunCli([]string{"get", "semaphore-tmpl-level"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Waiting for")
Expand Down Expand Up @@ -1446,7 +1446,7 @@ spec:
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("release")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, "Hello, World!", nodeStatus.Inputs.Parameters[0].Value.String())
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/cluster_workflow_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *ClusterWorkflowTemplateSuite) TestSubmitClusterWorkflowTemplate() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.NodeSucceeded)
assert.Equal(t, status.Phase, v1alpha1.WorkflowSucceeded)
})
}

Expand Down Expand Up @@ -68,7 +68,7 @@ spec:
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
assert.Equal(t, v1alpha1.WorkflowSucceeded, status.Phase)
})

}
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/estimated_duration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *EstimatedDurationSuite) TestWorkflowTemplate() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotEmpty(t, status.EstimatedDuration)
assert.NotEmpty(t, status.Nodes[metadata.Name].EstimatedDuration)
})
Expand All @@ -45,7 +45,7 @@ func (s *EstimatedDurationSuite) TestClusterWorkflowTemplate() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotEmpty(t, status.EstimatedDuration)
assert.NotEmpty(t, status.Nodes[metadata.Name].EstimatedDuration)
})
Expand All @@ -62,7 +62,7 @@ func (s *EstimatedDurationSuite) TestCronWorkflow() {
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.NotEmpty(t, status.EstimatedDuration)
assert.NotEmpty(t, status.Nodes[metadata.Name].EstimatedDuration)
})
Expand Down
Loading

0 comments on commit 957ef67

Please sign in to comment.