Skip to content

Commit

Permalink
Initial.
Browse files Browse the repository at this point in the history
  • Loading branch information
udhos committed Feb 16, 2024
1 parent acb4a02 commit 8230e0d
Show file tree
Hide file tree
Showing 10 changed files with 1,044 additions and 0 deletions.
28 changes: 28 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash

go install golang.org/x/vuln/cmd/govulncheck@latest
go install golang.org/x/tools/cmd/deadcode@latest

gofmt -s -w .

revive ./...

gocyclo -over 15 .

go mod tidy

govulncheck ./...

deadcode ./cmd/*

go env -w CGO_ENABLED=1

go test -race ./...

#go test -bench=BenchmarkController ./cmd/gateboard

go env -w CGO_ENABLED=0

go install ./...

go env -u CGO_ENABLED
49 changes: 49 additions & 0 deletions cmd/kubecache/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"time"

"github.com/udhos/boilerplate/envconfig"
)

type config struct {
listenAddr string
backendURL string
backendTimeout time.Duration
cacheTTL time.Duration
healthAddr string
healthPath string
metricsAddr string
metricsPath string
metricsNamespace string
metricsBucketsLatencyHTTP []float64
groupcachePort string
groupcacheSizeBytes int64
kubegroupMetricsNamespace string
kubegroupDebug bool
kubegroupListerInterval time.Duration
}

func newConfig(roleSessionName string) config {

env := envconfig.NewSimple(roleSessionName)

return config{
listenAddr: env.String("LISTEN_ADDR", ":9000"),
backendURL: env.String("BACKEND_URL", "http:https://config-server:9000"),
cacheTTL: env.Duration("CACHE_TTL", 300*time.Second),
backendTimeout: env.Duration("BACKEND_TIMEOUT", 300*time.Second),
healthAddr: env.String("HEALTH_ADDR", ":8888"),
healthPath: env.String("HEALTH_PATH", "/health"),
metricsAddr: env.String("METRICS_ADDR", ":3000"),
metricsPath: env.String("METRICS_PATH", "/metrics"),
metricsNamespace: env.String("METRICS_NAMESPACE", ""),
metricsBucketsLatencyHTTP: env.Float64Slice("METRICS_BUCKETS_LATENCY_HTTP",
[]float64{0.00001, 0.000025, 0.00005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000}),
groupcachePort: env.String("GROUPCACHE_PORT", ":5000"),
groupcacheSizeBytes: env.Int64("GROUPCACHE_SIZE_BYTES", 10_000_000),
kubegroupMetricsNamespace: env.String("KUBEGROUP_METRICS_NAMESPACE", ""),
kubegroupDebug: env.Bool("KUBEGROUP_DEBUG", true),
kubegroupListerInterval: env.Duration("KUBEGROUP_LISTER_INTERVAL", 20*time.Second),
}
}
36 changes: 36 additions & 0 deletions cmd/kubecache/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"context"
"io"
"net/http"

"go.opentelemetry.io/otel/trace"
)

func fetch(c context.Context, client *http.Client, tracer trace.Tracer, method, uri string) ([]byte, int, error) {

const me = "fetch"
ctx, span := tracer.Start(c, me)
defer span.End()

req, errReq := http.NewRequestWithContext(ctx, method, uri, nil)
if errReq != nil {
return nil, 500, errReq
}

req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

resp, errDo := client.Do(req)
if errDo != nil {
return nil, 500, errDo
}
defer resp.Body.Close()

body, errBody := io.ReadAll(resp.Body)
if errBody != nil {
return nil, 500, errBody
}

return body, 200, nil
}
144 changes: 144 additions & 0 deletions cmd/kubecache/groupcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/modernprogram/groupcache/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
"github.com/udhos/groupcache_exporter"
"github.com/udhos/groupcache_exporter/groupcache/modernprogram"
"github.com/udhos/kubegroup/kubegroup"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

func startGroupcache(app *application) func() {

workspace := groupcache.NewWorkspace()

//
// create groupcache pool
//

myURL, errURL := kubegroup.FindMyURL(app.cfg.groupcachePort)
if errURL != nil {
log.Fatal().Msgf("my URL: %v", errURL)
}
log.Printf("groupcache my URL: %s", myURL)

pool := groupcache.NewHTTPPoolOptsWithWorkspace(workspace, myURL, &groupcache.HTTPPoolOptions{})

//
// start groupcache server
//

app.serverGroupCache = &http.Server{Addr: app.cfg.groupcachePort, Handler: pool}

go func() {
log.Info().Msgf("groupcache server: listening on %s", app.cfg.groupcachePort)
err := app.serverGroupCache.ListenAndServe()
log.Error().Msgf("groupcache server: exited: %v", err)
}()

//
// start watcher for addresses of peers
//

options := kubegroup.Options{
Pool: pool,
GroupCachePort: app.cfg.groupcachePort,
//PodLabelKey: "app", // default is "app"
//PodLabelValue: "my-app-name", // default is current PODs label value for label key
MetricsRegisterer: app.registry,
MetricsGatherer: app.registry,
MetricsNamespace: app.cfg.kubegroupMetricsNamespace,
Debug: app.cfg.kubegroupDebug,
ListerInterval: app.cfg.kubegroupListerInterval,
}

kg, errKg := kubegroup.UpdatePeers(options)
if errKg != nil {
log.Fatal().Msgf("kubegroup error: %v", errKg)
}

//
// create cache
//

httpClient := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
Timeout: app.cfg.backendTimeout,
}

getter := groupcache.GetterFunc(
func(c context.Context, key string, dest groupcache.Sink) error {

const me = "groupcache.getter"
ctx, span := app.tracer.Start(c, me)
defer span.End()

method, uri, found := strings.Cut(key, " ")
if !found {
return fmt.Errorf("getter: bad key: '%s'", key)
}

u, errURL := url.JoinPath(app.cfg.backendURL, uri)
if errURL != nil {
return fmt.Errorf("getter: bad URL: %v", errURL)
}

body, status, errFetch := fetch(ctx, httpClient, app.tracer, method, u)

traceID := span.SpanContext().TraceID().String()
log.Info().Str("traceID", traceID).Msgf("traceID=%s key='%s' status=%d error:%v",
traceID, key, status, errFetch)

if errFetch != nil {
return errFetch
}

resp := response{
Body: body,
Status: status,
}

data, errJ := json.Marshal(resp)
if errFetch != nil {
return errJ
}

expire := time.Now().Add(app.cfg.cacheTTL)

return dest.SetBytes(data, expire)
},
)

// https://talks.golang.org/2013/oscon-dl.slide#46
//
// 64 MB max per-node memory usage
app.cache = groupcache.NewGroupWithWorkspace(workspace, "path", app.cfg.groupcacheSizeBytes, getter)

//
// expose prometheus metrics for groupcache
//

g := modernprogram.New(app.cache)
labels := map[string]string{
//"app": appName,
}
namespace := ""
collector := groupcache_exporter.NewExporter(namespace, labels, g)
prometheus.MustRegister(collector)

stop := func() {
kg.Close()
}

return stop
}
Loading

0 comments on commit 8230e0d

Please sign in to comment.