Skip to content

Commit

Permalink
[chore] spanmetricsprocessor: use generics for the cache implementati…
Browse files Browse the repository at this point in the history
…on (#16020)

Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 2, 2022
1 parent 4a1c9fd commit 2f34357
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 44 deletions.
22 changes: 11 additions & 11 deletions processor/spanmetricsprocessor/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,46 @@ import (
// map. In spanmetricsprocessor's use case, we need to hold all the items during the current processing step for
// building the metrics. The evicted items can/should be safely removed once the metrics are built from the current
// batch of spans.
type Cache struct {
type Cache[K comparable, V any] struct {
*lru.Cache
evictedItems map[interface{}]interface{}
evictedItems map[K]V
}

// NewCache creates a Cache.
func NewCache(size int) (*Cache, error) {
evictedItems := make(map[interface{}]interface{})
lruCache, err := lru.NewWithEvict(size, func(key interface{}, value interface{}) {
evictedItems[key] = value
func NewCache[K comparable, V any](size int) (*Cache[K, V], error) {
evictedItems := make(map[K]V)
lruCache, err := lru.NewWithEvict(size, func(key any, value any) {
evictedItems[key.(K)] = value.(V)
})
if err != nil {
return nil, err
}

return &Cache{
return &Cache[K, V]{
Cache: lruCache,
evictedItems: evictedItems,
}, nil
}

// RemoveEvictedItems cleans all the evicted items.
func (c *Cache) RemoveEvictedItems() {
func (c *Cache[K, V]) RemoveEvictedItems() {
// we need to keep the original pointer to evictedItems map as it is used in the closure of lru.NewWithEvict
for k := range c.evictedItems {
delete(c.evictedItems, k)
}
}

// Get retrieves an item from the LRU cache or evicted items.
func (c *Cache) Get(key interface{}) (interface{}, bool) {
func (c *Cache[K, V]) Get(key K) (V, bool) {
if val, ok := c.Cache.Get(key); ok {
return val, ok
return val.(V), ok
}
val, ok := c.evictedItems[key]
return val, ok
}

// Purge removes all the items from the LRU cache and evicted items.
func (c *Cache) Purge() {
func (c *Cache[K, V]) Purge() {
c.Cache.Purge()
c.RemoveEvictedItems()
}
48 changes: 24 additions & 24 deletions processor/spanmetricsprocessor/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestNewCache(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
_, err := NewCache(tt.args.size)
_, err := NewCache[string, string](tt.args.size)
if tt.wantErr {
assert.Error(t, err)
return
Expand All @@ -68,16 +68,16 @@ func TestNewCache(t *testing.T) {
func TestCache_Get(t *testing.T) {
tests := []struct {
name string
lruCache func() *Cache
evictedItems map[interface{}]interface{}
key interface{}
wantValue interface{}
lruCache func() *Cache[string, string]
evictedItems map[string]string
key string
wantValue string
wantOk bool
}{
{
name: "if key is not found in LRUCache, will get key from evictedItems",
lruCache: func() *Cache {
cache, _ := NewCache(1)
lruCache: func() *Cache[string, string] {
cache, _ := NewCache[string, string](1)
cache.evictedItems["key"] = "val"
return cache
},
Expand All @@ -87,8 +87,8 @@ func TestCache_Get(t *testing.T) {
},
{
name: "if key is found in LRUCache, return the found item",
lruCache: func() *Cache {
cache, _ := NewCache(1)
lruCache: func() *Cache[string, string] {
cache, _ := NewCache[string, string](1)
cache.Add("key", "val_from_LRU")
cache.evictedItems["key"] = "val_from_evicted_items"
return cache
Expand All @@ -99,12 +99,12 @@ func TestCache_Get(t *testing.T) {
},
{
name: "if key is not found either in LRUCache or evicted items, return nothing",
lruCache: func() *Cache {
cache, _ := NewCache(1)
lruCache: func() *Cache[string, string] {
cache, _ := NewCache[string, string](1)
return cache
},
key: "key",
wantValue: nil,
wantValue: "",
wantOk: false,
},
}
Expand All @@ -127,18 +127,18 @@ func TestCache_Get(t *testing.T) {
func TestCache_RemoveEvictedItems(t *testing.T) {
tests := []struct {
name string
lruCache func() (*Cache, error)
lruCache func() (*Cache[string, string], error)
}{
{
name: "no panic when there is no evicted item to remove",
lruCache: func() (*Cache, error) {
return NewCache(1)
lruCache: func() (*Cache[string, string], error) {
return NewCache[string, string](1)
},
},
{
name: "evicted items should be removed",
lruCache: func() (*Cache, error) {
cache, err := NewCache(1)
lruCache: func() (*Cache[string, string], error) {
cache, err := NewCache[string, string](1)
if err != nil {
return nil, err
}
Expand All @@ -163,18 +163,18 @@ func TestCache_RemoveEvictedItems(t *testing.T) {
func TestCache_PurgeItems(t *testing.T) {
tests := []struct {
name string
lruCache func() (*Cache, error)
lruCache func() (*Cache[string, string], error)
}{
{
name: "no panic when there is no item to remove",
lruCache: func() (*Cache, error) {
return NewCache(1)
lruCache: func() (*Cache[string, string], error) {
return NewCache[string, string](1)
},
},
{
name: "remove items from the lru cache",
lruCache: func() (*Cache, error) {
cache, err := NewCache(1)
lruCache: func() (*Cache[string, string], error) {
cache, err := NewCache[string, string](1)
if err != nil {
return nil, err
}
Expand All @@ -185,8 +185,8 @@ func TestCache_PurgeItems(t *testing.T) {
},
{
name: "remove all the items from lru cache and the evicted items",
lruCache: func() (*Cache, error) {
cache, err := NewCache(10)
lruCache: func() (*Cache[string, string], error) {
cache, err := NewCache[string, string](10)
if err != nil {
return nil, err
}
Expand Down
12 changes: 4 additions & 8 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type processorImp struct {

// An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
// e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }}
metricKeyToDimensions *cache.Cache
metricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
}

type histogramData struct {
Expand Down Expand Up @@ -109,7 +109,7 @@ func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer cons
pConfig.DimensionsCacheSize,
)
}
metricKeyToDimensionsCache, err := cache.NewCache(pConfig.DimensionsCacheSize)
metricKeyToDimensionsCache, err := cache.NewCache[metricKey, pcommon.Map](pConfig.DimensionsCacheSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -329,13 +329,9 @@ func (p *processorImp) collectCallMetrics(ilm pmetric.ScopeMetrics) error {

// getDimensionsByMetricKey gets dimensions from `metricKeyToDimensions` cache.
func (p *processorImp) getDimensionsByMetricKey(k metricKey) (pcommon.Map, error) {
if item, ok := p.metricKeyToDimensions.Get(k); ok {
if attributeMap, ok := item.(pcommon.Map); ok {
return attributeMap, nil
}
return pcommon.Map{}, fmt.Errorf("type assertion of metricKeyToDimensions attributes failed, the key is %q", k)
if attributeMap, ok := p.metricKeyToDimensions.Get(k); ok {
return attributeMap, nil
}

return pcommon.Map{}, fmt.Errorf("value not found in metricKeyToDimensions cache by key %q", k)
}

Expand Down
2 changes: 1 addition & 1 deletion processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) {
func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *string, temporality string, logger *zap.Logger) *processorImp {
defaultNotInSpanAttrVal := "defaultNotInSpanAttrVal"
// use size 2 for LRU cache for testing purpose
metricKeyToDimensions, err := cache.NewCache(DimensionsCacheSize)
metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 2f34357

Please sign in to comment.