From 618b6dee4de973b3f3ef1d1164a44b9cb176355e Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 31 Mar 2020 10:20:19 -0700 Subject: [PATCH 1/2] fix: Fixes --kubeconfig flag. Fixes #2492 (#2553) --- cmd/argo/commands/client/conn.go | 27 ++++++++++++++------------- cmd/argo/commands/server.go | 2 +- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/cmd/argo/commands/client/conn.go b/cmd/argo/commands/client/conn.go index a724cca3760d..e72163967918 100644 --- a/cmd/argo/commands/client/conn.go +++ b/cmd/argo/commands/client/conn.go @@ -14,20 +14,21 @@ import ( var argoServer string -var Config clientcmd.ClientConfig +var overrides = clientcmd.ConfigOverrides{} -var ExplicitPath string +var explicitPath string func AddKubectlFlagsToCmd(cmd *cobra.Command) { - // The "usual" clientcmd/kubectl flags - loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() - loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig - overrides := clientcmd.ConfigOverrides{} kflags := clientcmd.RecommendedConfigOverrideFlags("") - cmd.PersistentFlags().StringVar(&ExplicitPath, "kubeconfig", "", "Path to a kube config. Only required if out-of-cluster") - loadingRules.ExplicitPath = ExplicitPath + cmd.PersistentFlags().StringVar(&explicitPath, "kubeconfig", "", "Path to a kube config. Only required if out-of-cluster") clientcmd.BindOverrideFlags(&overrides, cmd.PersistentFlags(), kflags) - Config = clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin) +} + +func GetConfig() clientcmd.ClientConfig { + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig + loadingRules.ExplicitPath = explicitPath + return clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin) } func AddArgoServerFlagsToCmd(cmd *cobra.Command) { @@ -37,7 +38,7 @@ func AddArgoServerFlagsToCmd(cmd *cobra.Command) { func NewAPIClient() (context.Context, apiclient.Client) { ctx, client, err := apiclient.NewClient(argoServer, func() string { return GetAuthString() - }, Config) + }, GetConfig()) if err != nil { log.Fatal(err) } @@ -45,7 +46,7 @@ func NewAPIClient() (context.Context, apiclient.Client) { } func Namespace() string { - namespace, _, err := Config.Namespace() + namespace, _, err := GetConfig().Namespace() if err != nil { log.Fatal(err) } @@ -53,11 +54,11 @@ func Namespace() string { } func GetAuthString() string { - restConfig, err := Config.ClientConfig() + restConfig, err := GetConfig().ClientConfig() if err != nil { log.Fatal(err) } - authString, err := kubeconfig.GetAuthString(restConfig, ExplicitPath) + authString, err := kubeconfig.GetAuthString(restConfig, explicitPath) if err != nil { log.Fatal(err) } diff --git a/cmd/argo/commands/server.go b/cmd/argo/commands/server.go index 77f0d7c097c2..a85ba71b75e5 100644 --- a/cmd/argo/commands/server.go +++ b/cmd/argo/commands/server.go @@ -39,7 +39,7 @@ See %s`, help.ArgoSever), stats.RegisterStackDumper() stats.StartStatsTicker(5 * time.Minute) - config, err := client.Config.ClientConfig() + config, err := client.GetConfig().ClientConfig() if err != nil { return err } From fb74ba1ce27b96473411c2c5cfe9a86972af589e Mon Sep 17 00:00:00 2001 From: Simon Behar Date: Tue, 31 Mar 2020 11:11:25 -0700 Subject: [PATCH 2/2] fix: Separate global scope processing from local scope building (#2528) --- workflow/controller/dag.go | 7 ++++--- workflow/controller/operator.go | 24 +++++++++++++++++------- workflow/controller/steps.go | 6 ++++-- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 0c1751987bac..799f9d4c28dc 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -259,7 +259,8 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti // Can happen when dag.target was specified continue } - woc.processNodeOutputs(&scope, fmt.Sprintf("tasks.%s", task.Name), taskNode) + woc.buildLocalScope(&scope, fmt.Sprintf("tasks.%s", task.Name), taskNode) + woc.addOutputsToGlobalScope(taskNode.Outputs) } outputs, err := getTemplateOutputsFromScope(tmpl, &scope) if err != nil { @@ -448,7 +449,7 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv tmpl: dagCtx.tmpl, scope: make(map[string]interface{}), } - woc.addOutputsToScope("workflow", woc.wf.Status.Outputs, &scope) + woc.addOutputsToLocalScope("workflow", woc.wf.Status.Outputs, &scope) ancestors := common.GetTaskAncestry(dagCtx, task.Name, dagCtx.tasks) for _, ancestor := range ancestors { @@ -473,7 +474,7 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv return nil, errors.InternalWrapError(err) } } else { - woc.processNodeOutputs(&scope, prefix, ancestorNode) + woc.buildLocalScope(&scope, prefix, ancestorNode) } } return &scope, nil diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 153fbde023d1..5654eda8ec60 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -739,7 +739,7 @@ func (woc *wfOperationCtx) podReconciliation() error { if node, ok := woc.wf.Status.Nodes[nodeID]; ok { if newState := woc.assessNodeStatus(pod, &node); newState != nil { woc.wf.Status.Nodes[nodeID] = *newState - woc.addOutputsToScope("workflow", node.Outputs, nil) + woc.addOutputsToGlobalScope(node.Outputs) woc.updated = true } node := woc.wf.Status.Nodes[pod.ObjectMeta.Name] @@ -1895,9 +1895,9 @@ func (woc *wfOperationCtx) executeScript(nodeName string, templateScope string, return node, err } -// processNodeOutputs adds all of a nodes outputs to the local scope with the given prefix, as well +// buildLocalScope adds all of a nodes outputs to the local scope with the given prefix, as well // as the global scope, if specified with a globalName -func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, node *wfv1.NodeStatus) { +func (woc *wfOperationCtx) buildLocalScope(scope *wfScope, prefix string, node *wfv1.NodeStatus) { if node.PodIP != "" { key := fmt.Sprintf("%s.ip", prefix) scope.addParamToScope(key, node.PodIP) @@ -1906,10 +1906,10 @@ func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, nod key := fmt.Sprintf("%s.status", prefix) scope.addParamToScope(key, string(node.Phase)) } - woc.addOutputsToScope(prefix, node.Outputs, scope) + woc.addOutputsToLocalScope(prefix, node.Outputs, scope) } -func (woc *wfOperationCtx) addOutputsToScope(prefix string, outputs *wfv1.Outputs, scope *wfScope) { +func (woc *wfOperationCtx) addOutputsToLocalScope(prefix string, outputs *wfv1.Outputs, scope *wfScope) { if outputs == nil { return } @@ -1924,14 +1924,24 @@ func (woc *wfOperationCtx) addOutputsToScope(prefix string, outputs *wfv1.Output if scope != nil { scope.addParamToScope(key, *param.Value) } - woc.addParamToGlobalScope(param) } for _, art := range outputs.Artifacts { key := fmt.Sprintf("%s.outputs.artifacts.%s", prefix, art.Name) if scope != nil { scope.addArtifactToScope(key, art) } - woc.addArtifactToGlobalScope(art, scope) + } +} + +func (woc *wfOperationCtx) addOutputsToGlobalScope(outputs *wfv1.Outputs) { + if outputs == nil { + return + } + for _, param := range outputs.Parameters { + woc.addParamToGlobalScope(param) + } + for _, art := range outputs.Artifacts { + woc.addArtifactToGlobalScope(art, nil) } } diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index c6d8e0776435..046bc7bfbd32 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -50,7 +50,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu }, tmplCtx: tmplCtx, } - woc.addOutputsToScope("workflow", woc.wf.Status.Outputs, stepsCtx.scope) + woc.addOutputsToLocalScope("workflow", woc.wf.Status.Outputs, stepsCtx.scope) for i, stepGroup := range tmpl.Steps { sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i) @@ -126,7 +126,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu woc.log.Infof("Step '%s' has no expanded child nodes", childNode) } } else { - woc.processNodeOutputs(stepsCtx.scope, prefix, childNode) + woc.buildLocalScope(stepsCtx.scope, prefix, childNode) } } } @@ -259,6 +259,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod return node } + woc.addOutputsToGlobalScope(node.Outputs) + // All children completed. Determine step group status as a whole for _, childNodeID := range node.Children { childNode := woc.wf.Status.Nodes[childNodeID]