Skip to content

Commit

Permalink
Significantly increase efficiency of workflow control loop (resolves a…
Browse files Browse the repository at this point in the history
…rgoproj#505)

The following improvements were made to reduce the amount of processing in the controller:
1. use a field selector in the pod watch to ignore Pending pods
2. introduce a TTL cache to prevent redundant evaluation of completed pods
3. update PodIP only for daemoned steps
  • Loading branch information
jessesuen committed Dec 4, 2017
1 parent 4b2098e commit d5b06dc
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 25 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func Run(cmd *cobra.Command, args []string) {
if err != nil {
log.Fatalf("%+v", err)
}
common.RegisterStackDumper()

// initialize custom resource using a CustomResourceDefinition if it does not exist
log.Infof("Creating Workflow CRD")
Expand Down
45 changes: 45 additions & 0 deletions test/e2e/stress/pod-limits.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Stress test to test upper bounds of concurrent pods
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-limits-
spec:
entrypoint: pod-limits
arguments:
parameters:
- name: limit
value: 1000

templates:
- name: pod-limits
inputs:
parameters:
- name: limit
steps:
- - name: gen-number-list
template: gen-number-list
arguments:
parameters:
- name: count
value: "{{inputs.parameters.limit}}"
- - name: run-pod
template: run-pod
withParam: "{{steps.gen-number-list.outputs.result}}"

- name: gen-number-list
inputs:
parameters:
- name: count
script:
image: python:3.6
command: [python]
source: |
import json
import sys
json.dump([i for i in range(1, {{inputs.parameters.count}}+1)], sys.stdout)
- name: run-pod
container:
image: "alpine:3.7"
command: [sh, -c]
args: ["echo sleeping 1s; sleep 1"]
23 changes: 23 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import (
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"
"time"

wfv1 "github.com/argoproj/argo/api/workflow/v1alpha1"
Expand Down Expand Up @@ -263,3 +267,22 @@ func addPodMetadata(c *kubernetes.Clientset, field, podName, namespace, key, val
}
return err
}

// RegisterStackDumper spawns a goroutine which dumps stack trace upon a SIGUSR1
func RegisterStackDumper() {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1)
for {
<-sigs
LogStack()
}
}()
}

// LogStack will log the current stack
func LogStack() {
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
log.Printf("*** goroutine dump...\n%s\n*** end\n", buf[:stacklen])
}
100 changes: 81 additions & 19 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"encoding/json"
"fmt"
"os"
goruntime "runtime"
"time"

wfv1 "github.com/argoproj/argo/api/workflow/v1alpha1"
"github.com/argoproj/argo/errors"
workflowclient "github.com/argoproj/argo/workflow/client"
"github.com/argoproj/argo/workflow/common"
"github.com/ghodss/yaml"
gocache "github.com/patrickmn/go-cache"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,6 +39,17 @@ type WorkflowController struct {
clientset *kubernetes.Clientset
wfUpdates chan *wfv1.Workflow
podUpdates chan *apiv1.Pod

// completedPodCache an in-memory cache of completed pods names.
// This is used to remember the fact that we marked a pod as completed.
// any future pod events from the watch can be ignored. This enables
// pod watch handler to quickly skip evaluation of duplicated pod entries
// in the pod channel.
// Ideally this would have been prevented using completed=true label
// which we apply on a pod, but somehow it is possible for the informer
// to enqueue pods which are missing the label (depite having added it),
// thus, we record these pods temporarily in a TTL cache.
completedPodCache *gocache.Cache
}

type WorkflowControllerConfig struct {
Expand Down Expand Up @@ -72,19 +85,21 @@ func NewWorkflowController(config *rest.Config, configMap string) *WorkflowContr
}

wfc := WorkflowController{
restClient: restClient,
restConfig: config,
clientset: clientset,
scheme: scheme,
ConfigMap: configMap,
wfUpdates: make(chan *wfv1.Workflow, 1024),
podUpdates: make(chan *apiv1.Pod, 1024),
restClient: restClient,
restConfig: config,
clientset: clientset,
scheme: scheme,
ConfigMap: configMap,
wfUpdates: make(chan *wfv1.Workflow, 10240),
podUpdates: make(chan *apiv1.Pod, 102400),
completedPodCache: gocache.New(1*time.Hour, 10*time.Minute),
}
return &wfc
}

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context) error {
wfc.StartStatsTicker(5 * time.Minute)

log.Info("Watch Workflow controller config map updates")
_, err := wfc.watchControllerConfigMap(ctx)
Expand All @@ -109,10 +124,15 @@ func (wfc *WorkflowController) Run(ctx context.Context) error {
return err
}

i := 0
for {
if i%100 == 0 {
// periodically print the channel sizes
i += 1
log.Infof("wfChan=%d/%d podChan=%d/%d", len(wfc.wfUpdates), cap(wfc.wfUpdates), len(wfc.podUpdates), cap(wfc.podUpdates))
}
select {
case wf := <-wfc.wfUpdates:
log.Infof("Processing wf: %v", wf.ObjectMeta.SelfLink)
wfc.operateWorkflow(wf)
case pod := <-wfc.podUpdates:
wfc.handlePodUpdate(pod)
Expand Down Expand Up @@ -309,6 +329,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch {
Namespace(namespace).
Resource(resource).
Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)).
Param("fieldSelector", "status.phase!=Pending").
VersionedParams(&options, metav1.ParameterCodec)
req = wfc.addLabelSelectors(req)
return req.Do().Get()
Expand All @@ -320,6 +341,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch {
Namespace(namespace).
Resource(resource).
Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)).
Param("fieldSelector", "status.phase!=Pending").
VersionedParams(&options, metav1.ParameterCodec)
req = wfc.addLabelSelectors(req)
return req.Watch()
Expand Down Expand Up @@ -368,16 +390,25 @@ func (wfc *WorkflowController) watchWorkflowPods(ctx context.Context) (cache.Con
// handlePodUpdate receives an update from a pod, and updates the status of the node in the workflow object accordingly
// It is also responsible for unsetting the deamoned flag from a node status when it notices that a daemoned pod terminated.
func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
if _, ok := wfc.completedPodCache.Get(pod.ObjectMeta.Name); ok {
return
}
if pod.Labels[common.LabelKeyCompleted] == "true" {
return
}
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
log.Warnf("watch returned pod unrelated to any workflow: %s", pod.ObjectMeta.Name)
return
}
var newPhase wfv1.NodePhase
var newDaemonStatus *bool
var message string
switch pod.Status.Phase {
case apiv1.PodPending:
// Should not get here unless the watch is setup incorrectly
log.Warnf("watch returned a Pending pod: %s", pod.ObjectMeta.Name)
return
case apiv1.PodSucceeded:
newPhase = wfv1.NodeSucceeded
Expand Down Expand Up @@ -430,7 +461,7 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
}
updateNeeded := applyUpdates(pod, &node, newPhase, newDaemonStatus, message)
if !updateNeeded {
log.Infof("No workflow updated needed for node %s", node)
log.Infof("No workflow updated needed for node %s (pod phase: %s)", node, pod.Status.Phase)
} else {
wf.Status.Nodes[pod.Name] = node
_, err = wfClient.UpdateWorkflow(wf)
Expand All @@ -454,9 +485,10 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) {
log.Errorf("Failed to label completed pod %s: %+v", node, err)
return
}
wfc.completedPodCache.SetDefault(pod.ObjectMeta.Name, true)
log.Infof("Set completed=true label to pod: %s", node)
} else {
log.Infof("Skipping completed labeling for daemoned pod: %s", node)
log.Infof("Skipping completed=true labeling for daemoned pod: %s", node)
}
}
}
Expand Down Expand Up @@ -556,11 +588,6 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase
node.Phase = newPhase
}
}
if pod.Status.PodIP != node.PodIP {
log.Infof("Updating node %s IP %s -> %s", node, node.PodIP, pod.Status.PodIP)
updateNeeded = true
node.PodIP = pod.Status.PodIP
}
if newDaemonStatus != nil {
if *newDaemonStatus == false {
// if the daemon status switched to false, we prefer to just unset daemoned status field
Expand All @@ -571,6 +598,11 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase
log.Infof("Setting node %v daemoned: %v -> %v", node, node.Daemoned, newDaemonStatus)
node.Daemoned = newDaemonStatus
updateNeeded = true
if pod.Status.PodIP != node.PodIP {
// only update Pod IP for daemoned nodes to reduce number of updates
log.Infof("Updating daemon node %s IP %s -> %s", node, node.PodIP, pod.Status.PodIP)
node.PodIP = pod.Status.PodIP
}
}
}
outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs]
Expand All @@ -591,11 +623,41 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase
node.Message = message
}
if node.Completed() && node.FinishedAt.IsZero() {
// TODO: rather than using time.Now(), we should use the latest container finishedAt
// timestamp to get a more accurate finished timestamp, in the event the controller is
// down or backlogged. But this would not work for daemoned containers.
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
if !node.IsDaemoned() {
// Use the latest container finishedAt timestamp, since the controller
// can get backlogged or become down.
for _, ctr := range pod.Status.InitContainerStatuses {
if ctr.State.Terminated != nil && ctr.State.Terminated.FinishedAt.After(node.FinishedAt.Time) {
node.FinishedAt = ctr.State.Terminated.FinishedAt
}
}
for _, ctr := range pod.Status.ContainerStatuses {
if ctr.State.Terminated != nil && ctr.State.Terminated.FinishedAt.After(node.FinishedAt.Time) {
node.FinishedAt = ctr.State.Terminated.FinishedAt
}
}
}
if node.FinishedAt.IsZero() {
// If we get here, the container is daemoned so the
// finishedAt might not have been set.
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
}
updateNeeded = true
}
return updateNeeded
}

// StartStatsTicker starts a goroutine which dumps stats at a specified interval
func (wfc *WorkflowController) StartStatsTicker(d time.Duration) {
ticker := time.NewTicker(d)
go func() {
for {
<-ticker.C
var m goruntime.MemStats
goruntime.ReadMemStats(&m)
log.Infof("Alloc=%v TotalAlloc=%v Sys=%v NumGC=%v Goroutines=%d wfChan=%d/%d podChan=%d/%d",
m.Alloc/1024, m.TotalAlloc/1024, m.Sys/1024, m.NumGC, goruntime.NumGoroutine(),
len(wfc.wfUpdates), cap(wfc.wfUpdates), len(wfc.podUpdates), cap(wfc.podUpdates))
}
}()
}
12 changes: 9 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ type wfScope struct {
// operateWorkflow is the operator logic of a workflow
// It evaluates the current state of the workflow and decides how to proceed down the execution path
func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) {
if wf.ObjectMeta.Labels[common.LabelKeyCompleted] == "true" {
// can get here if we already added the completed=true label,
// but we are still draining the controller's workflow channel
return
}
log.Infof("Processing wf: %v", wf.ObjectMeta.SelfLink)
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
Expand Down Expand Up @@ -211,11 +217,11 @@ func (woc *wfOperationCtx) deletePVCs() error {
}

func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string) error {
woc.log.Infof("Evaluating node %s: template: %s", nodeName, templateName)
woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName)
nodeID := woc.wf.NodeID(nodeName)
node, ok := woc.wf.Status.Nodes[nodeID]
if ok && node.Completed() {
woc.log.Infof("Node %s already completed", nodeName)
woc.log.Debugf("Node %s already completed", nodeName)
return nil
}
tmpl := woc.wf.GetTemplate(templateName)
Expand Down Expand Up @@ -422,7 +428,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
nodeID := woc.wf.NodeID(sgNodeName)
node, ok := woc.wf.Status.Nodes[nodeID]
if ok && node.Completed() {
woc.log.Infof("Step group node %v already marked completed", node)
woc.log.Debugf("Step group node %v already marked completed", node)
return nil
}
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func envFromField(envVarName, fieldPath string) apiv1.EnvVar {
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Template) error {
woc.log.Infof("Creating Pod: %s", nodeName)
woc.log.Debugf("Creating Pod: %s", nodeName)
tmpl = tmpl.DeepCopy()
waitCtr, err := woc.newWaitContainer(tmpl)
if err != nil {
Expand Down Expand Up @@ -215,7 +215,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat
if apierr.IsAlreadyExists(err) {
// workflow pod names are deterministic. We can get here if
// the controller fails to persist the workflow after creating the pod.
woc.log.Infof("pod %s already exists", nodeName)
woc.log.Infof("Skipped pod %s creation: already exists", nodeName)
return nil
}
woc.log.Infof("Failed to create pod %s: %v", nodeName, err)
Expand Down

0 comments on commit d5b06dc

Please sign in to comment.