Skip to content

Commit

Permalink
Introduce argo watch command to watch live workflows from terminal (r…
Browse files Browse the repository at this point in the history
…esolves argoproj#969)
  • Loading branch information
jessesuen committed Aug 25, 2018
1 parent 5739436 commit 5670bf5
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 116 deletions.
18 changes: 0 additions & 18 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ required = [
name = "github.com/stretchr/testify"
version = "1.1.4"

[[constraint]]
name = "github.com/hashicorp/go-version"
branch = "master"

[[constraint]]
name = "github.com/jpillora/backoff"
branch = "master"

[[constraint]]
name = "github.com/spf13/cobra"
branch = "master"
Expand Down
1 change: 1 addition & 0 deletions cmd/argo/commands/resubmit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func NewResubmitCommand() *cobra.Command {
}
command.Flags().StringVarP(&submitArgs.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&submitArgs.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&submitArgs.watch, "watch", false, "watch the workflow until it completes")
command.Flags().BoolVar(&memoized, "memoized", false, "re-use successful steps & outputs from the previous run (experimental)")
return command
}
6 changes: 4 additions & 2 deletions cmd/argo/commands/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ func NewRetryCommand() *cobra.Command {
}
printWorkflow(wf, submitArgs.output)
if submitArgs.wait {
wsp := NewWorkflowStatusPoller(wfClient, false, submitArgs.output == "json")
wsp.WaitWorkflows([]string{wf.ObjectMeta.Name})
WaitWorkflows([]string{wf.ObjectMeta.Name}, false, submitArgs.output == "json")
} else if submitArgs.watch {
watchWorkflow(wf.ObjectMeta.Name)
}
},
}
command.Flags().StringVarP(&submitArgs.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&submitArgs.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&submitArgs.watch, "watch", false, "watch the workflow until it completes")
return command
}
1 change: 1 addition & 0 deletions cmd/argo/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewCommand() *cobra.Command {
command.AddCommand(NewSubmitCommand())
command.AddCommand(NewSuspendCommand())
command.AddCommand(NewWaitCommand())
command.AddCommand(NewWatchCommand())
command.AddCommand(cmd.NewVersionCmd(CLIName))

addKubectlFlagsToCmd(command)
Expand Down
16 changes: 14 additions & 2 deletions cmd/argo/commands/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type submitFlags struct {
parameterFile string // --parameter-file
output string // --output
wait bool // --wait
watch bool // --watch
strict bool // --strict
serviceAccount string // --serviceaccount
}
Expand All @@ -54,6 +55,7 @@ func NewSubmitCommand() *cobra.Command {
command.Flags().StringVarP(&submitArgs.parameterFile, "parameter-file", "f", "", "pass a file containing all input parameters")
command.Flags().StringVarP(&submitArgs.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVarP(&submitArgs.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&submitArgs.watch, "watch", false, "watch the workflow until it completes")
command.Flags().BoolVar(&submitArgs.strict, "strict", true, "perform strict workflow validation")
command.Flags().StringVar(&submitArgs.serviceAccount, "serviceaccount", "", "run all pods in the workflow using specified serviceaccount")
command.Flags().StringVar(&submitArgs.instanceID, "instanceid", "", "submit with a specific controller's instance id label")
Expand Down Expand Up @@ -95,6 +97,15 @@ func SubmitWorkflows(filePaths []string, submitArgs *submitFlags) {
}
}

if submitArgs.watch {
if len(workflows) > 1 {
log.Fatalf("Cannot watch more than one workflow")
}
if submitArgs.wait {
log.Fatalf("--wait cannot be combined with --watch")
}
}

var workflowNames []string
for _, wf := range workflows {
wfName, err := submitWorkflow(&wf, submitArgs)
Expand All @@ -104,8 +115,9 @@ func SubmitWorkflows(filePaths []string, submitArgs *submitFlags) {
workflowNames = append(workflowNames, wfName)
}
if submitArgs.wait {
wsp := NewWorkflowStatusPoller(wfClient, false, submitArgs.output == "json")
wsp.WaitWorkflows(workflowNames)
WaitWorkflows(workflowNames, false, submitArgs.output == "json")
} else if submitArgs.watch {
watchWorkflow(workflowNames[0])
}
}

Expand Down
117 changes: 33 additions & 84 deletions cmd/argo/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"fmt"
"os"
"sync"
"time"

"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
goversion "github.com/hashicorp/go-version"
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
)

func NewWaitCommand() *cobra.Command {
Expand All @@ -27,103 +25,54 @@ func NewWaitCommand() *cobra.Command {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}

wfc := InitWorkflowClient()
wsp := NewWorkflowStatusPoller(wfc, ignoreNotFound, false)
wsp.WaitWorkflows(args)
InitWorkflowClient()
WaitWorkflows(args, ignoreNotFound, false)
},
}
command.Flags().BoolVar(&ignoreNotFound, "ignore-not-found", false, "Ignore the wait if the workflow is not found")
return command
}

// VersionChecker checks the Kubernetes version and currently logs a message if wait should
// be implemented using watch instead of polling.
type VersionChecker struct{}

func (vc *VersionChecker) run() {
// Watch APIs on CRDs using fieldSelectors are only supported in Kubernetes v1.9.0 onwards.
// https://github.com/kubernetes/kubernetes/issues/51046.
versionInfo, err := clientset.ServerVersion()
if err != nil {
log.Fatalf("Failed to get Kubernetes version: %v", err)
// WaitWorkflows waits for the given workflowNames.
func WaitWorkflows(workflowNames []string, ignoreNotFound, quiet bool) {
var wg sync.WaitGroup
for _, workflowName := range workflowNames {
wg.Add(1)
go func(name string) {
waitOnOne(name, ignoreNotFound, quiet)
wg.Done()
}(workflowName)
}
wg.Wait()
}

serverVersion, err := goversion.NewVersion(versionInfo.String())
if err != nil {
log.Fatalf("Failed to create version: %v", err)
func waitOnOne(workflowName string, ignoreNotFound, quiet bool) {
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", workflowName))
opts := metav1.ListOptions{
FieldSelector: fieldSelector.String(),
}

minVersion, err := goversion.NewVersion("1.9")
_, err := wfClient.Get(workflowName, metav1.GetOptions{})
if err != nil {
log.Fatalf("Failed to create minimum version: %v", err)
}

if serverVersion.Equal(minVersion) || serverVersion.GreaterThan(minVersion) {
fmt.Printf("This should be changed to use a \"watch\" based approach.\n")
if apierr.IsNotFound(err) && ignoreNotFound {
return
}
errors.CheckError(err)
}
}

// WorkflowStatusPoller exports methods to wait on workflows by periodically
// querying their status.
type WorkflowStatusPoller struct {
wfc v1alpha1.WorkflowInterface
ignoreNotFound bool
noOutput bool
}

// NewWorkflowStatusPoller creates a new WorkflowStatusPoller object.
func NewWorkflowStatusPoller(wfc v1alpha1.WorkflowInterface, ignoreNotFound bool, noOutput bool) *WorkflowStatusPoller {
return &WorkflowStatusPoller{wfc, ignoreNotFound, noOutput}
}

func (wsp *WorkflowStatusPoller) waitOnOne(workflowName string) {
b := &backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 2,
}
for {
wf, err := wsp.wfc.Get(workflowName, metav1.GetOptions{})
if err != nil {
if wsp.ignoreNotFound && apierr.IsNotFound(err) {
if !wsp.noOutput {
fmt.Printf("%s not found. Ignoring...\n", workflowName)
}
return
}
panic(err)
watchIf, err := wfClient.Watch(opts)
errors.CheckError(err)
defer watchIf.Stop()
for next := range watchIf.ResultChan() {
wf, ok := next.Object.(*wfv1.Workflow)
if !ok {
continue
}

if !wf.Status.FinishedAt.IsZero() {
if !wsp.noOutput {
if !quiet {
fmt.Printf("%s completed at %v\n", workflowName, wf.Status.FinishedAt)
}
return
}

time.Sleep(b.Duration())
continue
}
}

func (wsp *WorkflowStatusPoller) waitUpdateWaitGroup(workflowName string, wg *sync.WaitGroup) {
defer wg.Done()
wsp.waitOnOne(workflowName)
}

// WaitWorkflows waits for the given workflowNames.
func (wsp *WorkflowStatusPoller) WaitWorkflows(workflowNames []string) {
// TODO(shri): When Kubernetes 1.9 support is added, this block should be executed
// only for versions 1.8 and for 1.9, a new "watch" based implmentation should be
// used.
var vc VersionChecker
vc.run()

var wg sync.WaitGroup
for _, workflowName := range workflowNames {
wg.Add(1)
go wsp.waitUpdateWaitGroup(workflowName, &wg)
}
wg.Wait()
}
62 changes: 62 additions & 0 deletions cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package commands

import (
"fmt"
"os"
"time"

"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

func NewWatchCommand() *cobra.Command {
var command = &cobra.Command{
Use: "watch WORKFLOW",
Short: "watch a workflow until it completes",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
InitWorkflowClient()
watchWorkflow(args[0])
},
}
return command
}

func watchWorkflow(name string) {
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", name))
opts := metav1.ListOptions{
FieldSelector: fieldSelector.String(),
}
wf, err := wfClient.Get(name, metav1.GetOptions{})
errors.CheckError(err)

watchIf, err := wfClient.Watch(opts)
errors.CheckError(err)
defer watchIf.Stop()
ticker := time.NewTicker(time.Second)

var ok bool
for {
select {
case next := <-watchIf.ResultChan():
wf, ok = next.Object.(*wfv1.Workflow)
if !ok {
continue
}
case <-ticker.C:
}
print("\033[H\033[2J")
print("\033[0;0H")
printWorkflowHelper(wf, "")
if !wf.Status.FinishedAt.IsZero() {
return
}
}
}
3 changes: 1 addition & 2 deletions test/e2e/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ spec:
commands.SubmitWorkflows([]string{tmpfile.Name()}, nil)

wfClient := commands.InitWorkflowClient()
wsp := commands.NewWorkflowStatusPoller(wfClient, false, false)
wsp.WaitWorkflows([]string{workflowName})
commands.WaitWorkflows([]string{workflowName}, false, false)

wf, err := wfClient.Get(workflowName, metav1.GetOptions{})
if err != nil {
Expand Down

0 comments on commit 5670bf5

Please sign in to comment.