Skip to content

Commit

Permalink
Initial support for daemon workflow steps (no termination yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Nov 9, 2017
1 parent 738b02d commit 227c196
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 36 deletions.
4 changes: 4 additions & 0 deletions api/workflow/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Template struct {
Inputs Inputs `json:"inputs,omitempty"`
Outputs Outputs `json:"outputs,omitempty"`

// Deamon indicates will allow a workflow to proceed to the next step if the container reaches readiness
Daemon *bool `json:"daemon,omitempty"`

// Workflow fields
Steps [][]WorkflowStep `json:"steps,omitempty"`

Expand Down Expand Up @@ -164,6 +167,7 @@ type NodeStatus struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
PodIP string `json:"podIP,omitempty"`
// Outputs captures output parameter values and artifact locations
Outputs *Outputs `json:"outputs,omitempty"`
//ReturnCode *int `json:"returnCode"`
Expand Down
65 changes: 65 additions & 0 deletions examples/daemon-step.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
apiVersion: argoproj.io/v1
kind: Workflow
metadata:
generateName: daemon-step-
spec:
entrypoint: daemon-example
templates:
- name: daemon-example
steps:
- - name: influx
template: influxdb

- - name: init-database
template: influxdb-client
arguments:
parameters:
- name: cmd
value: curl -XPOST 'http:https://{{steps.influx.ip}}:8086/query' --data-urlencode "q=CREATE DATABASE mydb"

- - name: producer-1
template: influxdb-client
arguments:
parameters:
- name: cmd
value: for i in $(seq 1 20); do curl -XPOST 'http:https://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server01,region=uswest load=$i" ; sleep .5 ; done
- name: producer-2
template: influxdb-client
arguments:
parameters:
- name: cmd
value: for i in $(seq 1 20); do curl -XPOST 'http:https://{{steps.influx.ip}}:8086/write?db=mydb' -d "cpu,host=server02,region=uswest load=$((RANDOM % 100))" ; sleep .5 ; done
- name: producer-3
template: influxdb-client
arguments:
parameters:
- name: cmd
value: curl -XPOST 'http:https://{{steps.influx.ip}}:8086/write?db=mydb' -d 'cpu,host=server03,region=useast load=15.4'

- - name: consumer
template: influxdb-client
arguments:
parameters:
- name: cmd
value: curl --silent -G http:https://{{steps.influx.ip}}:8086/query?pretty=true --data-urlencode "db=mydb" --data-urlencode "q=SELECT * FROM cpu"

- name: influxdb
daemon: true
container:
image: influxdb:1.2
restartPolicy: Always
readinessProbe:
httpGet:
path: /ping
port: 8086
initialDelaySeconds: 5
timeoutSeconds: 1

- name: influxdb-client
inputs:
parameters:
- name: cmd
container:
image: appropriate/curl:latest
command: ["sh", "-c"]
args: ["{{inputs.parameters.cmd}}"]
2 changes: 1 addition & 1 deletion examples/sidecar-dind.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
image: docker:17.10-dind
securityContext:
privileged: true
sidecarOptions:
options:
# volumeMirroring will mount the same volumes specified in the main container
# to the sidecar (including artifacts), at the same mountPaths. This enables
# dind daemon to partially see the same filesystem as the main container in
Expand Down
55 changes: 40 additions & 15 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,48 @@ func (wfc *WorkflowController) watchWorkflowPods(ctx context.Context) (cache.Con
}

func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
if pod.Status.Phase != apiv1.PodSucceeded && pod.Status.Phase != apiv1.PodFailed {
// Ignore pod updates for running pods
return
}
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
return
}
log.Infof("Processing completed pod: %v", pod.ObjectMeta.SelfLink)
var newStatus string
switch pod.Status.Phase {
case apiv1.PodPending:
return
case apiv1.PodSucceeded:
newStatus = wfv1.NodeStatusSucceeded
case apiv1.PodFailed:
newStatus = wfv1.NodeStatusFailed
case apiv1.PodRunning:
tmplStr, ok := pod.Annotations[common.AnnotationKeyTemplate]
if !ok {
log.Warnf("%s missing template annotation", pod.ObjectMeta.Name)
return
}
var tmpl wfv1.Template
err := json.Unmarshal([]byte(tmplStr), &tmpl)
if err != nil {
log.Warnf("%s template annotation unreadable: %v", pod.ObjectMeta.Name, err)
return
}
if tmpl.Daemon == nil || !*tmpl.Daemon {
return
}
// pod is running and template is marked daemon. check if everything is ready
for _, ctrStatus := range pod.Status.ContainerStatuses {
if !ctrStatus.Ready {
return
}
}
// proceed to mark node status as completed
newStatus = wfv1.NodeStatusSucceeded
log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink)
default:
log.Infof("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase)
newStatus = wfv1.NodeStatusError
}

wf, err := wfc.WorkflowClient.GetWorkflow(workflowName)
if err != nil {
log.Warnf("Failed to find workflow %s %+v", workflowName, err)
Expand All @@ -259,15 +292,6 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
log.Infof("node %v already marked completed (%s)", node, node.Status)
return
}
var newStatus string
switch pod.Status.Phase {
case apiv1.PodSucceeded:
newStatus = wfv1.NodeStatusSucceeded
case apiv1.PodFailed:
newStatus = wfv1.NodeStatusFailed
default:
newStatus = wfv1.NodeStatusError
}
outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]
if ok {
var outputs wfv1.Outputs
Expand All @@ -279,8 +303,9 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
node.Outputs = &outputs
}
}
log.Infof("Updating node %s status %s -> %s", node, node.Status, newStatus)
log.Infof("Updating node %s status %s -> %s (IP: %s)", node, node.Status, newStatus, pod.Status.PodIP)
node.Status = newStatus
node.PodIP = pod.Status.PodIP
wf.Status.Nodes[pod.Name] = node
_, err = wfc.WorkflowClient.UpdateWorkflow(wf)
if err != nil {
Expand Down
35 changes: 22 additions & 13 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"regexp"
"sort"
"strconv"
"strings"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
Expand Down Expand Up @@ -332,20 +333,23 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template) er
// are not easily referenceable by user.
continue
}
if childNode.Outputs == nil {
continue
}
if childNode.Outputs.Result != nil {
key := fmt.Sprintf("steps.%s.outputs.result", step.Name)
scope.addParamToScope(key, *childNode.Outputs.Result)
}
for _, outParam := range childNode.Outputs.Parameters {
key := fmt.Sprintf("steps.%s.outputs.parameters.%s", step.Name, outParam.Name)
scope.addParamToScope(key, *outParam.Value)
if childNode.PodIP != "" {
key := fmt.Sprintf("steps.%s.ip", step.Name)
scope.addParamToScope(key, childNode.PodIP)
}
for _, outArt := range childNode.Outputs.Artifacts {
key := fmt.Sprintf("steps.%s.outputs.artifacts.%s", step.Name, outArt.Name)
scope.addArtifactToScope(key, outArt)
if childNode.Outputs != nil {
if childNode.Outputs.Result != nil {
key := fmt.Sprintf("steps.%s.outputs.result", step.Name)
scope.addParamToScope(key, *childNode.Outputs.Result)
}
for _, outParam := range childNode.Outputs.Parameters {
key := fmt.Sprintf("steps.%s.outputs.parameters.%s", step.Name, outParam.Name)
scope.addParamToScope(key, *outParam.Value)
}
for _, outArt := range childNode.Outputs.Artifacts {
key := fmt.Sprintf("steps.%s.outputs.artifacts.%s", step.Name, outArt.Name)
scope.addArtifactToScope(key, outArt)
}
}
}
}
Expand Down Expand Up @@ -668,11 +672,16 @@ func replace(fstTmpl *fasttemplate.Template, replaceMap map[string]string, allow
replacement, ok := replaceMap[tag]
if !ok {
if allowUnresolved {
// just write the same string back
return w.Write([]byte(fmt.Sprintf("{{%s}}", tag)))
}
unresolvedErr = errors.Errorf(errors.CodeBadRequest, "Failed to resolve {{%s}}", tag)
return 0, nil
}
// The following escapes any special characters (e.g. newlines, tabs, etc...)
// in preparation for substitution
replacement = strconv.Quote(replacement)
replacement = replacement[1 : len(replacement)-1]
return w.Write([]byte(replacement))
})
if unresolvedErr != nil {
Expand Down
33 changes: 26 additions & 7 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package controller
import (
"encoding/json"
"fmt"
"io"

wfv1 "github.com/argoproj/argo/api/workflow/v1"
"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/common"
"github.com/valyala/fasttemplate"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -135,6 +137,19 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat
},
}

// Set the container template JSON in pod annotations, which executor
// will examine for things like artifact location/path.
// Also ensures that all variables have been resolved
tmplBytes, err := json.Marshal(tmpl)
if err != nil {
return err
}
err = verifyResolvedVariables(string(tmplBytes))
if err != nil {
return err
}
pod.ObjectMeta.Annotations[common.AnnotationKeyTemplate] = string(tmplBytes)

// Add init container only if it needs input artifacts
// or if it is a script template (which needs to populate the script)
if len(tmpl.Inputs.Artifacts) > 0 || tmpl.Script != nil {
Expand Down Expand Up @@ -162,13 +177,6 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat
return err
}

// Set the container template JSON in pod annotations, which executor will look to for artifact
tmplBytes, err := json.Marshal(tmpl)
if err != nil {
return err
}
pod.ObjectMeta.Annotations[common.AnnotationKeyTemplate] = string(tmplBytes)

created, err := woc.controller.podCl.Create(&pod)
if err != nil {
if apierr.IsAlreadyExists(err) {
Expand Down Expand Up @@ -497,3 +505,14 @@ func addSidecars(pod *apiv1.Pod, tmpl *wfv1.Template) error {
}
return nil
}

// verifyResolvedVariables is a helper to ensure all {{variables}} have been resolved
func verifyResolvedVariables(tmplStr string) error {
var unresolvedErr error
fstTmpl := fasttemplate.New(tmplStr, "{{", "}}")
fstTmpl.ExecuteFuncString(func(w io.Writer, tag string) (int, error) {
unresolvedErr = errors.Errorf(errors.CodeBadRequest, "Failed to resolve {{%s}}", tag)
return 0, nil
})
return unresolvedErr
}

0 comments on commit 227c196

Please sign in to comment.