Skip to content

Commit

Permalink
fix(CLI): Re-establish workflow watch on disconnect. Fixes argoproj#2796
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Apr 27, 2020
1 parent 31358d6 commit d7f8e0c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 30 deletions.
66 changes: 36 additions & 30 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,53 +155,59 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
if req.ListOptions != nil {
opts = *req.ListOptions
}
watch, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Watch(s.withInstanceID(opts))
opts = s.withInstanceID(opts)
wfIf := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace)
watch, err := wfIf.Watch(opts)
if err != nil {
return err
}
defer watch.Stop()
ctx := ws.Context()

log.Debug("Piping events to channel")
defer log.Debug("Result channel done")

for next := range watch.ResultChan() {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
log.Debug("Received event")
wf, ok := next.Object.(*v1alpha1.Workflow)
if !ok {
return fmt.Errorf("watch object was not a workflow %v", reflect.TypeOf(next.Object))
}
logCtx := log.WithFields(log.Fields{"workflow": wf.Name, "type": next.Type, "phase": wf.Status.Phase})
err := packer.DecompressWorkflow(wf)
if err != nil {
return err
}
if wf.Status.IsOffloadNodeStatus() {
if s.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
case event, open := <-watch.ResultChan():
if !open {
log.Info("Re-establishing workflow watch")
watch, err = wfIf.Watch(opts)
if err != nil {
return err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
continue
}
log.Debug("Received event")
wf, ok := event.Object.(*v1alpha1.Workflow)
if !ok {
return fmt.Errorf("watch object was not a workflow %v", reflect.TypeOf(event.Object))
}
logCtx := log.WithFields(log.Fields{"workflow": wf.Name, "type": event.Type, "phase": wf.Status.Phase})
err := packer.DecompressWorkflow(wf)
if err != nil {
return err
}
if wf.Status.IsOffloadNodeStatus() {
if s.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
logCtx.Debug("Sending event")
err = ws.Send(&workflowpkg.WorkflowWatchEvent{Type: string(event.Type), Object: wf})
if err != nil {
return err
}
}
logCtx.Debug("Sending event")
err = ws.Send(&workflowpkg.WorkflowWatchEvent{Type: string(next.Type), Object: wf})
if err != nil {
return err
}

}

log.Debug("Result channel done")

return nil
}

func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.WorkflowDeleteRequest) (*workflowpkg.WorkflowDeleteResponse, error) {
Expand Down
2 changes: 2 additions & 0 deletions util/logs/workflow-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
logCtx.Error(err)
return
}
continue
}
wf, ok := event.Object.(*wfv1.Workflow)
if !ok {
Expand Down Expand Up @@ -198,6 +199,7 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
logCtx.Error(err)
return
}
continue
}
pod, ok := event.Object.(*corev1.Pod)
if !ok {
Expand Down

0 comments on commit d7f8e0c

Please sign in to comment.