Skip to content

Commit

Permalink
feat(controller): Add last hit timestamp to memoization caches (#5487)
Browse files Browse the repository at this point in the history
Signed-off-by: terrytangyuan <[email protected]>
  • Loading branch information
terrytangyuan committed Mar 26, 2021
1 parent a61d84c commit 76b6a0e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const (
// LabelKeyOnExit is a label applied to Pods that are run from onExit nodes, so that they are not shut down when stopping a Workflow
LabelKeyOnExit = workflow.WorkflowFullName + "/on-exit"

// LabelKeyCacheLastHitTimestamp is the timestamp when the memoization cache is last hit.
LabelKeyCacheLastHitTimestamp = "last-hit-timestamp"

// ExecutorArtifactBaseDir is the base directory in the init container in which artifacts will be copied to.
// Each artifact will be named according to its input name (e.g: /argo/inputs/artifacts/CODE)
ExecutorArtifactBaseDir = "/argo/inputs/artifacts"
Expand Down
1 change: 1 addition & 0 deletions workflow/controller/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Entry struct {
NodeID string `json:"nodeID"`
Outputs *wfv1.Outputs `json:"outputs"`
CreationTimestamp metav1.Time `json:"creationTimestamp"`
LastHitTimestamp metav1.Time `json:"lastHitTimestamp"`
}

func (e *Entry) Hit() bool {
Expand Down
24 changes: 22 additions & 2 deletions workflow/controller/cache/configmap_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/kubernetes"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type configMapCache struct {
Expand Down Expand Up @@ -60,7 +61,8 @@ func (c *configMapCache) Load(ctx context.Context, key string) (*Entry, error) {
}

c.logInfo(log.Fields{}, "config map cache loaded")

hitTime := time.Now()
cm.SetLabels(map[string]string{common.LabelKeyCacheLastHitTimestamp: hitTime.Format(time.RFC3339)})
rawEntry, ok := cm.Data[key]
if !ok || rawEntry == "" {
c.logInfo(log.Fields{}, "config map cache miss: entry does not exist")
Expand All @@ -73,6 +75,20 @@ func (c *configMapCache) Load(ctx context.Context, key string) (*Entry, error) {
return nil, fmt.Errorf("malformed cache entry: could not unmarshal JSON; unable to parse: %w", err)
}

entry.LastHitTimestamp = metav1.Time{Time: hitTime}
entryJSON, err := json.Marshal(entry)
if err != nil {
c.logError(err, log.Fields{"key": key}, "Unable to marshal cache entry with last hit timestamp")
return nil, fmt.Errorf("unable to marshal cache entry with last hit timestamp: %w", err)
}
cm.Data[key] = string(entryJSON)

_, err = c.kubeClient.CoreV1().ConfigMaps(c.namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
c.logError(err, log.Fields{}, "Error updating last hit timestamp on cache")
return nil, fmt.Errorf("error updating last hit timestamp on cache: %w", err)
}

return &entry, nil
}

Expand Down Expand Up @@ -102,10 +118,14 @@ func (c *configMapCache) Save(ctx context.Context, key string, nodeId string, va
}
}

creationTime := time.Now()
cache.SetLabels(map[string]string{common.LabelKeyCacheLastHitTimestamp: creationTime.Format(time.RFC3339)})

newEntry := Entry{
NodeID: nodeId,
Outputs: value,
CreationTimestamp: metav1.Time{Time: time.Now()},
CreationTimestamp: metav1.Time{Time: creationTime},
LastHitTimestamp: metav1.Time{Time: creationTime},
}

entryJSON, err := json.Marshal(newEntry)
Expand Down
23 changes: 23 additions & 0 deletions workflow/controller/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package controller

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
)

Expand Down Expand Up @@ -45,8 +48,21 @@ func TestConfigMapCacheLoadHit(t *testing.T) {
_, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)
c := cache.NewConfigMapCache("default", controller.kubeclientset, "whalesay-cache")

cm, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, sampleConfigMapCacheEntry.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.Nil(t, cm.Labels)

entry, err := c.Load(ctx, "hi-there-world")
assert.NoError(t, err)

cm, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, sampleConfigMapCacheEntry.Name, metav1.GetOptions{})
assert.NoError(t, err)
lastHitTimestampLabel, err := time.Parse(time.RFC3339, cm.Labels[common.LabelKeyCacheLastHitTimestamp])
assert.NoError(t, err)
assert.True(t, lastHitTimestampLabel.After(entry.CreationTimestamp.Time))
assert.Equal(t, lastHitTimestampLabel.Format(time.RFC3339), entry.LastHitTimestamp.Time.Format(time.RFC3339))

outputs := entry.Outputs
assert.NoError(t, err)
if assert.Len(t, outputs.Parameters, 1) {
Expand Down Expand Up @@ -87,4 +103,11 @@ func TestConfigMapCacheSave(t *testing.T) {
cm, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, "whalesay-cache", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, cm)
lastHitTimestampLabel, err := time.Parse(time.RFC3339, cm.Labels[common.LabelKeyCacheLastHitTimestamp])
assert.NoError(t, err)
var entry cache.Entry
err = json.Unmarshal([]byte(cm.Data["hi-there-world"]), &entry)
assert.NoError(t, err)
assert.Equal(t, lastHitTimestampLabel.Format(time.RFC3339), entry.LastHitTimestamp.Time.Format(time.RFC3339))
assert.Equal(t, lastHitTimestampLabel.Format(time.RFC3339), entry.CreationTimestamp.Time.Format(time.RFC3339))
}

0 comments on commit 76b6a0e

Please sign in to comment.