diff --git a/.github/workflows/golangci.lint.yml b/.github/workflows/golangci.lint.yml new file mode 100644 index 0000000000..d0c3f2cd06 --- /dev/null +++ b/.github/workflows/golangci.lint.yml @@ -0,0 +1,32 @@ +name: CI Lint + +on: + pull_request: + branches: + - main + paths: + - "**/*.go" + - ".github/workflows/golangci.lint.yml" + +env: + GO_VERSION: "1.20" + +jobs: + + analysis: + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Check out code into the Go module directory + uses: actions/checkout@v3 + + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.54 + args: --timeout=30m --disable=errcheck diff --git a/cmd/server.go b/cmd/server.go index 37ecf45972..7800b14c7c 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -20,10 +20,8 @@ package cmd import ( "log" - "math/rand" "os" "sync" - "time" "github.com/megaease/easegress/v2/pkg/api" "github.com/megaease/easegress/v2/pkg/cluster" @@ -40,8 +38,6 @@ import ( // RunServer runs Easegress server. func RunServer() { - rand.Seed(time.Now().UnixNano()) - opt := option.New() if err := opt.Parse(); err != nil { common.Exit(1, err.Error()) diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 082ddd816d..e97fa055d0 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -18,8 +18,8 @@ package cluster import ( + "crypto/rand" "fmt" - "math/rand" "os" "path" "path/filepath" @@ -48,7 +48,6 @@ func getRandomString(n int) string { } func TestMain(m *testing.M) { - rand.Seed(time.Now().UnixNano()) logger.InitNop() // logger.InitMock() tempDir = path.Join(tempDir, getRandomString(6)) diff --git a/pkg/filters/headerlookup/headerlookup_test.go b/pkg/filters/headerlookup/headerlookup_test.go index 943a7f5d30..d48ee54e1f 100644 --- a/pkg/filters/headerlookup/headerlookup_test.go +++ b/pkg/filters/headerlookup/headerlookup_test.go @@ -21,7 +21,6 @@ import ( "net/http" "os" "sort" - "sync" "testing" "time" @@ -79,9 +78,8 @@ func TestValidate(t *testing.T) { assert := assert.New(t) clusterInstance, _ := createClusterAndSyncer() - var mockMap sync.Map supervisor := supervisor.NewMock( - nil, clusterInstance, mockMap, mockMap, nil, nil, false, nil, nil) + nil, clusterInstance, nil, nil, false, nil, nil) const validYaml = ` name: headerLookup @@ -191,9 +189,8 @@ headerSetters: clusterInstance, syncerChannel := createClusterAndSyncer() - var mockMap sync.Map supervisor := supervisor.NewMock( - nil, clusterInstance, mockMap, mockMap, nil, nil, false, nil, nil) + nil, clusterInstance, nil, nil, false, nil, nil) // let's put data to 'foobar' foobar := ` @@ -225,6 +222,7 @@ extra-entry: "extra" assert.Equal("123456789", hdr1) hl, err = createHeaderLookup(config, hl, supervisor) + assert.Nil(err) ctx, header = prepareCtxAndHeader(t) // update key-value store @@ -245,6 +243,7 @@ extra-entry: "extra" assert.Equal("77341", header.Get("user-ext-id")) hl, err = createHeaderLookup(config, hl, supervisor) + assert.Nil(err) ctx, header = prepareCtxAndHeader(t) header.Set("X-AUTH-USER", "foobar") // delete foobar completely @@ -274,9 +273,8 @@ headerSetters: headerKey: "user-ext-id" ` clusterInstance, _ := createClusterAndSyncer() - var mockMap sync.Map supervisor := supervisor.NewMock( - nil, clusterInstance, mockMap, mockMap, nil, nil, false, nil, nil) + nil, clusterInstance, nil, nil, false, nil, nil) bobbanana := ` ext-id: "333" extra-entry: "extra" diff --git a/pkg/filters/meshadaptor/meshadaptor.go b/pkg/filters/meshadaptor/meshadaptor.go index 1a83d715fd..0113fdd571 100644 --- a/pkg/filters/meshadaptor/meshadaptor.go +++ b/pkg/filters/meshadaptor/meshadaptor.go @@ -24,7 +24,6 @@ import ( proxy "github.com/megaease/easegress/v2/pkg/filters/proxies/httpproxy" "github.com/megaease/easegress/v2/pkg/protocols/httpprot" "github.com/megaease/easegress/v2/pkg/protocols/httpprot/httpheader" - "github.com/megaease/easegress/v2/pkg/util/pathadaptor" ) const ( @@ -52,8 +51,6 @@ type ( // MeshAdaptor is filter MeshAdaptor. MeshAdaptor struct { spec *Spec - - pa *pathadaptor.PathAdaptor } // Spec is HTTPAdaptor Spec. diff --git a/pkg/filters/oidcadaptor/oidcadaptor.go b/pkg/filters/oidcadaptor/oidcadaptor.go index 86c78883c5..b9b5ca3293 100644 --- a/pkg/filters/oidcadaptor/oidcadaptor.go +++ b/pkg/filters/oidcadaptor/oidcadaptor.go @@ -253,7 +253,7 @@ func (o *OIDCAdaptor) handleOIDCCallback(ctx *context.Context) string { if err != nil { return filterResp(rw, http.StatusForbidden, err.Error()) } - oidcToken, err := o.fetchOIDCToken(authCode, state, spec, err, rw, req) + oidcToken, err := o.fetchOIDCToken(authCode, state, spec, rw, req) if err != nil { return errorResp(rw, "fetch OIDC token error: "+err.Error()) } @@ -293,7 +293,7 @@ func (o *OIDCAdaptor) handleOIDCCallback(ctx *context.Context) string { return "" } -func (o *OIDCAdaptor) fetchOIDCToken(authCode string, state string, spec *Spec, err error, rw *httpprot.Response, req *httpprot.Request) (*oidcIDToken, error) { +func (o *OIDCAdaptor) fetchOIDCToken(authCode string, state string, spec *Spec, rw *httpprot.Response, req *httpprot.Request) (*oidcIDToken, error) { // client_secret_post || client_secret_basic tokenFormData := url.Values{ "client_id": {o.spec.ClientID}, diff --git a/pkg/filters/opafilter/opafilter_test.go b/pkg/filters/opafilter/opafilter_test.go index ee3498d542..7b0e7ef762 100644 --- a/pkg/filters/opafilter/opafilter_test.go +++ b/pkg/filters/opafilter/opafilter_test.go @@ -55,14 +55,13 @@ func createOPAFilter(yamlConfig string, prev *OPAFilter, supervisor *supervisor. } type testCase struct { - req func() *http.Request - status int - shouldHandlerError bool - shouldRegoError bool - readBody bool - policy string - defaultStatus int - includedHeaders string + req func() *http.Request + status int + shouldRegoError bool + readBody bool + policy string + defaultStatus int + includedHeaders string } func TestOpaPolicyInFilter(t *testing.T) { diff --git a/pkg/filters/proxies/grpcproxy/codec.go b/pkg/filters/proxies/grpcproxy/codec.go index 501598ef18..174935369a 100644 --- a/pkg/filters/proxies/grpcproxy/codec.go +++ b/pkg/filters/proxies/grpcproxy/codec.go @@ -52,7 +52,7 @@ func (GrpcCodec) Unmarshal(data []byte, v interface{}) error { return proto.Unmarshal(data, v.(proto.Message)) } -// String return codec name -func (GrpcCodec) String() string { +// Name return codec name +func (GrpcCodec) Name() string { return codecName } diff --git a/pkg/filters/proxies/grpcproxy/codec_test.go b/pkg/filters/proxies/grpcproxy/codec_test.go index 9a4f9b22f7..b42e8535b9 100644 --- a/pkg/filters/proxies/grpcproxy/codec_test.go +++ b/pkg/filters/proxies/grpcproxy/codec_test.go @@ -25,7 +25,7 @@ import ( func TestGrpcCodecString(t *testing.T) { grpcCodec := GrpcCodec{} - assert.Equal(t, codecName, grpcCodec.String()) + assert.Equal(t, codecName, grpcCodec.Name()) } func TestCodecMarshalUnmarshal(t *testing.T) { diff --git a/pkg/filters/proxies/grpcproxy/pool.go b/pkg/filters/proxies/grpcproxy/pool.go index a9c34aabb5..1802e63c95 100644 --- a/pkg/filters/proxies/grpcproxy/pool.go +++ b/pkg/filters/proxies/grpcproxy/pool.go @@ -345,8 +345,8 @@ func (sp *ServerPool) biTransport(ctx *serverPoolContext, proxyAsClientStream gr // Explicitly *do not Close* c2sErrChan and c2sErrChan, otherwise the select below will not terminate. // Channels do not have to be closed, it is just a control flow mechanism, see // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ - c2sErrChan := sp.forwardE2E(ctx.stdr, proxyAsClientStream, nil) - s2cErrChan := sp.forwardE2E(proxyAsClientStream, ctx.stdw, ctx.resp.RawHeader()) + c2sErrChan := sp.forwardS2C(ctx.stdr, proxyAsClientStream, nil) + s2cErrChan := sp.forwardC2S(proxyAsClientStream, ctx.stdw, ctx.resp.RawHeader()) // We don't know which side is going to stop sending first, so we need a select between the two. for { select { @@ -381,7 +381,7 @@ func (sp *ServerPool) buildOutputResponse(spCtx *serverPoolContext, s *status.St spCtx.SetOutputResponse(spCtx.resp) } -func (sp *ServerPool) forwardE2E(src grpc.Stream, dst grpc.Stream, header *grpcprot.Header) chan error { +func (sp *ServerPool) forwardC2S(src grpc.ClientStream, dst grpc.ServerStream, header *grpcprot.Header) chan error { ret := make(chan error, 1) go func() { f := &emptypb.Empty{} @@ -391,24 +391,40 @@ func (sp *ServerPool) forwardE2E(src grpc.Stream, dst grpc.Stream, header *grpcp return } if i == 0 { - if cs, ok := src.(grpc.ClientStream); ok { - // This is a bit of a hack, but client to server headers are only readable after first client msg is - // received but must be written to server stream before the first msg is flushed. - // This is the only place to do it nicely. - md, err := cs.Header() - if err != nil { - ret <- err - return - } - if header != nil { - md = metadata.Join(header.GetMD(), md) - } - - if err = dst.(grpc.ServerStream).SendHeader(md); err != nil { - ret <- err - return - } + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.Header() + if err != nil { + ret <- err + return } + if header != nil { + md = metadata.Join(header.GetMD(), md) + } + + if err = dst.SendHeader(md); err != nil { + ret <- err + return + } + } + if err := dst.SendMsg(f); err != nil { + ret <- err + return + } + } + }() + return ret +} + +func (sp *ServerPool) forwardS2C(src grpc.ServerStream, dst grpc.ClientStream, header *grpcprot.Header) chan error { + ret := make(chan error, 1) + go func() { + f := &emptypb.Empty{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + return } if err := dst.SendMsg(f); err != nil { ret <- err diff --git a/pkg/filters/proxies/grpcproxy/proxy.go b/pkg/filters/proxies/grpcproxy/proxy.go index fadbaa49b8..25d3f3d73a 100644 --- a/pkg/filters/proxies/grpcproxy/proxy.go +++ b/pkg/filters/proxies/grpcproxy/proxy.go @@ -70,8 +70,9 @@ var ( } defaultDialOpts = []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithCodec(&GrpcCodec{}), - grpc.WithBlock()} + grpc.WithDefaultCallOptions(grpc.ForceCodec(&GrpcCodec{})), + grpc.WithBlock(), + } ) var _ filters.Filter = (*Proxy)(nil) diff --git a/pkg/filters/proxies/httpproxy/pool_test.go b/pkg/filters/proxies/httpproxy/pool_test.go index f4e27f9510..125e5516fe 100644 --- a/pkg/filters/proxies/httpproxy/pool_test.go +++ b/pkg/filters/proxies/httpproxy/pool_test.go @@ -19,7 +19,6 @@ package httpproxy import ( "net/http" - "sync" "testing" "github.com/megaease/easegress/v2/pkg/context" @@ -93,7 +92,7 @@ servers: assert.NoError(spec.Validate()) p := &Proxy{} - p.super = supervisor.NewMock(option.New(), nil, sync.Map{}, sync.Map{}, nil, + p.super = supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) sp := NewServerPool(p, spec, "test") policies := map[string]resilience.Policy{} @@ -135,7 +134,7 @@ servers: assert.NoError(spec.Validate()) p := &Proxy{} - p.super = supervisor.NewMock(option.New(), nil, sync.Map{}, sync.Map{}, nil, + p.super = supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) sp := NewServerPool(p, spec, "test") spCtx := &serverPoolContext{ @@ -178,7 +177,7 @@ func TestCopyCORSHeaders(t *testing.T) { dst.Add("X-Dst", "dst") p := &Proxy{} - p.super = supervisor.NewMock(option.New(), nil, sync.Map{}, sync.Map{}, nil, + p.super = supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) sp := NewServerPool(p, &ServerPoolSpec{}, "test") dst = sp.mergeResponseHeader(dst, src) diff --git a/pkg/filters/proxies/httpproxy/proxy_test.go b/pkg/filters/proxies/httpproxy/proxy_test.go index 230bd56627..5ff5a1554b 100644 --- a/pkg/filters/proxies/httpproxy/proxy_test.go +++ b/pkg/filters/proxies/httpproxy/proxy_test.go @@ -24,7 +24,6 @@ import ( "net/http/httptest" "os" "strings" - "sync" "sync/atomic" "testing" "time" @@ -58,7 +57,7 @@ func newTestProxy(yamlConfig string, assert *assert.Assertions) *Proxy { proxy := kind.CreateInstance(spec).(*Proxy) - proxy.super = supervisor.NewMock(option.New(), nil, sync.Map{}, sync.Map{}, nil, + proxy.super = supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) proxy.Init() diff --git a/pkg/filters/proxies/loadbalance_test.go b/pkg/filters/proxies/loadbalance_test.go index 3e2d4d0614..27fafaad56 100644 --- a/pkg/filters/proxies/loadbalance_test.go +++ b/pkg/filters/proxies/loadbalance_test.go @@ -19,7 +19,6 @@ package proxies import ( "fmt" - "math/rand" "net/http" "os" "sync" @@ -85,8 +84,6 @@ func TestGeneralLoadBalancer(t *testing.T) { } func TestRandomLoadBalancePolicy(t *testing.T) { - rand.Seed(0) - counter := [10]int{} servers := prepareServers(10) @@ -126,15 +123,13 @@ func TestRoundRobinLoadBalancePolicy(t *testing.T) { } func TestWeightedRandomLoadBalancePolicy(t *testing.T) { - rand.Seed(0) - counter := [10]int{} servers := prepareServers(10) lb := NewGeneralLoadBalancer(&LoadBalanceSpec{Policy: LoadBalancePolicyWeightedRandom}, servers) lb.Init(nil, nil, nil) - for i := 0; i < 1000; i++ { + for i := 0; i < 50000; i++ { svr := lb.ChooseServer(nil) counter[svr.Weight-1]++ } @@ -142,7 +137,7 @@ func TestWeightedRandomLoadBalancePolicy(t *testing.T) { v := 0 for i := 0; i < 10; i++ { if v >= counter[i] { - t.Error("possibility is not weighted even") + t.Errorf("possibility is not weighted even %v", counter) } v = counter[i] } diff --git a/pkg/filters/proxies/requestmatch_test.go b/pkg/filters/proxies/requestmatch_test.go index 65e9d0f42f..d7d01a3eaf 100644 --- a/pkg/filters/proxies/requestmatch_test.go +++ b/pkg/filters/proxies/requestmatch_test.go @@ -19,7 +19,6 @@ package proxies import ( "fmt" - "math/rand" "net/http" "strconv" "testing" @@ -53,8 +52,6 @@ func TestRequestMatcherBaseSpecValidate(t *testing.T) { } func TestRandomMatcher(t *testing.T) { - rand.Seed(0) - rm := NewRequestMatcher(&RequestMatcherBaseSpec{ Policy: "random", Permil: 100, diff --git a/pkg/filters/validator/basicauth.go b/pkg/filters/validator/basicauth.go index 6c628b17c3..c737013608 100644 --- a/pkg/filters/validator/basicauth.go +++ b/pkg/filters/validator/basicauth.go @@ -206,7 +206,6 @@ func (huc *htpasswdUserCache) WatchChanges() { if err != nil { logger.Errorf(err.Error()) } - return } func (huc *htpasswdUserCache) Close() { @@ -320,7 +319,6 @@ func (euc *etcdUserCache) WatchChanges() { } } }() - return } func (euc *etcdUserCache) Close() { diff --git a/pkg/filters/validator/validator_test.go b/pkg/filters/validator/validator_test.go index 48eab21e98..c1944d80c7 100644 --- a/pkg/filters/validator/validator_test.go +++ b/pkg/filters/validator/validator_test.go @@ -24,7 +24,6 @@ import ( "net/http" "os" "strings" - "sync" "testing" "time" @@ -535,9 +534,8 @@ basicAuth: return kvs, nil } - var mockMap sync.Map supervisor := supervisor.NewMock( - nil, clusterInstance, mockMap, mockMap, nil, nil, false, nil, nil) + nil, clusterInstance, nil, nil, false, nil, nil) yamlConfig := ` kind: Validator diff --git a/pkg/graceupdate/graceupdate.go b/pkg/graceupdate/graceupdate.go index ab96842a58..ea2f808733 100644 --- a/pkg/graceupdate/graceupdate.go +++ b/pkg/graceupdate/graceupdate.go @@ -71,7 +71,6 @@ func NotifySigUsr2(closeCls func(), restartCls func()) error { // Reset signal usr2 notify NotifySigUsr2(closeCls, restartCls) } else { - childdone := make(chan error, 1) go func() { process, err := os.FindProcess(pid) if err != nil { @@ -79,13 +78,9 @@ func NotifySigUsr2(closeCls func(), restartCls func()) error { NotifySigUsr2(closeCls, restartCls) } else { _, werr := process.Wait() - childdone <- werr - select { - case err := <-childdone: - logger.Errorf("child proc exited: %v", err) - restartCls() - NotifySigUsr2(closeCls, restartCls) - } + logger.Errorf("child proc exited: %v", werr) + restartCls() + NotifySigUsr2(closeCls, restartCls) } }() } diff --git a/pkg/object/autocertmanager/autocertmanager_test.go b/pkg/object/autocertmanager/autocertmanager_test.go index 673d88b29a..158cd12840 100644 --- a/pkg/object/autocertmanager/autocertmanager_test.go +++ b/pkg/object/autocertmanager/autocertmanager_test.go @@ -20,8 +20,8 @@ package autocertmanager import ( "context" "crypto" - "crypto/dsa" "crypto/ecdsa" + "crypto/ed25519" "crypto/elliptic" "crypto/rand" "crypto/rsa" @@ -775,9 +775,8 @@ func TestCertificateHelpers(t *testing.T) { keys[0] = rsaKey ecdsaKey, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) keys[1] = ecdsaKey - dsaKey := &dsa.PrivateKey{} - _ = dsa.GenerateKey(dsaKey, rand.Reader) - keys[2] = dsaKey + _, edKey, _ := ed25519.GenerateKey(rand.Reader) + keys[2] = edKey rsaKey2, _ := rsa.GenerateKey(rand.Reader, 2048) rsaKey2.D = nil keys[3] = rsaKey2 @@ -855,8 +854,7 @@ func TestDomain(t *testing.T) { }) t.Run("renewCert", func(t *testing.T) { - var ca *httptest.Server - ca = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ca := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Replay-Nonce", "nonce") w.WriteHeader(http.StatusForbidden) w.Write([]byte("{}")) diff --git a/pkg/object/function/worker/worker.go b/pkg/object/function/worker/worker.go index a376352624..74d7a62379 100644 --- a/pkg/object/function/worker/worker.go +++ b/pkg/object/function/worker/worker.go @@ -34,7 +34,6 @@ type ( // Worker stores the worker information Worker struct { mutex sync.RWMutex - super *supervisor.Supervisor superSpec *supervisor.Spec name string diff --git a/pkg/object/gatewaycontroller/k8s.go b/pkg/object/gatewaycontroller/k8s.go index aab52a4a86..5323e14bd4 100644 --- a/pkg/object/gatewaycontroller/k8s.go +++ b/pkg/object/gatewaycontroller/k8s.go @@ -462,14 +462,6 @@ func (c *k8sClient) getService(namespace, name string) (*apicorev1.Service, erro return service, err } -func (c *k8sClient) getEndpoints(namespace, name string) (*apicorev1.Endpoints, error) { - endpoint, err := c.kFactory.Core().V1().Endpoints().Lister().Endpoints(namespace).Get(name) - if errors.IsNotFound(err) { - err = nil - } - return endpoint, err -} - func (c *k8sClient) getSecret(namespace, name string) (*apicorev1.Secret, error) { secret, err := c.kFactory.Core().V1().Secrets().Lister().Secrets(namespace).Get(name) if errors.IsNotFound(err) { diff --git a/pkg/object/globalfilter/globalfilter.go b/pkg/object/globalfilter/globalfilter.go index f81fa683d4..33776c0289 100644 --- a/pkg/object/globalfilter/globalfilter.go +++ b/pkg/object/globalfilter/globalfilter.go @@ -126,6 +126,7 @@ func (gf *GlobalFilter) Status() *supervisor.Status { // Init initializes GlobalFilter. func (gf *GlobalFilter) Init(superSpec *supervisor.Spec) { gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec) + gf.super = superSpec.Super() gf.reload(nil) } diff --git a/pkg/object/grpcserver/runtime.go b/pkg/object/grpcserver/runtime.go index d7622e4e53..acf08fe41e 100644 --- a/pkg/object/grpcserver/runtime.go +++ b/pkg/object/grpcserver/runtime.go @@ -30,9 +30,14 @@ import ( "github.com/megaease/easegress/v2/pkg/supervisor" "github.com/megaease/easegress/v2/pkg/util/limitlistener" "google.golang.org/grpc" + "google.golang.org/grpc/encoding" "google.golang.org/grpc/keepalive" ) +func init() { + encoding.RegisterCodec(&grpcproxy.GrpcCodec{}) +} + const ( stateNil stateType = "nil" stateFailed stateType = "failed" @@ -219,7 +224,7 @@ func (r *runtime) startServer() { r.setError(err) return } - opts := []grpc.ServerOption{grpc.UnknownServiceHandler(r.mux.handler), grpc.CustomCodec(&grpcproxy.GrpcCodec{})} + opts := []grpc.ServerOption{grpc.UnknownServiceHandler(r.mux.handler)} keepaliveOpts := r.buildServerKeepaliveOpt() if len(keepaliveOpts) != 0 { diff --git a/pkg/object/httpserver/httpserver_test.go b/pkg/object/httpserver/httpserver_test.go index ffe74d814a..ea1d67a758 100644 --- a/pkg/object/httpserver/httpserver_test.go +++ b/pkg/object/httpserver/httpserver_test.go @@ -19,7 +19,6 @@ package httpserver import ( "os" - "sync" "testing" "time" @@ -46,7 +45,7 @@ port: 38081 keepAlive: true https: false ` - super := supervisor.NewMock(option.New(), nil, sync.Map{}, sync.Map{}, nil, + super := supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) superSpec, err := super.NewSpec(yamlConfig) assert.NoError(err) diff --git a/pkg/object/httpserver/routers/radixtree/router.go b/pkg/object/httpserver/routers/radixtree/router.go index 40db7ec560..83f3cfd5d9 100644 --- a/pkg/object/httpserver/routers/radixtree/router.go +++ b/pkg/object/httpserver/routers/radixtree/router.go @@ -514,11 +514,7 @@ func newMuxRule(rule *routers.Rule) *muxRule { if seg.nodeType == ntStatic { p := path.Path - if _, ok := mr.pathCache[p]; ok { - mr.pathCache[p] = append(mr.pathCache[p], newMuxPath(path)) - } else { - mr.pathCache[p] = []*muxPath{newMuxPath(path)} - } + mr.pathCache[p] = append(mr.pathCache[p], newMuxPath(path)) } else { mr.root.insert(path) } diff --git a/pkg/object/httpserver/runtime_test.go b/pkg/object/httpserver/runtime_test.go index 6107ffcf94..23bb93eb7d 100644 --- a/pkg/object/httpserver/runtime_test.go +++ b/pkg/object/httpserver/runtime_test.go @@ -18,7 +18,6 @@ package httpserver import ( - "sync" "testing" "time" @@ -38,7 +37,7 @@ port: 38081 keepAlive: true https: false ` - super := supervisor.NewMock(option.New(), nil, sync.Map{}, sync.Map{}, nil, + super := supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) superSpec, err := super.NewSpec(yamlConfig) assert.NoError(err) diff --git a/pkg/object/httpserver/spec_test.go b/pkg/object/httpserver/spec_test.go index ac5ce97098..9810239b50 100644 --- a/pkg/object/httpserver/spec_test.go +++ b/pkg/object/httpserver/spec_test.go @@ -48,7 +48,7 @@ rules: - pathPrefix: /api ` - superSpec, err := supervisor.NewSpec(yamlConfig) + _, err := supervisor.NewSpec(yamlConfig) assert.Error(err) yamlConfig = ` @@ -62,7 +62,7 @@ rules: - pathPrefix: /api ` - superSpec, err = supervisor.NewSpec(yamlConfig) + _, err = supervisor.NewSpec(yamlConfig) assert.Error(err) yamlConfig = ` @@ -75,7 +75,7 @@ rules: - pathPrefix: /api ` - superSpec, err = supervisor.NewSpec(yamlConfig) + superSpec, err := supervisor.NewSpec(yamlConfig) assert.NoError(err) assert.NotNil(superSpec.ObjectSpec()) diff --git a/pkg/object/ingresscontroller/k8s.go b/pkg/object/ingresscontroller/k8s.go index c3037d64f2..76bb06c89a 100644 --- a/pkg/object/ingresscontroller/k8s.go +++ b/pkg/object/ingresscontroller/k8s.go @@ -180,6 +180,9 @@ func checkKubernetesVersion(cfg *rest.Config) (err error) { } info, err := cli.ServerVersion() + if err != nil { + return + } if info.Major != "1" { err = fmt.Errorf("unknown kubernetes major version: %v", info.Major) return @@ -187,7 +190,7 @@ func checkKubernetesVersion(cfg *rest.Config) (err error) { minor, err := strconv.Atoi(info.Minor) if err != nil { - err = fmt.Errorf("unknown kubernetes minor version: %v", info.Minor) + err = fmt.Errorf("unknown kubernetes minor version: %v, %s", info.Minor, err.Error()) return } diff --git a/pkg/object/meshcontroller/api/api_service.go b/pkg/object/meshcontroller/api/api_service.go index 6985e193f8..7e2658d000 100644 --- a/pkg/object/meshcontroller/api/api_service.go +++ b/pkg/object/meshcontroller/api/api_service.go @@ -258,7 +258,7 @@ func (a *API) getServiceDeployment(w http.ResponseWriter, r *http.Request) { } } - statefulsets, err := a.k8sClient.AppsV1().StatefulSets("").List(context.Background(), metav1.ListOptions{}) + statefulsets, _ := a.k8sClient.AppsV1().StatefulSets("").List(context.Background(), metav1.ListOptions{}) for _, statefulset := range statefulsets.Items { if statefulset.Annotations[annotationServiceNameKey] == serviceName { serviceDeployment.App = statefulset diff --git a/pkg/object/meshcontroller/master/master.go b/pkg/object/meshcontroller/master/master.go index daf6c17459..b6d23bc3d3 100644 --- a/pkg/object/meshcontroller/master/master.go +++ b/pkg/object/meshcontroller/master/master.go @@ -46,9 +46,8 @@ type ( heartbeatInterval time.Duration certManager *certmanager.CertManager - registrySyncer *registrySyncer - store storage.Storage - service *service.Service + store storage.Storage + service *service.Service done chan struct{} } @@ -63,12 +62,12 @@ func New(superSpec *supervisor.Spec) *Master { adminSpec := superSpec.ObjectSpec().(*spec.Admin) m := &Master{ + super: superSpec.Super(), superSpec: superSpec, spec: adminSpec, - store: store, - service: service.New(superSpec), - registrySyncer: newRegistrySyncer(superSpec), + store: store, + service: service.New(superSpec), done: make(chan struct{}), } diff --git a/pkg/object/meshcontroller/master/registrysyncer.go b/pkg/object/meshcontroller/master/registrysyncer.go deleted file mode 100644 index 2e6f06de74..0000000000 --- a/pkg/object/meshcontroller/master/registrysyncer.go +++ /dev/null @@ -1,353 +0,0 @@ -/* - * Copyright (c) 2017, MegaEase - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package master - -import ( - "sort" - "time" - - "github.com/megaease/easegress/v2/pkg/logger" - "github.com/megaease/easegress/v2/pkg/object/meshcontroller/informer" - "github.com/megaease/easegress/v2/pkg/object/meshcontroller/service" - "github.com/megaease/easegress/v2/pkg/object/meshcontroller/spec" - "github.com/megaease/easegress/v2/pkg/object/meshcontroller/storage" - "github.com/megaease/easegress/v2/pkg/object/serviceregistry" - "github.com/megaease/easegress/v2/pkg/supervisor" - "github.com/megaease/easegress/v2/pkg/util/stringtool" -) - -const ( - syncInterval = 10 * time.Second -) - -type ( - registrySyncer struct { - superSpec *supervisor.Spec - spec *spec.Admin - - externalInstances map[string]*serviceregistry.ServiceInstanceSpec - service *service.Service - informer informer.Informer - serviceRegistry *serviceregistry.ServiceRegistry - registryWatcher serviceregistry.RegistryWatcher - - done chan struct{} - } -) - -func newRegistrySyncer(superSpec *supervisor.Spec) *registrySyncer { - spec := superSpec.ObjectSpec().(*spec.Admin) - - rs := ®istrySyncer{ - superSpec: superSpec, - spec: spec, - - done: make(chan struct{}), - } - - store := storage.New(superSpec.Name(), superSpec.Super().Cluster()) - rs.service = service.New(superSpec) - - if spec.CleanExternalRegistry { - go rs.cleanExternalRegistry() - } - - if spec.ExternalServiceRegistry == "" { - return rs - } - - rs.informer = informer.NewInformer(store, "") - rs.informer.OnAllServiceInstanceSpecs(rs.serviceInstanceSpecsFunc) - - rs.serviceRegistry = superSpec.Super().MustGetSystemController(serviceregistry.Kind).Instance().(*serviceregistry.ServiceRegistry) - rs.registryWatcher = rs.serviceRegistry.NewRegistryWatcher(spec.ExternalServiceRegistry) - - go rs.run() - - return rs -} - -// cleanExternalRegistry removes external service specs and service instances. -func (rs *registrySyncer) cleanExternalRegistry() { - if !rs.needSync() { - return - } - - instances := rs.service.ListAllServiceInstanceSpecs() - meshRegistryName := rs.meshRegistryName() - externalRegistryName := rs.externalRegistryName() - - for _, instance := range instances { - switch instance.RegistryName { - case "", meshRegistryName, externalRegistryName: - continue - } - - rs.service.DeleteServiceInstanceSpec(instance.ServiceName, instance.InstanceID) - logger.Infof("clean service instance: %s/%s", instance.ServiceName, instance.InstanceID) - } - - specs := rs.service.ListServiceSpecs() - for _, _spec := range specs { - switch _spec.RegistryName { - case "", meshRegistryName, externalRegistryName: - continue - } - - rs.service.DeleteServiceSpec(_spec.Name) - logger.Infof("clean service: %s", _spec.Name) - } -} - -func (rs *registrySyncer) run() { - for { - select { - case <-rs.done: - return - case event := <-rs.registryWatcher.Watch(): - if !rs.needSync() { - continue - } - - rs.handleEvent(event) - rs.updateServiceSpec() - } - } -} - -func (rs *registrySyncer) handleEvent(event *serviceregistry.RegistryEvent) { - defer func() { - if r := recover(); r != nil { - logger.Errorf("recover from %v", r) - } - }() - - if event.UseReplace { - event.Replace = rs.filterExternalInstances(event.Replace, rs.externalRegistryName()) - - oldInstances := rs.service.ListAllServiceInstanceSpecs() - for _, oldInstance := range oldInstances { - if oldInstance.RegistryName != rs.externalRegistryName() { - continue - } - - instance := &serviceregistry.ServiceInstanceSpec{ - RegistryName: oldInstance.RegistryName, - ServiceName: oldInstance.ServiceName, - InstanceID: oldInstance.InstanceID, - } - _, exists := event.Replace[instance.Key()] - if !exists { - rs.service.DeleteServiceInstanceSpec(oldInstance.ServiceName, oldInstance.InstanceID) - } - } - - for _, instance := range event.Replace { - rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance)) - } - - return - } - - event.Delete = rs.filterExternalInstances(event.Delete, rs.externalRegistryName()) - event.Apply = rs.filterExternalInstances(event.Apply, rs.externalRegistryName()) - - for _, instance := range event.Delete { - rs.service.DeleteServiceInstanceSpec(instance.ServiceName, instance.InstanceID) - } - for _, instance := range event.Apply { - rs.service.PutServiceInstanceSpec(rs.externalToMeshInstance(instance)) - } -} - -func (rs *registrySyncer) updateServiceSpec() { - defer func() { - if r := recover(); r != nil { - logger.Errorf("recover from %v", r) - } - }() - - allInstances := rs.service.ListAllServiceInstanceSpecs() - externalServices := make(map[string]struct{}) - for _, instance := range allInstances { - if instance.RegistryName != rs.externalRegistryName() { - continue - } - externalServices[instance.ServiceName] = struct{}{} - } - - allServices := rs.service.ListServiceSpecs() - globalServices := make(map[string]struct{}) - for _, service := range allServices { - if service.RegisterTenant == spec.GlobalTenant { - globalServices[service.Name] = struct{}{} - } - - if service.RegistryName != rs.externalRegistryName() { - continue - } - - _, exists := externalServices[service.Name] - if !exists { - logger.Infof("delete external service spec %s", service.Name) - rs.service.DeleteServiceSpec(service.Name) - delete(globalServices, service.Name) - } - } - - globalServicesSlice := make([]string, 0) - for service := range externalServices { - globalServicesSlice = append(globalServicesSlice, service) - } - sort.Strings(globalServicesSlice) - rs.service.PutTenantSpec(&spec.Tenant{ - Name: spec.GlobalTenant, - Services: globalServicesSlice, - CreatedAt: time.Now().Format(time.RFC3339), - Description: "Generated by registry syncer", - }) - - for service := range externalServices { - serviceSpec := &spec.Service{ - RegistryName: rs.externalRegistryName(), - Name: service, - RegisterTenant: spec.GlobalTenant, - Sidecar: &spec.Sidecar{ - DiscoveryType: rs.spec.RegistryType, - Address: "127.0.0.1", - IngressPort: 13001, - IngressProtocol: "http", - EgressPort: 13002, - EgressProtocol: "http", - }, - } - rs.service.PutServiceSpec(serviceSpec) - } -} - -func (rs *registrySyncer) serviceInstanceSpecsFunc(meshInstances map[string]*spec.ServiceInstanceSpec) bool { - if !rs.needSync() { - return true - } - - oldInstances, err := rs.serviceRegistry.ListAllServiceInstances(rs.spec.ExternalServiceRegistry) - if err != nil { - logger.Errorf("list all service instances of %s: %v", rs.spec.ExternalServiceRegistry, err) - return true - } - - oldInstances = rs.filterExternalInstances(oldInstances, rs.meshRegistryName()) - - meshInstances = rs.filterMeshInstances(meshInstances, rs.meshRegistryName()) - newInstances := rs.meshToExternalInstances(meshInstances) - - event := serviceregistry.NewRegistryEventFromDiff(rs.meshRegistryName(), oldInstances, newInstances) - - if len(event.Apply) != 0 { - err := rs.serviceRegistry.ApplyServiceInstances(rs.externalRegistryName(), event.Apply) - if err != nil { - logger.Errorf("apply service instances failed: %v", err) - return true - } - } - if len(event.Delete) != 0 { - err := rs.serviceRegistry.DeleteServiceInstances(rs.externalRegistryName(), event.Delete) - if err != nil { - logger.Errorf("delete service instances failed: %v", err) - } - } - - return true -} - -func (rs *registrySyncer) needSync() bool { - // NOTE: Only need one member in the cluster to do sync. - return rs.superSpec.Super().Cluster().IsLeader() -} - -func (rs *registrySyncer) meshRegistryName() string { - return rs.superSpec.Name() -} - -func (rs *registrySyncer) externalRegistryName() string { - return rs.spec.ExternalServiceRegistry -} - -func (rs *registrySyncer) filterExternalInstances(instances map[string]*serviceregistry.ServiceInstanceSpec, registryNames ...string) map[string]*serviceregistry.ServiceInstanceSpec { - result := make(map[string]*serviceregistry.ServiceInstanceSpec) - - for _, instance := range instances { - if stringtool.StrInSlice(instance.RegistryName, registryNames) { - result[instance.Key()] = instance - } - } - - return result -} - -func (rs *registrySyncer) filterMeshInstances(instances map[string]*spec.ServiceInstanceSpec, registryNames ...string) map[string]*spec.ServiceInstanceSpec { - result := make(map[string]*spec.ServiceInstanceSpec) - - for _, instance := range instances { - if instance.Status != spec.ServiceStatusUp { - continue - } - if stringtool.StrInSlice(instance.RegistryName, registryNames) { - result[instance.Key()] = instance - } - } - - return result -} - -func (rs *registrySyncer) meshToExternalInstances(instances map[string]*spec.ServiceInstanceSpec) map[string]*serviceregistry.ServiceInstanceSpec { - result := make(map[string]*serviceregistry.ServiceInstanceSpec) - - for _, instance := range instances { - externalInstance := &serviceregistry.ServiceInstanceSpec{ - RegistryName: instance.RegistryName, - ServiceName: instance.ServiceName, - InstanceID: instance.InstanceID, - - Address: instance.IP, - Port: uint16(instance.Port), - } - result[externalInstance.Key()] = externalInstance - } - - return result -} - -func (rs *registrySyncer) externalToMeshInstance(instance *serviceregistry.ServiceInstanceSpec) *spec.ServiceInstanceSpec { - return &spec.ServiceInstanceSpec{ - RegistryName: instance.RegistryName, - ServiceName: instance.ServiceName, - InstanceID: instance.InstanceID, - IP: instance.Address, - Port: uint32(instance.Port), - Status: spec.ServiceStatusUp, - } -} - -func (rs *registrySyncer) close() { - close(rs.done) - - if rs.informer != nil { - rs.informer.Close() - } -} diff --git a/pkg/object/meshcontroller/worker/observability.go b/pkg/object/meshcontroller/worker/observability.go index f25dfa6760..6d12903885 100644 --- a/pkg/object/meshcontroller/worker/observability.go +++ b/pkg/object/meshcontroller/worker/observability.go @@ -21,10 +21,6 @@ import ( "github.com/megaease/easegress/v2/pkg/util/jmxtool" ) -const ( - easeAgentConfigManager = "com.megaease.easeagent:type=ConfigManager" -) - type ( // ObservabilityManager is the manager for observability. ObservabilityManager struct { diff --git a/pkg/object/meshcontroller/worker/server.go b/pkg/object/meshcontroller/worker/server.go index fb7ad0b8af..c754d4a5c3 100644 --- a/pkg/object/meshcontroller/worker/server.go +++ b/pkg/object/meshcontroller/worker/server.go @@ -29,7 +29,6 @@ import ( "go.uber.org/zap" "github.com/megaease/easegress/v2/pkg/logger" - "github.com/megaease/easegress/v2/pkg/option" "github.com/megaease/easegress/v2/pkg/util/codectool" ) @@ -39,12 +38,10 @@ const ( type ( apiServer struct { - opt option.Options srv http.Server router *chi.Mux apisMutex sync.RWMutex apis []*apiEntry - port int } apiEntry struct { diff --git a/pkg/object/meshcontroller/worker/worker.go b/pkg/object/meshcontroller/worker/worker.go index 1e9d0827eb..8c5a7dddbb 100644 --- a/pkg/object/meshcontroller/worker/worker.go +++ b/pkg/object/meshcontroller/worker/worker.go @@ -44,10 +44,6 @@ import ( "github.com/megaease/easegress/v2/pkg/util/stringtool" ) -const ( - agentConfigUpdateInterval = 5 * time.Second -) - type ( // Worker is a sidecar in service mesh. Worker struct { diff --git a/pkg/object/mqttproxy/mqtt_test.go b/pkg/object/mqttproxy/mqtt_test.go index f32845db61..d302bd365c 100644 --- a/pkg/object/mqttproxy/mqtt_test.go +++ b/pkg/object/mqttproxy/mqtt_test.go @@ -751,10 +751,9 @@ type testServer struct { func newServer(addr string) *testServer { mux := http.NewServeMux() - srv := http.Server{Addr: addr, Handler: mux} ts := &testServer{ mux: mux, - srv: srv, + srv: http.Server{Addr: addr, Handler: mux}, addr: addr, } return ts @@ -1389,7 +1388,7 @@ func TestMQTTProxy(t *testing.T) { name: mqtt-proxy kind: MQTTProxy ` - super := supervisor.NewMock(option.New(), nil, sync.Map{}, sync.Map{}, nil, nil, false, nil, nil) + super := supervisor.NewMock(option.New(), nil, nil, nil, false, nil, nil) super.Options() superSpec, err := super.NewSpec(yamlStr) assert.Nil(err) diff --git a/pkg/object/pipeline/pipeline.go b/pkg/object/pipeline/pipeline.go index 0d5c419d90..bf5b34b902 100644 --- a/pkg/object/pipeline/pipeline.go +++ b/pkg/object/pipeline/pipeline.go @@ -335,7 +335,7 @@ func (p *Pipeline) HandleWithBeforeAfter(ctx *context.Context, before, after *Pi } if !sawEnd && after != nil { - result, stats, sawEnd = p.doHandle(ctx, after.flow, stats) + result, stats, _ = p.doHandle(ctx, after.flow, stats) } ctx.LazyAddTag(func() string { diff --git a/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go b/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go index 096f41ec3e..ba455b363c 100644 --- a/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go +++ b/pkg/object/zookeeperserviceregistry/zookeeperserviceregistry.go @@ -40,8 +40,6 @@ const ( // Kind is the kind of ZookeeperServiceRegistry. Kind = "ZookeeperServiceRegistry" - - requestTimeout = 5 * time.Second ) func init() { diff --git a/pkg/protocols/grpcprot/header.go b/pkg/protocols/grpcprot/header.go index 93140ec124..c89b070853 100644 --- a/pkg/protocols/grpcprot/header.go +++ b/pkg/protocols/grpcprot/header.go @@ -59,11 +59,11 @@ func (h *Header) RawAdd(key string, values ...string) { // Add adds the key, value pair to the header. func (h *Header) Add(key string, value interface{}) { - switch value.(type) { + switch v := value.(type) { case string: - h.md.Append(key, value.(string)) + h.md.Append(key, v) case []string: - h.md.Append(key, value.([]string)...) + h.md.Append(key, v...) default: panic(fmt.Sprintf("append grpc header value type %T is not string or []string", value)) } @@ -76,11 +76,11 @@ func (h *Header) RawSet(key string, values ...string) { // Set sets the header entries associated with key to the given value. func (h *Header) Set(key string, value interface{}) { - switch value.(type) { + switch v := value.(type) { case string: - h.md.Set(key, value.(string)) + h.md.Set(key, v) case []string: - h.md.Set(key, value.([]string)...) + h.md.Set(key, v...) default: panic(fmt.Sprintf("set grpc header value type %T is not string or []string", value)) } diff --git a/pkg/protocols/httpprot/httpheader/validator.go b/pkg/protocols/httpprot/httpheader/validator.go index f91c526d9f..82df807d97 100644 --- a/pkg/protocols/httpprot/httpheader/validator.go +++ b/pkg/protocols/httpprot/httpheader/validator.go @@ -75,20 +75,22 @@ func NewValidator(spec *ValidatorSpec) *Validator { // Validate validates HTTPHeader by the Validator. func (v Validator) Validate(h *HTTPHeader) error { -LOOP: for key, vv := range *v.spec { values := h.GetAll(key) + valid := false for _, value := range values { if stringtool.StrInSlice(value, vv.Values) { - continue LOOP + valid = true + break } if vv.re != nil && vv.re.MatchString(value) { - continue LOOP + valid = true + break } - return fmt.Errorf("header %s:%s is invalid", key, value) } - return fmt.Errorf("header %s not found", key) + if !valid { + return fmt.Errorf("header %s is invalid", key) + } } - return nil } diff --git a/pkg/protocols/httpprot/httpstat/httpstat.go b/pkg/protocols/httpprot/httpstat/httpstat.go index c2a9ae4c5f..ee55d1dba4 100644 --- a/pkg/protocols/httpprot/httpstat/httpstat.go +++ b/pkg/protocols/httpprot/httpstat/httpstat.go @@ -46,10 +46,6 @@ type ( errRate5 metrics.EWMA errRate15 metrics.EWMA - m1ErrPercent float64 - m5ErrPercent float64 - m15ErrPercent float64 - total uint64 min uint64 max uint64 diff --git a/pkg/protocols/httpprot/response.go b/pkg/protocols/httpprot/response.go index 851fef3d58..fb02a9ed01 100644 --- a/pkg/protocols/httpprot/response.go +++ b/pkg/protocols/httpprot/response.go @@ -40,9 +40,8 @@ type Response struct { // TODO: we only need StatusCode, Header and Body, that's can avoid // using the big http.Response object. *http.Response - stream *readers.ByteCountReader - payload []byte - payloadSize int64 + stream *readers.ByteCountReader + payload []byte } // ErrResponseEntityTooLarge means the request entity is too large. diff --git a/pkg/supervisor/mock.go b/pkg/supervisor/mock.go index 5b9884ef09..3893bc1f37 100644 --- a/pkg/supervisor/mock.go +++ b/pkg/supervisor/mock.go @@ -18,26 +18,21 @@ package supervisor import ( - "sync" - "github.com/megaease/easegress/v2/pkg/cluster" "github.com/megaease/easegress/v2/pkg/option" ) // NewMock return a mock supervisor for testing purpose -func NewMock(options *option.Options, cls cluster.Cluster, businessControllers sync.Map, - systemControllers sync.Map, objectRegistry *ObjectRegistry, watcher *ObjectEntityWatcher, +func NewMock(options *option.Options, cls cluster.Cluster, objectRegistry *ObjectRegistry, watcher *ObjectEntityWatcher, firstHandle bool, firstHandleDone chan struct{}, done chan struct{}) *Supervisor { return &Supervisor{ - options: options, - cls: cls, - businessControllers: businessControllers, - systemControllers: systemControllers, - objectRegistry: objectRegistry, - watcher: watcher, - firstHandle: firstHandle, - firstHandleDone: firstHandleDone, - done: done, + options: options, + cls: cls, + objectRegistry: objectRegistry, + watcher: watcher, + firstHandle: firstHandle, + firstHandleDone: firstHandleDone, + done: done, } } diff --git a/pkg/util/jmxtool/common.go b/pkg/util/jmxtool/common.go index af056d956c..7788c45af0 100644 --- a/pkg/util/jmxtool/common.go +++ b/pkg/util/jmxtool/common.go @@ -52,25 +52,24 @@ func JSONToKVMap(jsonStr string) (map[string]string, error) { func extractKVs(prefix string, obj interface{}) []map[string]string { var rst []map[string]string - switch obj.(type) { + switch o := obj.(type) { case map[string]interface{}: - for k, v := range obj.(map[string]interface{}) { + for k, v := range o { current := k rst = append(rst, extractKVs(join(prefix, current), v)...) } case []interface{}: - o := obj.([]interface{}) length := len(o) for i := 0; i < length; i++ { rst = append(rst, extractKVs(join(prefix, strconv.Itoa(i)), o[i])...) } case bool: - rst = append(rst, map[string]string{prefix: strconv.FormatBool(obj.(bool))}) + rst = append(rst, map[string]string{prefix: strconv.FormatBool(o)}) case int: - rst = append(rst, map[string]string{prefix: strconv.Itoa(obj.(int))}) + rst = append(rst, map[string]string{prefix: strconv.Itoa(o)}) case float64: - rst = append(rst, map[string]string{prefix: strconv.FormatFloat(obj.(float64), 'f', 0, 64)}) + rst = append(rst, map[string]string{prefix: strconv.FormatFloat(o, 'f', 0, 64)}) default: if obj == nil { rst = append(rst, map[string]string{prefix: ""}) diff --git a/pkg/util/prometheushelper/helper.go b/pkg/util/prometheushelper/helper.go index 9854f47854..825aed8b07 100644 --- a/pkg/util/prometheushelper/helper.go +++ b/pkg/util/prometheushelper/helper.go @@ -150,13 +150,13 @@ func NewSummary(opt prometheus.SummaryOpts, labels []string) *prometheus.Summary } func getAndValidate(metricName string, labels []string) (string, error) { - if ValidateMetricName(metricName) == false { - return "", fmt.Errorf("Invalid metric name: %s", metricName) + if !ValidateMetricName(metricName) { + return "", fmt.Errorf("invalid metric name: %s", metricName) } for _, l := range labels { - if ValidateLabelName(l) == false { - return "", fmt.Errorf("Invalid label name: %s", l) + if !ValidateLabelName(l) { + return "", fmt.Errorf("invalid label name: %s", l) } } return metricName, nil diff --git a/pkg/util/sem/semaphore_test.go b/pkg/util/sem/semaphore_test.go index 07fdf73356..862d7145a5 100644 --- a/pkg/util/sem/semaphore_test.go +++ b/pkg/util/sem/semaphore_test.go @@ -68,7 +68,8 @@ func runCase(s *Semaphore, maxCount int64, t *testing.T) { wg.Wait() // step 2: try to acquire one more, should timeout - ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() err := s.AcquireWithContext(ctx) if err == nil { t.Fatalf("sema count exceeds the maxCount: %d", maxCount) diff --git a/pkg/util/signer/signer.go b/pkg/util/signer/signer.go index cb502806f6..360ffd0a6f 100644 --- a/pkg/util/signer/signer.go +++ b/pkg/util/signer/signer.go @@ -50,43 +50,43 @@ type ( Literal struct { // ScopeSuffix is the last part when build the credential scope. // Default: megaease_request - ScopeSuffix string `json:"scopeSuffix" json:"scopeSuffix" jsonschema:"required"` + ScopeSuffix string `json:"scopeSuffix" jsonschema:"required"` // AlgorithmName is the query name of the signature algorithm // Default: X-Me-Algorithm - AlgorithmName string `json:"algorithmName" json:"algorithmName" jsonschema:"required"` + AlgorithmName string `json:"algorithmName" jsonschema:"required"` // AlgorithmName is the header/query value of the signature algorithm // Default: ME-HMAC-SHA256 - AlgorithmValue string `json:"algorithmValue" json:"alrithmValue" jsonschema:"required"` + AlgorithmValue string `json:"algorithmValue" jsonschema:"required"` // SignedHeaders is the header/query headers of the signed headers // Default: X-Me-SignedHeaders - SignedHeaders string `json:"signedHeaders" json:"signedHeaders" jsonschema:"required"` + SignedHeaders string `json:"signedHeaders" jsonschema:"required"` // Signature is the query name of the signature // Default: X-Me-Signature - Signature string `json:"signature" json:"signature" jsonschema:"required"` + Signature string `json:"signature" jsonschema:"required"` // Date is the header/query name of request time // Default: X-Me-Date - Date string `json:"date" json:"date" jsonschema:"required"` + Date string `json:"date" jsonschema:"required"` // Expires is the query name of expire duration // Default: X-Me-Expires - Expires string `json:"expires" json:"expires" jsonschema:"required"` + Expires string `json:"expires" jsonschema:"required"` // Credential is the query name of credential // Default: X-Me-Credential - Credential string `json:"credential" json:"credential" jsonschema:"required"` + Credential string `json:"credential" jsonschema:"required"` // ContentSHA256 is the header name of body/payload hash // Default: X-Me-Content-Sha256 - ContentSHA256 string `json:"contentSha256" json:"contentSha256" jsonschema:"required"` + ContentSHA256 string `json:"contentSha256" jsonschema:"required"` // SigningKeyPrefix is prepend to access key secret when derive the signing key // Default: ME - SigningKeyPrefix string `json:"signingKeyPrefix" json:"signingKeyPrefix" jsonschema:"omitempty"` + SigningKeyPrefix string `json:"signingKeyPrefix" jsonschema:"omitempty"` } // HeaderHoisting defines which headers are allowed to be moved from header to query @@ -94,9 +94,9 @@ type ( // disallowed prefixes and doesn't match any of disallowed names are allowed to be // hoisted HeaderHoisting struct { - AllowedPrefix []string `json:"allowedPrefix" json:"allowedPrefix" jsonschema:"omitempty,uniqueItems=true"` - DisallowedPrefix []string `json:"disallowedPrefix" json:"disallowedPrefix" jsonschema:"omitempty,uniqueItems=true"` - Disallowed []string `json:"disallowed" json:"disallowed" jsonschema:"omitempty,uniqueItems=true"` + AllowedPrefix []string `json:"allowedPrefix" jsonschema:"omitempty,uniqueItems=true"` + DisallowedPrefix []string `json:"disallowedPrefix" jsonschema:"omitempty,uniqueItems=true"` + Disallowed []string `json:"disallowed" jsonschema:"omitempty,uniqueItems=true"` disallowed map[string]bool } @@ -781,7 +781,7 @@ func (ctx *Context) Verify(req *http.Request, getBody func() io.Reader) error { return e } - age := time.Now().Sub(ctx.Time) + age := time.Since(ctx.Time) if ctx.signer.ttl > 0 { if age < -ctx.signer.ttl || age > ctx.signer.ttl { return fmt.Errorf("signature expired") diff --git a/pkg/util/signer/signer_test.go b/pkg/util/signer/signer_test.go index d013130cb8..1c38c348fc 100644 --- a/pkg/util/signer/signer_test.go +++ b/pkg/util/signer/signer_test.go @@ -93,8 +93,7 @@ var awsSpec = &Spec{ } func buildRequest(serviceName, region, payload string) *http.Request { - var body io.Reader - body = strings.NewReader(payload) + var body io.Reader = strings.NewReader(payload) var bodyLen int diff --git a/pkg/util/signer/spec.go b/pkg/util/signer/spec.go index 3433f9d305..3553f18c6e 100644 --- a/pkg/util/signer/spec.go +++ b/pkg/util/signer/spec.go @@ -21,14 +21,14 @@ import "time" // Spec defines the configuration of a Signer type Spec struct { - Literal *Literal `json:"literal,omitempty" json:"literal,omitempty" jsonschema:"omitempty"` - HeaderHoisting *HeaderHoisting `json:"headerHoisting,omitempty" json:"headerHoisting,omitempty" jsonschema:"omitempty"` - IgnoredHeaders []string `json:"ignoredHeaders" json:"ignoredHeaders" jsonschema:"omitempty,uniqueItems=true"` - ExcludeBody bool `json:"excludeBody" json:"excludeBody" jsonschema:"omitempty"` - TTL string `json:"ttl" json:"ttl" jsonschema:"omitempty,format=duration"` - AccessKeyID string `json:"accessKeyId" json:"accessKeyId" jsonschema:"omitempty"` - AccessKeySecret string `json:"accessKeySecret" json:"accessKeySecret" jsonschema:"omitempty"` - AccessKeys map[string]string `json:"accessKeys" json:"accessKeys" jsonschema:"omitempty"` + Literal *Literal `json:"literal,omitempty" jsonschema:"omitempty"` + HeaderHoisting *HeaderHoisting `json:"headerHoisting,omitempty" jsonschema:"omitempty"` + IgnoredHeaders []string `json:"ignoredHeaders" jsonschema:"omitempty,uniqueItems=true"` + ExcludeBody bool `json:"excludeBody" jsonschema:"omitempty"` + TTL string `json:"ttl" jsonschema:"omitempty,format=duration"` + AccessKeyID string `json:"accessKeyId" jsonschema:"omitempty"` + AccessKeySecret string `json:"accessKeySecret" jsonschema:"omitempty"` + AccessKeys map[string]string `json:"accessKeys" jsonschema:"omitempty"` // TODO: AccessKeys is used as an internal access key store, but an external store is also needed } diff --git a/pkg/util/urlclusteranalyzer/urlclusteranalyzer.go b/pkg/util/urlclusteranalyzer/urlclusteranalyzer.go index e81975fd0c..f0a5ccf7bb 100644 --- a/pkg/util/urlclusteranalyzer/urlclusteranalyzer.go +++ b/pkg/util/urlclusteranalyzer/urlclusteranalyzer.go @@ -32,17 +32,17 @@ const ( // URLClusterAnalyzer is url cluster analyzer. type URLClusterAnalyzer struct { - slots []*field `json:"slots"` + slots []*field mutex *sync.Mutex cache *lru.Cache } type field struct { - constant string `json:"constant"` - subFields []*field `json:"subFields"` - variableField *field `json:"variableField"` - isVariableField bool `json:"isVariableField"` - pattern string `json:"pattern"` + constant string + subFields []*field + variableField *field + isVariableField bool + pattern string } func newField(name string) *field { diff --git a/pkg/util/urlclusteranalyzer/urlclusteranalyzer_test.go b/pkg/util/urlclusteranalyzer/urlclusteranalyzer_test.go index 7d1482df4a..ad1e1e27e1 100644 --- a/pkg/util/urlclusteranalyzer/urlclusteranalyzer_test.go +++ b/pkg/util/urlclusteranalyzer/urlclusteranalyzer_test.go @@ -22,56 +22,42 @@ import ( "sync" "testing" "time" + + "github.com/stretchr/testify/assert" ) var wg sync.WaitGroup func TestURLClusterAnalyzer(t *testing.T) { + assert := assert.New(t) + urlClusterAnalyzer := New() p := urlClusterAnalyzer.GetPattern("") - fmt.Println(p) - if p != "" { - t.Fatal("") - } + assert.Equal("", p) p = urlClusterAnalyzer.GetPattern("city/address/1/order/3") - fmt.Println(p) - if p != "/city/address/1/order/3" { - t.Fatal("") - } + assert.Equal("/city/address/1/order/3", p) for i := 0; i < maxValues; i++ { p = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/%d", i)) } - fmt.Println(p) - if p != fmt.Sprintf("/%d", maxValues-1) { - t.Fatal("") - } + assert.Equal(fmt.Sprintf("/%d", maxValues-1), p) for i := 0; i < maxValues+1; i++ { p = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/%d", i)) } - fmt.Println(p) - if p != "/*" { - t.Fatal("") - } + assert.Equal("/*", p) for i := 0; i < maxValues+10; i++ { p = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/orders/%d", i)) } - fmt.Println(p) - if p != "/orders/*" { - t.Fatal("") - } + assert.Equal("/orders/*", p) for i := 0; i < maxValues+10; i++ { p = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/com/megaease/users/%d/orders/%d/details", 1, i)) } - fmt.Println(p) - if p != "/com/megaease/users/1/orders/*/details" { - t.Fatal("") - } + assert.Equal("/com/megaease/users/1/orders/*/details", p) begin := time.Now() for i := 0; i < 2; i++ { @@ -80,17 +66,14 @@ func TestURLClusterAnalyzer(t *testing.T) { defer wg.Done() pat := "" for i := 0; i < 10000; i++ { - pat = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/com%d/abc/users/%d/orders/%d/details", i%10, i, i)) - pat = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com%d/merchant/%d/sail2/%d/details", i%10, i, i)) - pat = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com/merchant%d/%d/sail3/%d/details", i%10, i, i)) - pat = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com/users/%d/orders/%d/details%d", i, i, i%10)) - pat = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com/merchant/%d/sail/50/details", i)) + urlClusterAnalyzer.GetPattern(fmt.Sprintf("/com%d/abc/users/%d/orders/%d/details", i%10, i, i)) + urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com%d/merchant/%d/sail2/%d/details", i%10, i, i)) + urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com/merchant%d/%d/sail3/%d/details", i%10, i, i)) + urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com/users/%d/orders/%d/details%d", i, i, i%10)) + urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com/merchant/%d/sail/50/details", i)) pat = urlClusterAnalyzer.GetPattern(fmt.Sprintf("prefix%d/abc/com/merchant/%d/sail15/%d/details", i%10, i, i)) } - fmt.Println(pat) - if pat != "/prefix9/abc/com/merchant/*/sail15/*/details" { - t.Fatal("") - } + assert.Equal("/prefix9/abc/com/merchant/*/sail15/*/details", pat) }() } wg.Wait() @@ -98,8 +81,5 @@ func TestURLClusterAnalyzer(t *testing.T) { fmt.Println(duration) p = urlClusterAnalyzer.GetPattern(fmt.Sprintf("/abc/com/merchant/other/other/%d/details", 30)) - fmt.Println(p) - if p != "/abc/com/merchant/*/other/30/details" { - t.Fatal("") - } + assert.Equal("/abc/com/merchant/*/other/30/details", p) }