Skip to content

Commit

Permalink
Move cache to its own package and add support for go-cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
iegomez committed Jun 27, 2020
1 parent 1371452 commit e72e522
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 299 deletions.
18 changes: 12 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,34 +242,40 @@ auth_opt_backends files, postgres, jwt

#### Cache

Set cache option to true to use redis cache (defaults to false when missing). Also, set `cache_reset` to flush the redis DB on mosquitto startup:
There are 2 types of cache supported: an in memory one using [go-cache](https://github.com/patrickmn/go-cache), or a Redis backed one.
Set `cache` option to true to use a cache (defaults to false when missing) and `cache_type` to set the type of the cache. By default the plugin will use `go-cache` unless explicitly told to use Redis.
Set `cache_reset` to flush the cache on mosquitto startup (**hydrating `go-cache` on startup is not yet supported**).

Finally, set expiration times in seconds for authentication (`auth`) and authorization (`acl`) caches:

```
auth_opt_cache true
auth_opt_cache_type redis
auth_opt_cache_reset true
auth_opt_auth_cache_seconds 30
auth_opt_acl_cache_seconds 30
```

If `cache_reset` is set to false or omitted, cache won't be flushed upon service start.

Redis will use the following defaults if no values are given. Also, these are the available options for cache:
When using Redis, the following defaults will be used if no values are given. Also, these are the available options for cache:

```
auth_opt_cache_host localhost
auth_opt_cache_port 6379
auth_opt_cache_password pwd
auth_opt_cache_db 3
auth_opt_auth_cache_seconds 30
auth_opt_acl_cache_seconds 30
```

If you want to use a Redis cluster as your cache, you need to set `auth_opt_cache_mode` to `cluster` and provide the different addresses as a list of comma separated `host:port` strings with the `auth_opt_cache_addresses` options:
If you want to use a Redis cluster as your cache, you may omit previous Redis options and instead need to set `auth_opt_cache_mode` to `cluster` and provide the different addresses as a list of comma separated `host:port` strings with the `auth_opt_cache_addresses` options:

```
auth_opt_cache_mode cluster
auth_opt_cache_addresses host1:port1,host2:port2,host3:port3
```

Notice that if `cache_mode` is not provided or isn't equal to `cluster`, cache will default to use a single instance with the common options. If instead the mode is correctly set to `cluster` but no addresses are given, the plugin will default to not use a cache.
Notice that if `cache_mode` is not provided or isn't equal to `cluster`, cache will default to use a single instance with the common options. If instead the mode is set to `cluster` but no addresses are given, the plugin will default to not use a cache.

#### Logging

Expand Down
10 changes: 5 additions & 5 deletions backends/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewRedis(authOpts map[string]string, logLevel log.Level) (Redis, error) {
Password: redis.Password,
DB: int(redis.DB),
})
redis.conn = SingleRedisClient{redisClient}
redis.conn = &SingleRedisClient{redisClient}
}

for {
Expand All @@ -135,7 +135,7 @@ func NewRedis(authOpts map[string]string, logLevel log.Level) (Redis, error) {
}

// Checks if an error was caused by a moved record in a cluster.
func IsMovedError(err error) bool {
func isMovedError(err error) bool {
s := err.Error()
if strings.HasPrefix(s, "MOVED ") || strings.HasPrefix(s, "ASK ") {
return true
Expand All @@ -152,7 +152,7 @@ func (o Redis) GetUser(username, password, _ string) bool {
}

//If using Redis Cluster, reload state and attempt once more.
if IsMovedError(err) {
if isMovedError(err) {
err = o.conn.ReloadState(o.ctx)
if err != nil {
log.Debugf("redis reload state error: %s", err)
Expand Down Expand Up @@ -194,7 +194,7 @@ func (o Redis) GetSuperuser(username string) bool {
}

//If using Redis Cluster, reload state and attempt once more.
if IsMovedError(err) {
if isMovedError(err) {
err = o.conn.ReloadState(o.ctx)
if err != nil {
log.Debugf("redis reload state error: %s", err)
Expand Down Expand Up @@ -231,7 +231,7 @@ func (o Redis) CheckAcl(username, topic, clientid string, acc int32) bool {
}

//If using Redis Cluster, reload state and attempt once more.
if IsMovedError(err) {
if isMovedError(err) {
err = o.conn.ReloadState(o.ctx)
if err != nil {
log.Debugf("redis reload state error: %s", err)
Expand Down
271 changes: 271 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package cache

import (
"context"
b64 "encoding/base64"
"fmt"
"strings"
"time"

goredis "github.com/go-redis/redis/v8"
bes "github.com/iegomez/mosquitto-go-auth/backends"
goCache "github.com/patrickmn/go-cache"
log "github.com/sirupsen/logrus"
)

// redisCache stores necessary values for Redis cache
type redisStore struct {
authExpiration int64
aclExpiration int64
client bes.RedisClient
}

type goStore struct {
authExpiration int64
aclExpiration int64
client *goCache.Cache
}

const (
defaultExpiration = 30
)

type Store interface {
SetAuthRecord(ctx context.Context, username, password, granted string) error
CheckAuthRecord(ctx context.Context, username, password string) (bool, bool)
SetACLRecord(ctx context.Context, username, topic, clientid string, acc int, granted string) error
CheckACLRecord(ctx context.Context, username, topic, clientid string, acc int) (bool, bool)
Connect(ctx context.Context, reset bool) bool
Close()
}

// NewGoStore initializes a cache using go-cache as the store.
func NewGoStore(authExpiration, aclExpiration int64) *goStore {
// TODO: support hydrating the cache to retain previous values.

return &goStore{
authExpiration: authExpiration,
aclExpiration: aclExpiration,
client: goCache.New(time.Second*defaultExpiration, time.Second*(defaultExpiration*2)),
}
}

// NewSingleRedisStore initializes a cache using a single Redis instance as the store.
func NewSingleRedisStore(host, port, password string, db int, authExpiration, aclExpiration int64) *redisStore {
addr := fmt.Sprintf("%s:%s", host, port)
redisClient := goredis.NewClient(&goredis.Options{
Addr: addr,
Password: password, // no password set
DB: db, // use default db
})
//If cache is on, try to start redis.
return &redisStore{
authExpiration: authExpiration,
aclExpiration: aclExpiration,
client: bes.SingleRedisClient{redisClient},
}
}

// NewSingleRedisStore initializes a cache using a Redis Cluster as the store.
func NewRedisClusterStore(password string, addresses []string, authExpiration, aclExpiration int64) *redisStore {
clusterClient := goredis.NewClusterClient(
&goredis.ClusterOptions{
Addrs: addresses,
Password: password,
})

return &redisStore{
authExpiration: authExpiration,
aclExpiration: aclExpiration,
client: clusterClient,
}
}

func toAuthRecord(username, password string) string {
return b64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("auth-%s-%s", username, password)))
}

func toACLRecord(username, topic, clientid string, acc int) string {
return b64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("acl-%s-%s-%s-%d", username, topic, clientid, acc)))
}

// Checks if an error was caused by a moved record in a Redis Cluster.
func isMovedError(err error) bool {
s := err.Error()
if strings.HasPrefix(s, "MOVED ") || strings.HasPrefix(s, "ASK ") {
return true
}

return false
}

// Connect flushes the cache if reset is set.
func (o *goStore) Connect(ctx context.Context, reset bool) bool {
log.Infoln("started go-cache")
if reset {
o.client.Flush()
log.Infoln("flushed go-cache")
}
return true
}

// Connect pings Redis and flushes the cache if reset is set.
func (o *redisStore) Connect(ctx context.Context, reset bool) bool {
_, err := o.client.Ping(ctx).Result()
if err != nil {
log.Errorf("couldn't start redis. error: %s", err)
return false
} else {
log.Infoln("started redis cachet")
//Check if cache must be reset
if reset {
o.client.FlushDB(ctx)
log.Infoln("flushed redis cache")
}
}
return true
}

func (o *goStore) Close() {
//TODO: support serializing cache for re hydration.
}

func (o *redisStore) Close() {
o.client.Close()
}

// CheckAuthRecord checks if the username/password pair is present in the cache. Return if it's present and, if so, if it was granted privileges
func (o *goStore) CheckAuthRecord(ctx context.Context, username, password string) (bool, bool) {
record := toAuthRecord(username, password)
return o.checkRecord(ctx, record, o.authExpiration)
}

//CheckAclCache checks if the username/topic/clientid/acc mix is present in the cache. Return if it's present and, if so, if it was granted privileges.
func (o *goStore) CheckACLRecord(ctx context.Context, username, topic, clientid string, acc int) (bool, bool) {
record := toACLRecord(username, topic, clientid, acc)
return o.checkRecord(ctx, record, o.aclExpiration)
}

func (o *goStore) checkRecord(ctx context.Context, record string, expirationTime int64) (bool, bool) {
granted := false
v, present := o.client.Get(record)

if present {
value, ok := v.(string)
if ok && value == "true" {
granted = true
}

o.client.Set(record, value, time.Duration(expirationTime))
}
return present, granted
}

// CheckAuthRecord checks if the username/password pair is present in the cache. Return if it's present and, if so, if it was granted privileges
func (o *redisStore) CheckAuthRecord(ctx context.Context, username, password string) (bool, bool) {
record := toAuthRecord(username, password)
return o.checkRecord(ctx, record, o.authExpiration)
}

//CheckAclCache checks if the username/topic/clientid/acc mix is present in the cache. Return if it's present and, if so, if it was granted privileges.
func (o *redisStore) CheckACLRecord(ctx context.Context, username, topic, clientid string, acc int) (bool, bool) {
record := toACLRecord(username, topic, clientid, acc)
return o.checkRecord(ctx, record, o.aclExpiration)
}

func (o *redisStore) checkRecord(ctx context.Context, record string, expirationTime int64) (bool, bool) {

present, granted, err := o.getAndRefresh(ctx, record, expirationTime)
if err == nil {
return present, granted
}

if isMovedError(err) {
err = o.client.ReloadState(ctx)
// This should not happen, ever!
if err == bes.SingleClientError {
return false, false
}

//Retry once.
present, granted, err = o.getAndRefresh(ctx, record, expirationTime)
}

if err != nil {
log.Debugf("set cache error: %s", err)
}

return present, granted
}

func (o *redisStore) getAndRefresh(ctx context.Context, record string, expirationTime int64) (bool, bool, error) {
val, err := o.client.Get(ctx, record).Result()
if err != nil {
return false, false, err
}

//refresh expiration
_, err = o.client.Expire(ctx, record, time.Duration(expirationTime)*time.Second).Result()
if err != nil {
return false, false, err
}

if val == "true" {
return true, true, nil
}

return true, false, nil
}

// SetAuthRecord sets a pair, granted option and expiration time.
func (o *goStore) SetAuthRecord(ctx context.Context, username, password string, granted string) error {
record := toAuthRecord(username, password)
o.client.Set(record, granted, time.Duration(o.authExpiration))

return nil
}

//SetAclCache sets a mix, granted option and expiration time.
func (o *goStore) SetACLRecord(ctx context.Context, username, topic, clientid string, acc int, granted string) error {
record := toACLRecord(username, topic, clientid, acc)
o.client.Set(record, granted, time.Duration(o.authExpiration))

return nil
}

// SetAuthRecord sets a pair, granted option and expiration time.
func (o *redisStore) SetAuthRecord(ctx context.Context, username, password string, granted string) error {
record := toAuthRecord(username, password)
return o.setRecord(ctx, record, granted, o.authExpiration)
}

//SetAclCache sets a mix, granted option and expiration time.
func (o *redisStore) SetACLRecord(ctx context.Context, username, topic, clientid string, acc int, granted string) error {
record := toACLRecord(username, topic, clientid, acc)
return o.setRecord(ctx, record, granted, o.authExpiration)
}

func (o *redisStore) setRecord(ctx context.Context, record, granted string, expirationTime int64) error {
err := o.set(ctx, record, granted, expirationTime)

if err == nil {
return nil
}

// If record was moved, reload and retry.
if isMovedError(err) {
err = o.client.ReloadState(ctx)
if err != nil {
return err
}

//Retry once.
err = o.set(ctx, record, granted, expirationTime)
}

return err
}

func (o *redisStore) set(ctx context.Context, record string, granted string, expirationTime int64) error {
return o.client.Set(ctx, record, granted, time.Duration(expirationTime)*time.Second).Err()
}
Loading

0 comments on commit e72e522

Please sign in to comment.