diff --git a/README.md b/README.md index bea54ae..592edb5 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ kubegroup_peers: Gauge: Number of peer PODs discovered. kubegroup_events: Counter: Number of events received. ``` -# Usage +# Usage for groupcache3 Import the package `github.com/udhos/kubegroup/kubegroup`. @@ -23,7 +23,101 @@ Import the package `github.com/udhos/kubegroup/kubegroup`. import "github.com/udhos/kubegroup/kubegroup" ``` -Spawn a goroutine for `kubegroup.UpdatePeers(pool, groupcachePort)`. +```go +groupcachePort := ":5000" + +// 1. start groupcache3 daemon + +myIP, errAddr := kubegroup.FindMyAddress() +if errAddr != nil { + log.Fatalf("find my address: %v", errAddr) +} + +myAddr := myIP + groupCachePort + +daemon, errDaemon := groupcache.ListenAndServe(context.TODO(), myAddr, groupcache.Options{}) +if errDaemon != nil { + log.Fatalf("groupcache daemon: %v", errDaemon) +} + +// 2. spawn peering autodiscovery + +const debug = true + +clientsetOpt := kubeclient.Options{DebugLog: debug} +clientset, errClientset := kubeclient.New(clientsetOpt) +if errClientset != nil { + log.Fatalf("kubeclient: %v", errClientset) +} + +options := kubegroup.Options{ + Client: clientset, + Peers: daemon, + LabelSelector: "app=miniapi", + GroupCachePort: groupCachePort, + Debug: debug, + MetricsRegisterer: prometheus.DefaultRegisterer, + MetricsGatherer: prometheus.DefaultGatherer, + ForceNamespaceDefault: true, +} + +group, errDiscovery := kubegroup.UpdatePeers(options) +if errDiscovery != nil { + log.Fatalf("kubegroup: %v", errDiscovery) +} + +// 3. create groupcache groups + +ttl := time.Minute + +getter := groupcache.GetterFunc( + func(_ context.Context, filePath string, dest transport.Sink) error { + + log.Printf("cache miss, loading file: %s (ttl:%v)", + filePath, ttl) + + data, errRead := os.ReadFile(filePath) + if errRead != nil { + return errRead + } + + var expire time.Time // zero value for expire means no expiration + if app.groupCacheExpire != 0 { + expire = time.Now().Add(ttl) + } + + return dest.SetBytes(data, expire) + }, +) + +cache, errGroup := daemon.NewGroup("files", 1_000_000, getter) +if errGroup != nil { + log.Fatalf("new group: %v", errGroup) +} + +// 4. query cache + +var data []byte + +errGet := cache.Get(r.Context(), "filename.txt", transport.AllocatingByteSliceSink(&data)) +if errGet != nil { + log.Printf("cache error: %v", errGet) +} else { + log.Printf("cache response: %s", string(data)) +} +``` + +# Example for groupcache3 + +See [./examples/kubegroup-example3](./examples/kubegroup-example3) + +# Usage for groupcache2 + +Import the package `github.com/udhos/kubegroup/kubegroup`. + +```go +import "github.com/udhos/kubegroup/kubegroup" +``` ```go groupcachePort := ":5000" diff --git a/examples/kubegroup-example3/groupcache.go b/examples/kubegroup-example3/groupcache.go new file mode 100644 index 0000000..d5c4f73 --- /dev/null +++ b/examples/kubegroup-example3/groupcache.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "log" + "os" + "time" + + "github.com/groupcache/groupcache-go/v3" + "github.com/groupcache/groupcache-go/v3/transport" + "github.com/udhos/kube/kubeclient" + "github.com/udhos/kubegroup/kubegroup" +) + +func startGroupcache(app *application) { + + // + // create groupcache instance + // + + myIP, errAddr := kubegroup.FindMyAddress() + if errAddr != nil { + log.Fatalf("find my address: %v", errAddr) + } + + myAddr := myIP + app.groupCachePort + + daemon, errDaemon := groupcache.ListenAndServe(context.TODO(), myAddr, groupcache.Options{}) + if errDaemon != nil { + log.Fatalf("groupcache daemon: %v", errDaemon) + } + + // + // start watcher for addresses of peers + // + + const debug = true + + clientsetOpt := kubeclient.Options{DebugLog: debug} + clientset, errClientset := kubeclient.New(clientsetOpt) + if errClientset != nil { + log.Fatalf("kubeclient: %v", errClientset) + } + + options := kubegroup.Options{ + Client: clientset, + Peers: daemon, + LabelSelector: "app=miniapi", + GroupCachePort: app.groupCachePort, // ":5000" + Debug: debug, + MetricsRegisterer: app.registry, + MetricsGatherer: app.registry, + ForceNamespaceDefault: true, + } + + group, errGroup := kubegroup.UpdatePeers(options) + if errGroup != nil { + log.Fatalf("kubegroup: %v", errGroup) + } + + app.group = group + + // + // create cache + // + + getter := groupcache.GetterFunc( + func(_ context.Context, filePath string, dest transport.Sink) error { + + log.Printf("cache miss, loading file: %s (ttl:%v)", + filePath, app.groupCacheExpire) + + data, errRead := os.ReadFile(filePath) + if errRead != nil { + return errRead + } + + var expire time.Time // zero value for expire means no expiration + if app.groupCacheExpire != 0 { + expire = time.Now().Add(app.groupCacheExpire) + } + + return dest.SetBytes(data, expire) + }, + ) + + cache, errGroup := daemon.NewGroup("files", app.groupCacheSizeBytes, getter) + if errGroup != nil { + log.Fatalf("new group: %v", errGroup) + } + + app.cache = cache +} diff --git a/examples/kubegroup-example3/main.go b/examples/kubegroup-example3/main.go new file mode 100644 index 0000000..d342995 --- /dev/null +++ b/examples/kubegroup-example3/main.go @@ -0,0 +1,129 @@ +// Package main implements the example. +package main + +import ( + "context" + "log" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/groupcache/groupcache-go/v3/transport" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/udhos/kubegroup/kubegroup" +) + +type application struct { + listenAddr string + groupCachePort string + groupCacheSizeBytes int64 + groupCacheExpire time.Duration + + serverMain *http.Server + //serverGroupCache *http.Server + cache transport.Group + group *kubegroup.Group + + registry *prometheus.Registry +} + +func main() { + + mux := http.NewServeMux() + + app := &application{ + listenAddr: ":8080", + groupCachePort: ":5000", + groupCacheSizeBytes: 1_000_000, // limit cache at 1 MB + groupCacheExpire: 60 * time.Second, // cache TTL at 60s + registry: prometheus.NewRegistry(), + } + + // + // metrics + // + app.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + app.registry.MustRegister(prometheus.NewGoCollector()) + + app.serverMain = &http.Server{Addr: app.listenAddr, Handler: mux} + + startGroupcache(app) + + mux.HandleFunc("/", func(w http.ResponseWriter, + r *http.Request) { + routeHandler(w, r, app) + }) + mux.Handle("/metrics", app.metricsHandler()) + + go func() { + // + // start main http server + // + log.Printf("main server: listening on %s", app.listenAddr) + err := app.serverMain.ListenAndServe() + log.Printf("main server: exited: %v", err) + }() + + shutdown(app) + + log.Printf("exiting") +} + +func (app *application) metricsHandler() http.Handler { + registerer := app.registry + gatherer := app.registry + return promhttp.InstrumentMetricHandler( + registerer, promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}), + ) +} + +func routeHandler(w http.ResponseWriter, r *http.Request, app *application) { + + filePath := r.URL.Path + + filePath = strings.TrimPrefix(filePath, "/") + + var data []byte + //errGet := app.cache.Get(r.Context(), filePath, groupcache.AllocatingByteSliceSink(&data)) + errGet := app.cache.Get(r.Context(), filePath, transport.AllocatingByteSliceSink(&data)) + if errGet != nil { + log.Printf("routeHandler: %s %s: cache error: %v", r.Method, r.URL.Path, errGet) + http.Error(w, errGet.Error(), 500) + return + } + + if _, errWrite := w.Write(data); errWrite != nil { + log.Printf("routeHandler: %s %s: write error: %v", r.Method, r.URL.Path, errWrite) + } +} + +func shutdown(app *application) { + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + sig := <-quit + + log.Printf("received signal '%v', initiating shutdown", sig) + + log.Printf("stopping kubegroup") + + app.group.Close() // release kubegroup resources + + time.Sleep(time.Second) // give kubegroup time to log debug messages about exiting + + log.Printf("stopping http servers") + + httpShutdown(app.serverMain) + //httpShutdown(app.serverGroupCache) +} + +func httpShutdown(server *http.Server) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + log.Printf("http server shutdown error: %v", err) + } +} diff --git a/go.mod b/go.mod index a9380a8..aac0ed1 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module github.com/udhos/kubegroup -go 1.22.1 +go 1.22.3 require ( + github.com/groupcache/groupcache-go/v3 v3.0.0 github.com/modernprogram/groupcache/v2 v2.6.0 github.com/prometheus/client_golang v1.19.0 github.com/udhos/kube v0.0.1 @@ -38,11 +39,11 @@ require ( github.com/segmentio/fasthash v1.0.3 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/net v0.22.0 // indirect + golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.18.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/term v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/term v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.33.0 // indirect diff --git a/go.sum b/go.sum index 5653991..b7c6d64 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/groupcache/groupcache-go/v3 v3.0.0 h1:R8HqV4DuXkwevmPAP/YyVFkxCA6z9tEt8O//UuRucDc= +github.com/groupcache/groupcache-go/v3 v3.0.0/go.mod h1:xN6Zm5kE5pbD8UEmJZLnEPslDWSMV3unosIl/g5W3U0= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -106,8 +108,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -122,18 +124,18 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/kubegroup/kubegroup.go b/kubegroup/kubegroup.go index eebad93..3346315 100644 --- a/kubegroup/kubegroup.go +++ b/kubegroup/kubegroup.go @@ -2,11 +2,13 @@ package kubegroup import ( + "context" "fmt" "log" "net" "os" + "github.com/groupcache/groupcache-go/v3/transport/peer" "github.com/prometheus/client_golang/prometheus" "github.com/udhos/kubepodinformer/podinformer" "k8s.io/client-go/kubernetes" @@ -24,6 +26,11 @@ func FindMyURL(groupcachePort string) (string, error) { return url, nil } +// FindMyAddress returns my adress. +func FindMyAddress() (string, error) { + return findMyAddr() +} + func findMyAddr() (string, error) { host, errHost := os.Hostname() if errHost != nil { @@ -59,6 +66,13 @@ type PeerGroup interface { Set(peers ...string) } +// PeerSet is an interface to plug in a target for delivering peering +// updates. *groupcache.daemon, created with +// groupcache.ListenAndServe(), implements this interface. +type PeerSet interface { + SetPeers(ctx context.Context, peers []peer.Info) error +} + // Options specifies options for UpdatePeers. type Options struct { // Pool is an interface to plug in a target for delivering peering @@ -66,6 +80,11 @@ type Options struct { // groupcache.NewHTTPPoolOpts(), implements this interface. Pool PeerGroup + // PeerSet is an interface to plug in a target for delivering peering + // updates. *groupcache.daemon, created with + // groupcache.ListenAndServe(), implements this interface. + Peers PeerSet + // Client provides kubernetes client. Client *kubernetes.Clientset @@ -100,6 +119,7 @@ type Group struct { options Options informer *podinformer.PodInformer m *metrics + myAddr string } func (g *Group) debugf(format string, v ...any) { @@ -124,8 +144,8 @@ func UpdatePeers(options Options) (*Group, error) { // // Required fields. // - if options.Pool == nil { - panic("Pool is nil") + if options.Pool == nil && options.Peers == nil { + panic("Pool and Peers are both nil") } if options.Client == nil { panic("Client is nil") @@ -158,9 +178,15 @@ func UpdatePeers(options Options) (*Group, error) { namespace = ns } + myAddr, errAddr := findMyAddr() + if errAddr != nil { + return nil, errAddr + } + group := &Group{ options: options, m: newMetrics(options.MetricsNamespace, options.MetricsRegisterer), + myAddr: myAddr, } optionsInformer := podinformer.Options{ @@ -187,18 +213,44 @@ func (g *Group) onUpdate(pods []podinformer.Pod) { size := len(pods) g.debugf("%s: %d", me, size) - peers := make([]string, 0, size) + if g.options.Peers != nil { + + peers := make([]peer.Info, 0, size) + + for i, p := range pods { + hostPort := p.IP + g.options.GroupCachePort + isSelf := g.myAddr == p.IP - for i, p := range pods { - g.debugf("%s: %d/%d: namespace=%s pod=%s ip=%s ready=%t", - me, i+1, size, p.Namespace, p.Name, p.IP, p.Ready) + g.debugf("%s: %d/%d: namespace=%s pod=%s ip=%s ready=%t host_port=%s is_self=%t", + me, i+1, size, p.Namespace, p.Name, p.IP, p.Ready, hostPort, isSelf) - if p.Ready { - peers = append(peers, buildURL(p.IP, g.options.GroupCachePort)) + if p.Ready { + peers = append(peers, peer.Info{ + Address: hostPort, + IsSelf: isSelf, + }) + } } - } - g.options.Pool.Set(peers...) + err := g.options.Peers.SetPeers(context.TODO(), peers) + if err != nil { + g.errorf("set peers: error: %v", err) + } + + } else { + peers := make([]string, 0, size) + + for i, p := range pods { + g.debugf("%s: %d/%d: namespace=%s pod=%s ip=%s ready=%t", + me, i+1, size, p.Namespace, p.Name, p.IP, p.Ready) + + if p.Ready { + peers = append(peers, buildURL(p.IP, g.options.GroupCachePort)) + } + } + + g.options.Pool.Set(peers...) + } g.m.events.Inc() g.m.peers.Set(float64(size))