From d5b06dcd4e52270a24f4f3b19497b9a9afaed4e9 Mon Sep 17 00:00:00 2001 From: Jesse Suen Date: Mon, 4 Dec 2017 01:41:47 -0800 Subject: [PATCH] Significantly increase efficiency of workflow control loop (resolves #505) The following improvements were made to reduce the amount of processing in the controller: 1. use a field selector in the pod watch to ignore Pending pods 2. introduce a TTL cache to prevent redundant evaluation of completed pods 3. update PodIP only for daemoned steps --- Gopkg.lock | 8 ++- cmd/workflow-controller/main.go | 1 + test/e2e/stress/pod-limits.yaml | 45 +++++++++++++ workflow/common/util.go | 23 +++++++ workflow/controller/controller.go | 100 +++++++++++++++++++++++------ workflow/controller/operator.go | 12 +++- workflow/controller/workflowpod.go | 4 +- 7 files changed, 168 insertions(+), 25 deletions(-) create mode 100644 test/e2e/stress/pod-limits.yaml diff --git a/Gopkg.lock b/Gopkg.lock index 1ab3ba0a1aeb..86820c557191 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -187,6 +187,12 @@ revision = "4e0f567303d4cc90ceb055a451959fb9fc391fb9" version = "3.0.3" +[[projects]] + name = "github.com/patrickmn/go-cache" + packages = ["."] + revision = "a3647f8e31d79543b2d0f0ae2fe5c379d72cedc0" + version = "v2.1.0" + [[projects]] branch = "master" name = "github.com/petar/GoLLRB" @@ -328,6 +334,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "09827c257bc95a09aa945e8a07525501c7e1a4bb44083e2ad435893f80d0b93f" + inputs-digest = "d88d04abf40902c38a8af161ff9af4b73f8ff14d96aeef5111972926f414f6d8" solver-name = "gps-cdcl" solver-version = 1 diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index f1b9be2ad4ba..d8b29e35c034 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -69,6 +69,7 @@ func Run(cmd *cobra.Command, args []string) { if err != nil { log.Fatalf("%+v", err) } + common.RegisterStackDumper() // initialize custom resource using a CustomResourceDefinition if it does not exist log.Infof("Creating Workflow CRD") diff --git a/test/e2e/stress/pod-limits.yaml b/test/e2e/stress/pod-limits.yaml new file mode 100644 index 000000000000..09f9efbf98ed --- /dev/null +++ b/test/e2e/stress/pod-limits.yaml @@ -0,0 +1,45 @@ +# Stress test to test upper bounds of concurrent pods +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pod-limits- +spec: + entrypoint: pod-limits + arguments: + parameters: + - name: limit + value: 1000 + + templates: + - name: pod-limits + inputs: + parameters: + - name: limit + steps: + - - name: gen-number-list + template: gen-number-list + arguments: + parameters: + - name: count + value: "{{inputs.parameters.limit}}" + - - name: run-pod + template: run-pod + withParam: "{{steps.gen-number-list.outputs.result}}" + + - name: gen-number-list + inputs: + parameters: + - name: count + script: + image: python:3.6 + command: [python] + source: | + import json + import sys + json.dump([i for i in range(1, {{inputs.parameters.count}}+1)], sys.stdout) + + - name: run-pod + container: + image: "alpine:3.7" + command: [sh, -c] + args: ["echo sleeping 1s; sleep 1"] diff --git a/workflow/common/util.go b/workflow/common/util.go index 240b92159e63..5d59d12be81a 100644 --- a/workflow/common/util.go +++ b/workflow/common/util.go @@ -5,9 +5,13 @@ import ( "encoding/json" "fmt" "io" + "os" "os/exec" + "os/signal" + "runtime" "strconv" "strings" + "syscall" "time" wfv1 "github.com/argoproj/argo/api/workflow/v1alpha1" @@ -263,3 +267,22 @@ func addPodMetadata(c *kubernetes.Clientset, field, podName, namespace, key, val } return err } + +// RegisterStackDumper spawns a goroutine which dumps stack trace upon a SIGUSR1 +func RegisterStackDumper() { + go func() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGUSR1) + for { + <-sigs + LogStack() + } + }() +} + +// LogStack will log the current stack +func LogStack() { + buf := make([]byte, 1<<20) + stacklen := runtime.Stack(buf, true) + log.Printf("*** goroutine dump...\n%s\n*** end\n", buf[:stacklen]) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index b832d5636f64..e8b0a46b56bb 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + goruntime "runtime" "time" wfv1 "github.com/argoproj/argo/api/workflow/v1alpha1" @@ -12,6 +13,7 @@ import ( workflowclient "github.com/argoproj/argo/workflow/client" "github.com/argoproj/argo/workflow/common" "github.com/ghodss/yaml" + gocache "github.com/patrickmn/go-cache" log "github.com/sirupsen/logrus" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +39,17 @@ type WorkflowController struct { clientset *kubernetes.Clientset wfUpdates chan *wfv1.Workflow podUpdates chan *apiv1.Pod + + // completedPodCache an in-memory cache of completed pods names. + // This is used to remember the fact that we marked a pod as completed. + // any future pod events from the watch can be ignored. This enables + // pod watch handler to quickly skip evaluation of duplicated pod entries + // in the pod channel. + // Ideally this would have been prevented using completed=true label + // which we apply on a pod, but somehow it is possible for the informer + // to enqueue pods which are missing the label (depite having added it), + // thus, we record these pods temporarily in a TTL cache. + completedPodCache *gocache.Cache } type WorkflowControllerConfig struct { @@ -72,19 +85,21 @@ func NewWorkflowController(config *rest.Config, configMap string) *WorkflowContr } wfc := WorkflowController{ - restClient: restClient, - restConfig: config, - clientset: clientset, - scheme: scheme, - ConfigMap: configMap, - wfUpdates: make(chan *wfv1.Workflow, 1024), - podUpdates: make(chan *apiv1.Pod, 1024), + restClient: restClient, + restConfig: config, + clientset: clientset, + scheme: scheme, + ConfigMap: configMap, + wfUpdates: make(chan *wfv1.Workflow, 10240), + podUpdates: make(chan *apiv1.Pod, 102400), + completedPodCache: gocache.New(1*time.Hour, 10*time.Minute), } return &wfc } // Run starts an Workflow resource controller func (wfc *WorkflowController) Run(ctx context.Context) error { + wfc.StartStatsTicker(5 * time.Minute) log.Info("Watch Workflow controller config map updates") _, err := wfc.watchControllerConfigMap(ctx) @@ -109,10 +124,15 @@ func (wfc *WorkflowController) Run(ctx context.Context) error { return err } + i := 0 for { + if i%100 == 0 { + // periodically print the channel sizes + i += 1 + log.Infof("wfChan=%d/%d podChan=%d/%d", len(wfc.wfUpdates), cap(wfc.wfUpdates), len(wfc.podUpdates), cap(wfc.podUpdates)) + } select { case wf := <-wfc.wfUpdates: - log.Infof("Processing wf: %v", wf.ObjectMeta.SelfLink) wfc.operateWorkflow(wf) case pod := <-wfc.podUpdates: wfc.handlePodUpdate(pod) @@ -309,6 +329,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { Namespace(namespace). Resource(resource). Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)). + Param("fieldSelector", "status.phase!=Pending"). VersionedParams(&options, metav1.ParameterCodec) req = wfc.addLabelSelectors(req) return req.Do().Get() @@ -320,6 +341,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { Namespace(namespace). Resource(resource). Param("labelSelector", fmt.Sprintf("%s=false", common.LabelKeyCompleted)). + Param("fieldSelector", "status.phase!=Pending"). VersionedParams(&options, metav1.ParameterCodec) req = wfc.addLabelSelectors(req) return req.Watch() @@ -368,9 +390,16 @@ func (wfc *WorkflowController) watchWorkflowPods(ctx context.Context) (cache.Con // handlePodUpdate receives an update from a pod, and updates the status of the node in the workflow object accordingly // It is also responsible for unsetting the deamoned flag from a node status when it notices that a daemoned pod terminated. func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) { + if _, ok := wfc.completedPodCache.Get(pod.ObjectMeta.Name); ok { + return + } + if pod.Labels[common.LabelKeyCompleted] == "true" { + return + } workflowName, ok := pod.Labels[common.LabelKeyWorkflow] if !ok { // Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly) + log.Warnf("watch returned pod unrelated to any workflow: %s", pod.ObjectMeta.Name) return } var newPhase wfv1.NodePhase @@ -378,6 +407,8 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) { var message string switch pod.Status.Phase { case apiv1.PodPending: + // Should not get here unless the watch is setup incorrectly + log.Warnf("watch returned a Pending pod: %s", pod.ObjectMeta.Name) return case apiv1.PodSucceeded: newPhase = wfv1.NodeSucceeded @@ -430,7 +461,7 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) { } updateNeeded := applyUpdates(pod, &node, newPhase, newDaemonStatus, message) if !updateNeeded { - log.Infof("No workflow updated needed for node %s", node) + log.Infof("No workflow updated needed for node %s (pod phase: %s)", node, pod.Status.Phase) } else { wf.Status.Nodes[pod.Name] = node _, err = wfClient.UpdateWorkflow(wf) @@ -454,9 +485,10 @@ func (wfc *WorkflowController) handlePodUpdate(pod *apiv1.Pod) { log.Errorf("Failed to label completed pod %s: %+v", node, err) return } + wfc.completedPodCache.SetDefault(pod.ObjectMeta.Name, true) log.Infof("Set completed=true label to pod: %s", node) } else { - log.Infof("Skipping completed labeling for daemoned pod: %s", node) + log.Infof("Skipping completed=true labeling for daemoned pod: %s", node) } } } @@ -556,11 +588,6 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase node.Phase = newPhase } } - if pod.Status.PodIP != node.PodIP { - log.Infof("Updating node %s IP %s -> %s", node, node.PodIP, pod.Status.PodIP) - updateNeeded = true - node.PodIP = pod.Status.PodIP - } if newDaemonStatus != nil { if *newDaemonStatus == false { // if the daemon status switched to false, we prefer to just unset daemoned status field @@ -571,6 +598,11 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase log.Infof("Setting node %v daemoned: %v -> %v", node, node.Daemoned, newDaemonStatus) node.Daemoned = newDaemonStatus updateNeeded = true + if pod.Status.PodIP != node.PodIP { + // only update Pod IP for daemoned nodes to reduce number of updates + log.Infof("Updating daemon node %s IP %s -> %s", node, node.PodIP, pod.Status.PodIP) + node.PodIP = pod.Status.PodIP + } } } outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs] @@ -591,11 +623,41 @@ func applyUpdates(pod *apiv1.Pod, node *wfv1.NodeStatus, newPhase wfv1.NodePhase node.Message = message } if node.Completed() && node.FinishedAt.IsZero() { - // TODO: rather than using time.Now(), we should use the latest container finishedAt - // timestamp to get a more accurate finished timestamp, in the event the controller is - // down or backlogged. But this would not work for daemoned containers. - node.FinishedAt = metav1.Time{Time: time.Now().UTC()} + if !node.IsDaemoned() { + // Use the latest container finishedAt timestamp, since the controller + // can get backlogged or become down. + for _, ctr := range pod.Status.InitContainerStatuses { + if ctr.State.Terminated != nil && ctr.State.Terminated.FinishedAt.After(node.FinishedAt.Time) { + node.FinishedAt = ctr.State.Terminated.FinishedAt + } + } + for _, ctr := range pod.Status.ContainerStatuses { + if ctr.State.Terminated != nil && ctr.State.Terminated.FinishedAt.After(node.FinishedAt.Time) { + node.FinishedAt = ctr.State.Terminated.FinishedAt + } + } + } + if node.FinishedAt.IsZero() { + // If we get here, the container is daemoned so the + // finishedAt might not have been set. + node.FinishedAt = metav1.Time{Time: time.Now().UTC()} + } updateNeeded = true } return updateNeeded } + +// StartStatsTicker starts a goroutine which dumps stats at a specified interval +func (wfc *WorkflowController) StartStatsTicker(d time.Duration) { + ticker := time.NewTicker(d) + go func() { + for { + <-ticker.C + var m goruntime.MemStats + goruntime.ReadMemStats(&m) + log.Infof("Alloc=%v TotalAlloc=%v Sys=%v NumGC=%v Goroutines=%d wfChan=%d/%d podChan=%d/%d", + m.Alloc/1024, m.TotalAlloc/1024, m.Sys/1024, m.NumGC, goruntime.NumGoroutine(), + len(wfc.wfUpdates), cap(wfc.wfUpdates), len(wfc.podUpdates), cap(wfc.podUpdates)) + } + }() +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index d764d3b4de34..afa3ff41f684 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -44,6 +44,12 @@ type wfScope struct { // operateWorkflow is the operator logic of a workflow // It evaluates the current state of the workflow and decides how to proceed down the execution path func (wfc *WorkflowController) operateWorkflow(wf *wfv1.Workflow) { + if wf.ObjectMeta.Labels[common.LabelKeyCompleted] == "true" { + // can get here if we already added the completed=true label, + // but we are still draining the controller's workflow channel + return + } + log.Infof("Processing wf: %v", wf.ObjectMeta.SelfLink) // NEVER modify objects from the store. It's a read-only, local cache. // You can use DeepCopy() to make a deep copy of original object and modify this copy // Or create a copy manually for better performance @@ -211,11 +217,11 @@ func (woc *wfOperationCtx) deletePVCs() error { } func (woc *wfOperationCtx) executeTemplate(templateName string, args wfv1.Arguments, nodeName string) error { - woc.log.Infof("Evaluating node %s: template: %s", nodeName, templateName) + woc.log.Debugf("Evaluating node %s: template: %s", nodeName, templateName) nodeID := woc.wf.NodeID(nodeName) node, ok := woc.wf.Status.Nodes[nodeID] if ok && node.Completed() { - woc.log.Infof("Node %s already completed", nodeName) + woc.log.Debugf("Node %s already completed", nodeName) return nil } tmpl := woc.wf.GetTemplate(templateName) @@ -422,7 +428,7 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod nodeID := woc.wf.NodeID(sgNodeName) node, ok := woc.wf.Status.Nodes[nodeID] if ok && node.Completed() { - woc.log.Infof("Step group node %v already marked completed", node) + woc.log.Debugf("Step group node %v already marked completed", node) return nil } if !ok { diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 8a2fe14b2b72..b15dae8e5cba 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -106,7 +106,7 @@ func envFromField(envVarName, fieldPath string) apiv1.EnvVar { } func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Template) error { - woc.log.Infof("Creating Pod: %s", nodeName) + woc.log.Debugf("Creating Pod: %s", nodeName) tmpl = tmpl.DeepCopy() waitCtr, err := woc.newWaitContainer(tmpl) if err != nil { @@ -215,7 +215,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, tmpl *wfv1.Templat if apierr.IsAlreadyExists(err) { // workflow pod names are deterministic. We can get here if // the controller fails to persist the workflow after creating the pod. - woc.log.Infof("pod %s already exists", nodeName) + woc.log.Infof("Skipped pod %s creation: already exists", nodeName) return nil } woc.log.Infof("Failed to create pod %s: %v", nodeName, err)