0%

多缓冲多线程代码阅读笔记

详细阅读了别人项目中的图像预处理pip line 的工程实现,使用到了多缓冲,多线程等知识点。

注意: 代码中所有的 拷贝构造和 赋值构造都没实现,因此应该避免使用它的场合。例如多线程类,不能使用值传递的方式 (重载 () 的方法)。

algorithm.h 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#ifndef ALGORITH_H
#define ALGORITH_H
#include <ctime>
#include <vector>
#include <iostream>

using namespace std;

////////////////////////////////////////预处理方法////////////////////////////////////////////
/*
预处理方法使用 工厂模式,当要新增预处理方法时,
1. 需要 继承 AbstractAlgorithm 类,并实现 run 函数,
2. 在 AlgorithmFactory 中新增算法 id 和 实例化代码
图像预处理的算法参数在 创建对应算法类对象的时候传入类,由 param 保存
模板 T 代表 图像数据类型 例如 cv::Mat 测试的时候使用简单的 string 测试的。
P 为算法参数的类型 默认 void* 指针。
*/
template <typename T, typename P>
class AbstractAlgorithm{
public:
AbstractAlgorithm(): time(0){};
AbstractAlgorithm(P p): param(p), time(0){};
AbstractAlgorithm(P p, int t): param(p), time(t){};
AbstractAlgorithm(const AbstractAlgorithm& alg) = delete;
AbstractAlgorithm& operator=(const AbstractAlgorithm& alg) = delete;

int getRunTime(){return time;};
virtual T run(T& input) = 0;

protected:
void Delay(){
clock_t now = clock();
while(clock()-now < time);
}
int time;
P param;
};

template <typename T, typename P>
class SeqReadAlgorithm: public AbstractAlgorithm<T, P>{
private:
int frame_cnt = 0;
vector<T> data_buff;
public:
SeqReadAlgorithm(P p): AbstractAlgorithm<T, P>(p, 5){
frame_cnt = 0;
for(int i = 0; i < 500; i++)
data_buff.push_back("frame: " + std::to_string(i) + " param: " + (char*)this->param);
};
T run(T& input) override{
if(frame_cnt >= 500) return NULL;
this->Delay();
T ans = data_buff[frame_cnt++];
cout << ans << endl;
return ans;
};
};

template <typename T, typename P>
class EnhanceAlgorithm: public AbstractAlgorithm<T,P>{
public:
EnhanceAlgorithm(P p): AbstractAlgorithm<T,P>(p, 5){};
T run(T& input) override{
this->Delay();
T ans = input + " Enhancement" + " param: " + to_string(*(int*)this->param);
cout << ans << endl;
return ans;
};
};

template <typename T, typename P>
class DenoiseAlgorithm: public AbstractAlgorithm<T,P>{
public:
DenoiseAlgorithm(P p): AbstractAlgorithm<T,P>(p,5){};
T run(T& input) override{
this->Delay();
T ans = input + " Denoise" + " param: " + to_string(*(float*)this->param);
cout << ans << endl;
return ans;
};
};

template <typename T, typename P>
class AlgorithmFactory{
public:
AlgorithmFactory(){};
AbstractAlgorithm<T, P>* createAlgorim(int type, P parma){
switch (type)
{
case 0:
return new SeqReadAlgorithm<T,P>(parma);
case 1:
return new EnhanceAlgorithm<T,P>(parma);
case 2:
return new DenoiseAlgorithm<T,P>(parma);
default:
return nullptr;
}
};
};
#endif

safeQueue.h 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#ifndef SAFE_QUEUE_H
#define SAFE_QUEUE_H
#include <queue>
#include <mutex>
#include <iostream>

using namespace std;
// ////////////////////////// 线程安全的数据队列 /////////////////////////////
// 只是简单的实现了线程安全的队列,可以优化的地方很多 例如缓冲区数据满后 自动休眠,取数据空后自动休眠
// T 是队列数据类型
template <typename T>
class SafeMatList{
public:
SafeMatList(int sz): max_sz(sz){};
T getItem(){
std::lock_guard<std::mutex> lg(mtx_);
T ans = m_queue.front();
m_queue.pop();
return ans;
}
void pushItem(T& data){
std::lock_guard<std::mutex> lg(mtx_);
m_queue.push(data);
}
bool full(){
std::lock_guard<std::mutex> lg(mtx_);
return m_queue.size() >= max_sz;
}
bool empty(){
std::lock_guard<std::mutex> lg(mtx_);
return m_queue.size() <= 0;
}

private:
int max_sz;
std::queue<T> m_queue;
std::mutex mtx_;
};
#endif

prepocessThread.h 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168

#ifndef PREPOCESS_THREAD_H
#define PREPOCESS_THREAD_H
#include <iostream>
#include <mutex>
#include <thread>

#include "safeQueue.h"
#include "algorithm.h"

using namespace std;

////////////////////////////////////////使用多线程类对预处理算法包装////////////////////////////////////////////
/*
将多个图像预处理算法封装在一个线程类中,按照穿行的方式执行这些算法。新增算法也无须修改该类的代码
类中有三个主要的数据结构,一个是 封装的预处理算法 基类指针 数组,一个是待处理图像队列,另一个是处理就绪的队列指针;
在 run 函数中首先从 待处理队列中获取一个图像,然后串行调用预处理算法,将最后一个算法生成的结果保存在就绪队列中。
由于一个系统中可能有多个多种预处理算法,例如图像去噪,增强,矫正等,每种算法耗时不同,如果每个算法一个线程,对于耗时很短的算法,
会常常在休眠和运行态切换,带来不必要的系统开销,因此我们采用分组多线的形式,即使用一个线程类包装一个或多个算法,尽可能使得每个
线程组的算法运行总耗时接近。
由于线程类对象的构建采用一个单例的生成器构建,使用这个单例生成器首先将算法id列表划分为多个算法组,并实例化线程类,每个线程类包装
一个算法组,实例化的所有线程类都由这个生成器统一管理。
*/

template <typename T, typename P>
class AbstrctThreadAlg{
public:
vector<AbstractAlgorithm<T, P>*> list_algs;
SafeMatList<T>* m_pFinishQueue;
// 构造函数 给数据缓冲区分分配内存
AbstrctThreadAlg(int sz, SafeMatList<T>* pre_queue): m_pReadyQueue(pre_queue){
m_pFinishQueue = nullptr;
if(sz > 0)
m_pFinishQueue = new SafeMatList<T>(sz);
}
~AbstrctThreadAlg(){
for(int i = 0; i < list_algs.size(); i++) delete list_algs[i];
if(m_pFinishQueue != nullptr) delete m_pFinishQueue;
m_pFinishQueue = nullptr;
}
AbstrctThreadAlg(const AbstrctThreadAlg& threadAlg) = delete;
AbstrctThreadAlg& operator=(const AbstrctThreadAlg& threadAlg) = delete;

void run() {
// 如果 输入数据缓冲队列指针为 空,则不读数据,这就需要列表的第一个 alg 为读图的程序
if(!m_pReadyQueue) cout << "This thread does't have input data queue!" << endl;
// 如果 输出数据缓冲队列指针为 空,则不往缓冲区中存储数据
if(!m_pFinishQueue) cout << "This thread does't have out data queue!" << endl;
while(1){
T input_data;
// 输入缓冲区数据空,就等待,直到取出数据
if(m_pReadyQueue){
while(m_pReadyQueue->empty());
input_data = std::move(m_pReadyQueue->getItem());
}
// 串行执行该线程中的算法
for(auto &it : list_algs)
input_data = std::move(it->run(input_data));

// 输出缓冲区数据满了 就等待,直到有存储的位置
if(m_pFinishQueue){
while(m_pFinishQueue->full());
m_pFinishQueue->pushItem(input_data);
}
}
};
protected:
SafeMatList<T>* m_pReadyQueue;
};


/* 线程类的生成器 新增算法时 不需要修改该部分代码,不过可以考虑。这个类默认的算法参数是 void* 类型,以下方面可以优化
1. 优化build函数,使用均衡分组方法,保证每个线程上的任务耗时接近
2. 优化 模板类 使得 算法参数类型 不一定非得是 void* 类型 (现在的做法是在 算法函数中 将 void* 指针强转)
3. ...
*/
template <typename T>
class PrepocessTaskSingleton{
public:
~PrepocessTaskSingleton(){
for(auto &alg : m_pThreadAlgs) delete alg;
};
/* 增加 用到的预处理算法 */
void add_Algorm(int alg_id, void* p){
std::lock_guard<std::mutex> lg(mtx_);
alg_pairs.push_back({alg_id, (void*)(p)});
};
/* 设置 某个线程的队列大小 */
void setDataSize(int i, int n){
std::lock_guard<std::mutex> lg(mtx_);
if(i >= thread_num) return;
data_size[i] = n;
};
/* 获取 某个线程的队列大小 */
int getDataSize(int i){
std::lock_guard<std::mutex> lg(mtx_);
if(i >= thread_num) return -1;
return data_size[i];
};
/* 获取 某个线程的结果队列地址 */
SafeMatList<T>* getResultQueue(int i){
std::lock_guard<std::mutex> lg(mtx_);
if(i >= m_pThreadAlgs.size()) return nullptr;
return m_pThreadAlgs[i];
};
/*构建多线程对象 默认第一个线程的读取数据队列指针为 nullptr, 该线程的第一个算法需要是读取序列图像类似的算法
也可以设置 输入数据缓冲队列地址。采用等数量划分算法组的方法, 不好 */
void build(SafeMatList<T>* q_in = nullptr){
std::lock_guard<std::mutex> lg(mtx_);

SafeMatList<T>* pQueue = q_in;
int N = alg_pairs.size();
int per = N / thread_num + (N%thread_num ? 1 :0);

for(int i = 0; i < thread_num; i++){
vector<pair<int, void*>> p;

for(int j = i*per; j < min((i+1)*per, N); j++)
p.push_back(alg_pairs[j]);

pQueue = per_threadContrust(p, pQueue, data_size[i]);
}
};
/* 开启多线程 运行*/
void run(){
std::lock_guard<std::mutex> lg(mtx_);
vector<std::thread> tasks;
for(auto& algs : m_pThreadAlgs)
tasks.push_back(std::thread(AbstrctThreadAlg<T, void*>::run, algs));
for(auto& t : tasks)
t.join();
};
/*获取 构造器*/
static PrepocessTaskSingleton<T>* getPrepocessBuilder(int nth){
if(singleBuilder != nullptr)
return singleBuilder;
std::lock_guard<std::mutex> lg(mtx_);
if(singleBuilder == nullptr)
singleBuilder = new PrepocessTaskSingleton<T>(nth);
return singleBuilder;
};

private:
PrepocessTaskSingleton(int n): thread_num(n) {
/* 初始化的时候 默认 每个线程的数据缓冲队列大小为 3 最后一个线程的缓冲队列大小为 0,大小为 0 表明不存储生成的结果*/
for(int i=0; i < thread_num-1; i++)
data_size.push_back(3);
data_size.push_back(0);
};
SafeMatList<T>* per_threadContrust(vector<pair<int, void*>>& aps, SafeMatList<T>* data_queu, int sz){
auto p = new AbstrctThreadAlg<T, void*>(sz, data_queu);
m_pThreadAlgs.push_back(p);
AlgorithmFactory<T, void*> algorithm_creator;
for(auto &ap : aps)
p->list_algs.push_back(algorithm_creator.createAlgorim(ap.first, ap.second));

return p->m_pFinishQueue;
};
static std::mutex mtx_; // 保证线程安全
static PrepocessTaskSingleton<T>* singleBuilder; // 单例模式 返回实例化的对象
vector<AbstrctThreadAlg<T, void*>*> m_pThreadAlgs; // 用于管理 实例化的所有线程类
vector<pair<int, void*>> alg_pairs; // 用于存储输入的算法 id 和 对应的参数地址
vector<int> data_size; // 用于存储 每个线程 的缓冲数据队列的 大小
int thread_num; // 生成的线程数量


};
#endif

main.cpp 测试文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include "algorithm.h"
#include "prepocessThread.h"

// 静态成员变量 这个初始化 必须有,因为静态成员变量在初始化的时候分配内存
template<typename T>
PrepocessTaskSingleton<T>* PrepocessTaskSingleton<T>::singleBuilder = nullptr;
template<typename T>
std::mutex PrepocessTaskSingleton<T>::mtx_;


int main(){

auto builder= PrepocessTaskSingleton<string>::getPrepocessBuilder(2);

char* alg0_param = const_cast<char*>("dir");
builder->add_Algorm(0, alg0_param); //增加一个 预处理方法

int alg1_param = 0;
builder->add_Algorm(1, &alg1_param); //增加一个 预处理方法

float alg2_param = 2.1;
builder->add_Algorm(2, &alg2_param); //增加一个 预处理方法

builder->build(); // 构建 算法分组,并使用多个线程类包装
builder->run(); // 运行

return 0;
}