Skip to content

Commit

Permalink
feat: implement pubsub model to support evicting keys in localcache (#2)
Browse files Browse the repository at this point in the history
* feat: implement pubsub model to support evicting keys in localcache

* adjust eviction algorithm

* fix README
  • Loading branch information
viney-shih committed May 25, 2022
1 parent a08f49c commit 2a16815
Show file tree
Hide file tree
Showing 18 changed files with 387 additions and 110 deletions.
51 changes: 49 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,57 @@ Considering the flexibility, efficiency and consistency, we starts to build up o

## Data flow
### Load the cache with `Cache-Aside` strategy
![load](./doc/img/get.png)
```mermaid
sequenceDiagram
participant APP as Application
participant M as go-cache
participant L as Local Cache
participant S as Shared Cache
participant R as Resource (Microservice / DB)
APP ->> M: Cache.Get() / Cache.MGet()
alt Local Cache hit
M ->> L: Adapter.MGet()
L -->> M: {[]Value, error}
M -->> APP: return
else Local Cache miss but Shared Cache hit
M ->> L: Adapter.MGet()
L -->> M: cache miss
M ->> S: Adapter.MGet()
S -->> M: {[]Value, error}
M ->> L: Adapter.MSet()
M -->> APP: return
else All miss
M ->> L: Adapter.MGet()
L -->> M: cache miss
M ->> S: Adapter.MGet()
S -->> M: cache miss
M ->> R: OneTimeGetterFunc() / MGetterFunc()
R -->> M: return from getter
M ->> S: Adapter.MSet()
M ->> L: Adapter.MSet()
M -->> APP: return
end
```

### Evict the cache
![evict](./doc/img/del.png)
```mermaid
sequenceDiagram
participant APP as Application
participant M as go-cache
participant L as Local Cache
participant S as Shared Cache
participant PS as PubSub
APP ->> M: Cache.Del()
M ->> S: Adapter.Del()
S -->> M: return error if necessary
M ->> L: Adapter.Del()
L -->> M: return error if necessary
M ->> PS: Pubsub.Pub() (broadcast key eviction)
M -->> APP: return nil or error
```

## Installation
```sh
Expand Down
2 changes: 1 addition & 1 deletion adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"
)

// Adapter is the interface when communicating with shared/local caches.
// Adapter is the interface communicating with shared/local caches.
type Adapter interface {
MGet(context context.Context, keys []string) ([]Value, error)
MSet(context context.Context, keyVals map[string][]byte, ttl time.Duration, options ...MSetOptions) error
Expand Down
59 changes: 22 additions & 37 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ package cache

import (
"context"
"encoding/json"
"reflect"
"strings"
"time"

"golang.org/x/sync/singleflight"
)

type cache struct {
fid string // id from factory
configs map[string]*config
onCacheHit CallbackFunc
onCacheMiss CallbackFunc
onCacheHit func(prefix string, key string, count int)
onCacheMiss func(prefix string, key string, count int)
onLCCostAdd func(key string, cost int)
onLCCostEvict func(key string, cost int)
pubsub Pubsub

singleflight singleflight.Group
}
Expand Down Expand Up @@ -213,40 +215,6 @@ func (c *cache) MSet(ctx context.Context, prefix string, keyValues map[string]in
return c.refill(ctx, cfg, m)
}

func customKey(delimiter string, components ...string) string {
return strings.Join(components, delimiter)
}

func getCacheKey(pfx, key string) string {
return customKey(delimiter, packageKey, pfx, key)
}

func getCacheKeys(pfx string, keys []string) []string {
cacheKeys := make([]string, len(keys))
for i, k := range keys {
cacheKeys[i] = getCacheKey(pfx, k)
}

return cacheKeys
}

func getPrefixAndKey(cacheKey string) (string, string) {
// cacheKey = packageKey + prefix + key
idx := strings.Index(cacheKey, delimiter)
if idx < 0 {
return cacheKey, ""
}

// mixedKey = prefix + key
mixedKey := cacheKey[idx+len(delimiter):]
idx = strings.Index(mixedKey, delimiter)
if idx < 0 {
return mixedKey, ""
}

return mixedKey[:idx], mixedKey[idx+len(delimiter):]
}

func getKeyIndex(keys []string) map[string]int {
keyIdx := map[string]int{}
for i, k := range keys {
Expand Down Expand Up @@ -373,11 +341,28 @@ func (c *cache) del(ctx context.Context, cfg *config, keys ...string) error {
if err := cfg.local.Del(ctx, keys...); err != nil {
return err
}

c.publishEvictEvents(ctx, keys...)
}

return nil
}

func (c *cache) publishEvictEvents(ctx context.Context, keys ...string) error {
if c.pubsub == nil {
// do nothing
return nil
}

event := evictEvent{ID: c.fid, Keys: keys}
bs, err := json.Marshal(event)
if err != nil {
return err
}

return c.pubsub.Pub(ctx, evictTopic, bs)
}

type result struct {
internalIdx map[int]int
vals [][]byte
Expand Down
Binary file removed doc/img/del.png
Binary file not shown.
Binary file removed doc/img/get.png
Binary file not shown.
10 changes: 10 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cache

var (
evictTopic = customKey(topicDelim, packageKey, topicKey, "evict")
)

type evictEvent struct {
ID string
Keys []string
}
2 changes: 1 addition & 1 deletion example_readthrough_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func ExampleCache_GetByFunc() {
// &{value2 2}
}

func ExampleService_Create_mGetter() {
func ExampleFactory_NewCache_mGetter() {
tinyLfu := cache.NewTinyLFU(10000)
rds := cache.NewRedis(redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
Expand Down
104 changes: 78 additions & 26 deletions factory.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package cache

import (
"context"
"encoding/json"
"errors"
)
"sync"

const (
packageKey = "ca"
delimiter = ":"
"github.com/google/uuid"
)

var (
Expand Down Expand Up @@ -37,31 +36,43 @@ func newFactory(sharedCache Adapter, localCache Adapter, options ...ServiceOptio
unmarshalFunc = o.unmarshalFunc
}

return &factory{
f := &factory{
id: uuid.New().String(),
sharedCache: sharedCache,
localCache: localCache,
pubsub: o.pubsub,
marshal: marshalFunc,
unmarshal: unmarshalFunc,
onCacheHit: o.onCacheHit,
onCacheMiss: o.onCacheMiss,
onLCCostAdd: o.onLCCostAdd,
onLCCostEvict: o.onLCCostEvict,
}

// subscribing if necessary
f.subscribeEvictEvents(context.TODO())

return f
}

type factory struct {
sharedCache Adapter
localCache Adapter
pubsub Pubsub

marshal MarshalFunc
unmarshal UnmarshalFunc
onCacheHit CallbackFunc
onCacheMiss CallbackFunc
onLCCostAdd CallbackFunc
onLCCostEvict CallbackFunc
onCacheHit func(prefix string, key string, count int)
onCacheMiss func(prefix string, key string, count int)
onLCCostAdd func(prefix string, key string, cost int)
onLCCostEvict func(prefix string, key string, cost int)

id string
closeOnce sync.Once
wg sync.WaitGroup
}

func (s *factory) NewCache(settings []Setting) Cache {
func (f *factory) NewCache(settings []Setting) Cache {
m := map[string]*config{}
for _, setting := range settings {
// check prefix
Expand All @@ -75,8 +86,8 @@ func (s *factory) NewCache(settings []Setting) Cache {

cfg := &config{
mGetter: setting.MGetter,
marshal: s.marshal,
unmarshal: s.unmarshal,
marshal: f.marshal,
unmarshal: f.unmarshal,
}

// need to specify marshalFunc and unmarshalFunc at the same time
Expand All @@ -95,10 +106,10 @@ func (s *factory) NewCache(settings []Setting) Cache {

for typ, attr := range setting.CacheAttributes {
if typ == SharedCacheType {
cfg.shared = s.sharedCache
cfg.shared = f.sharedCache
cfg.sharedTTL = attr.TTL
} else if typ == LocalCacheType {
cfg.local = s.localCache
cfg.local = f.localCache
cfg.localTTL = attr.TTL
}
}
Expand All @@ -112,32 +123,73 @@ func (s *factory) NewCache(settings []Setting) Cache {
}

return &cache{
fid: f.id,
configs: m,
onCacheHit: func(prefix string, key string, value interface{}) {
pubsub: f.pubsub,
onCacheHit: func(prefix string, key string, count int) {
// trigger the callback on cache hitted if necessary
if s.onCacheHit != nil {
s.onCacheHit(prefix, key, value)
if f.onCacheHit != nil {
f.onCacheHit(prefix, key, count)
}
},
onCacheMiss: func(prefix string, key string, value interface{}) {
onCacheMiss: func(prefix string, key string, count int) {
// trigger the callback on cache missed if necessary
if s.onCacheMiss != nil {
s.onCacheMiss(prefix, key, value)
if f.onCacheMiss != nil {
f.onCacheMiss(prefix, key, count)
}
},
onLCCostAdd: func(cKey string, value int) {
onLCCostAdd: func(cKey string, cost int) {
// trigger the callback on local cache added if necessary
if s.onLCCostAdd != nil {
if f.onLCCostAdd != nil {
pfx, key := getPrefixAndKey(cKey)
s.onLCCostAdd(pfx, key, value)
f.onLCCostAdd(pfx, key, cost)
}
},
onLCCostEvict: func(cKey string, value int) {
onLCCostEvict: func(cKey string, cost int) {
// trigger the callback on local cache evicted if necessary
if s.onLCCostEvict != nil {
if f.onLCCostEvict != nil {
pfx, key := getPrefixAndKey(cKey)
s.onLCCostEvict(pfx, key, value)
f.onLCCostEvict(pfx, key, cost)
}
},
}
}

func (f *factory) Close() {
f.closeOnce.Do(func() {
// close subscribing
f.pubsub.Close()
// wait for all goroutines stopped
f.wg.Wait()
})
}

func (f *factory) subscribeEvictEvents(ctx context.Context) {
if f.pubsub == nil || f.localCache == nil {
// do nothing
return
}

f.wg.Add(1)
go func() {
defer f.wg.Done()

// listen to evicting key events
for mess := range f.pubsub.Sub(ctx, evictTopic) {
event := evictEvent{}
err := json.Unmarshal(mess.Content(), &event)
if err != nil {
// TOOD: forward error messages outside
continue
}

// skip self cases
if event.ID == f.id {
continue
}

// evicting
f.localCache.Del(ctx, event.Keys...)
}
}()
}
Loading

0 comments on commit 2a16815

Please sign in to comment.