-
Notifications
You must be signed in to change notification settings - Fork 2
/
DC_ThreadPool.h
130 lines (110 loc) · 2.89 KB
/
DC_ThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#pragma once
#ifndef liuzianglib_ThreadPool
#define liuzianglib_ThreadPool
#include <string>
#include <thread>
#include <mutex>
#include <future>
#include <queue>
//Version 2.4.22V17
//20180301
namespace DC {
class ThreadPool {
public:
ThreadPool(std::size_t inputThreadNumber) {
ThreadNumber = inputThreadNumber;
RunningNumber = 0;
state_ = state::STOP;
WorkerThreads.reserve(ThreadNumber);
for (std::size_t i = 0; i < ThreadNumber; i++)
WorkerThreads.emplace_back(&ThreadPool::thread_func, this);
}
ThreadPool(const ThreadPool& input) = delete;
ThreadPool& operator=(const ThreadPool& input) = delete;
~ThreadPool() {
join();
}
private:
enum class state
{
RUNNING,
STOP,
EXPIRING,//将亡,意味着join已被调用
WAITING//wait已被调用
};
typedef std::function<void()> job_type;
public:
template <typename funtype, typename ...argstype> std::future<typename std::result_of<funtype(argstype...)>::type> async(funtype&& func, argstype&&... args) {
typedef std::packaged_task<std::result_of<funtype(argstype...)>::type(argstype...)> task_type;
auto task = std::make_shared<task_type>(std::forward<funtype>(func));
push_task(std::bind([task](argstype... args) mutable {
(*task)(std::forward<argstype>(args)...);
}, std::forward<argstype>(args)...));
return task->get_future();
}
inline void start() {
state_ = state::RUNNING;
thread_cv.notify_all();
}
inline void stop() {
state_ = state::STOP;
}
inline state GetState() {
return state_;
}
inline std::size_t GetJobsQueueSize() {
return jobs.size();
}
inline std::size_t GetRunningNumber() {
return RunningNumber;
}
//立即停止,不会完成工作队列
void join() {
state_ = state::EXPIRING;
thread_cv.notify_all();
for (std::thread& t : WorkerThreads)
if (t.joinable())
t.join();
}
//完成工作队列后停止
void wait() {
state_ = state::WAITING;
thread_cv.notify_all();
for (std::thread& t : WorkerThreads)
if (t.joinable())
t.join();
}
private:
void push_task(job_type job) {
{
std::lock_guard<std::mutex> LMwthread(Mwthread);
jobs.push(std::move(job));
}
thread_cv.notify_one();
}
void thread_func() {
while (true) {
std::unique_lock<std::mutex> lock(Mwthread);
thread_cv.wait(lock, [this] {
return (!jobs.empty() && state_ == state::RUNNING) || (state_ == state::EXPIRING|| state_ == state::WAITING);
});
if (state_ == state::EXPIRING || (state_ == state::WAITING&&jobs.empty()))
return;
job_type job = std::move(jobs.front());
jobs.pop();
lock.unlock();
RunningNumber++;
job();
RunningNumber--;
}
}
private:
std::vector<std::thread> WorkerThreads;
std::queue<job_type> jobs;
std::atomic<std::size_t> ThreadNumber, RunningNumber;
std::mutex Mwthread;
std::condition_variable thread_cv;
state state_;
};
}
#endif