Skip to content

Commit

Permalink
Merge pull request harness#3344 from harness/CI-8597
Browse files Browse the repository at this point in the history
(fix) prevent scheduler deadlock
  • Loading branch information
TP Honey committed Aug 9, 2023
2 parents 5075f84 + bc8a37d commit 480790f
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ steps:
- name: test
image: golang:1.14.15
commands:
- go test ./...
- go test -race ./...
- go build -o /dev/null github.com/drone/drone/cmd/drone-server
- go build -o /dev/null -tags "oss nolimit" github.com/drone/drone/cmd/drone-server

Expand Down
24 changes: 20 additions & 4 deletions scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ type queue struct {
}

// newQueue returns a new Queue backed by the build datastore.
func newQueue(store core.StageStore) *queue {
func newQueue(ctx context.Context, store core.StageStore) *queue {
q := &queue{
store: store,
globMx: redisdb.LockErrNoOp{},
ready: make(chan struct{}, 1),
workers: map[*worker]struct{}{},
interval: time.Minute,
ctx: context.Background(),
ctx: ctx,
}
go q.start()
return q
Expand Down Expand Up @@ -87,6 +87,8 @@ func (q *queue) Resume(ctx context.Context) error {
}

func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
w := &worker{
kind: params.Kind,
typ: params.Type,
Expand All @@ -96,6 +98,7 @@ func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, e
variant: params.Variant,
labels: params.Labels,
channel: make(chan *core.Stage),
done: ctx.Done(),
}
q.Lock()
q.workers[w] = struct{}{}
Expand Down Expand Up @@ -209,8 +212,20 @@ func (q *queue) signal(ctx context.Context) error {
// Msg("cannot update queue item")
// continue
// }
select {
case w.channel <- item:

// TODO: refactor to its own unexported method
sendWork := func() bool {
select {
case w.channel <- item:
return true
case <-w.done:
// Worker will exit when we call the deferred q.Unlock()
case <-time.After(q.interval):
// Worker failed to ack before timeout
}
return false
}
if sendWork() {
delete(q.workers, w)
break loop
}
Expand Down Expand Up @@ -241,6 +256,7 @@ type worker struct {
variant string
labels map[string]string
channel chan *core.Stage
done <-chan struct{}
}

type counter struct {
Expand Down
66 changes: 63 additions & 3 deletions scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package queue

import (
"context"
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -33,7 +34,7 @@ func TestQueue(t *testing.T) {
store.EXPECT().ListIncomplete(ctx).Return(items[1:], nil).Times(1)
store.EXPECT().ListIncomplete(ctx).Return(items[2:], nil).Times(1)

q := newQueue(store)
q := newQueue(ctx, store)
for _, item := range items {
next, err := q.Request(ctx, core.Filter{OS: "linux", Arch: "amd64"})
if err != nil {
Expand All @@ -54,8 +55,7 @@ func TestQueueCancel(t *testing.T) {
store := mock.NewMockStageStore(controller)
store.EXPECT().ListIncomplete(ctx).Return(nil, nil)

q := newQueue(store)
q.ctx = ctx
q := newQueue(ctx, store)

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -356,3 +356,63 @@ func TestWithinLimits_Old(t *testing.T) {
}
}
}

func incomplete(n int) ([]*core.Stage, error) {
ret := make([]*core.Stage, n)
for i := range ret {
ret[i] = &core.Stage{
OS: "linux/amd64",
Arch: "amd64",
}
}
return ret, nil
}

func TestQueueDeadlock(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

n := 10
donechan := make(chan struct{}, n)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := mock.NewMockStageStore(controller)
store.EXPECT().ListIncomplete(ctx).Return(incomplete(n)).AnyTimes()

q := newQueue(ctx, store)
doWork := func(i int) bool {
select {
case <-ctx.Done():
return false
default:
}
ctx, cancel := context.WithTimeout(ctx,
time.Duration(i+rand.Intn(1000/n))*time.Millisecond)
defer cancel()
if i%3 == 0 {
// Randomly cancel some contexts to simulate timeouts
cancel()
}
_, err := q.Request(ctx, core.Filter{OS: "linux/amd64", Arch: "amd64"})
if err != nil && err != context.Canceled && err !=
context.DeadlineExceeded {
t.Errorf("Expected context.Canceled or context.DeadlineExceeded error, got %s", err)
}
select {
case donechan <- struct{}{}:
case <-ctx.Done():
}
return true
}
for i := 0; i < n; i++ {
go func(i int) {
// Spawn n workers, doing work until the parent context is canceled
for doWork(i) {
}
}(i)
}
// Wait for n * 10 tasks to complete, then exit and cancel all the workers.
for seen := 0; seen < n*10; seen++ {
<-donechan
}
}
6 changes: 4 additions & 2 deletions scheduler/queue/scheduler_non_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !oss
// +build !oss

package queue

import (
"context"
"time"

"github.com/drone/drone/core"
Expand All @@ -27,13 +29,13 @@ import (
func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler {
if r == nil {
return scheduler{
queue: newQueue(store),
queue: newQueue(context.Background(), store),
canceller: newCanceller(),
}
}

sched := schedulerRedis{
queue: newQueue(store),
queue: newQueue(context.Background(), store),
cancellerRedis: newCancellerRedis(r),
}

Expand Down
5 changes: 4 additions & 1 deletion scheduler/queue/scheduler_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build oss
// +build oss

package queue

import (
"context"

"github.com/drone/drone/core"
"github.com/drone/drone/service/redisdb"
)

// New creates a new scheduler.
func New(store core.StageStore, r redisdb.RedisDB) core.Scheduler {
return scheduler{
queue: newQueue(store),
queue: newQueue(context.Background(), store),
canceller: newCanceller(),
}
}

0 comments on commit 480790f

Please sign in to comment.