0%

线程池代码解析

网上大佬用 C++11 写的线程池代码,和自己的阅读笔记

代码中有很多c11新增的高级操作, 配合笔记 一起理解

ThreadPool.h 头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <queue>
#include <type_traits>
#include <utility>
#include <vector>

// C++11 版的 线程池
namespace zl
{
class ThreadsGuard
{
public:
ThreadsGuard(std::vector<std::thread>& v)
: threads_(v)
{
}
~ThreadsGuard()
{ // join 需要等到所有线程运行完毕,才能退出析构函数
for (size_t i = 0; i != threads_.size(); ++i)
{
if (threads_[i].joinable())
{
threads_[i].join();
}
}
}
private:
ThreadsGuard(ThreadsGuard&& tg) = delete;
ThreadsGuard& operator = (ThreadsGuard&& tg) = delete;

ThreadsGuard(const ThreadsGuard&) = delete;
ThreadsGuard& operator = (const ThreadsGuard&) = delete;
private:
std::vector<std::thread>& threads_;
};


class ThreadPool
{
public:
typedef std::function<void()> task_type;

public:
explicit ThreadPool(int n = 0);

~ThreadPool()
{
stop();
cond_.notify_all();
}

void stop(){
stop_.store(true, std::memory_order_release);
}

template<class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type> add(Function&&, Args&&...);

private:
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator = (ThreadPool&&) = delete;
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator = (const ThreadPool&) = delete;

private:
std::atomic<bool> stop_; // 将值做了一遍封装,使得这个值的访问不会导致数据竞争
std::mutex mtx_; // 互斥锁用于对任务队列互斥访问
std::condition_variable cond_; // 条件变量 在测试条件的时候要先加锁 确保完全进入 wait 状态后再释放锁,然后再安心接收条件,免得还没wait呢,发了一个唤醒信号,就丢了。

std::queue<task_type> tasks_;
std::vector<std::thread> threads_;
zl::ThreadsGuard tg_;
};


inline ThreadPool::ThreadPool(int n)
: stop_(false)
, tg_(threads_)
{
int nthreads = n;
if (nthreads <= 0)
{
nthreads = std::thread::hardware_concurrency();
nthreads = (nthreads == 0 ? 2 : nthreads);
}
// 1. 构造函数,n 设置线程池中的线程数目。构造函数中,首先开启了n个线程,线程池其实就是 std::vector<std::thread> 的一个数组,每个线程的函数都在干同一件事:等待条件,如果条件成立,就从任务队列中取出一个任务执行。
for (int i = 0; i != nthreads; ++i)
{
threads_.push_back(std::thread([this]{
while (!stop_.load(std::memory_order_acquire))
{
task_type task;
{ // 这个 大括号至关重要,表明这是一个 操作临界区的部分代码,因为使用了 unique_lock,当退出这个临界区代码,unique_lock 会自动调用析构函数解锁。然后接着执行 task 任务,在执行task 任务期间并不需要上锁,只有从队列取出任务的过程需要上锁
// 使用 unique_lock 对互斥锁进行封装,有自动上锁和解锁的功能,析构函数中解锁。 task() 退出函数后自动解锁
std::unique_lock<std::mutex> ulk(this->mtx_);
// 当任务队列为空的时候的等待 或者不是stop_模式的时候等待,如果是stop模式,不等待直接退出
this->cond_.wait(ulk, [this]{ return stop_.load(std::memory_order_acquire) || !this->tasks_.empty(); });
if (stop_.load(std::memory_order_acquire))
return;
// 函数的返回值是一个临时右值,使用 move 直接高效的转移给 task (当调用的函数返回的是引用的时候,实际返回的是左值,当值返回时,返回的是右值。这里使用的move 无论 front() 返回的是左值还是右值都被转换成了右值)
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
task();
}
}));
}
}
// 2. 增加任务的函数,任务队列 tasks_ 实际是一个 function 类型的队列。result_of 用于获取输入函数fcn的返回类型
// 3. future 是 创建packge_task后的返回值,用于异步获取调用结果
template<class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type>
ThreadPool::add(Function&& fcn, Args&&... args)
{
// 4. std::result_of<Function(Args...)>::type 中的 type 实际是类的一个成员,前面的 typename 告诉编译器将这个成员当作类型名称。所以 最终 return_type 就是一个类型名称。
typedef typename std::result_of<Function(Args...)>::type return_type;
// 5. 加入 return_type 是 int 型 那么这句话相当于申明了一个 std::packaged_task<int()> 型的类,他就可以包装 返回值为 int 的可调用对象。进而使用它的返回值 future 实现异步读取执行结果的功能
typedef std::packaged_task<return_type()> task;
// 6. 由于前面申明的只是一个 int() 型的 package_task 类型,但是输入函数还有参数,因此先用 bind 绑定可执行对象和参数,形成一个不带输入参数的新的可调用对象,这里只是用智能指针管理。
// 将可调用函数 fcn 和 对应的参数 args 绑定成一个仿函数。会隐式的转换成函数指针 std::forward用于完美转发
// 将该函数指针用 share_ptr 智能指针管理
auto t = std::make_shared<task>(std::bind(std::forward<Function>(fcn), std::forward<Args>(args)...));
// 7. 获取package_task的返回值,是 future对象
auto ret = t->get_future();
{
// 8. 要向任务队列中添加任务的时候先上锁,这里用 lock_guard 来管理 互斥锁 在退出大括号的作用域后,可以自动解锁
std::lock_guard<std::mutex> lg(mtx_);
if (stop_.load(std::memory_order_acquire))
throw std::runtime_error("thread pool has stopped");
// 9. 这里之所以还要用 lambad函数封一层,是因为 t 是 package_task 类型的对象,可以执行,但是 不同的 输入fcn 可能有不同的 返回值类型,例如按照前面假设的这里就是 package_task<int()> 类型。因此 为了更通用,这里用 lambda 函数封一遍 统一都是 function<void(void)>类型了。但是这里不用 package_task封了,因为 这里的 lambda 没返回值,不用future来异步获取结果。
tasks_.emplace([t]{(*t)(); });
}
cond_.notify_one();
return ret;
}
}

#endif /* THREAD_POOL_H */

main文件使用例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <string>
#include "ThreadPool.h"

using namespace std;


int main(){
std::mutex mtx;
try{
zl::ThreadPool tp;
std::vector<std::future<int>> v;
std::vector<std::future<void>> v1;

for (int i = 0; i <= 10; ++i)
{
auto ans = tp.add([](int answer) { return answer; }, i);
v.push_back(std::move(ans));
}

for (int i = 0; i <= 5; ++i){
auto ans = tp.add([&mtx](const std::string& str1, const std::string& str2)
{
std::lock_guard<std::mutex> lg(mtx);
std::cout << (str1 + str2) << std::endl;
return;
}, "hello ", "world");
v1.push_back(std::move(ans));
}
for (size_t i = 0; i < v.size(); ++i) {
std::lock_guard<std::mutex> lg(mtx);
cout << v[i].get() << endl;
}

for (size_t i = 0; i < v1.size(); ++i){
v1[i].get();
}
}
catch (std::exception& e){
std::cout << e.what() << std::endl;
}

}