Skip to content

Commit

Permalink
support zipkin b3 format
Browse files Browse the repository at this point in the history
  • Loading branch information
localvar committed Apr 22, 2022
1 parent 02972f1 commit 97efc21
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 248 deletions.
39 changes: 10 additions & 29 deletions pkg/filters/proxy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
gohttpstat "github.com/tcnksm/go-httpstat"

"github.com/megaease/easegress/pkg/context"
Expand Down Expand Up @@ -68,7 +67,6 @@ type serverPoolContext struct {
*context.Context
span tracing.Span
startTime time.Time
endTime time.Time

req *httpprot.Request
stdReq *http.Request
Expand All @@ -91,33 +89,17 @@ func (spCtx *serverPoolContext) prepareRequest(svr *Server, ctx stdcontext.Conte
return err
}

stdr.Header = spCtx.req.HTTPHeader()
stdr.Header = spCtx.req.HTTPHeader().Clone()
if !svr.addrIsHostName {
stdr.Host = spCtx.req.Host()
}

spCtx.stdReq = stdr
return nil
}

func (spCtx *serverPoolContext) start(spanName string) {
spCtx.startTime = fasttime.Now()
span := spCtx.Span().NewChildWithStart(spanName, spCtx.startTime)
carrier := opentracing.HTTPHeadersCarrier(spCtx.req.HTTPHeader())
span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, carrier)
spCtx.span = span
}

func (spCtx *serverPoolContext) finish() {
if spCtx.endTime.IsZero() {
now := fasttime.Now()
spCtx.endTime = now
spCtx.span.Finish()
if spCtx.span != nil {
spCtx.span.InjectHTTP(stdr)
}
}

func (spCtx *serverPoolContext) duration() time.Duration {
return spCtx.endTime.Sub(spCtx.startTime)
spCtx.stdReq = stdr
return nil
}

// ServerPool defines a server pool.
Expand Down Expand Up @@ -324,9 +306,8 @@ func (sp *ServerPool) InjectResiliencePolicy(policies map[string]resilience.Poli
}

func (sp *ServerPool) collectMetrics(spCtx *serverPoolContext) {
spCtx.finish()
duration := fasttime.Since(spCtx.startTime)

duration := spCtx.duration()
spCtx.LazyAddTag(func() string {
return stringtool.Cat(sp.name, "#duration: ", duration.String())
})
Expand Down Expand Up @@ -375,7 +356,7 @@ func (sp *ServerPool) handle(ctx *context.Context, mirror bool) string {
return ""
}

spCtx.start(sp.spec.SpanName)
spCtx.startTime = fasttime.Now()
defer sp.collectMetrics(spCtx)

if sp.buildResponseFromCache(spCtx) {
Expand All @@ -397,6 +378,9 @@ func (sp *ServerPool) handle(ctx *context.Context, mirror bool) string {
spCtx.resp = nil
spCtx.stdResp = nil

spCtx.span = ctx.Span().NewChild(sp.spec.SpanName)
defer spCtx.span.Finish()

return sp.doHandle(stdctx, spCtx)
}

Expand Down Expand Up @@ -460,9 +444,6 @@ func (sp *ServerPool) doHandle(stdctx stdcontext.Context, spCtx *serverPoolConte

resp, err := fnSendRequest(spCtx.stdReq, sp.proxy.client)
if err != nil {
// NOTE: May add option to cancel the tracing if failed here.
// spCtx.Span().Cancel()

spCtx.LazyAddTag(func() string {
return fmt.Sprintf("send request error: %v", err)
})
Expand Down
32 changes: 0 additions & 32 deletions pkg/object/httpserver/context.go

This file was deleted.

8 changes: 4 additions & 4 deletions pkg/object/httpserver/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type (

cache *lru.ARCCache

tracer *tracing.Tracing
tracer *tracing.Tracer
ipFilter *ipfilter.IPFilter
ipFilterChan *ipfilter.IPFilters

Expand Down Expand Up @@ -306,7 +306,7 @@ func newMux(httpStat *httpstat.HTTPStat, topN *httpstat.TopN, mapper context.Mux

m.inst.Store(&muxInstance{
spec: &Spec{},
tracer: tracing.NoopTracing,
tracer: tracing.NoopTracer,
muxMapper: mapper,
httpStat: httpStat,
topN: topN,
Expand All @@ -318,7 +318,7 @@ func newMux(httpStat *httpstat.HTTPStat, topN *httpstat.TopN, mapper context.Mux
func (m *mux) reload(superSpec *supervisor.Spec, muxMapper context.MuxMapper) {
spec := superSpec.ObjectSpec().(*Spec)

tracer := tracing.NoopTracing
tracer := tracing.NoopTracer
oldInst := m.inst.Load().(*muxInstance)
if !reflect.DeepEqual(oldInst.spec.Tracing, spec.Tracing) {
defer func() {
Expand Down Expand Up @@ -403,7 +403,7 @@ func (mi *muxInstance) serveHTTP(stdw http.ResponseWriter, stdr *http.Request) {
bodySize := -1

startAt := fasttime.Now()
span := tracing.NewSpanWithStart(mi.tracer, mi.superSpec.Name(), startAt)
span := mi.tracer.NewSpanWithStart(mi.superSpec.Name(), startAt)
ctx := context.New(span)

// httpprot.NewRequest never returns an error.
Expand Down
5 changes: 3 additions & 2 deletions pkg/object/httpserver/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package httpserver

import (
"bytes"
stdcontext "context"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -320,8 +321,8 @@ func (r *runtime) closeServer() {

if r.server != nil {
// NOTE: It's safe to shutdown serve failed server.
ctx, cancelFunc := serverShutdownContext()
defer cancelFunc()
ctx, cancel := stdcontext.WithTimeout(stdcontext.Background(), 30*time.Second)
defer cancel()
err := r.server.Shutdown(ctx)
if err != nil {
logger.Warnf("shutdown http1/2 server %s failed: %v",
Expand Down
25 changes: 0 additions & 25 deletions pkg/tracing/base/base.go

This file was deleted.

138 changes: 30 additions & 108 deletions pkg/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,159 +18,81 @@
package tracing

import (
"sync"
"net/http"
"time"

opentracing "github.com/opentracing/opentracing-go"
zipkingo "github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/propagation/b3"

"github.com/megaease/easegress/pkg/tracing/base"
"github.com/megaease/easegress/pkg/util/fasttime"
)

type (
// Span is the span of the Tracing.
Span interface {
// Tracer returns the Tracer that created this Span.
Tracer() opentracing.Tracer

// Context yields the SpanContext for this Span
Context() opentracing.SpanContext
zipkingo.Span

// Finish finishes the span.
Finish()
// Cancel cancels the span, it should be called before Finish called.
// It will cancel all descendent spans.
Cancel()
// Tracer returns the Tracer that created this Span.
Tracer() *Tracer

// NewChild creates a child span.
NewChild(name string) Span

// NewChildWithStart creates a child span with start time.
NewChildWithStart(name string, startAt time.Time) Span

// SetName changes the span name.
SetName(name string)

// LogKV logs key:value for the span.
//
// The keys must all be strings. The values may be strings, numeric types,
// bools, Go error instances, or arbitrary structs.
//
// Example:
//
// span.LogKV(
// "event", "soft error",
// "type", "cache timeout",
// "waited.millis", 1500)
LogKV(kvs ...interface{})

// SetTag sets tag key and value.
SetTag(key string, value string)
// IsNoopSpan returns true if span is NoopSpan.
IsNoopSpan() bool
// InjectHTTP injects span context into an HTTP request.
InjectHTTP(r *http.Request)
}

span struct {
mutex sync.Mutex
tracer *Tracing
span opentracing.Span
children []*span
zipkingo.Span
tracer *Tracer
}
)

// NoopSpan does nothing.
var NoopSpan = &span{
tracer: NoopTracing,
span: NoopTracing.StartSpan(""), // will return opentracing.defaultNoopSpan
}

// NewSpan creates a span.
func NewSpan(tracer *Tracing, name string) Span {
if tracer.IsNoopTracer() {
return NoopSpan
}
return newSpanWithStart(tracer, name, fasttime.Now())
}

// NewSpanWithStart creates a span with specify start time.
func NewSpanWithStart(tracer *Tracing, name string, startAt time.Time) Span {
if tracer.IsNoopTracer() {
return NoopSpan
}
return newSpanWithStart(tracer, name, startAt)
}

func newSpanWithStart(tracer *Tracing, name string, startAt time.Time) Span {
newSpan := tracer.StartSpan(name, opentracing.StartTime(startAt))
for tagKey, tagValue := range tracer.tags {
newSpan.SetTag(tagKey, tagValue)
}
return &span{
tracer: tracer,
span: newSpan,
}
}
var NoopSpan = &span{tracer: NoopTracer, Span: NoopTracer.tracer.StartSpan("")}

func (s *span) IsNoopSpan() bool {
// IsNoop returns whether the span is a noop span.
func (s *span) IsNoop() bool {
return s == NoopSpan
}

func (s *span) Tracer() opentracing.Tracer {
// Tracer returns the tracer of the span.
func (s *span) Tracer() *Tracer {
return s.tracer
}

func (s *span) Context() opentracing.SpanContext {
return s.span.Context()
}

func (s *span) Finish() {
s.span.Finish()
}

func (s *span) Cancel() {
s.span.SetTag(base.CancelTagKey, "yes")
for _, child := range s.children {
child.Cancel()
}
}

// NewChild creates a new child span.
func (s *span) NewChild(name string) Span {
if s.IsNoopSpan() {
if s.IsNoop() {
return s
}
return s.newChildWithStart(name, fasttime.Now())
}

// NewChildWithStart creates a new child span with specified start time.
func (s *span) NewChildWithStart(name string, startAt time.Time) Span {
if s.IsNoopSpan() {
if s.IsNoop() {
return s
}
return s.newChildWithStart(name, startAt)
}

func (s *span) newChildWithStart(name string, startAt time.Time) Span {
childSpan := s.tracer.StartSpan(name,
opentracing.ChildOf(s.span.Context()),
opentracing.StartTime(startAt))
child := &span{
child := s.tracer.tracer.StartSpan(name,
zipkingo.Parent(s.Context()),
zipkingo.StartTime(startAt))

return &span{
tracer: s.tracer,
span: childSpan,
Span: child,
}

s.mutex.Lock()
defer s.mutex.Unlock()
s.children = append(s.children, child)

return child
}

func (s *span) SetName(name string) {
s.span.SetOperationName(name)
}

func (s *span) LogKV(kv ...interface{}) {
s.span.LogKV(kv...)
}

func (s *span) SetTag(key string, value string) {
s.span.SetTag(key, value)
// InjectHTTP injects span context into an HTTP request.
func (s *span) InjectHTTP(r *http.Request) {
inject := b3.InjectHTTP(r, b3.WithSingleHeaderOnly())
inject(s.Context())
}
Loading

0 comments on commit 97efc21

Please sign in to comment.