Skip to content

Commit

Permalink
gRPC server doc (#1036)
Browse files Browse the repository at this point in the history
* grpc server

* - add grpc-server.md
- fix bug: gRPC only support ip/hostname:port

* fix loadbalance unit test

* gRPC func Host will return port, some cases only need host without port

* fix unit test

* follow up the suggestions
  • Loading branch information
sodaRyCN committed Jul 10, 2023
1 parent d9c5364 commit 601cf96
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 58 deletions.
36 changes: 0 additions & 36 deletions doc/cookbook/grpc-server.md

This file was deleted.

38 changes: 38 additions & 0 deletions doc/reference/controllers.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,44 @@ data:
| resilience | []map[string]interface{} | Defines resilience policies, please refer [Resilience Policy](#resiliencepolicy) for details of a specific resilience policy. | No |
| data | map[string]interface{} | Static user data of the pipeline. | No |


### gRRC Service Proxy

Easegress gRPC servers is a proxy server base on gRPC protocol.

#### Simple Proxy Configuration
Below is one of the simplest gRPC Proxy Server, and it will listen on port `8080` and handle the requests that includes kv `content-type:application/grpc` in headers by use backend `pipeline-grpc-forward`
``` yaml
kind: GRPCServer
port: 8080
name: server-grpc
#路由规则
rules:
- methods:
- backend: pipeline-grpc
headers:
- key: "Content-Type"
values:
- "application/grpc"
xForwardedFor: true
```

#### Configuration
The below parameters will help manage connections better

| Name | Type | Description | Required |
|------|------|-------------|----------|
| maxConnections | uint32 | The maximum number of connections allowed by gRPC Server , default value 10240, min is 1 | No |
| minTimeClientSendPing | duration | The minimum amount of time a client should wait before sending a keepalive ping, default value is 5 minutes | No |
| permitClintSendPingWithoutStream | duration | If true, server allows keepalive pings even when there are no active streams(RPCs). If false, and client sends ping when there are no active streams, server will send GOAWAY and close the connection. default false | No |
| maxConnectionIdle | duration | A duration for the amount of time after which an idle connection would be closed by sending a GoAway. Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment. default value is infinity | No |
| maxConnectionAge | duration | A duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway. A random jitter of ±10% will be added to MaxConnectionAge to prevent connection storms. default value is infinity | No |
| maxConnectionAgeGrace | duration | An additive period after MaxConnectionAge after which the connection will be forcibly closed. default value is infinity | No |
| keepaliveTime | duration | After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive. If set below 1s, a minimum value of 1s will be used instead. default value is 2 hours. | No |
| keepaliveTimeout | duration | After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. default value is 20 seconds |No |



### StatusSyncController

No config.
Expand Down
3 changes: 3 additions & 0 deletions doc/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,9 @@ Same as the `Proxy` filter:
* the servers of a pool can be configured dynamically via service discovery.
* when there are multiple servers in a pool, the pool can do a load balance between them.

Because gRPC does not support the http `Connect` method, it does not support tunneling mode,
we provide a new [load balancer](#proxyloadbalancespec) `policy.forward` to achieve a similar effect.

Note that each gRPC client establishes a connection with Easegress. However,
Easegress may utilize a single connection when forwarding requests from various
clients to a gRPC server, due to its use of HTTP2. This action could potentially
Expand Down
6 changes: 0 additions & 6 deletions pkg/filters/proxies/grpcproxy/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package grpcproxy

import (
"net"
"sync"

"github.com/megaease/easegress/pkg/logger"
Expand Down Expand Up @@ -60,11 +59,6 @@ func (f *forwardLoadBalancer) ChooseServer(req protocols.Request) *Server {
return nil
}

if _, _, err := net.SplitHostPort(target); err != nil {
logger.Debugf("request %v from %v context target address %s invalid", grpcreq.FullMethod(), grpcreq.RealIP(), target)
return nil
}

if s, ok := f.servers.Get().(*Server); ok {
s.URL = target
return s
Expand Down
6 changes: 1 addition & 5 deletions pkg/filters/proxies/grpcproxy/loadbalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ func TestForwardLB(t *testing.T) {
req := grpcprot.NewRequestWithServerStream(sm)
assert.Nil(t, lb.ChooseServer(req))

target := "%127.0.0.1%8849"
req.Header().Set(key, target)
assert.Nil(t, lb.ChooseServer(req))

target = "127.0.0.1:8849"
target := "127.0.0.1:8849"
req.Header().Set(key, target)
svr := lb.ChooseServer(req)
assert.Equal(t, target, svr.URL)
Expand Down
23 changes: 17 additions & 6 deletions pkg/filters/proxies/grpcproxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"net"
"net/url"
"sync"

Expand Down Expand Up @@ -266,6 +267,18 @@ func (sp *ServerPool) handle(ctx *context.Context) string {
panic(fmt.Errorf("should not reach here"))
}

func (sp *ServerPool) getTarget(rawTarget string) string {
target := rawTarget
// gRPC only support ip:port, but proxies.Server.URL include scheme and forward.Key too
if parse, err := url.Parse(target); err == nil && parse.Host != "" {
target = parse.Host
}
if _, _, err := net.SplitHostPort(target); err != nil {
return ""
}
return target
}

func (sp *ServerPool) doHandle(ctx stdcontext.Context, spCtx *serverPoolContext) error {
lb := sp.LoadBalancer()
svr := lb.ChooseServer(spCtx.req)
Expand All @@ -274,14 +287,12 @@ func (sp *ServerPool) doHandle(ctx stdcontext.Context, spCtx *serverPoolContext)
logger.Debugf("%s: no available server", sp.Name)
return serverPoolError{status.New(codes.InvalidArgument, "no available server"), resultClientError}
}
defer lb.ReturnServer(svr, spCtx.req, spCtx.resp)
// gRPC only support ip:port
parse, err := url.Parse(svr.URL)
if err != nil {
logger.Debugf("%s: server url %s invalid", sp.Name, svr.URL)
target := sp.getTarget(svr.URL)
lb.ReturnServer(svr, spCtx.req, spCtx.resp)
if target == "" {
logger.Debugf("request %v from %v context target address %s invalid", spCtx.req.FullMethod(), spCtx.req.RealIP(), target)
return serverPoolError{status.New(codes.Internal, "server url invalid"), resultInternalError}
}
target := parse.Host

// maybe be rewritten by grpcserver.MuxPath#rewrite
fullMethodName := spCtx.req.FullMethod()
Expand Down
48 changes: 48 additions & 0 deletions pkg/filters/proxies/grpcproxy/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package grpcproxy

import (
"context"
"github.com/megaease/easegress/pkg/protocols/grpcprot"
"math/rand"
"sync"
"testing"
Expand Down Expand Up @@ -167,3 +168,50 @@ func TestServerPoolSpecValidate(t *testing.T) {
sps.Servers[0].Weight = 1
assert.Error(t, sps.Validate())
}

func TestGetTarget(t *testing.T) {
at := assert.New(t)

s := `
kind: GRPCProxy
pools:
- loadBalance:
policy: roundRobin
servers:
- url: http:https://192.168.1.1:80
- url: http:https://192.168.1.2:80
serviceName: easegress
maxIdleConnsPerHost: 2
connectTimeout: 100ms
borrowTimeout: 100ms
name: grpcforwardproxy
`
proxy := newTestProxy(s, at)

server := proxy.mainPool.LoadBalancer().ChooseServer(nil)

at.NotEqual("", proxy.mainPool.getTarget(server.URL))

proxy.Close()

s = `
kind: GRPCProxy
pools:
- loadBalance:
policy: forward
forwardKey: targetAddress
serviceName: easegress
maxIdleConnsPerHost: 2
connectTimeout: 100ms
borrowTimeout: 100ms
name: grpcforwardproxy
`
proxy = newTestProxy(s, at)
request := grpcprot.NewRequestWithContext(context.Background())
request.Header().Add("targetAddress", "192.168.1.1:8080")

at.Equal("192.168.1.1:8080", proxy.mainPool.getTarget(proxy.mainPool.LoadBalancer().ChooseServer(request).URL))

request.Header().Set("targetAddress", "192.168.1.1")
at.Equal("", proxy.mainPool.getTarget(proxy.mainPool.LoadBalancer().ChooseServer(request).URL))
}
8 changes: 4 additions & 4 deletions pkg/object/grpcserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (mi *muxInstance) search(request *grpcprot.Request) *route {
// The key of the cache is grpc server address + called method
// and if a method is cached, we are sure it does not contain any
// headers.
r := mi.getRouteFromCache(request.Host(), fullMethod)
r := mi.getRouteFromCache(request.OnlyHost(), fullMethod)
if r != nil {
if r.code != 0 {
return r
Expand All @@ -412,7 +412,7 @@ func (mi *muxInstance) search(request *grpcprot.Request) *route {
}

for _, rs := range mi.rules {
if !rs.match(request.Host()) {
if !rs.match(request.OnlyHost()) {
continue
}

Expand All @@ -431,7 +431,7 @@ func (mi *muxInstance) search(request *grpcprot.Request) *route {
// The method can be put into the cache if it has no headers.
if len(method.headers) == 0 {
r = &route{code: 0, method: method}
mi.putRouteToCache(request.Host(), fullMethod, r)
mi.putRouteToCache(request.OnlyHost(), fullMethod, r)
} else if !method.matchHeaders(request) {
headerMismatch = true
continue
Expand Down Expand Up @@ -459,7 +459,7 @@ func (mi *muxInstance) search(request *grpcprot.Request) *route {
code: codes.NotFound,
message: "grpc stream miss match any conditions",
}
mi.putRouteToCache(request.Host(), fullMethod, notFound)
mi.putRouteToCache(request.OnlyHost(), fullMethod, notFound)
return notFound
}

Expand Down
21 changes: 20 additions & 1 deletion pkg/protocols/grpcprot/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (
realIP string
sts *serverTransportStream
peer *peer.Peer
host string
}

serverTransportStream struct {
Expand Down Expand Up @@ -272,14 +273,32 @@ func (r *Request) SetSourceHost(sourceHost string) {
}

// Host refer to google.golang.org\[email protected]\internal\transport\http2_server.go#operateHeaders
// ensure only one value
// It may be of the form "host:port" or contain an international domain name.
func (r *Request) Host() string {
return r.header.GetFirst(Authority)
}

// OnlyHost return host or hostname without port
func (r *Request) OnlyHost() string {
if r.host != "" {
return r.host
}
host := r.Host()
if h, _, err := net.SplitHostPort(host); err == nil {
host = h
}
r.host = host
return host
}

// SetHost set the host of the request.
func (r *Request) SetHost(host string) {
r.header.Set(Authority, host)
if h, _, err := net.SplitHostPort(host); err == nil {
r.host = h
} else {
r.host = host
}
}

// Header returns the header of the request in type protocols.Header.
Expand Down

0 comments on commit 601cf96

Please sign in to comment.