Skip to content

Commit

Permalink
Merge pull request #3 from gianarb/feature/timeout
Browse files Browse the repository at this point in the history
Timeout and context cancellation
  • Loading branch information
gianarb committed Sep 6, 2019
2 parents 8427d56 + 676cb2d commit 74237fa
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 7 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.12

require (
github.com/google/uuid v1.1.1
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/stretchr/testify v1.4.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -17,6 +15,7 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
17 changes: 13 additions & 4 deletions scheduer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *Scheduler) Execute(ctx context.Context, p Plan) error {
}
logger.Info("Started execution plan " + p.Name())
s.stepCounter = 0

for {
steps, err := p.Create(ctx)
if err != nil {
Expand All @@ -62,15 +63,23 @@ func (s *Scheduler) Execute(ctx context.Context, p Plan) error {
// react is a recursive function that goes over all the steps and the one
// returned by previous steps until the plan does not return anymore steps
func (s *Scheduler) react(ctx context.Context, steps []Procedure, logger *zap.Logger) error {
var innerSteps []Procedure
for _, step := range steps {
var err error
s.stepCounter = s.stepCounter + 1
if loggableS, ok := step.(Loggable); ok {
loggableS.WithLogger(logger)
}
innerSteps, err := step.Do(ctx)
if err != nil {
logger.Error("Step failed.", zap.String("step", step.Name()), zap.Error(err))
return err
select {
case <-ctx.Done():
logger.Error("Step not executed.", zap.String("step", step.Name()), zap.Error(ctx.Err()))
return ctx.Err()
default:
innerSteps, err = step.Do(ctx)
if err != nil {
logger.Error("Step failed.", zap.String("step", step.Name()), zap.Error(err))
return err
}
}
if len(innerSteps) > 0 {
if err := s.react(ctx, innerSteps, logger); err != nil {
Expand Down
46 changes: 46 additions & 0 deletions sheduler_test.go → scheduer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"
"testing"
"time"
)

const Seq = "seq"
Expand Down Expand Up @@ -62,6 +63,51 @@ func (o *One) Do(ctx context.Context) ([]Procedure, error) {
return nil, nil
}

type FakeStep struct {
do func(ctx context.Context) ([]Procedure, error)
name string
}

func (o *FakeStep) Name() string {
if o.name == "" {
return "fake"
}
return o.name
}

func (o *FakeStep) Do(ctx context.Context) ([]Procedure, error) {
return o.do(ctx)
}

func TestTriggerSchedulerTimeout(t *testing.T) {
ctx := context.Background()
ctx, cF := context.WithTimeout(ctx, 200*time.Millisecond)
defer cF()
p := &FakePlan{
P: []Procedure{},
Counter: 0,
}
p.P = append(p.P, &FakeStep{
name: "sleep",
do: func(ctx context.Context) ([]Procedure, error) {
time.Sleep(210 * time.Millisecond)
return nil, nil
},
})
p.P = append(p.P, &FakeStep{
name: "sleep",
do: func(ctx context.Context) ([]Procedure, error) {
time.Sleep(210 * time.Millisecond)
return nil, nil
},
})
s := NewScheduler()
err := s.Execute(ctx, p)
if err != context.DeadlineExceeded {
t.Fatalf("expected to get an deadline exceeded error. we got %s", err)
}
}

func TestExecutionSingleStep(t *testing.T) {
order := []string{}
seq := make(chan string, 1)
Expand Down

0 comments on commit 74237fa

Please sign in to comment.