Skip to content

Commit

Permalink
Issue argoproj#740 - System level workflow parallelism limits & prior…
Browse files Browse the repository at this point in the history
…ities (argoproj#1065)

* Issue argoproj#740 - System level workflow parallelism limits & priorities

* Apply reviewer notes
  • Loading branch information
alexmt authored Nov 7, 2018
1 parent a53a76e commit afdac9b
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 11 deletions.
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,11 @@
"type": "integer",
"format": "int64"
},
"priority": {
"description": "Priority is used if controller is configured to process limited number of workflows in parallel. Workflows with higher priority are processed first.",
"type": "integer",
"format": "int32"
},
"serviceAccountName": {
"description": "ServiceAccountName is the name of the ServiceAccount to run all pods of the workflow as.",
"type": "string"
Expand Down
4 changes: 2 additions & 2 deletions cmd/argo/commands/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func printTable(wfList []wfv1.Workflow, listArgs *listFlags) {
if listArgs.allNamespaces {
fmt.Fprint(w, "NAMESPACE\t")
}
fmt.Fprint(w, "NAME\tSTATUS\tAGE\tDURATION")
fmt.Fprint(w, "NAME\tSTATUS\tAGE\tDURATION\tPRIORITY")
if listArgs.output == "wide" {
fmt.Fprint(w, "\tP/R/C\tPARAMETERS")
}
Expand All @@ -119,7 +119,7 @@ func printTable(wfList []wfv1.Workflow, listArgs *listFlags) {
if listArgs.allNamespaces {
fmt.Fprintf(w, "%s\t", wf.ObjectMeta.Namespace)
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s", wf.ObjectMeta.Name, worklowStatus(&wf), ageStr, durationStr)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d", wf.ObjectMeta.Name, worklowStatus(&wf), ageStr, durationStr, wf.Spec.Priority)
if listArgs.output == "wide" {
pending, running, completed := countPendingRunningCompleted(&wf)
fmt.Fprintf(w, "\t%d/%d/%d", pending, running, completed)
Expand Down
16 changes: 12 additions & 4 deletions cmd/argo/commands/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ import (

// cliSubmitOpts holds submition options specific to CLI submission (e.g. controlling output)
type cliSubmitOpts struct {
output string // --output
wait bool // --wait
watch bool // --watch
strict bool // --strict
output string // --output
wait bool // --wait
watch bool // --watch
strict bool // --strict
priority *int32 // --priority
}

func NewSubmitCommand() *cobra.Command {
var (
submitOpts util.SubmitOpts
cliSubmitOpts cliSubmitOpts
priority int32
)
var command = &cobra.Command{
Use: "submit FILE1 FILE2...",
Expand All @@ -37,6 +39,10 @@ func NewSubmitCommand() *cobra.Command {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
if cmd.Flag("priority").Changed {
cliSubmitOpts.priority = &priority
}

SubmitWorkflows(args, &submitOpts, &cliSubmitOpts)
},
}
Expand All @@ -51,6 +57,7 @@ func NewSubmitCommand() *cobra.Command {
command.Flags().BoolVarP(&cliSubmitOpts.wait, "wait", "w", false, "wait for the workflow to complete")
command.Flags().BoolVar(&cliSubmitOpts.watch, "watch", false, "watch the workflow until it completes")
command.Flags().BoolVar(&cliSubmitOpts.strict, "strict", true, "perform strict workflow validation")
command.Flags().Int32Var(&priority, "priority", 0, "workflow priority")
return command
}

Expand Down Expand Up @@ -106,6 +113,7 @@ func SubmitWorkflows(filePaths []string, submitOpts *util.SubmitOpts, cliOpts *c

var workflowNames []string
for _, wf := range workflows {
wf.Spec.Priority = cliOpts.priority
created, err := util.SubmitWorkflow(wfClient, &wf, submitOpts)
if err != nil {
log.Fatalf("Failed to submit workflow: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions docs/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ data:
# controller to run with namespace scope (role), instead of cluster scope (clusterrole).
namespace: argo
# Parallelism limits the max total parallel workflows that can execute at the same time
parallelism: 10
# artifactRepository defines the default location to be used as the artifact repository for
# container artifacts.
artifactRepository:
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 16 additions & 5 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ type WorkflowSpec struct {
// allowed to run before the controller terminates the workflow. A value of zero is used to
// terminate a Running workflow
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`
// Priority is used if controller is configured to process limited number of workflows in parallel. Workflows with higher priority are processed first.
Priority *int32 `json:"priority,omitempty"`
}

// Template is a reusable and composable unit of execution in a workflow
Expand Down Expand Up @@ -530,12 +532,21 @@ func (n NodeStatus) String() string {
return fmt.Sprintf("%s (%s)", n.Name, n.ID)
}

// Completed returns whether or not the node has completed execution
func isCompletedPhase(phase NodePhase) bool {
return phase == NodeSucceeded ||
phase == NodeFailed ||
phase == NodeError ||
phase == NodeSkipped
}

// Remove returns whether or not the workflow has completed execution
func (ws *WorkflowStatus) Completed() bool {
return isCompletedPhase(ws.Phase)
}

// Remove returns whether or not the node has completed execution
func (n NodeStatus) Completed() bool {
return n.Phase == NodeSucceeded ||
n.Phase == NodeFailed ||
n.Phase == NodeError ||
n.Phase == NodeSkipped
return isCompletedPhase(n.Phase)
}

// IsDaemoned returns whether or not the node is deamoned
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type WorkflowControllerConfig struct {
MetricsConfig metrics.PrometheusConfig `json:"metricsConfig,omitempty"`

TelemetryConfig metrics.PrometheusConfig `json:"telemetryConfig,omitempty"`

// Parallelism limits the max total parallel workflows that can execute at the same time
Parallelism int `json:"parallelism,omitempty"`
}

// ArtifactRepository represents a artifact repository in which a controller will store its artifacts
Expand Down Expand Up @@ -114,6 +117,7 @@ func (wfc *WorkflowController) updateConfig(cm *apiv1.ConfigMap) error {
return errors.Errorf(errors.CodeBadRequest, "ConfigMap '%s' does not have executorImage", wfc.configMap)
}
wfc.Config = config
wfc.throttler.SetParallelism(config.Parallelism)
return nil
}

Expand Down
36 changes: 36 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type WorkflowController struct {
wfQueue workqueue.RateLimitingInterface
podQueue workqueue.RateLimitingInterface
completedPods chan string
throttler Throttler
}

const (
Expand Down Expand Up @@ -86,6 +87,7 @@ func NewWorkflowController(
podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
completedPods: make(chan string, 512),
}
wfc.throttler = NewThrottler(0, wfc.wfQueue)
return &wfc
}

Expand Down Expand Up @@ -217,22 +219,35 @@ func (wfc *WorkflowController) processNextItem() bool {
log.Warnf("Key '%s' in index is not an unstructured", key)
return true
}

if key, ok = wfc.throttler.Next(key); !ok {
log.Warnf("Workflow %s processing has been postponed due to max parallelism limit", key)
return true
}

wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("Failed to unmarshal key '%s' to workflow object: %v", key, err)
woc := newWorkflowOperationCtx(wf, wfc)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
return true
}

if wf.ObjectMeta.Labels[common.LabelKeyCompleted] == "true" {
wfc.throttler.Remove(key)
// can get here if we already added the completed=true label,
// but we are still draining the controller's workflow workqueue
return true
}

woc := newWorkflowOperationCtx(wf, wfc)
woc.operate()
if woc.wf.Status.Completed() {
wfc.throttler.Remove(key)
}

// TODO: operate should return error if it was unable to operate properly
// so we can requeue the work for a later time
// See: https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go
Expand Down Expand Up @@ -307,19 +322,39 @@ func (wfc *WorkflowController) tweakWorkflowMetricslist(options *metav1.ListOpti
options.LabelSelector = labelSelector.String()
}

func getWfPriority(obj interface{}) (int32, time.Time) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
return 0, time.Now()
}
priority, hasPriority, err := unstructured.NestedInt64(un.Object, "spec", "priority")
if err != nil {
return 0, un.GetCreationTimestamp().Time
}
if !hasPriority {
priority = 0
}

return int32(priority), un.GetCreationTimestamp().Time
}

func (wfc *WorkflowController) addWorkflowInformerHandler() {
wfc.wfInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
wfc.wfQueue.Add(key)
priority, creation := getWfPriority(obj)
wfc.throttler.Add(key, priority, creation)
}
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
wfc.wfQueue.Add(key)
priority, creation := getWfPriority(new)
wfc.throttler.Add(key, priority, creation)
}
},
DeleteFunc: func(obj interface{}) {
Expand All @@ -328,6 +363,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandler() {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.wfQueue.Add(key)
wfc.throttler.Remove(key)
}
},
},
Expand Down
Loading

0 comments on commit afdac9b

Please sign in to comment.