Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Unique Queue infrastructure and move TestPullRequests to this #9856

Merged
merged 23 commits into from
Feb 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8eb4ce3
Upgrade levelqueue to version 0.2.0
zeripath Jan 26, 2020
ca14f21
Add UniqueQueue interface and functions to create them
zeripath Jan 26, 2020
22c4563
Add UniqueQueue implementations
zeripath Jan 26, 2020
92c2207
Move TestPullRequests over to use UniqueQueue
zeripath Jan 26, 2020
e6bd896
fix revive
zeripath Jan 29, 2020
7b967e0
Reduce code duplication
zeripath Jan 31, 2020
4b278ce
Merge remote-tracking branch 'origin/master' into unique-queues
zeripath Jan 31, 2020
486a8e0
Merge remote-tracking branch 'origin/master' into unique-queues
zeripath Jan 31, 2020
307f18a
Add bytefifos
zeripath Jan 31, 2020
028872e
fix locking in persistablechanneluniquequeue shutdown
zeripath Feb 1, 2020
8f6f779
rename queue pr_patch_checker
zeripath Feb 1, 2020
2d88ad7
Ensure invalid types are logged
zeripath Feb 1, 2020
23e7017
Move body of ByteFIFOQueue terminate out of select
zeripath Feb 1, 2020
a2fb188
Fix close race in PersistableChannelQueue Shutdown
zeripath Feb 1, 2020
f53c675
Fix double lock in unique wrapped
zeripath Feb 1, 2020
c54e2e4
Simplify PushFunc in Unique wrapped slightly
zeripath Feb 1, 2020
d82abd9
rename q fifo in queue_disk and handle not found err in unique_queue_…
zeripath Feb 1, 2020
47d94a1
Lock for Empty and readToChan
zeripath Feb 1, 2020
343cded
Merge branch 'master' into unique-queues
zeripath Feb 1, 2020
5ab601b
Merge branch 'master' into unique-queues
zeripath Feb 1, 2020
25c1427
Add some more comments
zeripath Feb 2, 2020
111a87e
Merge branch 'master' into unique-queues
zeripath Feb 2, 2020
6b368d6
Merge branch 'master' into unique-queues
zeripath Feb 2, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ relation to port exhaustion.
- `BATCH_LENGTH`: **20**: Batch data before passing to the handler
- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.
- `QUEUE_NAME`: **_queue**: The suffix for default redis queue name. Individual queues will default to **`name`**`QUEUE_NAME` but can be overriden in the specific `queue.name` section.
- `SET_NAME`: **_unique**: The suffix that will added to the default redis
set name for unique queues. Individual queues will default to
**`name`**`QUEUE_NAME`_`SET_NAME`_ but can be overridden in the specific
`queue.name` section.
- `WRAP_IF_NECESSARY`: **true**: Will wrap queues with a timeoutable queue if the selected queue is not ready to be created - (Only relevant for the level queue.)
- `MAX_ATTEMPTS`: **10**: Maximum number of attempts to create the wrapped queue
- `TIMEOUT`: **GRACEFUL_HAMMER_TIME + 30s**: Timeout the creation of the wrapped queue if it takes longer than this to create.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.13

require (
cloud.google.com/go v0.45.0 // indirect
gitea.com/lunny/levelqueue v0.1.0
gitea.com/lunny/levelqueue v0.2.0
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b
gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76
gitea.com/macaron/captcha v0.0.0-20190822015246-daa973478bae
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbf
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
gitea.com/lunny/levelqueue v0.1.0 h1:7wMk0VH6mvKN6vZEZCy9nUDgRmdPLgeNrm1NkW8EHNk=
gitea.com/lunny/levelqueue v0.1.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
gitea.com/lunny/levelqueue v0.2.0 h1:lR/5EAwQtFcn5YvPEkNMw0p9pAy2/O2nSP5ImECLA2E=
gitea.com/lunny/levelqueue v0.2.0/go.mod h1:G7hVb908t0Bl0uk7zGSg14fyzNtxgtD9Shf04wkMK7s=
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b h1:vXt85uYV17KURaUlhU7v4GbCShkqRZDSfo0TkC0YCjQ=
gitea.com/macaron/binding v0.0.0-20190822013154-a5f53841ed2b/go.mod h1:Cxadig6POWpPYYSfg23E7jo35Yf0yvsdC1lifoKWmPo=
gitea.com/macaron/cache v0.0.0-20190822004001-a6e7fee4ee76 h1:mMsMEg90c5KXQgRWsH8D6GHXfZIW1RAe5S9VYIb12lM=
Expand Down
61 changes: 61 additions & 0 deletions modules/queue/bytefifo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package queue

// ByteFIFO defines a FIFO that takes a byte array
type ByteFIFO interface {
// Len returns the length of the fifo
Len() int64
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
PushFunc(data []byte, fn func() error) error
// Pop pops data from the start of the fifo
Pop() ([]byte, error)
// Close this fifo
Close() error
}

// UniqueByteFIFO defines a FIFO that Uniques its contents
type UniqueByteFIFO interface {
ByteFIFO
// Has returns whether the fifo contains this data
Has(data []byte) (bool, error)
}

var _ (ByteFIFO) = &DummyByteFIFO{}

// DummyByteFIFO represents a dummy fifo
type DummyByteFIFO struct{}

// PushFunc returns nil
func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error {
return nil
}

// Pop returns nil
func (*DummyByteFIFO) Pop() ([]byte, error) {
return []byte{}, nil
}

// Close returns nil
func (*DummyByteFIFO) Close() error {
return nil
}

// Len is always 0
func (*DummyByteFIFO) Len() int64 {
return 0
}

var _ (UniqueByteFIFO) = &DummyUniqueByteFIFO{}

// DummyUniqueByteFIFO represents a dummy unique fifo
type DummyUniqueByteFIFO struct {
DummyByteFIFO
}

// Has always returns false
func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) {
return false, nil
}
20 changes: 15 additions & 5 deletions modules/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,35 @@ type DummyQueue struct {
}

// Run does nothing
func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
func (*DummyQueue) Run(_, _ func(context.Context, func())) {}

// Push fakes a push of data to the queue
func (b *DummyQueue) Push(Data) error {
func (*DummyQueue) Push(Data) error {
return nil
}

// PushFunc fakes a push of data to the queue with a function. The function is never run.
func (*DummyQueue) PushFunc(Data, func() error) error {
return nil
}

// Has always returns false as this queue never does anything
func (*DummyQueue) Has(Data) (bool, error) {
return false, nil
}

// Flush always returns nil
func (b *DummyQueue) Flush(time.Duration) error {
func (*DummyQueue) Flush(time.Duration) error {
return nil
}

// FlushWithContext always returns nil
func (b *DummyQueue) FlushWithContext(context.Context) error {
func (*DummyQueue) FlushWithContext(context.Context) error {
return nil
}

// IsEmpty asserts that the queue is empty
func (b *DummyQueue) IsEmpty() bool {
func (*DummyQueue) IsEmpty() bool {
return true
}

Expand Down
227 changes: 227 additions & 0 deletions modules/queue/queue_bytefifo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package queue

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"code.gitea.io/gitea/modules/log"
)

// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
Name string
}

var _ (Queue) = &ByteFIFOQueue{}

// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
type ByteFIFOQueue struct {
*WorkerPool
byteFIFO ByteFIFO
typ Type
closed chan struct{}
terminated chan struct{}
exemplar interface{}
workers int
name string
lock sync.Mutex
}

// NewByteFIFOQueue creates a new ByteFIFOQueue
func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error) {
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)

return &ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO,
typ: typ,
closed: make(chan struct{}),
terminated: make(chan struct{}),
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
}, nil
}

// Name returns the name of this queue
func (q *ByteFIFOQueue) Name() string {
return q.name
}

// Push pushes data to the fifo
func (q *ByteFIFOQueue) Push(data Data) error {
return q.PushFunc(data, nil)
}

// PushFunc pushes data to the fifo
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
return err
}
return q.byteFIFO.PushFunc(bs, fn)
}

// IsEmpty checks if the queue is empty
func (q *ByteFIFOQueue) IsEmpty() bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.WorkerPool.IsEmpty() {
return false
}
return q.byteFIFO.Len() == 0
}

// Run runs the bytefifo queue
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), q.Shutdown)
atTerminate(context.Background(), q.Terminate)
log.Debug("%s: %s Starting", q.typ, q.name)

go func() {
_ = q.AddWorkers(q.workers, 0)
}()

go q.readToChan()

log.Trace("%s: %s Waiting til closed", q.typ, q.name)
<-q.closed
log.Trace("%s: %s Waiting til done", q.typ, q.name)
q.Wait()

log.Trace("%s: %s Waiting til cleaned", q.typ, q.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
q.CleanUp(ctx)
cancel()
}

func (q *ByteFIFOQueue) readToChan() {
for {
select {
case <-q.closed:
// tell the pool to shutdown.
q.cancel()
return
default:
q.lock.Lock()
bs, err := q.byteFIFO.Pop()
zeripath marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
q.lock.Unlock()
log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err)
time.Sleep(time.Millisecond * 100)
continue
}

if len(bs) == 0 {
q.lock.Unlock()
time.Sleep(time.Millisecond * 100)
continue
}

data, err := unmarshalAs(bs, q.exemplar)
if err != nil {
log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err)
q.lock.Unlock()
time.Sleep(time.Millisecond * 100)
continue
}

log.Trace("%s %s: Task found: %#v", q.typ, q.name, data)
q.WorkerPool.Push(data)
q.lock.Unlock()
}
}
}

// Shutdown processing from this queue
func (q *ByteFIFOQueue) Shutdown() {
log.Trace("%s: %s Shutting down", q.typ, q.name)
q.lock.Lock()
select {
case <-q.closed:
default:
close(q.closed)
}
q.lock.Unlock()
log.Debug("%s: %s Shutdown", q.typ, q.name)
}

// Terminate this queue and close the queue
func (q *ByteFIFOQueue) Terminate() {
log.Trace("%s: %s Terminating", q.typ, q.name)
q.Shutdown()
q.lock.Lock()
select {
case <-q.terminated:
q.lock.Unlock()
return
default:
}
close(q.terminated)
q.lock.Unlock()
if log.IsDebug() {
log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len())
}
if err := q.byteFIFO.Close(); err != nil {
log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err)
}
log.Debug("%s: %s Terminated", q.typ, q.name)
}

var _ (UniqueQueue) = &ByteFIFOUniqueQueue{}

// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
type ByteFIFOUniqueQueue struct {
ByteFIFOQueue
}

// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error) {
configInterface, err := toConfig(ByteFIFOQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(ByteFIFOQueueConfiguration)

return &ByteFIFOUniqueQueue{
ByteFIFOQueue: ByteFIFOQueue{
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
byteFIFO: byteFIFO,
typ: typ,
closed: make(chan struct{}),
terminated: make(chan struct{}),
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
},
}, nil
}

// Has checks if the provided data is in the queue
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if !assignableTo(data, q.exemplar) {
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
return false, err
}
return q.byteFIFO.(UniqueByteFIFO).Has(bs)
}
Loading