? ? ? ?
目录
???????? 前面的示例中,通过创建一组线程并通过线程ID与线程数来人为的定义每个线程需要处理的数据,这是一个常规的多线程任务分配的技巧;但是在openmp中通过#pragma omp for指令,可以通过简单的并行化循环构造即可快速实现前述的任务分配环节。
?????????需要注意for构造指令需要在parallel构造的并行区域内才可以多线程运行。
#include <iostream>
#include <omp.h>
#include <vector>
//#define NTHREADS 4
void task_manual() {
std::vector<double> a{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
std::vector<double> b{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
size_t N = a.size();
double start_time = omp_get_wtime();
#pragma omp parallel
{
//手动分配每个线程需要执行哪个任务
int id, i, Nthrds, istart, iend;
id = omp_get_thread_num();
Nthrds = omp_get_num_threads();
istart = id * N / Nthrds;
iend = (id + 1) * N / Nthrds;
if (id == Nthrds - 1) iend = N;
for (i = istart; i < iend; i++) {
a[i] = a[i] + b[i];
}
}
std::cout << "cost time: " << omp_get_wtime() - start_time << "s" << std::endl;
for (double &i: a) {
std::cout << i << " ";
}
}
void task_omp() {
std::vector<double> a{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
std::vector<double> b{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
size_t N = a.size();
double start_time = omp_get_wtime();
#pragma omp parallel default(shared)
{
//通过omp for指令让线程组自动迭代处理
//请注意循环中不要共享循环控制索引i,否则会导致线程间的数据竞争
#pragma omp for
for (int i = 0; i < N; i++) {
a[i] = a[i] + b[i];
}//此处存在一个隐式栅栏,所有线程都需要在这里等待与同步
}
std::cout << "cost time: " << omp_get_wtime() - start_time << "s" << std::endl;
for (double &i: a) {
std::cout << i << " ";
}
}
int main() {
task_omp();
}
cost time: 0.00808858s
2 4 6 8 10 12 14 16 18 20 22 24 26 28 30
Process finished with exit code 0
另外,可以将OMP构造指令组合在一起使用,如下示例所示:
void task_omp_simple() {
std::vector<double> a{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
std::vector<double> b{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
size_t N = a.size();
double start_time = omp_get_wtime();
//合并写在一起
#pragma omp parallel for
{
for (int i = 0; i < N; i++) {
a[i] = a[i] + b[i];
}
}
std::cout << "cost time: " << omp_get_wtime() - start_time << "s" << std::endl;
for (double &i: a) {
std::cout << i << " ";
}
}
? ? ? ? 规约可以通过多线程加速实现实现数组的累加、累乘、查找等工作。
openmp的规约通过指令reduction(op:list)实现,其中op定义了本次规约运算的操作符,如+、-、*、min、max、逻辑运算符、位运算符等。同时注意omp中的规约子句中一次只能指定一个运算符。
其中个运算符线程的私有变量初值如下:
运算符 | 初始值 |
+ | 0 |
- | 0 |
* | 1 |
min | 变量类型最大正数 |
max | 变量类型最大负数 |
#include <iostream>
#include <omp.h>
#include <vector>
//串行规约的实现
void plain(std::vector<double> &a) {
double ave = 0;
double start_time = omp_get_wtime();
for (double i: a) {
ave += i;
}
ave = ave / a.size();
std::cout << "result: " << ave << ",cost time: " << omp_get_wtime() - start_time << std::endl;
}
//omp 规约的实现
void reduce(std::vector<double> &a) {
double ave = 0;
double start_time = omp_get_wtime();
/*
omp的规约实现中,将会为每个线程创建一个同名的ave变量的私有副本,
并根据规约运算符完成私有副本的初始化,待每个线程完成各自部分的规约计算后,
在结尾的隐式栅栏处完成变量的合并计算。
*/
#pragma omp parallel for reduction(+:ave)
for (double i: a) {
ave += i;
}//此处包含隐式栅栏
ave = ave / a.size();
std::cout << "result: " << ave << ",cost time: " << omp_get_wtime() - start_time << std::endl;
}
int main() {
std::vector<double> a;
int vectorSize = 1e8;
a.reserve(vectorSize);
for (int i = 0; i < vectorSize; ++i) {
a.emplace_back(1);
}
plain(a);
reduce(a);
}
result: 1,cost time: 0.0952761
result: 1,cost time: 0.0465072
Process finished with exit code 0
? ? ? ? 在OpenMP显示API中包含两个常用schedule子句,分别为static、dynamic。使用方法如下:
schedule (static [,chunk]),chunk默认为1。
schedule (dynamic [,chunk]),chunk默认为1。
? ? ? ? 静态调度在编译的过程中将会将共享工作循环迭代地映射到线程上。如果没有指定每一个分块chunk的大小,编译器会默认将循环迭代分解成与可见线程数量相等的分块,并每个线程赋予一个分块,就像【OpenMP】 2.1 简单示例-CSDN博客中multi_block函数的实现方式一样。
? ? ? ? 如果手动指定了分块大小,那么OMP会将循环分成连续的指定大小的分块,并通过轮询调度的方式分配给每个线程。通过静态调度对复杂度相近的代码段可以实现较高的并行性能;同时需要确保每个线程与各自缓存层次之间的相互配合,增加数据从缓存中重用的机会,减少因为数据移动与内存带宽等导致的计算等待。
? ? ? ? 示例如下,代码中每个线程独立处理一部分连续的数据块:
#include <stdio.h>
#include <math.h>
#include <omp.h>
#include <iostream>
#define ITER 20
int main() {
int i;
double A[ITER];
for (i = 0; i < ITER; ++i)
A[i] = 2.0 * i;
#pragma omp parallel default(none) shared(A, std::cout)
{
int i;
int id = omp_get_thread_num();
double tdata = omp_get_wtime();
/*
使用静态调度,每次每个线程独立处理4个连续的数据
*/
#pragma omp for schedule(static, 5)
for (i = 1; i < ITER; i++) {
//避免输出错乱
# pragma omp critical
std::cout << "当前线程id: " << omp_get_thread_num() << "正在处理的数据的索引: " << i << std::endl;
A[i] = A[i] * sqrt(i) / pow(sin(i), tan(i));
}
tdata = omp_get_wtime() - tdata;
if (id == 0) printf("Time spent is %f sec \n", tdata);
}
}
? ? ? ? 结果:
当前线程id: 1正在处理的数据的索引: 6
当前线程id: 3正在处理的数据的索引: 16
当前线程id: 0正在处理的数据的索引: 1
当前线程id: 0正在处理的数据的索引: 2
当前线程id: 1正在处理的数据的索引: 7
当前线程id: 0正在处理的数据的索引: 3
当前线程id: 3正在处理的数据的索引: 17
当前线程id: 0正在处理的数据的索引: 4
当前线程id: 3正在处理的数据的索引: 18
当前线程id: 0正在处理的数据的索引: 5
当前线程id: 3正在处理的数据的索引: 19
当前线程id: 1正在处理的数据的索引: 8
当前线程id: 1正在处理的数据的索引: 9
当前线程id: 1正在处理的数据的索引: 10
当前线程id: 2正在处理的数据的索引: 11
当前线程id: 2正在处理的数据的索引: 12
当前线程id: 2正在处理的数据的索引: 13
当前线程id: 2正在处理的数据的索引: 14
当前线程id: 2正在处理的数据的索引: 15
Time spent is 0.000420 sec
Process finished with exit code 0
? ? ? ? 当每次迭代中代码的运行时间大致一样时,静态调度可以发挥最大的性能;但是当每次迭代中运行的时间不能确定时,比如粒子模拟的代码,有的粒子需要大量的计算,有的粒子则不需要;这样的迭代在静态调度的轮询算法中,可能把所有都需要大量计算的部分块都分给了一个线程执行,那么其余线程早早执行完自己的部分就会在原地等待直到最后一个线程处理完成。也有可能在当前的多核心异构系统中,比如Intel12代及以上的CPU中包含大小核架构的处理器中,核心以不同的频率运行,使得有些线程的运行速度会更快的完成工作;如果使用静态调度器则无法考虑到这种差异,都会使得部分线程等待,降低并行性能。
? ? ? ? 上述两种情况都需要在运行时才可以知道每个线程的工作量,因此omp中提供了动态调度dynamic。
????????下述代码中,通过do_sometng函数来模型不同任务的处理时长变化的情况,并通过动态调度,每个线程每次分配一个独立的任务让其处理。
#include <omp.h>
#include <iostream>
#include <chrono> // std::chrono::seconds
#include <thread> // std::this_thread::sleep_for
#define ITER 20
void do_someting(int seconds) {
std::this_thread::sleep_for(std::chrono::seconds(seconds));
}
int main() {
srand((unsigned) time(0));
#pragma omp parallel default(none) shared( std::cout)
{
int i;
int id = omp_get_thread_num();
double tdata = omp_get_wtime();
/*
使用动态调度,每次每个线程独立处理1个的数据
*/
#pragma omp for schedule(dynamic, 1)
for (i = 1; i < ITER; i++) {
int sleep_seconds = int(rand()) % 3;
//避免输出错乱
# pragma omp critical
std::cout << "当前线程id: " << omp_get_thread_num()
<< "当前数据将要处理: " << sleep_seconds << "秒。" << std::endl;
do_someting(sleep_seconds);
}
tdata = omp_get_wtime() - tdata;
if (id == 0) printf("Time spent is %f sec \n", tdata);
}
}
????????结果可以看到,线程3因为每次都要处理较长的时间,所以只处理了20个任务中的3个任务;而线程0因为任务较轻,所以处理了6个任务;可以看出动态调度根据每个线程的耗时进行任务的动态分配;避免了某些线程执行完后等待其他线程的情况。
当前线程id: 2当前数据将要处理: 1秒。
当前线程id: 3当前数据将要处理: 1秒。
当前线程id: 1当前数据将要处理: 0秒。
当前线程id: 1当前数据将要处理: 2秒。
当前线程id: 0当前数据将要处理: 1秒。
当前线程id: 2当前数据将要处理: 0秒。
当前线程id: 2当前数据将要处理: 1秒。
当前线程id: 3当前数据将要处理: 1秒。
当前线程id: 0当前数据将要处理: 0秒。
当前线程id: 0当前数据将要处理: 0秒。
当前线程id: 0当前数据将要处理: 0秒。
当前线程id: 0当前数据将要处理: 2秒。
当前线程id: 2当前数据将要处理: 2秒。
当前线程id: 1当前数据将要处理: 1秒。
当前线程id: 3当前数据将要处理: 2秒。
当前线程id: 1当前数据将要处理: 0秒。
当前线程id: 1当前数据将要处理: 2秒。
当前线程id: 0当前数据将要处理: 1秒。
当前线程id: 2当前数据将要处理: 2秒。
Time spent is 6.000432 sec
如果是静态调度,则每个线程都要独立处理5任务(总共20个任务,4个可用线程);修改动态调度代码为:
#pragma omp for schedule(static, 5)
那么结果如下,可以看到部分线程完成任务后需要等待其他线程执行完后才能一起退出。
当前线程id: 3当前数据将要处理: 0秒。
当前线程id: 2当前数据将要处理: 2秒。
当前线程id: 3当前数据将要处理: 1秒。
当前线程id: 1当前数据将要处理: 2秒。
当前线程id: 0当前数据将要处理: 2秒。
当前线程id: 3当前数据将要处理: 1秒。
当前线程id: 2当前数据将要处理: 0秒。
当前线程id: 1当前数据将要处理: 1秒。
当前线程id: 2当前数据将要处理: 0秒。
当前线程id: 2当前数据将要处理: 0秒。
当前线程id: 2当前数据将要处理: 2秒。
当前线程id: 0当前数据将要处理: 2秒。
当前线程id: 3当前数据将要处理: 1秒。
当前线程id: 1当前数据将要处理: 1秒。
当前线程id: 0当前数据将要处理: 2秒。
当前线程id: 1当前数据将要处理: 0秒。
当前线程id: 1当前数据将要处理: 1秒。
当前线程id: 0当前数据将要处理: 1秒。
当前线程id: 0当前数据将要处理: 1秒。
Time spent is 8.000411 sec
Process finished with exit code 0
schedule子句 | 静态调度(static) | 动态调度(dynamic) |
默认块大小 | 1 | 1 |
何时使用 | 每次迭代运行时间相近 | 每次迭代运行时间方差大 |
调度器时间开销 | 少 | 多 |
????????虽然动态调度看上去很不错,但是需要注意的是动态调度提供了线程间的自动负载均衡,其调度器的开销是巨大的;如果可以将不同复杂度的任务均匀的分配到各个线程中,则最好使用多个chunk为一组的静态调度器。
? ? ? ? 如下是对均匀复杂度的数据使用静态调度与动态调度的时间对比: