Skip to content

Commit

Permalink
Add configurable task queue size to GoPool
Browse files Browse the repository at this point in the history
- Updated README.md and README_zh.md to reflect the new feature of configurable task queue size.
- Added a new field 'taskQueueSize' in the 'goPool' struct to store the size of the task queue.
- Modified the 'NewGoPool' function to initialize the task queue with the specified size.
- Added a new function 'GetTaskQueueSize' to return the size of the task queue.
- Updated the test file 'gopool_test.go' to include tests for the new feature.
- Added a new option 'WithTaskQueueSize' in 'option.go' to allow users to set the size of the task queue when creating the pool.

Signed-off-by: Daniel Hu <[email protected]>
  • Loading branch information
daniel-hutao committed Aug 17, 2023
1 parent 3e13b06 commit bf1ff45
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 3 deletions.
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ ok github.com/devchat-ai/gopool 3.946s
<img src="./logo/gopool.png" width="750">
</div>

- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue.
- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue. The size of the task queue can be configured.

- [x] **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload.

Expand Down Expand Up @@ -163,6 +163,35 @@ func main() {
}
```

## Configurable Task Queue Size

GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue. The size of the task queue can be configured when creating the pool using the `WithTaskQueueSize` option.

Here is an example of how to use GoPool with a configurable task queue size:

```go
package main

import (
"time"

"github.com/devchat-ai/gopool"
)

func main() {
pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(5000))
defer pool.Release()

for i := 0; i < 1000; i++ {
pool.AddTask(func() (interface{}, error){
time.Sleep(10 * time.Millisecond)
return nil, nil
})
}
pool.Wait()
}
```

## Dynamic Worker Adjustment

GoPool supports dynamic worker adjustment. This means that the number of workers in the pool can increase or decrease based on the number of tasks in the queue. This feature can be enabled by setting the MinWorkers option when creating the pool.
Expand Down
31 changes: 30 additions & 1 deletion README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ ok github.com/devchat-ai/gopool 3.946s
<img src="./logo/gopool.png" width="750">
</div>

- [x] **任务队列**:GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。
- [x] **任务队列**:GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。任务队列的大小可配置。

- [x] **并发控制**:GoPool 可以控制并发任务的数量,防止系统过载。

Expand Down Expand Up @@ -163,6 +163,35 @@ func main() {
}
```

## 配置任务队列大小

GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。任务队列的大小可配置。可以通过在创建池时设置 `WithQueueSize` 选项来配置任务队列的大小。

这是一个如何配置 GoPool 任务队列大小的示例:

```go
package main

import (
"time"

"github.com/devchat-ai/gopool"
)

func main() {
pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(5000))
defer pool.Release()

for i := 0; i < 1000; i++ {
pool.AddTask(func() (interface{}, error){
time.Sleep(10 * time.Millisecond)
return nil, nil
})
}
pool.Wait()
}
```

## 动态工作器调整

GoPool 支持动态工作器调整。这意味着池中的工作器数量可以根据队列中的任务数量增加或减少。可以通过在创建池时设置 MinWorkers 选项来启用此功能。
Expand Down
15 changes: 14 additions & 1 deletion gopool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type GoPool interface {
Running() int
// GetWorkerCount returns the number of workers.
GetWorkerCount() int
// GetTaskQueueSize returns the size of the task queue.
GetTaskQueueSize() int
}

// task represents a function that will be executed by a worker.
Expand All @@ -33,6 +35,8 @@ type goPool struct {
minWorkers int
// tasks are added to this channel first, then dispatched to workers. Default buffer size is 1 million.
taskQueue chan task
// Set by WithTaskQueueSize(), used to set the size of the task queue. Default is 1e6.
taskQueueSize int
// Set by WithRetryCount(), used to retry a task when it fails. Default is 0.
retryCount int
lock sync.Locker
Expand Down Expand Up @@ -60,7 +64,8 @@ func NewGoPool(maxWorkers int, opts ...Option) GoPool {
// workers and workerStack should be initialized after WithMinWorkers() is called
workers: nil,
workerStack: nil,
taskQueue: make(chan task, 1e6),
taskQueue: nil,
taskQueueSize: 1e6,
retryCount: 0,
lock: new(sync.Mutex),
timeout: 0,
Expand All @@ -73,6 +78,7 @@ func NewGoPool(maxWorkers int, opts ...Option) GoPool {
opt(pool)
}

pool.taskQueue = make(chan task, pool.taskQueueSize)
pool.workers = make([]*worker, pool.minWorkers)
pool.workerStack = make([]int, pool.minWorkers)

Expand Down Expand Up @@ -213,3 +219,10 @@ func (p *goPool) GetWorkerCount() int {
defer p.lock.Unlock()
return len(p.workers)
}

// GetTaskQueueSize returns the size of the task queue.
func (p *goPool) GetTaskQueueSize() int {
p.lock.Lock()
defer p.lock.Unlock()
return p.taskQueueSize
}
10 changes: 10 additions & 0 deletions gopool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,14 @@ var _ = Describe("Gopool", func() {
Expect(pool.GetWorkerCount()).To(Equal(minWorkers))
})
})

Describe("With TaskQueueSize", func() {
It("should work correctly", func() {
size := 5000
pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(size))
defer pool.Release()

Expect(pool.GetTaskQueueSize()).To(Equal(size))
})
})
})
7 changes: 7 additions & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ func WithRetryCount(retryCount int) Option {
p.retryCount = retryCount
}
}

// WithTaskQueueSize sets the size of the task queue for the pool.
func WithTaskQueueSize(size int) Option {
return func(p *goPool) {
p.taskQueueSize = size
}
}

0 comments on commit bf1ff45

Please sign in to comment.