Skip to content

Commit

Permalink
VAULT-1401 and 1402 - preliminary fair sharing (hashicorp#1701) (hash…
Browse files Browse the repository at this point in the history
…icorp#10917)

* basic pool and start testing

* refactor a bit for testing

* workFunc, start/stop safety, testing

* cleanup function for worker quit, more tests

* redo public/private members

* improve tests, export types, switch uuid package

* fix loop capture bug, cleanup

* cleanup tests

* update worker pool file name, other improvements

* add job manager prototype

* remove remnants

* add functions to wait for job manager and worker pool to stop, other fixes

* test job manager functionality, fix bugs

* encapsulate how jobs are distributed to workers

* make worker job channel read only

* add job interface, more testing, fixes

* set name for dispatcher

* fix test races

* dispatcher and job manager constructors don't return errors

* logger now dependency injected

* make some members private, test fcn to get worker pool size

* make GetNumWorkers public

* Update helper/fairshare/jobmanager_test.go

Co-authored-by: Brian Kassouf <[email protected]>

* make workerpool private

* remove custom worker names

* concurrency improvements

* remove worker pool cleanup function

* remove cleanup func from job manager, remove non blocking stop from fairshare

* stop fairshare when started in tests

* stop leaking job manager goroutine

* prototype channel for waking up to assign work

* fix typo/bug and add tests

* improve job manager wake up, fix test typo

* put channel drain back

* better start/pause test for job manager

* go mod vendor

Co-authored-by: Brian Kassouf <[email protected]>

Co-authored-by: Brian Kassouf <[email protected]>
  • Loading branch information
swayne275 and briankassouf committed Feb 12, 2021
1 parent 8e1ca21 commit 111a66c
Show file tree
Hide file tree
Showing 5 changed files with 1,363 additions and 0 deletions.
65 changes: 65 additions & 0 deletions helper/fairshare/fairshare_testing_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package fairshare

import (
"fmt"
"testing"

log "github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
)

type testJob struct {
id string
ex func(id string) error
onFail func(error)
}

// TODO maybe define these function types
func (t *testJob) GetID() string {
return t.id
}

func (t *testJob) Execute() error {
return t.ex(t.GetID())
}

func (t *testJob) OnFailure(err error) {
t.onFail(err)
}

func newTestJob(t *testing.T, id string, ex func(string) error, onFail func(error)) testJob {
t.Helper()
if ex == nil {
t.Errorf("ex cannot be nil")
}
if onFail == nil {
t.Errorf("onFail cannot be nil")
}

return testJob{
id: id,
ex: ex,
onFail: onFail,
}
}

func newDefaultTestJob(t *testing.T, id string) testJob {
ex := func(_ string) error { return nil }
onFail := func(_ error) {}
return newTestJob(t, id, ex, onFail)
}

func newTestLogger(name string) log.Logger {
guid, err := uuid.GenerateUUID()
if err != nil {
guid = "no-guid"
}
return log.New(&log.LoggerOptions{
Name: fmt.Sprintf("%s-%s", name, guid),
Level: log.LevelFromString("TRACE"),
})
}

func GetNumWorkers(j *JobManager) int {
return j.workerPool.numWorkers
}
241 changes: 241 additions & 0 deletions helper/fairshare/jobmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package fairshare

import (
"container/list"
"fmt"
"io/ioutil"
"sync"

log "github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/sdk/helper/logging"
)

/*
Future Work:
- track workers per queue. this will involve things like:
- somehow wrap the Execute/OnFailure functions to increment counter when
they start running, and decrement when they stop running
-- put a queue.IncrementCounter() call at the beginning
-- call the provided work function in the middle
-- put a queue.DecrementCounter() call at the end
- job has a queueID or reference to the queue
- queue only removed when empty AND no workers
*/

type JobManager struct {
name string
queues map[string]*list.List
queuesIndex []string
lastQueueAccessed int
quit chan struct{}
newWork chan struct{} // must be buffered
workerPool *dispatcher
onceStart sync.Once
onceStop sync.Once
logger log.Logger
wg sync.WaitGroup

// protects `queues`, `queuesIndex`, `lastQueueAccessed`
l sync.RWMutex
}

// NewJobManager creates a job manager, with an optional name
func NewJobManager(name string, numWorkers int, l log.Logger) *JobManager {
if l == nil {
l = logging.NewVaultLoggerWithWriter(ioutil.Discard, log.NoLevel)
}
if name == "" {
guid, err := uuid.GenerateUUID()
if err != nil {
l.Warn("uuid generator failed, using 'no-uuid'", "err", err)
guid = "no-uuid"
}

name = fmt.Sprintf("jobmanager-%s", guid)
}

wp := newDispatcher(fmt.Sprintf("%s-dispatcher", name), numWorkers, l)

j := JobManager{
name: name,
queues: make(map[string]*list.List),
queuesIndex: make([]string, 0),
lastQueueAccessed: -1,
quit: make(chan struct{}),
newWork: make(chan struct{}, 1),
workerPool: wp,
logger: l,
}

j.logger.Trace("created job manager", "name", name, "pool_size", numWorkers)
return &j
}

// Start starts the job manager
// note: a given job manager cannot be restarted after it has been stopped
func (j *JobManager) Start() {
j.onceStart.Do(func() {
j.logger.Trace("starting job manager", "name", j.name)
j.workerPool.start()
j.assignWork()
})
}

// Stop stops the job manager, and waits for the worker pool and
// job manager to quit gracefully
func (j *JobManager) Stop() {
j.onceStop.Do(func() {
j.logger.Trace("terminating job manager and waiting...")
j.workerPool.stop()
close(j.quit)
j.wg.Wait()
})
}

// AddJob adds a job to the given queue, creating the queue if it doesn't exist
func (j *JobManager) AddJob(job Job, queueID string) {
j.l.Lock()
if len(j.queues) == 0 {
defer func() {
// newWork must be buffered to avoid deadlocks if work is added
// before the job manager is started
j.newWork <- struct{}{}
}()
}
defer j.l.Unlock()

if _, ok := j.queues[queueID]; !ok {
j.addQueue(queueID)
}

j.queues[queueID].PushBack(job)
}

// GetCurrentJobCount returns the total number of pending jobs in the job manager
func (j *JobManager) GetPendingJobCount() int {
j.l.RLock()
defer j.l.RUnlock()

cnt := 0
for _, q := range j.queues {
cnt += q.Len()
}

return cnt
}

// GetWorkerCounts() returns a map of queue ID to number of active workers
func (j *JobManager) GetWorkerCounts() map[string]int {
// TODO implement with VLT-145
return nil
}

// GetWorkQueueLengths() returns a map of queue ID to number of active workers
func (j *JobManager) GetWorkQueueLengths() map[string]int {
out := make(map[string]int)

j.l.RLock()
defer j.l.RUnlock()

for k, v := range j.queues {
out[k] = v.Len()
}

return out
}

// getNextJob grabs the next job to be processed and prunes empty queues
func (j *JobManager) getNextJob() Job {
j.l.Lock()
defer j.l.Unlock()

if len(j.queues) == 0 {
return nil
}

j.lastQueueAccessed = (j.lastQueueAccessed + 1) % len(j.queuesIndex)
queueID := j.queuesIndex[j.lastQueueAccessed]

jobElement := j.queues[queueID].Front()
out := j.queues[queueID].Remove(jobElement)

if j.queues[queueID].Len() == 0 {
j.removeLastQueueAccessed()
}

return out.(Job)
}

// assignWork continually loops checks for new jobs and dispatches them to the
// worker pool
func (j *JobManager) assignWork() {
j.wg.Add(1)

go func() {
for {
for {
// assign work while there are jobs to distribute
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
// keep the channel empty since we're already processing work
default:
}

job := j.getNextJob()
if job != nil {
j.workerPool.dispatch(job)
} else {
break
}
}

// listen for a wake-up when an emtpy job manager has been given
// new work
select {
case <-j.quit:
j.wg.Done()
return
case <-j.newWork:
break
}
}
}()
}

// addQueue generates a new queue if a queue for `queueID` doesn't exist
// note: this must be called with l held for write
func (j *JobManager) addQueue(queueID string) {
if _, ok := j.queues[queueID]; !ok {
j.queues[queueID] = list.New()
j.queuesIndex = append(j.queuesIndex, queueID)
}
}

// removeLastQueueAccessed removes the queue and index map for the last queue
// accessed. It is to be used when the last queue accessed has emptied.
// note: this must be called with l held for write
func (j *JobManager) removeLastQueueAccessed() {
if j.lastQueueAccessed == -1 || j.lastQueueAccessed > len(j.queuesIndex)-1 {
j.logger.Warn("call to remove queue out of bounds", "idx", j.lastQueueAccessed)
return
}

queueID := j.queuesIndex[j.lastQueueAccessed]

// remove the queue
delete(j.queues, queueID)

// remove the index for the queue
j.queuesIndex = append(j.queuesIndex[:j.lastQueueAccessed], j.queuesIndex[j.lastQueueAccessed+1:]...)

// correct the last queue accessed for round robining
if j.lastQueueAccessed > 0 {
j.lastQueueAccessed--
} else {
j.lastQueueAccessed = len(j.queuesIndex) - 1
}
}
Loading

0 comments on commit 111a66c

Please sign in to comment.