Skip to content

Commit

Permalink
add metrics (#588)
Browse files Browse the repository at this point in the history
  • Loading branch information
localvar committed Apr 19, 2022
1 parent 0e791f0 commit d88df64
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 115 deletions.
35 changes: 30 additions & 5 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package context

import (
"bytes"
"fmt"
"runtime/debug"

"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/protocols"
"github.com/megaease/easegress/pkg/tracing"
)
Expand Down Expand Up @@ -220,24 +223,46 @@ func (ctx *Context) GetKV(key interface{}) interface{} {
return ctx.kv[key]
}

// Tags joins all tags into a string and returns it.
func (ctx *Context) Tags() string {
buf := bytes.Buffer{}

for i, fn := range ctx.lazyTags {
if i > 0 {
buf.WriteString(" | ")
}
buf.WriteString(fn())
}

return buf.String()
}

// OnFinish registers a function to be called in Finish.
func (ctx *Context) OnFinish(fn func()) {
ctx.finishFuncs = append(ctx.finishFuncs, fn)
}

// Finish calls all finish functions.
func (ctx *Context) Finish() {
const msgFmt = "failed to execute finish action: %v, stack trace: \n%s\n"

for _, req := range ctx.requests {
req.Close()
}

for _, resp := range ctx.responses {
resp.Close()
}

for _, fn := range ctx.finishFuncs {
fn()
}
for _, fn := range ctx.lazyTags {
// TODO: add tags here
fn()
func() {
defer func() {
if err := recover(); err != nil {
logger.Errorf(msgFmt, err, debug.Stack())
}
}()

fn()
}()
}
}
8 changes: 6 additions & 2 deletions pkg/filters/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ func (sp *ServerPool) collectMetrics(spCtx *serverPoolContext) {
metric.Duration = duration

metric.ReqSize = uint64(spCtx.req.MetaSize())
metric.ReqSize += uint64(len(spCtx.req.RawPayload()))
metric.ReqSize += uint64(spCtx.req.PayloadLength())

metric.RespSize = uint64(spCtx.resp.MetaSize())
metric.RespSize += uint64(len(spCtx.resp.RawPayload()))
metric.RespSize += uint64(spCtx.resp.PayloadLength())

sp.httpStat.Stat(metric)
}
Expand Down Expand Up @@ -521,6 +521,10 @@ func (sp *ServerPool) buildResponse(spCtx *serverPoolContext) error {
return err
}

if _, err = resp.FetchPayload(); err != nil {
return err
}

spCtx.resp = resp
spCtx.SetResponse(spCtx.TargetResponseID(), resp)
return nil
Expand Down
7 changes: 3 additions & 4 deletions pkg/object/globalfilter/globalfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,17 @@ func (gf *GlobalFilter) Inherit(superSpec *supervisor.Spec, previousGeneration s
gf.reload(previousGeneration.(*GlobalFilter))
}

// Handle `beforePipeline` and `afterPipeline` before and after the httpHandler is executed.
func (gf *GlobalFilter) Handle(ctx *context.Context, httpHandle context.Handler) {
// Handle `beforePipeline` and `afterPipeline` before and after the handler is executed.
func (gf *GlobalFilter) Handle(ctx *context.Context, handler context.Handler) {
result := gf.beforeHandle(ctx)
if result == pipeline.BuiltInFilterEnd {
return
}
result = httpHandle.Handle(ctx)
result = handler.Handle(ctx)
if result == pipeline.BuiltInFilterEnd {
return
}
gf.afterHandle(ctx)
return
}

// BeforeHandle before handler logic for beforePipeline spec.
Expand Down
145 changes: 91 additions & 54 deletions pkg/object/httpserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/megaease/easegress/pkg/object/globalfilter"
"github.com/megaease/easegress/pkg/protocols/httpprot"
"github.com/tomasen/realip"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
Expand Down Expand Up @@ -142,19 +141,19 @@ func allowIP(ipFilter *ipfilter.IPFilter, ip string) bool {
return ipFilter.Allow(ip)
}

func (mi *muxInstance) getCacheRoute(req *http.Request) *route {
func (mi *muxInstance) getCacheRoute(req *httpprot.Request) *route {
if mi.cache != nil {
key := stringtool.Cat(req.Host, req.Method, req.URL.Path)
key := stringtool.Cat(req.Host(), req.Method(), req.Path())
if value, ok := mi.cache.Get(key); ok {
return value.(*route)
}
}
return nil
}

func (mi *muxInstance) putRouteToCache(req *http.Request, r *route) {
func (mi *muxInstance) putRouteToCache(req *httpprot.Request, r *route) {
if mi.cache != nil {
key := stringtool.Cat(req.Host, req.Method, req.URL.Path)
key := stringtool.Cat(req.Host(), req.Method(), req.Path())
mi.cache.Add(key, r)
}
}
Expand Down Expand Up @@ -183,12 +182,12 @@ func newMuxRule(parentIPFilters *ipfilter.IPFilters, rule *Rule, paths []*MuxPat
}
}

func (mr *muxRule) match(r *http.Request) bool {
func (mr *muxRule) match(r *httpprot.Request) bool {
if mr.host == "" && mr.hostRE == nil {
return true
}

host := r.Host
host := r.Host()
if h, _, err := net.SplitHostPort(host); err == nil {
host = h
}
Expand Down Expand Up @@ -234,12 +233,12 @@ func newMuxPath(parentIPFilters *ipfilter.IPFilters, path *Path) *MuxPath {
}
}

func (mp *MuxPath) matchPath(r *http.Request) bool {
func (mp *MuxPath) matchPath(r *httpprot.Request) bool {
if mp.path == "" && mp.pathPrefix == "" && mp.pathRE == nil {
return true
}

path := r.URL.Path
path := r.Path()
if mp.path != "" && mp.path == path {
return true
}
Expand All @@ -253,17 +252,17 @@ func (mp *MuxPath) matchPath(r *http.Request) bool {
return false
}

func (mp *MuxPath) matchMethod(r *http.Request) bool {
func (mp *MuxPath) matchMethod(r *httpprot.Request) bool {
if len(mp.methods) == 0 {
return true
}

return stringtool.StrInSlice(r.Method, mp.methods)
return stringtool.StrInSlice(r.Method(), mp.methods)
}

func (mp *MuxPath) matchHeaders(r *http.Request) bool {
func (mp *MuxPath) matchHeaders(r *httpprot.Request) bool {
for _, h := range mp.headers {
v := r.Header.Get(h.Key)
v := r.HTTPHeader().Get(h.Key)
if stringtool.StrInSlice(v, h.Values) {
return true
}
Expand Down Expand Up @@ -366,42 +365,97 @@ func (m *mux) ServeHTTP(stdw http.ResponseWriter, stdr *http.Request) {
m.inst.Load().(*muxInstance).serveHTTP(stdw, stdr)
}

func buildFailureResponse(ctx *context.Context, statusCode int) *httpprot.Response {
resp, _ := httpprot.NewResponse(nil)
resp.SetStatusCode(statusCode)
ctx.SetResponse(context.DefaultResponseID, resp)
return resp
}

func (mi *muxInstance) serveHTTP(stdw http.ResponseWriter, stdr *http.Request) {
// The body of the original request maybe changed by handlers, we
// need to restore it before the return of this funtion to make
// sure it can be correctly closed by the standard Go HTTP package.
originalBody := stdr.Body
bodySize := -1

now := fasttime.Now()
span := tracing.NewSpanWithStart(mi.tracer, mi.superSpec.Name(), now)
startAt := fasttime.Now()
span := tracing.NewSpanWithStart(mi.tracer, mi.superSpec.Name(), startAt)
ctx := context.New(span)

// httpprot.NewRequest never returns an error.
req, _ := httpprot.NewRequest(stdr)
ctx.SetRequest(context.InitialRequestID, req)

// get topN here, as the path could be modified later.
topNStat := mi.topN.Stat(stdr.URL.Path)
topN := mi.topN.Stat(req.Path())

defer func() {
span.Finish()
// If FetchPayload is not called yet.
if bodySize == -1 {
written, _ := io.Copy(io.Discard, originalBody)
bodySize = int(written)
}

var resp *httpprot.Response
if v := ctx.Response(); v != nil {
resp = v.(*httpprot.Response)
} else {
resp = buildFailureResponse(ctx, http.StatusInternalServerError)
}

// Send the response.
header := stdw.Header()
for k, v := range resp.HTTPHeader() {
header[k] = v
}
stdw.WriteHeader(resp.StatusCode())
io.Copy(stdw, resp.GetPayload())

ctx.Finish()
// TODO
// topNStat.Stat(ctx.StatMetric())
_ = topNStat
// TODO:
// mi.httpStat.Stat(ctx.StatMetric())
// restore the body of the origin request.

metric := httpstat.Metric{
StatusCode: resp.StatusCode(),
Duration: fasttime.Since(startAt),
ReqSize: uint64(bodySize + req.MetaSize()),
RespSize: uint64(resp.MetaSize() + resp.PayloadLength()),
}
topN.Stat(&metric)
mi.httpStat.Stat(&metric)

span.Finish()

// Restore the body of the origin request.
stdr.Body = originalBody

// Write access log.
logger.LazyHTTPAccess(func() string {
// log format:
//
// [$startTime]
// [$remoteAddr $realIP $method $requestURL $proto $statusCode]
// [$contextDuration $readBytes $writeBytes]
// [$tags]
const logFmt = "[%s] [%s %s %s %s %s %d] [%v rx:%dB tx:%dB] [%s]"
return fmt.Sprintf(logFmt,
fasttime.Format(startAt, fasttime.RFC3339Milli),
stdr.RemoteAddr, req.RealIP(), stdr.Method, stdr.RequestURI,
stdr.Proto, resp.StatusCode(), metric.Duration, metric.ReqSize,
metric.RespSize, ctx.Tags())
})
}()

route := mi.search(stdr)
if route.code != http.StatusOK {
route := mi.search(req)
if route.code != 0 {
ctx.AddTag(fmt.Sprintf("status code: %d", route.code))
stdw.WriteHeader(route.code)
buildFailureResponse(ctx, route.code)
return
}

handler, ok := mi.muxMapper.GetHandler(route.path.backend)
if !ok {
ctx.AddTag(stringtool.Cat("backend ", route.path.backend, " not found"))
stdw.WriteHeader(http.StatusServiceUnavailable)
buildFailureResponse(ctx, http.StatusServiceUnavailable)
return
}

Expand All @@ -411,33 +465,16 @@ func (mi *muxInstance) serveHTTP(stdw http.ResponseWriter, stdr *http.Request) {
stdr.URL.Path = path
}

req, err := httpprot.NewRequest(stdr)
if err != nil {
ctx.AddTag(fmt.Sprintf("failed to wrap request: %v", err))
stdw.WriteHeader(http.StatusBadRequest)
return
}
if mi.spec.XForwardedFor {
mi.appendXForwardedFor(req)
}
ctx.SetRequest(context.InitialRequestID, req)

defer func() {
var resp *httpprot.Response
if v := ctx.Response(); v != nil {
resp = v.(*httpprot.Response)
}
if resp == nil {
stdw.WriteHeader(http.StatusInternalServerError)
return
}
header := stdw.Header()
for k, v := range resp.HTTPHeader() {
header[k] = v
}
stdw.WriteHeader(resp.StatusCode())
io.Copy(stdw, resp.GetPayload())
}()
bodySize, err := req.FetchPayload()
if err != nil {
ctx.AddTag(fmt.Sprintf("failed to read request body: %v", err))
buildFailureResponse(ctx, http.StatusBadRequest)
return
}

// global filter
globalFilter := mi.getGlobalFilter()
Expand All @@ -448,17 +485,17 @@ func (mi *muxInstance) serveHTTP(stdw http.ResponseWriter, stdr *http.Request) {
}
}

func (mi *muxInstance) search(req *http.Request) *route {
func (mi *muxInstance) search(req *httpprot.Request) *route {
headerMismatch, methodMismatch := false, false

ip := realip.FromRequest(req)
ip := req.RealIP()

// The key of the cache is req.Host + req.Method + req.URL.Path,
// and if a path is cached, we are sure it does not contain any
// headers.
r := mi.getCacheRoute(req)
if r != nil {
if r.code != http.StatusOK {
if r.code != 0 {
return r
}
if r.path.ipFilterChain == nil {
Expand Down Expand Up @@ -495,7 +532,7 @@ func (mi *muxInstance) search(req *http.Request) *route {

// The path can be put into the cache if it has no headers.
if len(path.headers) == 0 {
r = &route{code: http.StatusOK, path: path}
r = &route{code: 0, path: path}
mi.putRouteToCache(req, r)
} else if !path.matchHeaders(req) {
headerMismatch = true
Expand Down
4 changes: 3 additions & 1 deletion pkg/object/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ func (p *Pipeline) Handle(ctx *context.Context) string {
}
}

ctx.AddTag(serializeStats(stats))
ctx.LazyAddTag(func() string {
return serializeStats(stats)
})
return result
}

Expand Down
Loading

0 comments on commit d88df64

Please sign in to comment.