Skip to content

Commit

Permalink
feat(controller): Add config for potential CPU hogs (argoproj#5853)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 6, 2021
1 parent 7ec262a commit d66954f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ Note that these environment variables may be removed at any time.
| `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | Whether to always offload the node status. |
| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | The periodicity for GC of archived workflows. |
| `ARGO_TRACE` | `bool` | Whether to enable tracing statements in Argo components. |
| `CRON_SYNC_PERIOD` | `time.Duration` | How ofen to sync cron workflows. Default `10s` |
| `DEFAULT_REQUEUE_TIME` | `time.Duration` | The requeue time for the rate limiter of the workflow queue. |
| `EXPRESSION_TEMPLATES` | `bool` | Escape hatch to disable expression templates. Default `true`. |
| `GZIP_IMPLEMENTATION` | `string` | The implementation of compression/decompression. Currently only "PGZip" and "GZip" are supported. Defaults to "PGZip". |
| `INDEX_WORKFLOW_SEMAPHORE_KEYS` | `bool` | Whether or not to index semaphores. Defaults to `true`. |
| `LEADER_ELECTION_IDENTITY` | `string` | The ID used for workflow controllers to elect a leader. |
| `LEADER_ELECTION_DISABLE` | `bool` | Whether leader election should be disabled. |
| `LEADER_ELECTION_LEASE_DURATION` | `time.Duration` | The duration that non-leader candidates will wait to force acquire leadership. |
Expand Down
16 changes: 16 additions & 0 deletions workflow/controller/indexes/workflow_index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package indexes

import (
"os"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
Expand All @@ -9,6 +12,14 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

var (
indexWorkflowSemaphoreKeys = os.Getenv("INDEX_WORKFLOW_SEMAPHORE_KEYS") != "false"
)

func init() {
log.WithField("indexWorkflowSemaphoreKeys", indexWorkflowSemaphoreKeys).Info("index config")
}

func MetaWorkflowIndexFunc(obj interface{}) ([]string, error) {
m, err := meta.Accessor(obj)
if err != nil {
Expand All @@ -26,6 +37,11 @@ func WorkflowIndexValue(namespace, name string) string {
}

func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc {
if !indexWorkflowSemaphoreKeys {
return func(obj interface{}) ([]string, error) {
return nil, nil
}
}
return func(obj interface{}) ([]string, error) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
Expand Down
10 changes: 9 additions & 1 deletion workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"reflect"
"time"

"github.com/argoproj/argo-workflows/v3/util/env"

"github.com/argoproj/pkg/sync"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -53,12 +55,17 @@ const (
cronWorkflowWorkers = 8
)

var (
cronSyncPeriod = env.LookupEnvDurationOr("CRON_SYNC_PERIOD", 10*time.Second)
)

func init() {
// this make sure we support timezones
_, err := time.Parse(time.RFC822, "17 Oct 07 14:03 PST")
if err != nil {
log.Fatal(err)
}
log.WithField("cronSyncPeriod", cronSyncPeriod).Info("cron config")
}

func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, wfInformer cache.SharedIndexInformer, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
Expand Down Expand Up @@ -96,7 +103,8 @@ func (cc *Controller) Run(ctx context.Context) {
defer cc.cron.Stop()

go cc.cronWfInformer.Informer().Run(ctx.Done())
go wait.UntilWithContext(ctx, cc.syncAll, 10*time.Second)

go wait.UntilWithContext(ctx, cc.syncAll, cronSyncPeriod)

for i := 0; i < cronWorkflowWorkers; i++ {
go wait.Until(cc.runCronWorker, time.Second, ctx.Done())
Expand Down

0 comments on commit d66954f

Please sign in to comment.