Skip to content

Commit

Permalink
feat(controller): Add per-namespace parallelism limits. Closes argopr…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 27, 2021
1 parent 73539fa commit e262b3a
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 55 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type Config struct {
// Parallelism limits the max total parallel workflows that can execute at the same time
Parallelism int `json:"parallelism,omitempty"`

// NamespaceParallelism limits the max workflows that can execute at the same time in a namespace
NamespaceParallelism int `json:"namespaceParallelism,omitempty"`

// ResourceRateLimit limits the rate at which pods are created
ResourceRateLimit *ResourceRateLimit `json:"resourceRateLimit,omitempty"`

Expand Down
6 changes: 6 additions & 0 deletions docs/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ data:
# (available since Argo v2.3). Controller must be restarted to take effect.
parallelism: 10

# Limit the maximum number of incomplete workflows in a namespace.
# Intended for cluster installs that are multi-tenancy environments, to prevent too many workflows in one
# namespace impacting others.
# >= v3.2
namespaceParallelism: "10"

# Globally limits the rate at which pods are created.
# This is intended to mitigate flooding of the Kubernetes API server by workflows with a large amount of
# parallel nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ data:
enabled: true
path: /metrics
port: 9090
namespaceParallelism: "10"
links: |
- name: Workflow Link
scope: workflow
Expand Down
20 changes: 14 additions & 6 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
}

func (wfc *WorkflowController) newThrottler() sync.Throttler {
return sync.NewThrottler(wfc.Config.Parallelism, func(key string) { wfc.wfQueue.AddRateLimited(key) })
f := func(key string) { wfc.wfQueue.AddRateLimited(key) }
return sync.ChainThrottler{
sync.NewThrottler(wfc.Config.Parallelism, sync.SingleBucket, f),
sync.NewThrottler(wfc.Config.NamespaceParallelism, sync.NamespaceBucket, f),
}
}

// RunTTLController runs the workflow TTL controller
Expand Down Expand Up @@ -649,11 +653,6 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

if !wfc.throttler.Admit(key.(string)) {
log.WithFields(log.Fields{"key": key}).Info("Workflow processing has been postponed due to max parallelism limit")
return true
}

wf, err := util.FromUnstructured(un)
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Warn("Failed to unmarshal key to workflow object")
Expand All @@ -673,6 +672,15 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {

woc := newWorkflowOperationCtx(wf, wfc)

if !wfc.throttler.Admit(key.(string)) {
log.WithField("key", key).Info("Workflow processing has been postponed due to max parallelism limit")
if woc.wf.Status.Phase == wfv1.WorkflowUnknown {
woc.markWorkflowPhase(ctx, wfv1.WorkflowPending, "Workflow processing has been postponed because too many workflows are already running")
woc.persistUpdates(ctx)
}
return true
}

// make sure this is removed from the throttler is complete
defer func() {
// must be done with woc
Expand Down
52 changes: 32 additions & 20 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,17 @@ func TestClusterController(t *testing.T) {
}

func TestParallelism(t *testing.T) {
cancel, controller := newController(
wfv1.MustUnmarshalWorkflow(`
for tt, f := range map[string]func(controller *WorkflowController){
"Parallelism": func(x *WorkflowController) {
x.Config.Parallelism = 1
},
"NamespaceParallelism": func(x *WorkflowController) {
x.Config.NamespaceParallelism = 1
},
} {
t.Run(tt, func(t *testing.T) {
cancel, controller := newController(
wfv1.MustUnmarshalWorkflow(`
metadata:
name: my-wf-0
spec:
Expand All @@ -471,7 +480,7 @@ spec:
container:
image: my-image
`),
wfv1.MustUnmarshalWorkflow(`
wfv1.MustUnmarshalWorkflow(`
metadata:
name: my-wf-1
spec:
Expand All @@ -481,23 +490,26 @@ spec:
container:
image: my-image
`),
func(controller *WorkflowController) { controller.Config.Parallelism = 1 },
)
defer cancel()
ctx := context.Background()
assert.True(t, controller.processNextItem(ctx))
assert.True(t, controller.processNextItem(ctx))

expectWorkflow(ctx, controller, "my-wf-0", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase)
}
})
expectWorkflow(ctx, controller, "my-wf-1", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
assert.Empty(t, wf.Status.Phase)
}
})
f,
)
defer cancel()
ctx := context.Background()
assert.True(t, controller.processNextItem(ctx))
assert.True(t, controller.processNextItem(ctx))

expectWorkflow(ctx, controller, "my-wf-0", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase)
}
})
expectWorkflow(ctx, controller, "my-wf-1", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase)
assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message)
}
})
})
}
}

func TestWorkflowController_archivedWorkflowGarbageCollector(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1854,7 +1854,7 @@ func (woc *wfOperationCtx) markWorkflowPhase(ctx context.Context, phase wfv1.Wor
}
markCompleted = phase.Completed()
}
if woc.wf.Status.StartedAt.IsZero() {
if woc.wf.Status.StartedAt.IsZero() && phase != wfv1.WorkflowPending {
woc.updated = true
woc.wf.Status.StartedAt = metav1.Time{Time: time.Now().UTC()}
woc.wf.Status.EstimatedDuration = woc.estimateWorkflowDuration()
Expand Down
30 changes: 30 additions & 0 deletions workflow/sync/chain_throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package sync

import (
"time"
)

type ChainThrottler []Throttler

func (c ChainThrottler) Add(key Key, priority int32, creationTime time.Time) {
for _, t := range c {
t.Add(key, priority, creationTime)
}
}

func (c ChainThrottler) Admit(key Key) bool {
for _, t := range c {
if !t.Admit(key) {
return false
}
}
return true
}

func (c ChainThrottler) Remove(key Key) {
for _, t := range c {
t.Remove(key)
}
}

var _ Throttler = ChainThrottler{}
24 changes: 24 additions & 0 deletions workflow/sync/chain_throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package sync

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/argoproj/argo-workflows/v3/workflow/sync/mocks"
)

func TestChainThrottler(t *testing.T) {
m := &mocks.Throttler{}
m.On("Add", "foo", int32(1), time.Time{}).Return()
m.On("Admit", "foo").Return(false)
m.On("Remove", "foo").Return()

c := ChainThrottler{m}
c.Add("foo", 1, time.Time{})
assert.False(t, c.Admit("foo"))
c.Remove("foo")

assert.True(t, ChainThrottler{}.Admit("foo"))
}
38 changes: 38 additions & 0 deletions workflow/sync/mocks/Throttler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e262b3a

Please sign in to comment.