原生并行版的std::accumulate
#include <thread>
#include <string>
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <numeric>
#include <algorithm>
#include <functional>
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) // 1
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads= // 3
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size=length/num_threads; // 4
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1); // 5
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6
threads[i]=std::thread( // 7
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; // 8
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join)); // 10
return std::accumulate(results.begin(),results.end(),init); // 11
}
void printTime(){
// 获取当前时间
std::time_t currentTime = std::time(nullptr);
// 将时间转换为本地时间结构
std::tm* localTime = std::localtime(¤tTime);
// 打印年月日时分秒
std::cout << "Current Time: "
<< localTime->tm_year + 1900 << "-" // 年份是从1900年开始的
<< localTime->tm_mon + 1 << "-" // 月份是从0开始的
<< localTime->tm_mday << " "
<< localTime->tm_hour << ":"
<< localTime->tm_min << ":"
<< localTime->tm_sec << std::endl;
}
int main()
{
std::vector<long> ve;
for(long i=0;i<10000;i++)
for(long j=0;j<1000;j++)
for(long k=0;k<200;k++)
ve.push_back(k);
printTime();
auto sum = std::accumulate(ve.begin(),ve.end(),0);
printTime();
std::cout<<sum<<std::endl;
printTime();
auto sum1 = parallel_accumulate(ve.begin(),ve.end(),0);
//auto sum = std::accumulate(ve.begin(),ve.end(),0);
printTime();
std::cout<<sum1<<std::endl;
}
结果也是:速度基本就是一个4倍速度,还是很可以的。
Current Time: 2023-12-28 11:26:2
Current Time: 2023-12-28 11:26:11
1431504384
Current Time: 2023-12-28 11:26:11
Current Time: 2023-12-28 11:26:13
1431504384
程序的思路:
这段程序实现了一个并行化的累加操作,通过将给定范围内的元素分割成多个块,分别在不同的线程中进行累加,最后将每个线程的结果相加得到最终的累加结果。以下是对程序的主要思想:
计算范围长度: 计算给定迭代器范围 [first, last)
中的元素个数。
确定线程数: 根据范围长度和最小处理块大小(min_per_thread
)确定所需的线程数。这里使用 std::thread::hardware_concurrency()
获取硬件支持的线程数,如果获取失败或为零,则使用默认值 2。
计算块大小: 根据线程数计算每个线程处理的块大小。
初始化结果向量: 创建一个向量 results
用于存储每个线程的累加结果。
创建线程向量: 创建一个向量 threads
用于存储线程对象。
划分块并启动线程: 将范围划分成块,每个块交由一个线程处理。通过迭代器 block_start
和 block_end
定义块的边界。
线程启动: 创建并启动线程,每个线程执行 accumulate_block
结构体的 operator()
操作符,对块中的元素进行累加,结果存储在 results[i]
中。
更新块起始迭代器: 将块的起始迭代器 block_start
更新为当前块的结束迭代器 block_end
。
处理最后一个块: 单独处理最后一个块,避免因为除法余数导致未处理的元素。
等待线程完成: 使用 std::for_each
和 std::mem_fn
等待所有线程完成。
汇总结果: 使用 std::accumulate
将所有线程的结果相加,得到最终累加结果。
该程序通过将累加任务分割成多个块并在多个线程中并行处理,以提高累加的速度。需要注意的是,对于小规模的累加任务,多线程可能会带来额外的开销,因此这种并行化的方式更适用于大规模的累加操作。