Skip to content

Commit

Permalink
chore(cli): Migrate argo wait to use API client. See argoproj#2116 (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Feb 25, 2020
1 parent baf03f6 commit 5c3d9cf
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 193 deletions.
100 changes: 23 additions & 77 deletions cmd/argo/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@ func NewWaitCommand() *cobra.Command {
ignoreNotFound bool
)
var command = &cobra.Command{
Use: "wait WORKFLOW1 WORKFLOW2..,",
Short: "waits for a workflow to complete",
Use: "wait [WORKFLOW...]",
Short: "waits for workflows to complete",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}

WaitWorkflows(args, ignoreNotFound, false)
},
}
Expand All @@ -41,34 +36,20 @@ func NewWaitCommand() *cobra.Command {
func WaitWorkflows(workflowNames []string, ignoreNotFound, quiet bool) {
var wg sync.WaitGroup
wfSuccessStatus := true
var apiClient workflowpkg.WorkflowServiceClient
var ctx context.Context
ns, _, _ := client.Config.Namespace()
if client.ArgoServer != "" {
conn := client.GetClientConn()
defer conn.Close()
apiClient, ctx = GetWFApiServerGRPCClient(conn)
} else {
InitWorkflowClient()
}

for _, workflowName := range workflowNames {
ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()

for _, name := range workflowNames {
wg.Add(1)
if client.ArgoServer != "" {
go func(name string) {
if !apiServerWaitOnOne(apiClient, ctx, name, ns, ignoreNotFound, quiet) {
wfSuccessStatus = false
}
wg.Done()
}(workflowName)
} else {
go func(name string) {
if !waitOnOne(name, ignoreNotFound, quiet) {
wfSuccessStatus = false
}
wg.Done()
}(workflowName)
}
go func(name string) {
if !waitOnOne(serviceClient, ctx, name, namespace, ignoreNotFound, quiet) {
wfSuccessStatus = false
}
wg.Done()
}(name)

}
wg.Wait()

Expand All @@ -77,16 +58,19 @@ func WaitWorkflows(workflowNames []string, ignoreNotFound, quiet bool) {
}
}

func apiServerWaitOnOne(client workflowpkg.WorkflowServiceClient, ctx context.Context, wfName string, namespace string, ignoreNotFound, quiet bool) bool {
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName))
wfReq := workflowpkg.WatchWorkflowsRequest{
func waitOnOne(client workflowpkg.WorkflowServiceClient, ctx context.Context, wfName, namespace string, ignoreNotFound, quiet bool) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.WatchWorkflows(ctx, &workflowpkg.WatchWorkflowsRequest{
Namespace: namespace,
ListOptions: &metav1.ListOptions{
FieldSelector: fieldSelector.String(),
FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String(),
},
}
stream, err := client.WatchWorkflows(ctx, &wfReq)
})
if err != nil {
if apierr.IsNotFound(err) && ignoreNotFound {
return true
}
errors.CheckError(err)
return false
}
Expand All @@ -112,41 +96,3 @@ func apiServerWaitOnOne(client workflowpkg.WorkflowServiceClient, ctx context.Co
}
return true
}

func waitOnOne(workflowName string, ignoreNotFound, quiet bool) bool {
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", workflowName))
opts := metav1.ListOptions{
FieldSelector: fieldSelector.String(),
}

_, err := wfClient.Get(workflowName, metav1.GetOptions{})
if err != nil {
if apierr.IsNotFound(err) && ignoreNotFound {
return true
}
errors.CheckError(err)
}

watchIf, err := wfClient.Watch(opts)
errors.CheckError(err)
defer watchIf.Stop()
for {
next := <-watchIf.ResultChan()
wf, _ := next.Object.(*wfv1.Workflow)
if wf == nil {
watchIf.Stop()
watchIf, err = wfClient.Watch(opts)
errors.CheckError(err)
continue
}
if !wf.Status.FinishedAt.IsZero() {
if !quiet {
fmt.Printf("%s %s at %v\n", workflowName, wf.Status.Phase, wf.Status.FinishedAt)
}
if wf.Status.Phase == wfv1.NodeFailed || wf.Status.Phase == wfv1.NodeError {
return false
}
return true
}
}
}
75 changes: 14 additions & 61 deletions cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package commands

import (
"context"
"fmt"
"os"
"time"

"github.com/argoproj/pkg/errors"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -33,79 +33,32 @@ func NewWatchCommand() *cobra.Command {
return command
}

func apiServerWatchWorkflow(wfName string) {
conn := client.GetClientConn()
defer conn.Close()
apiClient, ctx := GetWFApiServerGRPCClient(conn)
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName))
wfReq := workflowpkg.WatchWorkflowsRequest{
func watchWorkflow(wfName string) {

ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewWorkflowServiceClient()
namespace := client.Namespace()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := serviceClient.WatchWorkflows(ctx, &workflowpkg.WatchWorkflowsRequest{
Namespace: namespace,
ListOptions: &metav1.ListOptions{
FieldSelector: fieldSelector.String(),
FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String(),
},
}
stream, err := apiClient.WatchWorkflows(ctx, &wfReq)
if err != nil {
errors.CheckError(err)
return
}
})
errors.CheckError(err)
for {
event, err := stream.Recv()
if err != nil {
errors.CheckError(err)
break
}
errors.CheckError(err)
wf := event.Object
if wf != nil {
printWorkflowStatus(wf)
if !wf.Status.FinishedAt.IsZero() {
break
}
} else {
break
}
}
}

func watchWorkflow(name string) {
if client.ArgoServer != "" {
apiServerWatchWorkflow(name)
} else {
InitWorkflowClient()
k8sApiWatchWorkflow(name)

}
}

func k8sApiWatchWorkflow(name string) {
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", name))
opts := metav1.ListOptions{
FieldSelector: fieldSelector.String(),
}
wf, err := wfClient.Get(name, metav1.GetOptions{})
errors.CheckError(err)

watchIf, err := wfClient.Watch(opts)
errors.CheckError(err)
ticker := time.NewTicker(time.Second)
for {
select {
case next := <-watchIf.ResultChan():
wf, _ = next.Object.(*wfv1.Workflow)
case <-ticker.C:
}
if wf == nil {
watchIf.Stop()
watchIf, err = wfClient.Watch(opts)
errors.CheckError(err)
continue
break
}
printWorkflowStatus(wf)
if !wf.Status.FinishedAt.IsZero() {
break
}
}
watchIf.Stop()
}

func printWorkflowStatus(wf *wfv1.Workflow) {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ require (
k8s.io/api v0.0.0-20191219150132-17cfeff5d095
k8s.io/apimachinery v0.16.7-beta.0
k8s.io/client-go v0.0.0-20191225075139-73fd2ddc9180
k8s.io/code-generator v0.16.7-beta.0 // indirect
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
k8s.io/utils v0.0.0-20191218082557-f07c713de883
sigs.k8s.io/yaml v1.1.0
Expand Down
Loading

0 comments on commit 5c3d9cf

Please sign in to comment.