Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 committed Mar 31, 2020
2 parents b123157 + fb74ba1 commit 8c62154
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 26 deletions.
27 changes: 14 additions & 13 deletions cmd/argo/commands/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import (

var argoServer string

var Config clientcmd.ClientConfig
var overrides = clientcmd.ConfigOverrides{}

var ExplicitPath string
var explicitPath string

func AddKubectlFlagsToCmd(cmd *cobra.Command) {
// The "usual" clientcmd/kubectl flags
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
overrides := clientcmd.ConfigOverrides{}
kflags := clientcmd.RecommendedConfigOverrideFlags("")
cmd.PersistentFlags().StringVar(&ExplicitPath, "kubeconfig", "", "Path to a kube config. Only required if out-of-cluster")
loadingRules.ExplicitPath = ExplicitPath
cmd.PersistentFlags().StringVar(&explicitPath, "kubeconfig", "", "Path to a kube config. Only required if out-of-cluster")
clientcmd.BindOverrideFlags(&overrides, cmd.PersistentFlags(), kflags)
Config = clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
}

func GetConfig() clientcmd.ClientConfig {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
loadingRules.ExplicitPath = explicitPath
return clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
}

func AddArgoServerFlagsToCmd(cmd *cobra.Command) {
Expand All @@ -37,27 +38,27 @@ func AddArgoServerFlagsToCmd(cmd *cobra.Command) {
func NewAPIClient() (context.Context, apiclient.Client) {
ctx, client, err := apiclient.NewClient(argoServer, func() string {
return GetAuthString()
}, Config)
}, GetConfig())
if err != nil {
log.Fatal(err)
}
return ctx, client
}

func Namespace() string {
namespace, _, err := Config.Namespace()
namespace, _, err := GetConfig().Namespace()
if err != nil {
log.Fatal(err)
}
return namespace
}

func GetAuthString() string {
restConfig, err := Config.ClientConfig()
restConfig, err := GetConfig().ClientConfig()
if err != nil {
log.Fatal(err)
}
authString, err := kubeconfig.GetAuthString(restConfig, ExplicitPath)
authString, err := kubeconfig.GetAuthString(restConfig, explicitPath)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/argo/commands/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ See %s`, help.ArgoSever),
stats.RegisterStackDumper()
stats.StartStatsTicker(5 * time.Minute)

config, err := client.Config.ClientConfig()
config, err := client.GetConfig().ClientConfig()
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
// Can happen when dag.target was specified
continue
}
woc.processNodeOutputs(&scope, fmt.Sprintf("tasks.%s", task.Name), taskNode)
woc.buildLocalScope(&scope, fmt.Sprintf("tasks.%s", task.Name), taskNode)
woc.addOutputsToGlobalScope(taskNode.Outputs)
}
outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
if err != nil {
Expand Down Expand Up @@ -448,7 +449,7 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv
tmpl: dagCtx.tmpl,
scope: make(map[string]interface{}),
}
woc.addOutputsToScope("workflow", woc.wf.Status.Outputs, &scope)
woc.addOutputsToLocalScope("workflow", woc.wf.Status.Outputs, &scope)

ancestors := common.GetTaskAncestry(dagCtx, task.Name, dagCtx.tasks)
for _, ancestor := range ancestors {
Expand All @@ -473,7 +474,7 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv
return nil, errors.InternalWrapError(err)
}
} else {
woc.processNodeOutputs(&scope, prefix, ancestorNode)
woc.buildLocalScope(&scope, prefix, ancestorNode)
}
}
return &scope, nil
Expand Down
24 changes: 17 additions & 7 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
if node, ok := woc.wf.Status.Nodes[nodeID]; ok {
if newState := woc.assessNodeStatus(pod, &node); newState != nil {
woc.wf.Status.Nodes[nodeID] = *newState
woc.addOutputsToScope("workflow", node.Outputs, nil)
woc.addOutputsToGlobalScope(node.Outputs)
woc.updated = true
}
node := woc.wf.Status.Nodes[pod.ObjectMeta.Name]
Expand Down Expand Up @@ -1895,9 +1895,9 @@ func (woc *wfOperationCtx) executeScript(nodeName string, templateScope string,
return node, err
}

// processNodeOutputs adds all of a nodes outputs to the local scope with the given prefix, as well
// buildLocalScope adds all of a nodes outputs to the local scope with the given prefix, as well
// as the global scope, if specified with a globalName
func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, node *wfv1.NodeStatus) {
func (woc *wfOperationCtx) buildLocalScope(scope *wfScope, prefix string, node *wfv1.NodeStatus) {
if node.PodIP != "" {
key := fmt.Sprintf("%s.ip", prefix)
scope.addParamToScope(key, node.PodIP)
Expand All @@ -1906,10 +1906,10 @@ func (woc *wfOperationCtx) processNodeOutputs(scope *wfScope, prefix string, nod
key := fmt.Sprintf("%s.status", prefix)
scope.addParamToScope(key, string(node.Phase))
}
woc.addOutputsToScope(prefix, node.Outputs, scope)
woc.addOutputsToLocalScope(prefix, node.Outputs, scope)
}

func (woc *wfOperationCtx) addOutputsToScope(prefix string, outputs *wfv1.Outputs, scope *wfScope) {
func (woc *wfOperationCtx) addOutputsToLocalScope(prefix string, outputs *wfv1.Outputs, scope *wfScope) {
if outputs == nil {
return
}
Expand All @@ -1924,14 +1924,24 @@ func (woc *wfOperationCtx) addOutputsToScope(prefix string, outputs *wfv1.Output
if scope != nil {
scope.addParamToScope(key, *param.Value)
}
woc.addParamToGlobalScope(param)
}
for _, art := range outputs.Artifacts {
key := fmt.Sprintf("%s.outputs.artifacts.%s", prefix, art.Name)
if scope != nil {
scope.addArtifactToScope(key, art)
}
woc.addArtifactToGlobalScope(art, scope)
}
}

func (woc *wfOperationCtx) addOutputsToGlobalScope(outputs *wfv1.Outputs) {
if outputs == nil {
return
}
for _, param := range outputs.Parameters {
woc.addParamToGlobalScope(param)
}
for _, art := range outputs.Artifacts {
woc.addArtifactToGlobalScope(art, nil)
}
}

Expand Down
6 changes: 4 additions & 2 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
},
tmplCtx: tmplCtx,
}
woc.addOutputsToScope("workflow", woc.wf.Status.Outputs, stepsCtx.scope)
woc.addOutputsToLocalScope("workflow", woc.wf.Status.Outputs, stepsCtx.scope)

for i, stepGroup := range tmpl.Steps {
sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i)
Expand Down Expand Up @@ -126,7 +126,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
woc.log.Infof("Step '%s' has no expanded child nodes", childNode)
}
} else {
woc.processNodeOutputs(stepsCtx.scope, prefix, childNode)
woc.buildLocalScope(stepsCtx.scope, prefix, childNode)
}
}
}
Expand Down Expand Up @@ -259,6 +259,8 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
return node
}

woc.addOutputsToGlobalScope(node.Outputs)

// All children completed. Determine step group status as a whole
for _, childNodeID := range node.Children {
childNode := woc.wf.Status.Nodes[childNodeID]
Expand Down

0 comments on commit 8c62154

Please sign in to comment.