C++并行版的std::accumulate

发布时间:2023年12月28日

原生并行版的std::accumulate

#include <thread>
#include <string>
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <numeric>
#include <algorithm>
#include <functional>


template<typename Iterator,typename T>
struct accumulate_block
{
    void operator()(Iterator first,Iterator last,T& result)
    {
        result=std::accumulate(first,last,result);
    }
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
    unsigned long const length=std::distance(first,last);
    if(!length) // 1
        return init;
    unsigned long const min_per_thread=25;
    unsigned long const max_threads=
        (length+min_per_thread-1)/min_per_thread; // 2
    unsigned long const hardware_threads=
        std::thread::hardware_concurrency();
    unsigned long const num_threads=  // 3
        std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size=length/num_threads; // 4
    std::vector<T> results(num_threads);
    std::vector<std::thread> threads(num_threads-1);  // 5
    Iterator block_start=first;
    for(unsigned long i=0; i < (num_threads-1); ++i)
    {
        Iterator block_end=block_start;
        std::advance(block_end,block_size);  // 6
        threads[i]=std::thread(     // 7
            accumulate_block<Iterator,T>(),
            block_start,block_end,std::ref(results[i]));
        block_start=block_end;  // 8
    }
    accumulate_block<Iterator,T>()(
        block_start,last,results[num_threads-1]); // 9
    std::for_each(threads.begin(),threads.end(),
                  std::mem_fn(&std::thread::join));  // 10
    return std::accumulate(results.begin(),results.end(),init); // 11
}


void printTime(){
    // 获取当前时间
    std::time_t currentTime = std::time(nullptr);

    // 将时间转换为本地时间结构
    std::tm* localTime = std::localtime(&currentTime);

    // 打印年月日时分秒
    std::cout << "Current Time: "
              << localTime->tm_year + 1900 << "-"  // 年份是从1900年开始的
              << localTime->tm_mon + 1 << "-"       // 月份是从0开始的
              << localTime->tm_mday << " "
              << localTime->tm_hour << ":"
              << localTime->tm_min << ":"
              << localTime->tm_sec << std::endl;
}

int main()
{
    std::vector<long> ve;
    for(long i=0;i<10000;i++)
        for(long j=0;j<1000;j++)
            for(long k=0;k<200;k++)
            ve.push_back(k);

    printTime();
    auto sum = std::accumulate(ve.begin(),ve.end(),0);
    printTime();
    std::cout<<sum<<std::endl;

    printTime();
    auto sum1 = parallel_accumulate(ve.begin(),ve.end(),0);
    //auto sum = std::accumulate(ve.begin(),ve.end(),0);
    printTime();
    std::cout<<sum1<<std::endl;
}

结果也是:速度基本就是一个4倍速度,还是很可以的。
Current Time: 2023-12-28 11:26:2
Current Time: 2023-12-28 11:26:11
1431504384
Current Time: 2023-12-28 11:26:11
Current Time: 2023-12-28 11:26:13
1431504384

程序的思路:
这段程序实现了一个并行化的累加操作,通过将给定范围内的元素分割成多个块,分别在不同的线程中进行累加,最后将每个线程的结果相加得到最终的累加结果。以下是对程序的主要思想:

  1. 计算范围长度: 计算给定迭代器范围 [first, last) 中的元素个数。

  2. 确定线程数: 根据范围长度和最小处理块大小(min_per_thread)确定所需的线程数。这里使用 std::thread::hardware_concurrency() 获取硬件支持的线程数,如果获取失败或为零,则使用默认值 2。

  3. 计算块大小: 根据线程数计算每个线程处理的块大小。

  4. 初始化结果向量: 创建一个向量 results 用于存储每个线程的累加结果。

  5. 创建线程向量: 创建一个向量 threads 用于存储线程对象。

  6. 划分块并启动线程: 将范围划分成块,每个块交由一个线程处理。通过迭代器 block_startblock_end 定义块的边界。

  7. 线程启动: 创建并启动线程,每个线程执行 accumulate_block 结构体的 operator() 操作符,对块中的元素进行累加,结果存储在 results[i] 中。

  8. 更新块起始迭代器: 将块的起始迭代器 block_start 更新为当前块的结束迭代器 block_end

  9. 处理最后一个块: 单独处理最后一个块,避免因为除法余数导致未处理的元素。

  10. 等待线程完成: 使用 std::for_eachstd::mem_fn 等待所有线程完成。

  11. 汇总结果: 使用 std::accumulate 将所有线程的结果相加,得到最终累加结果。

该程序通过将累加任务分割成多个块并在多个线程中并行处理,以提高累加的速度。需要注意的是,对于小规模的累加任务,多线程可能会带来额外的开销,因此这种并行化的方式更适用于大规模的累加操作。

文章来源:https://blog.csdn.net/simple_core/article/details/135265092
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。