Skip to content

Commit

Permalink
refactor: Use polling model for workflow phase metric (argoproj#4557)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Behar <[email protected]>
  • Loading branch information
simster7 committed Nov 24, 2020
1 parent 99240ad commit 4531d79
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 139 deletions.
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aliyun/aliyun-oss-go-sdk v2.0.6+incompatible h1:ZDgadcjGIrbHMBLSqQVHkMOdNd/jF6bsSRJd/Ysxlos=
github.com/aliyun/aliyun-oss-go-sdk v2.0.6+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
Expand Down Expand Up @@ -1047,6 +1049,7 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc h1:TnonUr8u3himcMY0vSh23jFOXA+cnucl1gB6EQTReBI=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
64 changes: 19 additions & 45 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ import (
"github.com/argoproj/argo/workflow/util"
)

const enoughTimeForInformerSync = 1 * time.Second

const semaphoreConfigIndexName = "bySemaphoreConfigMap"

// WorkflowController is the controller for workflow resources
type WorkflowController struct {
// namespace of the workflow controller
Expand Down Expand Up @@ -110,6 +106,7 @@ const (
workflowTemplateResyncPeriod = 20 * time.Minute
podResyncPeriod = 30 * time.Minute
clusterWorkflowTemplateResyncPeriod = 20 * time.Minute
enoughTimeForInformerSync = 1 * time.Second
)

// NewWorkflowController instantiates a new WorkflowController
Expand Down Expand Up @@ -171,7 +168,8 @@ var indexers = cache.Indexers{
indexes.ClusterWorkflowTemplateIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyClusterWorkflowTemplate),
indexes.CronWorkflowIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyCronWorkflow),
indexes.WorkflowTemplateIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyWorkflowTemplate),
semaphoreConfigIndexName: workflowIndexerBySemaphoreKeys,
indexes.SemaphoreConfigIndexName: indexes.WorkflowSemaphoreKeysIndexFunc(),
indexes.WorkflowPhaseIndex: indexes.MetaLabelIndexFunc(common.LabelKeyPhase),
}

// Run starts an Workflow resource controller
Expand Down Expand Up @@ -201,7 +199,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in

go wfc.runTTLController(ctx)
go wfc.runCronController(ctx)

go wfc.metrics.RunServer(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())

wfc.createClusterWorkflowTemplateInformer(ctx)
wfc.waitForCacheSync(ctx)
Expand All @@ -221,20 +221,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
<-ctx.Done()
}

func workflowIndexerBySemaphoreKeys(obj interface{}) ([]string, error) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("cannot convert obj into unstructured.Unstructured in Indexer %s", semaphoreConfigIndexName)
return []string{}, nil
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("failed to convert to workflow from unstructured: %v", err)
return []string{}, nil
}
return wf.GetSemaphoreKeys(), nil
}

func (wfc *WorkflowController) waitForCacheSync(ctx context.Context) {
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(ctx.Done(), wfc.wfInformer.HasSynced, wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced) {
Expand Down Expand Up @@ -318,15 +304,15 @@ func (wfc *WorkflowController) runConfigMapWatcher(stopCh <-chan struct{}) {

// notifySemaphoreConfigUpdate will notify semaphore config update to pending workflows
func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) {
wfs, err := wfc.wfInformer.GetIndexer().ByIndex(semaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name))
wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name))
if err != nil {
log.Errorf("failed get the workflow from informer. %v", err)
}

for _, obj := range wfs {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("received object from indexer %s is not an unstructured", semaphoreConfigIndexName)
log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName)
continue
}
wf, err := util.FromUnstructured(un)
Expand Down Expand Up @@ -698,21 +684,6 @@ func getWfPriority(obj interface{}) (int32, time.Time) {
return int32(priority), un.GetCreationTimestamp().Time
}

func getWfPhase(obj interface{}) wfv1.NodePhase {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
return ""
}
phase, hasPhase, err := unstructured.NestedString(un.Object, "status", "phase")
if err != nil {
return ""
}
if !hasPhase {
return wfv1.NodePending
}
return wfv1.NodePhase(phase)
}

func (wfc *WorkflowController) addWorkflowInformerHandlers() {
wfc.wfInformer.AddEventHandler(
cache.FilteringResourceEventHandler{
Expand Down Expand Up @@ -769,18 +740,10 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
},
)
wfc.wfInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
wf := obj.(*unstructured.Unstructured)
wfc.metrics.WorkflowAdded(string(wf.GetUID()), getWfPhase(obj))
},
UpdateFunc: func(old, new interface{}) {
wf := new.(*unstructured.Unstructured)
wfc.metrics.WorkflowUpdated(string(wf.GetUID()), getWfPhase(old), getWfPhase(new))
},
DeleteFunc: func(obj interface{}) {
wf, ok := obj.(*unstructured.Unstructured)
if ok { // maybe cache.DeletedFinalStateUnknown
wfc.metrics.WorkflowDeleted(string(wf.GetUID()), getWfPhase(obj))
wfc.metrics.StopRealtimeMetricsForKey(string(wf.GetUID()))
}
},
})
Expand Down Expand Up @@ -991,3 +954,14 @@ func (wfc *WorkflowController) releaseAllWorkflowLocks(obj interface{}) {
func (wfc *WorkflowController) isArchivable(wf *wfv1.Workflow) bool {
return wfc.archiveLabelSelector.Matches(labels.Set(wf.Labels))
}

func (wfc *WorkflowController) syncWorkflowPhaseMetrics() {
for _, phase := range []wfv1.NodePhase{wfv1.NodePending, wfv1.NodeRunning, wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError} {
objs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.WorkflowPhaseIndex, string(phase))
if err != nil {
log.WithError(err).Errorf("failed to list workflows by '%s'", phase)
continue
}
wfc.metrics.SetWorkflowPhaseGauge(phase, len(objs))
}
}
3 changes: 3 additions & 0 deletions workflow/controller/indexes/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@ package indexes
const (
ClusterWorkflowTemplateIndex = "clusterworkflowtemplate"
CronWorkflowIndex = "cronworkflow"
WorkflowIndex = "workflow"
WorkflowTemplateIndex = "workflowtemplate"
WorkflowPhaseIndex = "workflow.phase"
SemaphoreConfigIndexName = "bySemaphoreConfigMap"
)
14 changes: 14 additions & 0 deletions workflow/controller/indexes/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ func MetaNamespaceLabelIndex(namespace, label string) string {
return namespace + "/" + label
}

func MetaLabelIndexFunc(label string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
if err != nil {
return []string{}, fmt.Errorf("object has no meta: %v", err)
}
if value, exists := v.GetLabels()[label]; exists {
return []string{value}, nil
} else {
return []string{}, nil
}
}
}

func MetaNamespaceLabelIndexFunc(label string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
Expand Down
10 changes: 10 additions & 0 deletions workflow/controller/indexes/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,14 @@ func TestMetaNamespaceLabelIndexFunc(t *testing.T) {
assert.NoError(t, err)
assert.ElementsMatch(t, values, []string{"my-ns/my-value"})
})
t.Run("Labelled No Namespace", func(t *testing.T) {
values, err := MetaLabelIndexFunc("my-label")(&wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "my-ns",
Labels: map[string]string{"my-label": "my-value"},
},
})
assert.NoError(t, err)
assert.ElementsMatch(t, values, []string{"my-value"})
})
}
22 changes: 20 additions & 2 deletions workflow/controller/indexes/workflow_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package indexes
import (
"fmt"

"github.com/prometheus/common/log"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
)

const WorkflowIndex = "workflow"

func MetaWorkflowIndexFunc(obj interface{}) ([]string, error) {
m, err := meta.Accessor(obj)
if err != nil {
Expand All @@ -25,3 +27,19 @@ func MetaWorkflowIndexFunc(obj interface{}) ([]string, error) {
func WorkflowIndexValue(namespace, name string) string {
return namespace + "/" + name
}

func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("cannot convert obj into unstructured.Unstructured in Indexer %s", SemaphoreConfigIndexName)
return []string{}, nil
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("failed to convert to workflow from unstructured: %v", err)
return []string{}, nil
}
return wf.GetSemaphoreKeys(), nil
}
}
46 changes: 10 additions & 36 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,53 +122,20 @@ func (m *Metrics) allMetrics() []prometheus.Metric {
return allMetrics
}

func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, exists := m.workflows[key]; exists {
return
}
m.workflows[key] = []string{}
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Inc()
}
}

func (m *Metrics) WorkflowUpdated(key string, fromPhase, toPhase v1alpha1.NodePhase) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, exists := m.workflows[key]; !exists || fromPhase == toPhase {
return
}
if _, ok := m.workflowsByPhase[fromPhase]; ok {
m.workflowsByPhase[fromPhase].Dec()
}
if _, ok := m.workflowsByPhase[toPhase]; ok {
m.workflowsByPhase[toPhase].Inc()
}
}

func (m *Metrics) WorkflowDeleted(key string, phase v1alpha1.NodePhase) {
func (m *Metrics) StopRealtimeMetricsForKey(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, exists := m.workflows[key]; !exists {
return
}
m.StopRealtimeMetricsForKey(key)
delete(m.workflows, key)
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Dec()
}
}

func (m *Metrics) StopRealtimeMetricsForKey(key string) {
realtimeMetrics := m.workflows[key]
for _, metric := range realtimeMetrics {
delete(m.customMetrics, metric)
}

delete(m.workflows, key)
}

func (m *Metrics) OperationCompleted(durationSeconds float64) {
Expand Down Expand Up @@ -210,6 +177,13 @@ func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prom
return nil
}

func (m *Metrics) SetWorkflowPhaseGauge(phase v1alpha1.NodePhase, num int) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.workflowsByPhase[phase].Set(float64(num))
}

type ErrorCause string

const (
Expand Down
Loading

0 comments on commit 4531d79

Please sign in to comment.