Skip to content

Commit

Permalink
Groupcache 3.
Browse files Browse the repository at this point in the history
  • Loading branch information
udhos committed May 23, 2024
1 parent e9e86b6 commit f063d3f
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 25 deletions.
98 changes: 96 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,109 @@ kubegroup_peers: Gauge: Number of peer PODs discovered.
kubegroup_events: Counter: Number of events received.
```

# Usage
# Usage for groupcache3

Import the package `github.com/udhos/kubegroup/kubegroup`.

```go
import "github.com/udhos/kubegroup/kubegroup"
```

Spawn a goroutine for `kubegroup.UpdatePeers(pool, groupcachePort)`.
```go
groupcachePort := ":5000"

// 1. start groupcache3 daemon

myIP, errAddr := kubegroup.FindMyAddress()
if errAddr != nil {
log.Fatalf("find my address: %v", errAddr)
}

myAddr := myIP + groupCachePort

daemon, errDaemon := groupcache.ListenAndServe(context.TODO(), myAddr, groupcache.Options{})
if errDaemon != nil {
log.Fatalf("groupcache daemon: %v", errDaemon)
}

// 2. spawn peering autodiscovery

const debug = true

clientsetOpt := kubeclient.Options{DebugLog: debug}
clientset, errClientset := kubeclient.New(clientsetOpt)
if errClientset != nil {
log.Fatalf("kubeclient: %v", errClientset)
}

options := kubegroup.Options{
Client: clientset,
Peers: daemon,
LabelSelector: "app=miniapi",
GroupCachePort: groupCachePort,
Debug: debug,
MetricsRegisterer: prometheus.DefaultRegisterer,
MetricsGatherer: prometheus.DefaultGatherer,
ForceNamespaceDefault: true,
}

group, errDiscovery := kubegroup.UpdatePeers(options)
if errDiscovery != nil {
log.Fatalf("kubegroup: %v", errDiscovery)
}

// 3. create groupcache groups

ttl := time.Minute

getter := groupcache.GetterFunc(
func(_ context.Context, filePath string, dest transport.Sink) error {

log.Printf("cache miss, loading file: %s (ttl:%v)",
filePath, ttl)

data, errRead := os.ReadFile(filePath)
if errRead != nil {
return errRead
}

var expire time.Time // zero value for expire means no expiration
if app.groupCacheExpire != 0 {
expire = time.Now().Add(ttl)
}

return dest.SetBytes(data, expire)
},
)

cache, errGroup := daemon.NewGroup("files", 1_000_000, getter)
if errGroup != nil {
log.Fatalf("new group: %v", errGroup)
}

// 4. query cache

var data []byte

errGet := cache.Get(r.Context(), "filename.txt", transport.AllocatingByteSliceSink(&data))
if errGet != nil {
log.Printf("cache error: %v", errGet)
} else {
log.Printf("cache response: %s", string(data))
}
```

# Example for groupcache3

See [./examples/kubegroup-example3](./examples/kubegroup-example3)

# Usage for groupcache2

Import the package `github.com/udhos/kubegroup/kubegroup`.

```go
import "github.com/udhos/kubegroup/kubegroup"
```

```go
groupcachePort := ":5000"
Expand Down
93 changes: 93 additions & 0 deletions examples/kubegroup-example3/groupcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"context"
"log"
"os"
"time"

"github.com/groupcache/groupcache-go/v3"
"github.com/groupcache/groupcache-go/v3/transport"
"github.com/udhos/kube/kubeclient"
"github.com/udhos/kubegroup/kubegroup"
)

func startGroupcache(app *application) {

//
// create groupcache instance
//

myIP, errAddr := kubegroup.FindMyAddress()
if errAddr != nil {
log.Fatalf("find my address: %v", errAddr)
}

myAddr := myIP + app.groupCachePort

daemon, errDaemon := groupcache.ListenAndServe(context.TODO(), myAddr, groupcache.Options{})
if errDaemon != nil {
log.Fatalf("groupcache daemon: %v", errDaemon)
}

//
// start watcher for addresses of peers
//

const debug = true

clientsetOpt := kubeclient.Options{DebugLog: debug}
clientset, errClientset := kubeclient.New(clientsetOpt)
if errClientset != nil {
log.Fatalf("kubeclient: %v", errClientset)
}

options := kubegroup.Options{
Client: clientset,
Peers: daemon,
LabelSelector: "app=miniapi",
GroupCachePort: app.groupCachePort, // ":5000"
Debug: debug,
MetricsRegisterer: app.registry,
MetricsGatherer: app.registry,
ForceNamespaceDefault: true,
}

group, errGroup := kubegroup.UpdatePeers(options)
if errGroup != nil {
log.Fatalf("kubegroup: %v", errGroup)
}

app.group = group

//
// create cache
//

getter := groupcache.GetterFunc(
func(_ context.Context, filePath string, dest transport.Sink) error {

log.Printf("cache miss, loading file: %s (ttl:%v)",
filePath, app.groupCacheExpire)

data, errRead := os.ReadFile(filePath)
if errRead != nil {
return errRead
}

var expire time.Time // zero value for expire means no expiration
if app.groupCacheExpire != 0 {
expire = time.Now().Add(app.groupCacheExpire)
}

return dest.SetBytes(data, expire)
},
)

cache, errGroup := daemon.NewGroup("files", app.groupCacheSizeBytes, getter)
if errGroup != nil {
log.Fatalf("new group: %v", errGroup)
}

app.cache = cache
}
129 changes: 129 additions & 0 deletions examples/kubegroup-example3/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Package main implements the example.
package main

import (
"context"
"log"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/groupcache/groupcache-go/v3/transport"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/udhos/kubegroup/kubegroup"
)

type application struct {
listenAddr string
groupCachePort string
groupCacheSizeBytes int64
groupCacheExpire time.Duration

serverMain *http.Server
//serverGroupCache *http.Server
cache transport.Group
group *kubegroup.Group

registry *prometheus.Registry
}

func main() {

mux := http.NewServeMux()

app := &application{
listenAddr: ":8080",
groupCachePort: ":5000",
groupCacheSizeBytes: 1_000_000, // limit cache at 1 MB
groupCacheExpire: 60 * time.Second, // cache TTL at 60s
registry: prometheus.NewRegistry(),
}

//
// metrics
//
app.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
app.registry.MustRegister(prometheus.NewGoCollector())

app.serverMain = &http.Server{Addr: app.listenAddr, Handler: mux}

startGroupcache(app)

mux.HandleFunc("/", func(w http.ResponseWriter,
r *http.Request) {
routeHandler(w, r, app)
})
mux.Handle("/metrics", app.metricsHandler())

go func() {
//
// start main http server
//
log.Printf("main server: listening on %s", app.listenAddr)
err := app.serverMain.ListenAndServe()
log.Printf("main server: exited: %v", err)
}()

shutdown(app)

log.Printf("exiting")
}

func (app *application) metricsHandler() http.Handler {
registerer := app.registry
gatherer := app.registry
return promhttp.InstrumentMetricHandler(
registerer, promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}),
)
}

func routeHandler(w http.ResponseWriter, r *http.Request, app *application) {

filePath := r.URL.Path

filePath = strings.TrimPrefix(filePath, "/")

var data []byte
//errGet := app.cache.Get(r.Context(), filePath, groupcache.AllocatingByteSliceSink(&data))
errGet := app.cache.Get(r.Context(), filePath, transport.AllocatingByteSliceSink(&data))
if errGet != nil {
log.Printf("routeHandler: %s %s: cache error: %v", r.Method, r.URL.Path, errGet)
http.Error(w, errGet.Error(), 500)
return
}

if _, errWrite := w.Write(data); errWrite != nil {
log.Printf("routeHandler: %s %s: write error: %v", r.Method, r.URL.Path, errWrite)
}
}

func shutdown(app *application) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
sig := <-quit

log.Printf("received signal '%v', initiating shutdown", sig)

log.Printf("stopping kubegroup")

app.group.Close() // release kubegroup resources

time.Sleep(time.Second) // give kubegroup time to log debug messages about exiting

log.Printf("stopping http servers")

httpShutdown(app.serverMain)
//httpShutdown(app.serverGroupCache)
}

func httpShutdown(server *http.Server) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("http server shutdown error: %v", err)
}
}
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/udhos/kubegroup

go 1.22.1
go 1.22.3

require (
github.com/groupcache/groupcache-go/v3 v3.0.0
github.com/modernprogram/groupcache/v2 v2.6.0
github.com/prometheus/client_golang v1.19.0
github.com/udhos/kube v0.0.1
Expand Down Expand Up @@ -38,11 +39,11 @@ require (
github.com/segmentio/fasthash v1.0.3 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/protobuf v1.33.0 // indirect
Expand Down
Loading

0 comments on commit f063d3f

Please sign in to comment.