Skip to content

Commit

Permalink
feat(cli): Add --latest flag for argo get command as per argoproj#3128 (
Browse files Browse the repository at this point in the history
  • Loading branch information
rbreeze committed Jun 12, 2020
1 parent 3460859 commit e5e6456
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 75 deletions.
4 changes: 2 additions & 2 deletions cmd/argo/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util"
)

func NewWaitCommand() *cobra.Command {
Expand Down Expand Up @@ -67,7 +67,7 @@ func waitOnOne(serviceClient workflowpkg.WorkflowServiceClient, ctx context.Cont
req := &workflowpkg.WatchWorkflowsRequest{
Namespace: namespace,
ListOptions: &metav1.ListOptions{
FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String(),
FieldSelector: util.GenerateFieldSelectorFromWorkflowName(wfName),
},
}
stream, err := serviceClient.WatchWorkflows(ctx, req)
Expand Down
4 changes: 2 additions & 2 deletions cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/cmd/argo/commands/client"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util"
"github.com/argoproj/argo/workflow/packer"
)

Expand Down Expand Up @@ -51,7 +51,7 @@ func watchWorkflow(wfName string, getArgs getFlags) {
req := &workflowpkg.WatchWorkflowsRequest{
Namespace: namespace,
ListOptions: &metav1.ListOptions{
FieldSelector: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", wfName)).String(),
FieldSelector: util.GenerateFieldSelectorFromWorkflowName(wfName),
},
}
stream, err := serviceClient.WatchWorkflows(ctx, req)
Expand Down
125 changes: 102 additions & 23 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"reflect"
"sort"

"github.com/argoproj/argo/pkg/client/clientset/versioned"

log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/argoproj/argo/pkg/apis/workflow"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/server/auth"
argoutil "github.com/argoproj/argo/util"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/util/logs"
"github.com/argoproj/argo/workflow/common"
Expand All @@ -30,6 +33,8 @@ type workflowServer struct {
hydrator hydrator.Interface
}

const latestAlias = "@latest"

// NewWorkflowServer returns a new workflowServer
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowpkg.WorkflowServiceServer {
return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)}
Expand Down Expand Up @@ -80,7 +85,12 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
if req.GetOptions != nil {
wfGetOption = *req.GetOptions
}
wf, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, wfGetOption)
wfClient := auth.GetWfClient(ctx)
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, wfGetOption)
if err != nil {
return nil, err
}
err = s.validateWorkflow(wf)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -131,6 +141,12 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
opts := &metav1.ListOptions{}
if req.ListOptions != nil {
opts = req.ListOptions
wfName := argoutil.RecoverWorkflowNameFromSelectorString(opts.FieldSelector)
wf, err := s.getWorkflow(wfClient, req.Namespace, wfName, metav1.GetOptions{})
if err != nil {
return err
}
opts.FieldSelector = argoutil.GenerateFieldSelectorFromWorkflowName(wf.Name)
}
s.instanceIDService.With(opts)
wfIf := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace)
Expand Down Expand Up @@ -176,11 +192,16 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
}

func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.WorkflowDeleteRequest) (*workflowpkg.WorkflowDeleteResponse, error) {
_, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wfClient := auth.GetWfClient(ctx)
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = s.validateWorkflow(wf)
if err != nil {
return nil, err
}
err = auth.GetWfClient(ctx).ArgoprojV1alpha1().Workflows(req.Namespace).Delete(req.Name, &metav1.DeleteOptions{})
err = auth.GetWfClient(ctx).ArgoprojV1alpha1().Workflows(wf.Namespace).Delete(wf.Name, &metav1.DeleteOptions{})
if err != nil {
return nil, err
}
Expand All @@ -191,7 +212,12 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)

wf, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

err = s.validateWorkflow(wf)
if err != nil {
return nil, err
}
Expand All @@ -205,7 +231,12 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor

func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowResubmitRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
wf, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

err = s.validateWorkflow(wf)
if err != nil {
return nil, err
}
Expand All @@ -224,18 +255,23 @@ func (s *workflowServer) ResubmitWorkflow(ctx context.Context, req *workflowpkg.

func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.WorkflowResumeRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
_, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, req.Name, req.NodeFieldSelector)
err = s.validateWorkflow(wf)
if err != nil {
log.Warnf("Failed to resume %s: %+v", req.Name, err)
return nil, err
}

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, wf.Name, req.NodeFieldSelector)
if err != nil {
log.Warnf("Failed to resume %s: %+v", wf.Name, err)
return nil, err
}

wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(wf.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -246,17 +282,22 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo
func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.WorkflowSuspendRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

_, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

err = s.validateWorkflow(wf)
if err != nil {
return nil, err
}

err = util.SuspendWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)
err = util.SuspendWorkflow(wfClient.ArgoprojV1alpha1().Workflows(wf.Namespace), wf.Name)
if err != nil {
return nil, err
}

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(wf.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -267,17 +308,22 @@ func (s *workflowServer) SuspendWorkflow(ctx context.Context, req *workflowpkg.W
func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg.WorkflowTerminateRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)

_, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

err = util.TerminateWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name)
err = s.validateWorkflow(wf)
if err != nil {
return nil, err
}

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
err = util.TerminateWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf.Name)
if err != nil {
return nil, err
}

wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(wf.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -286,16 +332,20 @@ func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg

func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
_, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
err = util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, req.Name, req.NodeFieldSelector, req.Message)
err = s.validateWorkflow(wf)
if err != nil {
return nil, err
}
err = util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, wf.Name, req.NodeFieldSelector, req.Message)
if err != nil {
return nil, err
}

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
wf, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(wf.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -321,24 +371,53 @@ func (s *workflowServer) PodLogs(req *workflowpkg.WorkflowLogRequest, ws workflo
ctx := ws.Context()
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)
_, err := s.getWorkflowAndValidate(ctx, req.Namespace, req.Name, metav1.GetOptions{})
wf, err := s.getWorkflow(wfClient, req.Namespace, req.Name, metav1.GetOptions{})
if err != nil {
return err
}
err = s.validateWorkflow(wf)
if err != nil {
return err
}
req.Name = wf.Name
return logs.WorkflowLogs(ctx, wfClient, kubeClient, req, ws)
}

func (s *workflowServer) getWorkflowAndValidate(ctx context.Context, namespace string, name string, options metav1.GetOptions) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
func (s *workflowServer) getWorkflow(wfClient versioned.Interface, namespace string, name string, options metav1.GetOptions) (*v1alpha1.Workflow, error) {
if name == latestAlias {
latest, err := getLatestWorkflow(wfClient, namespace)
if err != nil {
return nil, err
}
log.Infof("Resolved alias %s to workflow %s.\n", latestAlias, latest.Name)
return latest, nil
}
wf, err := wfClient.ArgoprojV1alpha1().Workflows(namespace).Get(name, options)
if err != nil {
return nil, err
}
err = s.instanceIDService.Validate(wf)
return wf, nil
}

func (s *workflowServer) validateWorkflow(wf *v1alpha1.Workflow) error {
return s.instanceIDService.Validate(wf)
}

func getLatestWorkflow(wfClient versioned.Interface, namespace string) (*v1alpha1.Workflow, error) {
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return wf, nil
if len(wfList.Items) < 1 {
return nil, fmt.Errorf("No workflows found.")
}
latest := wfList.Items[0]
for _, wf := range wfList.Items {
if latest.ObjectMeta.CreationTimestamp.Before(&wf.ObjectMeta.CreationTimestamp) {
latest = wf
}
}
return &latest, nil
}

func (s *workflowServer) SubmitWorkflow(ctx context.Context, req *workflowpkg.WorkflowSubmitRequest) (*v1alpha1.Workflow, error) {
Expand Down
Loading

0 comments on commit e5e6456

Please sign in to comment.