Skip to content

Commit

Permalink
Get python script example to function
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Nov 3, 2017
1 parent 8973204 commit 065a8f7
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 39 deletions.
6 changes: 5 additions & 1 deletion Dockerfile-argoexec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ FROM debian:9.1

RUN apt-get update && \
apt-get install -y curl jq procps && \
rm -rf /var/lib/apt/lists/*
rm -rf /var/lib/apt/lists/* && \
curl -LO https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl && \
chmod +x ./kubectl && \
mv ./kubectl /bin/


COPY dist/argoexec /bin/

Expand Down
4 changes: 2 additions & 2 deletions api/workflow/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type Artifact struct {
type Outputs struct {
Parameters []Parameter `json:"parameters,omitempty"`
Artifacts []Artifact `json:"artifacts,omitempty"`
Result *string `json:"result,omitempty"`
// TODO:
// - Result (output value from a script template)
// - Logs (log artifact(s) from the container)
}

Expand Down Expand Up @@ -135,7 +135,7 @@ type NodeStatus struct {
Name string `json:"name"`
Status string `json:"status"`
// Outputs captures output parameter values and artifact locations
Outputs Outputs `json:"outputs,omitempty"`
Outputs *Outputs `json:"outputs,omitempty"`
//ReturnCode *int `json:"returnCode"`
}

Expand Down
2 changes: 1 addition & 1 deletion examples/scripts-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ spec:
container:
image: alpine:latest
command: [sh, -c]
args: ["echo {{inputs.parameters.MESSAGE}}"]
args: ["echo result was: {{inputs.parameters.MESSAGE}}"]
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
AnnotationKeyNodeName = wfv1.CRDFullName + "/node-name"
// AnnotationKeyTemplate is the pod metadata annotation key containing the container template as JSON
AnnotationKeyTemplate = wfv1.CRDFullName + "/template"
// AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs
AnnotationKeyOutputs = wfv1.CRDFullName + "/outputs"

// LabelKeyArgoWorkflow is the pod metadata label to indidcate this pod is part of a workflow
LabelKeyArgoWorkflow = wfv1.CRDFullName + "/argo-workflow"
Expand Down
18 changes: 10 additions & 8 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,16 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
default:
newStatus = wfv1.NodeStatusError
}
tmplStr := pod.Annotations[common.AnnotationKeyTemplate]
var tmpl wfv1.Template
err = json.Unmarshal([]byte(tmplStr), &tmpl)
if err != nil {
log.Errorf("Failed to unmarshal %s template from pod annotation: %v", pod.Name, err)
newStatus = wfv1.NodeStatusError
} else {
node.Outputs = tmpl.Outputs
outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]
if ok {
var outputs wfv1.Outputs
err = json.Unmarshal([]byte(outputStr), &outputs)
if err != nil {
log.Errorf("Failed to unmarshal %s outputs from pod annotation: %v", pod.Name, err)
newStatus = wfv1.NodeStatusError
} else {
node.Outputs = &outputs
}
}
log.Infof("Updating node %s status %s -> %s", node, node.Status, newStatus)
node.Status = newStatus
Expand Down
66 changes: 45 additions & 21 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"encoding/json"
"fmt"
"io"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -179,7 +180,7 @@ func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Argume
}
tmpl := woc.wf.GetTemplate(templateName)
if tmpl == nil {
err := errors.Errorf(errors.CodeBadRequest, "Node %s error: template '%s' undefined", nodeName, templateName)
err := errors.Errorf(errors.CodeBadRequest, "Node %v error: template '%s' undefined", node, templateName)
woc.markNodeStatus(nodeName, wfv1.NodeStatusError)
return err
}
Expand Down Expand Up @@ -221,26 +222,23 @@ func processArgs(tmpl *wfv1.Template, args wfv1.Arguments) (*wfv1.Template, erro
// 1) check if was supplied as argument. if so use the supplied value from arg
// 2) if not, use default value.
// 3) if no default value, it is an error
newInputParameters := make([]wfv1.Parameter, len(tmpl.Inputs.Parameters))
tmpl = tmpl.DeepCopy()
for i, inParam := range tmpl.Inputs.Parameters {
if inParam.Default != nil {
// first set to default value
inParam.Value = inParam.Default
}
// overwrite value from argument (if supplied)
argParam := args.GetParameterByName(inParam.Name)
if argParam != nil {
if argParam.Value == nil {
return nil, errors.Errorf(errors.CodeBadRequest, "arguments.parameters.%s supplied no value", inParam.Name)
}
newInputParameters[i] = *argParam
continue
if argParam != nil && argParam.Value != nil {
newValue := *argParam.Value
inParam.Value = &newValue
}
if inParam.Default != nil {
newInputParameters[i] = wfv1.Parameter{
Name: inParam.Name,
Value: inParam.Default,
}
continue
if inParam.Value == nil {
return nil, errors.Errorf(errors.CodeBadRequest, "inputs.parameters.%s was not satisfied", inParam.Name)
}
return nil, errors.Errorf(errors.CodeBadRequest, "arguments.parameters.%s was not supplied", inParam.Name)
tmpl.Inputs.Parameters[i] = inParam
}
tmpl.Inputs.Parameters = newInputParameters
tmpl, err := substituteParams(tmpl)
if err != nil {
return nil, err
Expand All @@ -261,6 +259,7 @@ func processArgs(tmpl *wfv1.Template, args wfv1.Arguments) (*wfv1.Template, erro
if !argArt.HasLocation() {
return nil, errors.Errorf(errors.CodeBadRequest, "arguments.artifacts.%s missing location information", inArt.Name)
}
argArt.Path = inArt.Path
newInputArtifacts[i] = *argArt
}
tmpl.Inputs.Artifacts = newInputArtifacts
Expand Down Expand Up @@ -332,6 +331,13 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) er
// are not easily referenceable by user.
continue
}
if childNode.Outputs == nil {
continue
}
if childNode.Outputs.Result != nil {
key := fmt.Sprintf("steps.%s.outputs.result", stepName)
scope.addParamToScope(key, *childNode.Outputs.Result)
}
for _, outParam := range childNode.Outputs.Parameters {
key := fmt.Sprintf("steps.%s.outputs.parameters.%s", stepName, outParam.Name)
scope.addParamToScope(key, *outParam.Value)
Expand Down Expand Up @@ -444,12 +450,17 @@ func shouldExecute(when string) (bool, error) {
// resolveReferences replaces any references to outputs of previous steps, or artifacts in the inputs
// NOTE: by now, input parameters should have been substituted throughout the template, so we only
// are concerned with:
// 1) dereferencing parameters from previous steps
// 1) dereferencing output.parameters from previous steps
// 2) dereferencing output.result from previous steps
// 2) dereferencing artifacts from previous steps
// 3) dereferencing artifacts from inputs
func (woc *wfOperationCtx) resolveReferences(stepGroup map[string]wfv1.WorkflowStep, scope *wfScope) (map[string]wfv1.WorkflowStep, error) {
newStepGroup := make(map[string]wfv1.WorkflowStep)

if len(scope.scope) > 0 {
log.Printf("asdfsfd")
}

for stepName, step := range stepGroup {
newStep := step.DeepCopy()

Expand Down Expand Up @@ -516,7 +527,7 @@ func (woc *wfOperationCtx) expandStep(stepName string, step wfv1.WorkflowStep) (

expandedStep := make(map[string]wfv1.WorkflowStep)
for i, item := range step.WithItems {
replaceMap := make(map[string]interface{})
replaceMap := make(map[string]string)
var newStepName string
switch val := item.(type) {
case string:
Expand All @@ -543,7 +554,14 @@ func (woc *wfOperationCtx) expandStep(stepName string, step wfv1.WorkflowStep) (
default:
return nil, errors.Errorf(errors.CodeBadRequest, "withItems[%d] expected string or map. received: %s", i, val)
}
newStepStr := fstTmpl.ExecuteString(replaceMap)
newStepStr := fstTmpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
replacement, ok := replaceMap[tag]
if !ok {
return w.Write([]byte(fmt.Sprintf("{{%s}}", tag)))
}
return w.Write([]byte(replacement))
})

var newStep wfv1.WorkflowStep
err = json.Unmarshal([]byte(newStepStr), &newStep)
if err != nil {
Expand Down Expand Up @@ -573,15 +591,21 @@ func substituteParams(tmpl *wfv1.Template) (*wfv1.Template, error) {
return nil, errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(tmplBytes), "{{", "}}")
replaceMap := make(map[string]interface{})
replaceMap := make(map[string]string)

for _, inParam := range tmpl.Inputs.Parameters {
if inParam.Value == nil {
return nil, errors.InternalErrorf("inputs.parameters.%s had no value", inParam.Name)
}
replaceMap["inputs.parameters."+inParam.Name] = *inParam.Value
}
s := fstTmpl.ExecuteString(replaceMap)
s := fstTmpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
replacement, ok := replaceMap[tag]
if !ok {
return w.Write([]byte(fmt.Sprintf("{{%s}}", tag)))
}
return w.Write([]byte(replacement))
})
var newTmpl wfv1.Template
err = json.Unmarshal([]byte(s), &newTmpl)
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,8 @@ func addScriptVolume(pod *corev1.Pod) {
MountPath: common.ScriptTemplateEmptyDir,
}
initCtr.VolumeMounts = append(initCtr.VolumeMounts, volMount)

// HACK: debug purposes. sleep to experiment with init container artifacts
initCtr.Command = []string{"sh", "-c"}
initCtr.Args = []string{"sleep 999999; echo done"}

initCtr.Args = []string{"grep template /argo/podmetadata/annotations | cut -d = -f 2- | jq -rM '.' | jq -rM '.script.source' > /argo/script/source"}
pod.Spec.InitContainers[i] = initCtr
break
}
Expand All @@ -401,9 +398,14 @@ func addScriptVolume(pod *corev1.Pod) {
break
}
if ctr.Name == common.WaitContainerName {
// HACK: debug purposes. sleep to experiment with wait container artifacts
ctr.Command = []string{"sh", "-c"}
ctr.Args = []string{"sleep 999999; echo done"}
ctr.Args = []string{`
while true ; do kubectl get pod $ARGO_POD_NAME -o custom-columns=status:status.containerStatuses[0].state.terminated 2>/dev/null; if [ $? -eq 0 ] ; then break; fi; echo waiting; sleep 5; done &&
container_id=$(kubectl get pod $ARGO_POD_NAME -o jsonpath='{.status.containerStatuses[0].containerID}' | cut -d / -f 3-) &&
output=$(grep stdout /var/lib/docker/containers/$container_id/*.log | jq -r '.log') &&
outputjson={\"result\":\"$output\"} &&
kubectl annotate pods $ARGO_POD_NAME --overwrite workflows.argoproj.io/outputs=${outputjson}
`}
pod.Spec.Containers[i] = ctr
}
}
Expand Down

0 comments on commit 065a8f7

Please sign in to comment.