Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement concurrency limit template #32

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ If the file is not found, gowrap will look for the template [here](https://githu

List of available templates:
- [circuitbreaker](https://github.com/hexdigest/gowrap/tree/master/templates/circuitbreaker) stops executing methods of the wrapped interface after the specified number of consecutive errors and resumes execution after the specified delay
- [concurrencylimit](https://github.com/hexdigest/gowrap/tree/master/templates/concurrencylimit) limits amount of simultaneous calls
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"limits THE amount" please

- [fallback](https://github.com/hexdigest/gowrap/tree/master/templates/fallback) takes several implementations of the source interface and concurrently runs each implementation if the previous attempt didn't return the result in a specified period of time, it returns the first non-error result
- [log](https://github.com/hexdigest/gowrap/tree/master/templates/log) instruments the source interface with logging using standard logger from the "log" package
- [logrus](https://github.com/hexdigest/gowrap/tree/master/templates/logrus) instruments the source interface with logging using popular [sirupsen/logrus](https://github.com/sirupsen/logrus) logger
Expand Down
46 changes: 46 additions & 0 deletions templates/concurrencylimit
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{{ $decorator := (or .Vars.DecoratorName (printf "%sWithConcurrencyLimit" .Interface.Name)) }}

import (
"time"
)

// {{$decorator}} implements {{.Interface.Type}}
type {{$decorator}} struct {
_base {{.Interface.Type}}
_burst chan int
}

// New{{$decorator}} instruments an implementation of the {{.Interface.Type}} with concurrency limiting
func New{{$decorator}}(base {{.Interface.Type}}, concurrentCalls int) *{{$decorator}} {
d := &{{$decorator}}{
_base: base,
_burst: make(chan int, concurrentCalls),
}

return d
}

{{range $method := .Interface.Methods}}
// {{$method.Name}} implements {{$.Interface.Type}}
func (_d *{{$decorator}}) {{$method.Declaration}} {

{{- if (and $method.AcceptsContext $method.ReturnsError)}}
select {
case <-ctx.Done():
err = ctx.Err()
return
case _d._burst<-1:
defer func() {
<-_d._burst
}()
}
{{else}}
_d._burst<-1
defer func() {
<-_d._burst
}()
{{end}}

{{ $method.Pass "_d._base." }}
}
{{end}}
72 changes: 72 additions & 0 deletions templates_tests/interface_with_concurrency_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package templatestests

import "context"

// DO NOT EDIT!
// This code is generated with http:https://github.com/hexdigest/gowrap tool
// using ../templates/concurrencylimit template

//go:generate gowrap gen -p github.com/hexdigest/gowrap/templates_tests -i TestInterface -t ../templates/concurrencylimit -o interface_with_concurrency_limit.go

// TestInterfaceWithConcurrencyLimit implements TestInterface
type TestInterfaceWithConcurrencyLimit struct {
_base TestInterface
_burst chan int
}

// NewTestInterfaceWithConcurrencyLimit instruments an implementation of the TestInterface with concurrency limiting
func NewTestInterfaceWithConcurrencyLimit(base TestInterface, concurrentCalls int) *TestInterfaceWithConcurrencyLimit {
d := &TestInterfaceWithConcurrencyLimit{
_base: base,
_burst: make(chan int, concurrentCalls),
}

return d
}

// Channels implements TestInterface
func (_d *TestInterfaceWithConcurrencyLimit) Channels(chA chan bool, chB chan<- bool, chanC <-chan bool) {
_d._burst <- 1
defer func() {
<-_d._burst
}()

_d._base.Channels(chA, chB, chanC)
return
}

// F implements TestInterface
func (_d *TestInterfaceWithConcurrencyLimit) F(ctx context.Context, a1 string, a2 ...string) (result1 string, result2 string, err error) {
select {
case <-ctx.Done():
err = ctx.Err()
return
case _d._burst <- 1:
defer func() {
<-_d._burst
}()
}

return _d._base.F(ctx, a1, a2...)
}

// NoError implements TestInterface
func (_d *TestInterfaceWithConcurrencyLimit) NoError(s1 string) (s2 string) {
_d._burst <- 1
defer func() {
<-_d._burst
}()

return _d._base.NoError(s1)
}

// NoParamsOrResults implements TestInterface
func (_d *TestInterfaceWithConcurrencyLimit) NoParamsOrResults() {
_d._burst <- 1
defer func() {
<-_d._burst
}()

_d._base.NoParamsOrResults()
return
}
46 changes: 46 additions & 0 deletions templates_tests/interface_with_concurrency_limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package templatestests

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTestInterfaceWithConcurrencyLimit_F(t *testing.T) {
impl := &testImpl{r1: "1", r2: "2", delay: 100 * time.Millisecond}

wrapped := NewTestInterfaceWithConcurrencyLimit(impl, 3)

for i := 0; i < 10; i++ {
go func() {
r1, r2, err := wrapped.F(context.Background(), "a1")
assert.NoError(t, err)
assert.Equal(t, "1", r1)
assert.Equal(t, "2", r2)

}()
}

<-time.After(10 * time.Millisecond)

counter := atomic.LoadUint64(&impl.callCounter)
assert.EqualValues(t, 3, counter) // the first burst

<-time.After(100 * time.Millisecond)

counter = atomic.LoadUint64(&impl.callCounter)
assert.EqualValues(t, 6, counter) // the second burst

<-time.After(100 * time.Millisecond)

counter = atomic.LoadUint64(&impl.callCounter)
assert.EqualValues(t, 9, counter) // the third burst

<-time.After(100 * time.Millisecond)

counter = atomic.LoadUint64(&impl.callCounter)
assert.EqualValues(t, 10, counter) // the 10th call
}