Skip to content

Commit

Permalink
refactor event interface (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
viney-shih committed May 30, 2022
1 parent 453f7ed commit dad4acb
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 68 deletions.
47 changes: 26 additions & 21 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package cache

import (
"context"
"encoding/json"
"reflect"
"time"

"golang.org/x/sync/singleflight"
)

type cache struct {
fid string // id from factory
configs map[string]*config
onCacheHit func(prefix string, key string, count int)
onCacheMiss func(prefix string, key string, count int)
onLCCostAdd func(key string, cost int)
onLCCostEvict func(key string, cost int)
pubsub Pubsub
mb *messageBroker

singleflight singleflight.Group
}
Expand Down Expand Up @@ -303,7 +301,7 @@ func (c *cache) load(ctx context.Context, cfg *config, keys ...string) ([]Value,
WithOnCostEvictFunc(c.onLCCostEvict),
)

c.evictRemoteKeyBytes(ctx, m)
c.evictRemoteKeyMap(ctx, m)
}
}

Expand All @@ -328,7 +326,7 @@ func (c *cache) refill(ctx context.Context, cfg *config, keyBytes map[string][]b
return nil
}

c.evictRemoteKeyBytes(ctx, keyBytes)
c.evictRemoteKeyMap(ctx, keyBytes)
}

return nil
Expand All @@ -346,34 +344,41 @@ func (c *cache) del(ctx context.Context, cfg *config, keys ...string) error {
return err
}

c.publishEvictEvents(ctx, keys...)
c.evictRemoteKeys(ctx, keys...)
}

return nil
}

func (c *cache) evictRemoteKeyBytes(ctx context.Context, keyBytes map[string][]byte) error {
keys := []string{}
for k := range keyBytes {
keys = append(keys, k)
func (c *cache) evictRemoteKeyMap(ctx context.Context, keyM map[string][]byte) error {
if !c.mb.registered() {
// no pubsub, do nothing
return nil
}

keys := make([]string, len(keyM))
i := 0
for k := range keyM {
keys[i] = k
i++
}

return c.publishEvictEvents(ctx, keys...)
return c.evictRemoteKeys(ctx, keys...)
}

func (c *cache) publishEvictEvents(ctx context.Context, keys ...string) error {
if c.pubsub == nil {
// do nothing
func (c *cache) evictRemoteKeys(ctx context.Context, keys ...string) error {
if !c.mb.registered() {
// no pubsub, do nothing
return nil
}

event := evictEvent{ID: c.fid, Keys: keys}
bs, err := json.Marshal(event)
if err != nil {
return err
}

return c.pubsub.Pub(ctx, evictTopic, bs)
return c.mb.send(ctx, event{
Type: EventTypeEvict,
Body: eventBody{
FID: c.mb.fid,
Keys: keys,
},
})
}

type result struct {
Expand Down
132 changes: 129 additions & 3 deletions event.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,136 @@
//go:generate go-enum -f=$GOFILE --nocase

package cache

import (
"context"
"encoding/json"
"errors"
"sync"
)

var (
evictTopic = customKey(topicDelim, packageKey, topicKey, "evict")
// ErrSelfEvent indicates event triggered by itself.
ErrSelfEvent = errors.New("event triggered by itself")
)

// EventType is an enumeration of events used to communicate with each other via Pubsub.
/*
ENUM(
None // Not registered Event by default.
Evict // Evict presents eviction event.
)
*/
type EventType int32

var regTopicEventMap map[string]EventType

func init() {
regTopicEventMap = map[string]EventType{}

for typ := range _EventTypeMap {
if typ == EventTypeNone {
continue
}

regTopicEventMap[typ.Topic()] = typ
}
}

// Topic generates the topic for specified event.
func (x EventType) Topic() string {
return customKey(topicDelim, packageKey, topicKey, x.String())
}

type event struct {
Type EventType
Body eventBody
}

type evictEvent struct {
ID string
type eventBody struct {
FID string
Keys []string
}

type messageBroker struct {
pubsub Pubsub
fid string
wg sync.WaitGroup
}

func newMessageBroker(fid string, pb Pubsub) *messageBroker {
return &messageBroker{
fid: fid,
pubsub: pb,
}
}

func (mb *messageBroker) registered() bool {
return mb.pubsub != nil
}

func (mb *messageBroker) close() {
if !mb.registered() {
return
}

// close s
mb.pubsub.Close()
mb.wg.Wait()
}

func (mb *messageBroker) send(ctx context.Context, e event) error {
if !mb.registered() {
return nil
}

bs, err := json.Marshal(e.Body)
if err != nil {
return err
}

return mb.pubsub.Pub(ctx, e.Type.Topic(), bs)
}

func (mb *messageBroker) listen(
ctx context.Context, types []EventType, cb func(context.Context, *event, error),
) {
if !mb.registered() {
return
}

if len(types) == 0 {
return
}

topics := make([]string, len(types))
for i := 0; i < len(types); i++ {
topics[i] = types[i].Topic()
}

mb.wg.Add(1)
go func() {
defer mb.wg.Done()

for mess := range mb.pubsub.Sub(ctx, topics...) {
typ, ok := regTopicEventMap[mess.Topic()]
if !ok {
cb(ctx, nil, errors.New("no such topic registered"))
continue
}

e := event{Type: typ}
if err := json.Unmarshal(mess.Content(), &e.Body); err != nil {
cb(ctx, nil, err)
continue
}

if e.Body.FID == mb.fid {
cb(ctx, &e, ErrSelfEvent)
continue
}

cb(ctx, &e, nil)
}
}()
}
55 changes: 55 additions & 0 deletions event_enum.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit dad4acb

Please sign in to comment.