Skip to content

Commit

Permalink
Merge pull request #123 from PierreF/cache-jitter
Browse files Browse the repository at this point in the history
Cache jitter
  • Loading branch information
PierreF committed Mar 7, 2021
2 parents 76e798c + a3a27a1 commit 82ca3fc
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 20 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,15 @@ auth_opt_cache_refresh true
auth_opt_auth_cache_seconds 30
auth_opt_acl_cache_seconds 30
auth_opt_auth_jitter_seconds 3
auth_opt_acl_jitter_seconds 3
```

`auth_jitter_seconds` and `acl_jitter_seconds` options allow to randomize cache expiration time by a given offset
The value used for expiring a cache record would then be `cache_seconds` +/- `jitter_seconds`. With above values (30 seconds for cache and 3 seconds for jitter), effective expiration would yield any value between 27 and 33 seconds.
Setting a `jitter` value is useful to reduce lookups storms that could occur every `auth/acl_cache_seconds` if lots of clients connected at the same time, e.g. after a server restart when all clients may reconnect immediately creating lots of entries expiring at the same time.
You may omit or set jitter options to 0 to disable this feature.

If `cache_reset` is set to false or omitted, cache won't be flushed upon service start.

When using Redis, the following defaults will be used if no values are given. Also, these are the available options for cache:
Expand Down
44 changes: 35 additions & 9 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
b64 "encoding/base64"
"fmt"
"hash"
"math/rand"
"strings"
"time"

Expand All @@ -19,6 +20,8 @@ import (
type redisStore struct {
authExpiration time.Duration
aclExpiration time.Duration
authJitter time.Duration
aclJitter time.Duration
refreshExpiration bool
client bes.RedisClient
h hash.Hash
Expand All @@ -27,6 +30,8 @@ type redisStore struct {
type goStore struct {
authExpiration time.Duration
aclExpiration time.Duration
authJitter time.Duration
aclJitter time.Duration
refreshExpiration bool
client *goCache.Cache
h hash.Hash
Expand All @@ -46,20 +51,22 @@ type Store interface {
}

// NewGoStore initializes a cache using go-cache as the store.
func NewGoStore(authExpiration, aclExpiration time.Duration, refreshExpiration bool) *goStore {
func NewGoStore(authExpiration, aclExpiration, authJitter, aclJitter time.Duration, refreshExpiration bool) *goStore {
// TODO: support hydrating the cache to retain previous values.

return &goStore{
authExpiration: authExpiration,
aclExpiration: aclExpiration,
authJitter: authJitter,
aclJitter: aclJitter,
refreshExpiration: refreshExpiration,
client: goCache.New(time.Second*defaultExpiration, time.Second*(defaultExpiration*2)),
h: sha1.New(),
}
}

// NewSingleRedisStore initializes a cache using a single Redis instance as the store.
func NewSingleRedisStore(host, port, password string, db int, authExpiration, aclExpiration time.Duration, refreshExpiration bool) *redisStore {
func NewSingleRedisStore(host, port, password string, db int, authExpiration, aclExpiration, authJitter, aclJitter time.Duration, refreshExpiration bool) *redisStore {
addr := fmt.Sprintf("%s:%s", host, port)
redisClient := goredis.NewClient(&goredis.Options{
Addr: addr,
Expand All @@ -70,14 +77,16 @@ func NewSingleRedisStore(host, port, password string, db int, authExpiration, ac
return &redisStore{
authExpiration: authExpiration,
aclExpiration: aclExpiration,
authJitter: authJitter,
aclJitter: aclJitter,
refreshExpiration: refreshExpiration,
client: bes.SingleRedisClient{redisClient},
h: sha1.New(),
}
}

// NewSingleRedisStore initializes a cache using a Redis Cluster as the store.
func NewRedisClusterStore(password string, addresses []string, authExpiration, aclExpiration time.Duration, refreshExpiration bool) *redisStore {
func NewRedisClusterStore(password string, addresses []string, authExpiration, aclExpiration, authJitter, aclJitter time.Duration, refreshExpiration bool) *redisStore {
clusterClient := goredis.NewClusterClient(
&goredis.ClusterOptions{
Addrs: addresses,
Expand All @@ -87,6 +96,8 @@ func NewRedisClusterStore(password string, addresses []string, authExpiration, a
return &redisStore{
authExpiration: authExpiration,
aclExpiration: aclExpiration,
authJitter: authJitter,
aclJitter: aclJitter,
refreshExpiration: refreshExpiration,
client: clusterClient,
h: sha1.New(),
Expand Down Expand Up @@ -115,6 +126,21 @@ func isMovedError(err error) bool {
return false
}

// Return an expiration duration with a jitter added, i.e the actual expiration is in the range [expiration - jitter, expiration + jitter].
// If no expiration was set or jitter > expiration, then any negative value will yield 0 instead.
func expirationWithJitter(expiration, jitter time.Duration) time.Duration {
if jitter == 0 {
return expiration
}

result := expiration + time.Duration(rand.Int63n(int64(jitter)*2)-int64(jitter))
if result < 0 {
return 0
}

return result
}

// Connect flushes the cache if reset is set.
func (s *goStore) Connect(ctx context.Context, reset bool) bool {
log.Infoln("started go-cache")
Expand Down Expand Up @@ -153,13 +179,13 @@ func (s *redisStore) Close() {
// CheckAuthRecord checks if the username/password pair is present in the cache. Return if it's present and, if so, if it was granted privileges
func (s *goStore) CheckAuthRecord(ctx context.Context, username, password string) (bool, bool) {
record := toAuthRecord(username, password, s.h)
return s.checkRecord(ctx, record, s.authExpiration)
return s.checkRecord(ctx, record, expirationWithJitter(s.authExpiration, s.authJitter))
}

//CheckAclCache checks if the username/topic/clientid/acc mix is present in the cache. Return if it's present and, if so, if it was granted privileges.
func (s *goStore) CheckACLRecord(ctx context.Context, username, topic, clientid string, acc int) (bool, bool) {
record := toACLRecord(username, topic, clientid, acc, s.h)
return s.checkRecord(ctx, record, s.aclExpiration)
return s.checkRecord(ctx, record, expirationWithJitter(s.aclExpiration, s.aclJitter))
}

func (s *goStore) checkRecord(ctx context.Context, record string, expirationTime time.Duration) (bool, bool) {
Expand Down Expand Up @@ -239,29 +265,29 @@ func (s *redisStore) getAndRefresh(ctx context.Context, record string, expiratio
// SetAuthRecord sets a pair, granted option and expiration time.
func (s *goStore) SetAuthRecord(ctx context.Context, username, password string, granted string) error {
record := toAuthRecord(username, password, s.h)
s.client.Set(record, granted, s.authExpiration)
s.client.Set(record, granted, expirationWithJitter(s.authExpiration, s.authJitter))

return nil
}

//SetAclCache sets a mix, granted option and expiration time.
func (s *goStore) SetACLRecord(ctx context.Context, username, topic, clientid string, acc int, granted string) error {
record := toACLRecord(username, topic, clientid, acc, s.h)
s.client.Set(record, granted, s.aclExpiration)
s.client.Set(record, granted, expirationWithJitter(s.aclExpiration, s.aclJitter))

return nil
}

// SetAuthRecord sets a pair, granted option and expiration time.
func (s *redisStore) SetAuthRecord(ctx context.Context, username, password string, granted string) error {
record := toAuthRecord(username, password, s.h)
return s.setRecord(ctx, record, granted, s.authExpiration)
return s.setRecord(ctx, record, granted, expirationWithJitter(s.authExpiration, s.authJitter))
}

//SetAclCache sets a mix, granted option and expiration time.
func (s *redisStore) SetACLRecord(ctx context.Context, username, topic, clientid string, acc int, granted string) error {
record := toACLRecord(username, topic, clientid, acc, s.h)
return s.setRecord(ctx, record, granted, s.aclExpiration)
return s.setRecord(ctx, record, granted, expirationWithJitter(s.aclExpiration, s.aclJitter))
}

func (s *redisStore) setRecord(ctx context.Context, record, granted string, expirationTime time.Duration) error {
Expand Down
64 changes: 53 additions & 11 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,51 @@ import (
"github.com/stretchr/testify/assert"
)

func TestExpirationWithJitter(t *testing.T) {
/* Since expirationWithJitter randomizes the expirtaion time, do test
multiple times and check that result is within expected range
*/
for n := 0; n < 1000; n++ {
expiration := 100 * time.Millisecond

jitter := 10 * time.Millisecond

got := expirationWithJitter(expiration, jitter)
assert.True(t, expiration-jitter <= got)
assert.True(t, got <= expiration+jitter)

jitter = 150 * time.Millisecond

got = expirationWithJitter(expiration, jitter)
assert.True(t, 0 <= got)
assert.True(t, got <= expiration+jitter)
}
}

func TestExpirationWithoutJitter(t *testing.T) {
// jitter to 0 disable randomization
jitter := 0 * time.Millisecond
expiration := 100 * time.Millisecond

got := expirationWithJitter(expiration, jitter)
assert.Equal(t, expiration, got)
}

func TestGoStore(t *testing.T) {
authExpiration := 100 * time.Millisecond
aclExpiration := 100 * time.Millisecond
authJitter := 10 * time.Millisecond
aclJitter := 10 * time.Millisecond
refreshExpiration := false

store := NewGoStore(authExpiration, aclExpiration, refreshExpiration)
store := NewGoStore(authExpiration, aclExpiration, authJitter, aclJitter, refreshExpiration)

ctx := context.Background()

assert.Equal(t, authExpiration, store.authExpiration)
assert.Equal(t, aclExpiration, store.aclExpiration)
assert.Equal(t, authJitter, store.authJitter)
assert.Equal(t, aclJitter, store.aclJitter)

assert.True(t, store.Connect(ctx, false))

Expand Down Expand Up @@ -94,7 +128,7 @@ func TestGoStore(t *testing.T) {
assert.False(t, granted)

// Check expiration is refreshed.
store = NewGoStore(authExpiration, aclExpiration, true)
store = NewGoStore(authExpiration, aclExpiration, authExpiration, aclJitter, true)

// Test granted access.
err = store.SetAuthRecord(ctx, username, password, "true")
Expand All @@ -114,7 +148,7 @@ func TestGoStore(t *testing.T) {
assert.True(t, granted)

// Expiration should have been refreshed.
time.Sleep(55 * time.Millisecond)
time.Sleep(65 * time.Millisecond)

present, granted = store.CheckAuthRecord(ctx, username, password)

Expand All @@ -125,14 +159,18 @@ func TestGoStore(t *testing.T) {
func TestRedisSingleStore(t *testing.T) {
authExpiration := 1000 * time.Millisecond
aclExpiration := 1000 * time.Millisecond
authJitter := 100 * time.Millisecond
aclJitter := 100 * time.Millisecond
refreshExpiration := false

store := NewSingleRedisStore("localhost", "6379", "", 3, authExpiration, aclExpiration, refreshExpiration)
store := NewSingleRedisStore("localhost", "6379", "", 3, authExpiration, aclExpiration, authJitter, aclJitter, refreshExpiration)

ctx := context.Background()

assert.Equal(t, authExpiration, store.authExpiration)
assert.Equal(t, aclExpiration, store.aclExpiration)
assert.Equal(t, authJitter, store.authJitter)
assert.Equal(t, aclJitter, store.aclJitter)

assert.True(t, store.Connect(ctx, false))

Expand All @@ -152,7 +190,7 @@ func TestRedisSingleStore(t *testing.T) {

// Wait for it to expire. For Redis we do this just once since the package used (or Redis itself, not sure) doesn't
// support less than 1s expiration times: "specified duration is 100ms, but minimal supported value is 1s"
time.Sleep(1050 * time.Millisecond)
time.Sleep(1150 * time.Millisecond)

present, granted = store.CheckAuthRecord(ctx, username, password)

Expand Down Expand Up @@ -185,7 +223,7 @@ func TestRedisSingleStore(t *testing.T) {
assert.False(t, granted)

// Check expiration is refreshed.
store = NewSingleRedisStore("localhost", "6379", "", 3, authExpiration, aclExpiration, true)
store = NewSingleRedisStore("localhost", "6379", "", 3, authExpiration, aclExpiration, authJitter, aclJitter, true)

// Test granted access.
err = store.SetAuthRecord(ctx, username, password, "true")
Expand All @@ -205,7 +243,7 @@ func TestRedisSingleStore(t *testing.T) {
assert.True(t, granted)

// Expiration should have been refreshed.
time.Sleep(700 * time.Millisecond)
time.Sleep(800 * time.Millisecond)

present, granted = store.CheckAuthRecord(ctx, username, password)

Expand All @@ -216,15 +254,19 @@ func TestRedisSingleStore(t *testing.T) {
func TestRedisClusterStore(t *testing.T) {
authExpiration := 1000 * time.Millisecond
aclExpiration := 1000 * time.Millisecond
authJitter := 100 * time.Millisecond
aclJitter := 100 * time.Millisecond
refreshExpiration := false

addresses := []string{"localhost:7000", "localhost:7001", "localhost:7002"}
store := NewRedisClusterStore("", addresses, authExpiration, aclExpiration, refreshExpiration)
store := NewRedisClusterStore("", addresses, authExpiration, aclExpiration, authJitter, aclJitter, refreshExpiration)

ctx := context.Background()

assert.Equal(t, authExpiration, store.authExpiration)
assert.Equal(t, aclExpiration, store.aclExpiration)
assert.Equal(t, authJitter, store.authJitter)
assert.Equal(t, aclJitter, store.aclJitter)

assert.True(t, store.Connect(ctx, false))

Expand All @@ -244,7 +286,7 @@ func TestRedisClusterStore(t *testing.T) {

// Wait for it to expire. For Redis we do this just once since the package used (or Redis itself, not sure) doesn't
// support less than 1s expiration times: "specified duration is 100ms, but minimal supported value is 1s"
time.Sleep(1050 * time.Millisecond)
time.Sleep(1150 * time.Millisecond)

present, granted = store.CheckAuthRecord(ctx, username, password)

Expand Down Expand Up @@ -276,7 +318,7 @@ func TestRedisClusterStore(t *testing.T) {
assert.True(t, present)
assert.False(t, granted)

store = NewRedisClusterStore("", addresses, authExpiration, aclExpiration, true)
store = NewRedisClusterStore("", addresses, authExpiration, aclExpiration, authJitter, aclJitter, true)

// Test granted access.
err = store.SetAuthRecord(ctx, username, password, "true")
Expand All @@ -296,7 +338,7 @@ func TestRedisClusterStore(t *testing.T) {
assert.True(t, granted)

// Expiration should have been refreshed.
time.Sleep(700 * time.Millisecond)
time.Sleep(800 * time.Millisecond)

present, granted = store.CheckAuthRecord(ctx, username, password)

Expand Down
Loading

0 comments on commit 82ca3fc

Please sign in to comment.