Skip to content

Commit

Permalink
*: add explicit method to join cluster (#2)
Browse files Browse the repository at this point in the history
- require explicit definition of scheme and port, or it will be inferred
- export memberlist and pool types
- allow groupcache context to be set
- move non package essentials to _example
- go mod tidy / updates

Signed-off-by: Bobby DeSimone <[email protected]>
  • Loading branch information
desimone authored Jan 14, 2020
1 parent c534169 commit 1ca1126
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 86 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@

## TL;DR

See `/_example/main.go` for usage.
See `/_example/` for usage.

### Run

`docker-compose up --scale autocache=5`
`docker-compose up -f _example/docker-compose.yaml --scale autocache=5`

### Client

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile → _example/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM golang:latest as build
WORKDIR /go/src/autocache
ADD . /go/src/autocache
RUN go get -d -v ./...
RUN go build -o /go/bin/autocache _example/main.go
RUN go build -o /go/bin/autocache main.go

FROM gcr.io/distroless/base
COPY --from=build /go/bin/autocache /
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml → _example/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ services:
- VIRTUAL_HOST=autocache.localhost
- VIRTUAL_PORT=80
- NODES=autocache
image: pomerium/autocache:latest
image: autocache
expose:
- 80
23 changes: 12 additions & 11 deletions _example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ const (
defaultAddr = ":http"
)

var exampleCache cache

func main() {
addr := os.Getenv("ADDR")
if addr == "" {
Expand All @@ -31,12 +29,15 @@ func main() {
existing = strings.Split(nodes, ",")
}

ac, err := autocache.New(&autocache.Options{SeedNodes: existing})
ac, err := autocache.New(&autocache.Options{})
if err != nil {
log.Fatal(err)
}

exampleCache.group = groupcache.NewGroup("bcrypt", 1<<20, groupcache.GetterFunc(bcryptKey))
if _, err := ac.Join(existing); err != nil {
log.Fatal(err)
}
var exampleCache cache
exampleCache.group = groupcache.NewGroup("bcrypt", 1<<20, exampleCache)

mux := http.NewServeMux()
mux.Handle("/get/", exampleCache)
Expand All @@ -45,10 +46,14 @@ func main() {

}

// bcryptKey is am arbitrary getter function. Bcrypt is nice here because, it:
type cache struct {
group *groupcache.Group
}

// Get is am arbitrary getter function. Bcrypt is nice here because, it:
// 1) takes a long time
// 2) uses a random seed so non-cache results for the same key are obvious
func bcryptKey(ctx context.Context, key string, dst groupcache.Sink) error {
func (ac cache) Get(ctx context.Context, key string, dst groupcache.Sink) error {
now := time.Now()
defer func() {
log.Printf("bcryptKey/key:%q\ttime:%v", key, time.Since(now))
Expand All @@ -63,10 +68,6 @@ func bcryptKey(ctx context.Context, key string, dst groupcache.Sink) error {
return nil
}

type cache struct {
group *groupcache.Group
}

func (ac cache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
Expand Down
120 changes: 62 additions & 58 deletions autocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log"
"net/http"
"net/url"
"os"

"github.com/golang/groupcache"
Expand All @@ -19,19 +18,23 @@ var _ memberlist.EventDelegate = &Autocache{}
type Options struct {
// Groupcache related
//
// Groupcache's pool is a HTTP handler. Scheme and port should be set
// such that group cache's internal http client, used to fetch, distributed
// keys, knows how to build the request URL.
PoolOptions *groupcache.HTTPPoolOptions
PoolScheme string
PoolPort int
// Transport optionally specifies an http.RoundTripper for the client
// to use when it makes a request to another groupcache node.
// If nil, the client uses http.DefaultTransport.
TransportFn func(context.Context) http.RoundTripper
PoolOptions *groupcache.HTTPPoolOptions
PoolTransportFn func(context.Context) http.RoundTripper
// Context optionally specifies a context for the server to use when it
// receives a request.
// If nil, the server uses the request's context
PoolContext func(*http.Request) context.Context

// Memberlist related
//
// SeedNodes is a slice of addresses we use to bootstrap peer discovery
// Seed nodes should contain a list of valid URLs including scheme and port
// if those are used to connect to your group cache cluster. (e.g. "https://example.net:8000")
// Memberlist will be bootstrapped using just the hostname of those seed URLS.
SeedNodes []string
// MemberlistConfig ist he memberlist configuration to use.
// If empty, `DefaultLANConfig` is used.
MemberlistConfig *memberlist.Config
Expand All @@ -40,24 +43,11 @@ type Options struct {
Logger *log.Logger
}

func (o *Options) validate() error {
if len(o.SeedNodes) == 0 {
return errors.New("must supply at least one seed node")
}
u, err := url.Parse(o.SeedNodes[0])
if err != nil {
return err
}
if u.Scheme == "" {
return fmt.Errorf("%s has no scheme", u.String())
}
return nil
}

// Autocache implements automatic, distributed membership for a cluster
// of cache pool peers.
type Autocache struct {
Pool *groupcache.HTTPPool
GroupcachePool *groupcache.HTTPPool
Memberlist *memberlist.Memberlist

self string
peers []string
Expand All @@ -71,65 +61,79 @@ type Autocache struct {
// invokes groupcache's peer pooling handlers. Note, by design a groupcache
// pool can only be made _once_.
func New(o *Options) (*Autocache, error) {
if err := o.validate(); err != nil {
return nil, err
var err error
ac := Autocache{
scheme: o.PoolScheme,
port: fmt.Sprintf("%d", o.PoolPort),
logger: o.Logger,
}
var ac Autocache

u, _ := url.Parse(o.SeedNodes[0]) // err checked in validate
ac.scheme = u.Scheme
ac.port = u.Port()

ac.logger = o.Logger
if ac.logger == nil {
ac.logger = log.New(os.Stderr, "", log.LstdFlags)
}
ac.logger.Printf("autocache: with options: %+v", o)

if ac.scheme == "" {
ac.logger.Printf("autocache: pool scheme not set, assuming https://")
ac.scheme = "http"
}
if ac.port == "0" {
ac.logger.Printf("autocache: pool port not set, assuming empty")
ac.port = ""
}

mlConfig := o.MemberlistConfig
if mlConfig == nil {
ac.logger.Println("defaulting to lan configuration")
ac.logger.Println("autocache: defaulting to lan configuration")
mlConfig = memberlist.DefaultLANConfig()
}
mlConfig.Events = &ac
mlConfig.Logger = ac.logger
memberlist, err := memberlist.Create(mlConfig)
if err != nil {
return nil, err
if ac.Memberlist, err = memberlist.Create(mlConfig); err != nil {
return nil, fmt.Errorf("autocache: can't create memberlist: %w", err)
}
// the only way memberlist would be empty here, following create is if
// the current node suddenly died. Still, we check to be safe.
if len(memberlist.Members()) == 0 {
if len(ac.Memberlist.Members()) == 0 {
return nil, errors.New("memberlist can't find self")
}
self := memberlist.Members()[0]
self := ac.Memberlist.Members()[0]
if self.Addr == nil {
return nil, errors.New("self addr cannot be nil")
}
ac.self = self.Addr.String()
ac.logger.Printf("autocache: self addr is: %s", ac.self)
poolOptions := &groupcache.HTTPPoolOptions{}
if o.PoolOptions != nil {
poolOptions = o.PoolOptions
}
ac.Pool = groupcache.NewHTTPPoolOpts(ac.groupcacheURL(ac.self), poolOptions)
if o.TransportFn != nil {
ac.Pool.Transport = o.TransportFn
gcSelf := ac.groupcacheURL(ac.self)
ac.logger.Printf("autocache groupcache self: %s options: %+v", gcSelf, poolOptions)
ac.GroupcachePool = groupcache.NewHTTPPoolOpts(gcSelf, poolOptions)
if o.PoolTransportFn != nil {
ac.GroupcachePool.Transport = o.PoolTransportFn
}

seeds := make([]string, len(o.SeedNodes))
for k, v := range o.SeedNodes {
u, err := url.Parse(v)
if err != nil {
return nil, err
}
seeds[k] = u.Hostname()
}

if _, err := memberlist.Join(seeds); err != nil {
return nil, fmt.Errorf("couldn't join memberlist cluster: %w", err)
if o.PoolContext != nil {
ac.GroupcachePool.Context = o.PoolContext
}
return &ac, nil
}

// Join is used to take an existing Memberlist and attempt to join a cluster
// by contacting all the given hosts and performing a state sync. Initially,
// the Memberlist only contains our own state, so doing this will cause
// remote nodes to become aware of the existence of this node, effectively
// joining the cluster.
//
// This returns the number of hosts successfully contacted and an error if
// none could be reached. If an error is returned, the node did not successfully
// join the cluster.
func (ac *Autocache) Join(existing []string) (int, error) {
if ac.Memberlist == nil {
return 0, errors.New("memberlist cannot be nil")
}
return ac.Memberlist.Join(existing)
}

// groupcacheURL builds a groupcache friendly RPC url from an address
func (ac *Autocache) groupcacheURL(addr string) string {
u := fmt.Sprintf("%s:https://%s", ac.scheme, addr)
Expand All @@ -146,8 +150,8 @@ func (ac *Autocache) NotifyJoin(node *memberlist.Node) {
uri := ac.groupcacheURL(node.Addr.String())
ac.removePeer(uri)
ac.peers = append(ac.peers, uri)
if ac.Pool != nil {
ac.Pool.Set(ac.peers...)
if ac.GroupcachePool != nil {
ac.GroupcachePool.Set(ac.peers...)
ac.logger.Printf("Autocache/NotifyJoin: %s peers: %v", uri, len(ac.peers))
}
}
Expand All @@ -158,7 +162,7 @@ func (ac *Autocache) NotifyJoin(node *memberlist.Node) {
func (ac *Autocache) NotifyLeave(node *memberlist.Node) {
uri := ac.groupcacheURL(node.Addr.String())
ac.removePeer(uri)
ac.Pool.Set(ac.peers...)
ac.GroupcachePool.Set(ac.peers...)
ac.logger.Printf("Autocache/NotifyLeave: %s peers: %v", uri, len(ac.peers))
}

Expand All @@ -179,9 +183,9 @@ func (ac *Autocache) removePeer(uri string) {
}

func (ac *Autocache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ac.Pool == nil {
if ac.GroupcachePool == nil {
http.Error(w, "pool not initialized", http.StatusInternalServerError)
return
}
ac.Pool.ServeHTTP(w, r)
ac.GroupcachePool.ServeHTTP(w, r)
}
31 changes: 24 additions & 7 deletions autocache_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package autocache

import (
"context"
"net"
"net/http"
"net/http/httptest"
"testing"

Expand All @@ -16,16 +18,23 @@ func TestNew(t *testing.T) {
tests := []struct {
name string
o *Options
seed []string
path string
wantErr bool
wantStatus int
wantBody string
}{
{"complete config", &Options{PoolOptions: &groupcache.HTTPPoolOptions{BasePath: "/"}, SeedNodes: []string{"https://localhost:80", "https://127.0.0.1"}}, "/no_such_group/2/", false, 404, "no such group: no_such_group\n"},
{"reject empty seed", &Options{SeedNodes: []string{}}, "", true, 0, ""},
{"reject schemeless url", &Options{SeedNodes: []string{"localhost"}}, "", true, 0, ""},
{"reject malformed url", &Options{SeedNodes: []string{"%^notscheme-url:80"}}, "", true, 0, ""},
{"memberlist can't find self", &Options{SeedNodes: []string{"https://0.0.0.0"}}, "", true, 0, ""},
{"complete config",
&Options{
PoolContext: func(_ *http.Request) context.Context { return context.TODO() },
PoolTransportFn: func(_ context.Context) http.RoundTripper { return http.DefaultTransport },
PoolOptions: &groupcache.HTTPPoolOptions{BasePath: "/"},
},
[]string{"localhost"},
"/no_such_group/2/",
false,
404,
"no such group: no_such_group\n"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -34,10 +43,10 @@ func TestNew(t *testing.T) {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
return
}
_, err = s.Join(tt.seed)
if err != nil {
return
t.Fatal(err)
}

r := httptest.NewRequest("GET", tt.path, nil)
w := httptest.NewRecorder()
s.ServeHTTP(w, r)
Expand Down Expand Up @@ -66,6 +75,14 @@ func TestNew(t *testing.T) {
if len(s.peers) != 1 {
t.Errorf("NotifyUpdate failed")
}
// check nill conditions
s.GroupcachePool = nil
r = httptest.NewRequest("GET", tt.path, nil)
w = httptest.NewRecorder()
s.ServeHTTP(w, r)
if status := w.Code; status != 500 {
t.Errorf("status code: got %v want %v", status, 500)
}
})
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/memberlist v0.1.5
github.com/miekg/dns v1.1.27 // indirect
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876 // indirect
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 // indirect
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect
golang.org/x/sys v0.0.0-20200103143344-a1369afcdac7 // indirect
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 // indirect
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqri
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876 h1:sKJQZMuxjOAR/Uo2LBfU90onWEf1dF4C+0hPJCc9Mpc=
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 h1:nVJ3guKA9qdkEQ3TUdXI9QSINo2CUPM/cySEvw2w8I0=
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -102,8 +102,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200103143344-a1369afcdac7 h1:/W9OPMnnpmFXHYkcp2rQsbFUbRlRzfECQjmAFiOyHE8=
golang.org/x/sys v0.0.0-20200103143344-a1369afcdac7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 h1:gZpLHxUX5BdYLA08Lj4YCJNN/jk7KtquiArPoeX0WvA=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

0 comments on commit 1ca1126

Please sign in to comment.