diff --git a/cmd/argo/commands/wait.go b/cmd/argo/commands/wait.go index 137a4142bc83..0c1443215918 100644 --- a/cmd/argo/commands/wait.go +++ b/cmd/argo/commands/wait.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/argoproj/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -58,15 +59,16 @@ func WaitWorkflows(workflowNames []string, ignoreNotFound, quiet bool) { } } -func waitOnOne(client workflowpkg.WorkflowServiceClient, ctx context.Context, wfName, namespace string, ignoreNotFound, quiet bool) bool { +func waitOnOne(serviceClient workflowpkg.WorkflowServiceClient, ctx context.Context, wfName, namespace string, ignoreNotFound, quiet bool) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := client.WatchWorkflows(ctx, &workflowpkg.WatchWorkflowsRequest{ + req := &workflowpkg.WatchWorkflowsRequest{ Namespace: namespace, ListOptions: &metav1.ListOptions{ FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String(), }, - }) + } + stream, err := serviceClient.WatchWorkflows(ctx, req) if err != nil { if apierr.IsNotFound(err) && ignoreNotFound { return true @@ -82,6 +84,12 @@ func waitOnOne(client workflowpkg.WorkflowServiceClient, ctx context.Context, wf } wf := event.Object if wf == nil { + log.Debug("Re-establishing workflow watch") + stream, err = serviceClient.WatchWorkflows(ctx, req) + if err != nil { + errors.CheckError(err) + return false + } continue } if !wf.Status.FinishedAt.IsZero() { diff --git a/cmd/argo/commands/watch.go b/cmd/argo/commands/watch.go index a68eb7fd86f3..7146ede91aa8 100644 --- a/cmd/argo/commands/watch.go +++ b/cmd/argo/commands/watch.go @@ -6,6 +6,7 @@ import ( "os" "github.com/argoproj/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -40,19 +41,27 @@ func watchWorkflow(wfName string) { namespace := client.Namespace() ctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := serviceClient.WatchWorkflows(ctx, &workflowpkg.WatchWorkflowsRequest{ + req := &workflowpkg.WatchWorkflowsRequest{ Namespace: namespace, ListOptions: &metav1.ListOptions{ FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String(), }, - }) + } + stream, err := serviceClient.WatchWorkflows(ctx, req) errors.CheckError(err) for { event, err := stream.Recv() errors.CheckError(err) wf := event.Object if wf == nil { - break + log.Debug("Re-establishing workflow watch") + stream, err = serviceClient.WatchWorkflows(ctx, req) + if err != nil { + errors.CheckError(err) + return + } + continue + } printWorkflowStatus(wf) if !wf.Status.FinishedAt.IsZero() { diff --git a/pkg/apiclient/watch-intermediary.go b/pkg/apiclient/watch-intermediary.go index 92f0dd6b908f..95b9feeb754a 100644 --- a/pkg/apiclient/watch-intermediary.go +++ b/pkg/apiclient/watch-intermediary.go @@ -11,20 +11,20 @@ type watchIntermediary struct { events chan *workflowpkg.WorkflowWatchEvent } +func (w watchIntermediary) Send(e *workflowpkg.WorkflowWatchEvent) error { + w.events <- e + return nil +} + func (w watchIntermediary) Recv() (*workflowpkg.WorkflowWatchEvent, error) { select { case e := <-w.error: return nil, e - default: - return <-w.events, nil + case event := <-w.events: + return event, nil } } -func (w watchIntermediary) Send(e *workflowpkg.WorkflowWatchEvent) error { - w.events <- e - return nil -} - func newWatchIntermediary(ctx context.Context) *watchIntermediary { return &watchIntermediary{newAbstractIntermediary(ctx), make(chan *workflowpkg.WorkflowWatchEvent)} }