Skip to content

Commit

Permalink
Improve with logging deep components
Browse files Browse the repository at this point in the history
Propogate context with logger to ToxicCollection, ToxicLink.
  • Loading branch information
miry committed Sep 6, 2022
1 parent 77dfc2f commit dc4b9e4
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 71 deletions.
38 changes: 20 additions & 18 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ import (
"github.com/Shopify/toxiproxy/v2/toxics"
)

func stopBrowsersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.UserAgent(), "Mozilla/") {
http.Error(w, "User agent not allowed", 403)
} else {
next.ServeHTTP(w, r)
}
})
}

func timeoutMiddleware(next http.Handler) http.Handler {
return http.TimeoutHandler(next, 30*time.Second, "")
}

type ApiServer struct {
Collection *ProxyCollection
Metrics *metricsContainer
Expand Down Expand Up @@ -46,20 +60,6 @@ func (server *ApiServer) PopulateConfig(filename string) {
}
}

func stopBrowsersMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.UserAgent(), "Mozilla/") {
http.Error(w, "User agent not allowed", 403)
} else {
next.ServeHTTP(w, r)
}
})
}

func timeoutMiddleware(next http.Handler) http.Handler {
return http.TimeoutHandler(next, 5*time.Second, "")
}

func (server *ApiServer) Listen(host string, port string) {
r := mux.NewRouter()
r.Use(hlog.NewHandler(*server.Logger))
Expand Down Expand Up @@ -153,6 +153,7 @@ func (server *ApiServer) ProxyIndex(response http.ResponseWriter, request *http.
}

func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.Request) {
ctx := request.Context()
proxies := server.Collection.Proxies()

for _, proxy := range proxies {
Expand All @@ -161,13 +162,13 @@ func (server *ApiServer) ResetState(response http.ResponseWriter, request *http.
return
}

proxy.Toxics.ResetToxics()
proxy.Toxics.ResetToxics(ctx)
}

response.WriteHeader(http.StatusNoContent)
_, err := response.Write(nil)
if err != nil {
log := zerolog.Ctx(request.Context())
log := zerolog.Ctx(ctx)
log.Warn().Err(err).Msg("ResetState: Failed to write headers to client")
}
}
Expand Down Expand Up @@ -414,21 +415,22 @@ func (server *ApiServer) ToxicUpdate(response http.ResponseWriter, request *http

func (server *ApiServer) ToxicDelete(response http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
ctx := request.Context()
log := zerolog.Ctx(ctx)

proxy, err := server.Collection.Get(vars["proxy"])
if server.apiError(response, err) {
return
}

err = proxy.Toxics.RemoveToxic(vars["toxic"])
err = proxy.Toxics.RemoveToxic(ctx, vars["toxic"])
if server.apiError(response, err) {
return
}

response.WriteHeader(http.StatusNoContent)
_, err = response.Write(nil)
if err != nil {
log := zerolog.Ctx(request.Context())
log.Warn().Err(err).Msg("ToxicDelete: Failed to write headers to client")
}
}
Expand Down
49 changes: 32 additions & 17 deletions link.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package toxiproxy

import (
"context"
"fmt"
"io"
"net"

Expand Down Expand Up @@ -183,23 +185,36 @@ func (link *ToxicLink) UpdateToxic(toxic *toxics.ToxicWrapper) {
}

// Remove an existing toxic from the chain.
func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
i := toxic.Index
func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapper) {
toxic_index := toxic.Index
log := zerolog.Ctx(ctx).
With().
Str("component", "ToxicLink").
Str("method", "RemoveToxic").
Str("toxic", toxic.Name).
Str("toxic_type", toxic.Type).
Int("toxic_index", toxic.Index).
Str("link_addr", fmt.Sprintf("%p", link)).
Str("toxic_stub_addr", fmt.Sprintf("%p", link.stubs[toxic_index])).
Str("prev_toxic_stub_addr", fmt.Sprintf("%p", link.stubs[toxic_index-1])).
Logger()

if link.stubs[i].InterruptToxic() {
if link.stubs[toxic_index].InterruptToxic() {
cleanup, ok := toxic.Toxic.(toxics.CleanupToxic)
if ok {
cleanup.Cleanup(link.stubs[i])
cleanup.Cleanup(link.stubs[toxic_index])
// Cleanup could have closed the stub.
if link.stubs[i].Closed() {
if link.stubs[toxic_index].Closed() {
log.Trace().Msg("Cleanup closed toxic and removed toxic")
// TODO: Check if cleanup happen would link.stubs recalculated?
return
}
}

log.Trace().Msg("Interrupt the previous toxic to update its output")
stop := make(chan bool)
// Interrupt the previous toxic to update its output
go func() {
stop <- link.stubs[i-1].InterruptToxic()
stop <- link.stubs[toxic_index-1].InterruptToxic()
}()

// Unblock the previous toxic if it is trying to flush
Expand All @@ -210,32 +225,32 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
select {
case interrupted = <-stop:
stopped = true
case tmp := <-link.stubs[i].Input:
case tmp := <-link.stubs[toxic_index].Input:
if tmp == nil {
link.stubs[i].Close()
link.stubs[toxic_index].Close()
if !stopped {
<-stop
}
return
}
link.stubs[i].Output <- tmp
link.stubs[toxic_index].Output <- tmp
}
}

// Empty the toxic's buffer if necessary
for len(link.stubs[i].Input) > 0 {
tmp := <-link.stubs[i].Input
for len(link.stubs[toxic_index].Input) > 0 {
tmp := <-link.stubs[toxic_index].Input
if tmp == nil {
link.stubs[i].Close()
link.stubs[toxic_index].Close()
return
}
link.stubs[i].Output <- tmp
link.stubs[toxic_index].Output <- tmp
}

link.stubs[i-1].Output = link.stubs[i].Output
link.stubs = append(link.stubs[:i], link.stubs[i+1:]...)
link.stubs[toxic_index-1].Output = link.stubs[toxic_index].Output
link.stubs = append(link.stubs[:toxic_index], link.stubs[toxic_index+1:]...)

go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
go link.stubs[toxic_index-1].Run(link.toxics.chain[link.direction][toxic_index-1])
}
}

Expand Down
19 changes: 14 additions & 5 deletions link_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package toxiproxy

import (
"context"
"encoding/binary"
"flag"
"io"
"os"
"testing"
"time"

Expand Down Expand Up @@ -81,6 +84,7 @@ func TestStubInitializaationWithToxics(t *testing.T) {
}

func TestAddRemoveStubs(t *testing.T) {
ctx := context.Background()
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, zerolog.Nop())
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
Expand Down Expand Up @@ -117,7 +121,7 @@ func TestAddRemoveStubs(t *testing.T) {
}

// Remove stubs
collection.chainRemoveToxic(toxic)
collection.chainRemoveToxic(ctx, toxic)
if cap(link.stubs[len(link.stubs)-1].Output) != 0 {
t.Fatalf("Link output buffer was not initialized as 0: %d", cap(link.stubs[0].Output))
}
Expand All @@ -134,6 +138,7 @@ func TestAddRemoveStubs(t *testing.T) {
}

func TestNoDataDropped(t *testing.T) {
ctx := context.Background()
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, zerolog.Nop())
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
Expand All @@ -159,17 +164,17 @@ func TestNoDataDropped(t *testing.T) {
}
link.input.Close()
}()
go func() {
go func(ctx context.Context) {
for {
select {
case <-done:
return
default:
collection.chainAddToxic(toxic)
collection.chainRemoveToxic(toxic)
collection.chainRemoveToxic(ctx, toxic)
}
}
}()
}(ctx)

buf := make([]byte, 2)
for i := 0; i < 64*1024; i++ {
Expand Down Expand Up @@ -238,7 +243,11 @@ func TestToxicity(t *testing.T) {

func TestStateCreated(t *testing.T) {
collection := NewToxicCollection(nil)
link := NewToxicLink(nil, collection, stream.Downstream, zerolog.Nop())
log := zerolog.Nop()
if flag.Lookup("test.v").DefValue == "true" {
log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger()
}
link := NewToxicLink(nil, collection, stream.Downstream, log)
go link.stubs[0].Run(collection.chain[stream.Downstream][0])
collection.links["test"] = link

Expand Down
70 changes: 49 additions & 21 deletions toxic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package toxiproxy

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -43,14 +44,14 @@ func NewToxicCollection(proxy *Proxy) *ToxicCollection {
return collection
}

func (c *ToxicCollection) ResetToxics() {
func (c *ToxicCollection) ResetToxics(ctx context.Context) {
c.Lock()
defer c.Unlock()

// Remove all but the first noop toxic
for dir := range c.chain {
for len(c.chain[dir]) > 1 {
c.chainRemoveToxic(c.chain[dir][1])
c.chainRemoveToxic(ctx, c.chain[dir][1])
}
}
}
Expand Down Expand Up @@ -158,16 +159,28 @@ func (c *ToxicCollection) UpdateToxicJson(
return nil, ErrToxicNotFound
}

func (c *ToxicCollection) RemoveToxic(name string) error {
func (c *ToxicCollection) RemoveToxic(ctx context.Context, name string) error {
log := zerolog.Ctx(ctx).
With().
Str("component", "ToxicCollection").
Str("method", "RemoveToxic").
Str("toxic", name).
Str("proxy", c.proxy.Name).
Logger()
log.Trace().Msg("Acquire locking...")
c.Lock()
defer c.Unlock()

log.Trace().Msg("Getting toxic by name...")
toxic := c.findToxicByName(name)
if toxic != nil {
c.chainRemoveToxic(toxic)
return nil
if toxic == nil {
log.Trace().Msg("Could not find toxic by name")
return ErrToxicNotFound
}
return ErrToxicNotFound

c.chainRemoveToxic(ctx, toxic)
log.Trace().Msg("Finished")
return nil
}

func (c *ToxicCollection) StartLink(
Expand Down Expand Up @@ -217,17 +230,17 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {
c.chain[dir] = append(c.chain[dir], toxic)

// Asynchronously add the toxic to each link
group := sync.WaitGroup{}
wg := sync.WaitGroup{}
for _, link := range c.links {
if link.direction == dir {
group.Add(1)
go func(link *ToxicLink) {
defer group.Done()
wg.Add(1)
go func(link *ToxicLink, wg *sync.WaitGroup) {
defer wg.Done()
link.AddToxic(toxic)
}(link)
}(link, &wg)
}
}
group.Wait()
wg.Wait()
}

func (c *ToxicCollection) chainUpdateToxic(toxic *toxics.ToxicWrapper) {
Expand All @@ -247,25 +260,40 @@ func (c *ToxicCollection) chainUpdateToxic(toxic *toxics.ToxicWrapper) {
group.Wait()
}

func (c *ToxicCollection) chainRemoveToxic(toxic *toxics.ToxicWrapper) {
func (c *ToxicCollection) chainRemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapper) {
log := zerolog.Ctx(ctx).
With().
Str("component", "ToxicCollection").
Str("method", "chainRemoveToxic").
Str("toxic", toxic.Name).
Str("direction", toxic.Direction.String()).
Logger()

dir := toxic.Direction
c.chain[dir] = append(c.chain[dir][:toxic.Index], c.chain[dir][toxic.Index+1:]...)
for i := toxic.Index; i < len(c.chain[dir]); i++ {
c.chain[dir][i].Index = i
}

// Asynchronously remove the toxic from each link
group := sync.WaitGroup{}
wg := sync.WaitGroup{}

event_array := zerolog.Arr()
for _, link := range c.links {
if link.direction == dir {
group.Add(1)
go func(link *ToxicLink) {
defer group.Done()
link.RemoveToxic(toxic)
}(link)
event_array = event_array.Str(fmt.Sprintf("Link[%p] %s", link, link.Direction()))
wg.Add(1)
go func(ctx context.Context, link *ToxicLink, log zerolog.Logger) {
defer wg.Done()
link.RemoveToxic(ctx, toxic)
}(ctx, link, log)
}
}
group.Wait()

log.Trace().
Array("links", event_array).
Msg("Waiting to update links")
wg.Wait()

toxic.Index = -1
}
Loading

0 comments on commit dc4b9e4

Please sign in to comment.