如果觉得写的可以,请给一个点赞+关注支持一下
观看之前请先看,往期的博客教程,否则这篇博客没办法看懂
上一篇博客中讲了串行任务流,有了串行,那必然也有并行,本篇博客讲解任务流并行执行
SubTask
是所有任务的基类
first
参数是这个任务流序列执行的首个任务callback
是这个序列执行完毕后执行的回调函数using series_callback_t = std::function<void (const SeriesWork *)>;
inline SeriesWork *
Workflow::create_series_work(SubTask *first, series_callback_t callback)
{
return new SeriesWork(first, std::move(callback));
}
callback
是设置并行任务流中所有任务执行完毕之后调用的回调函数ParallelWork *
并行任务指针using parallel_callback_t = std::function<void (const ParallelWork *)>;
inline ParallelWork *
Workflow::create_parallel_work(parallel_callback_t callback)
{
return new ParallelWork(std::move(callback));
}
class ParallelWork{
public:
void add_series(SeriesWork *series);
}
httpCallback
异步回调函数,httpCallback
执行完毕后调用所在序列的序列回调函数,当所有的序列回调函数执行完毕之后在执行并行任务流的parallelCallback
回调函数#include <vector>
#include <workflow/WFFacilities.h>
#include <workflow/Workflow.h>
#include <workflow/HttpUtil.h>
struct SeriesContext{
std::string url;
int state;
int error;
protocol::HttpResponse resp;//响应报文的完整内容
};
void parallelCallback(const ParallelWork *pwork){
fprintf(stderr,"pwork callback!\n");
SeriesContext *context;
for(size_t i = 0; i != pwork->size(); ++i){
context = static_cast<SeriesContext *>(pwork->series_at(i)->get_context());
fprintf(stderr,"url = %s\n", context->url.c_str());
if(context->state == WFT_STATE_SUCCESS){
const void *body;
size_t size;
context->resp.get_parsed_body(&body,&size);
fwrite(body,1,size,stderr);
fprintf(stderr,"\n");
}
else{
fprintf(stderr,"Error, state = %d, error = %d\n", context->state, context->error);
}
delete context;
}
}
void httpCallback(WFHttpTask *httpTask){
SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());
fprintf(stderr,"httpTask callback, url = %s\n", context->url.c_str());
context->state = httpTask->get_state();
context->error = httpTask->get_error();
context->resp = std::move(*httpTask->get_resp());
}
int main(){
//使用工厂函数,创建一个并行任务
ParallelWork *pwork = Workflow::create_parallel_work(parallelCallback);//Workflow::create_parallel_work
std::vector<std::string> urlVec ={"http://192.168.135.129:81", "http://192.168.135.129","http://47.94.147.94"};
for(size_t i = 0; i != urlVec.size() ; ++i){
//创建若干个任务
// WFTaskFactory::create_http_task
std::string url = urlVec[i];
auto httpTask = WFTaskFactory::create_http_task(url,0,5,httpCallback);
// 修改任务的属性
auto req = httpTask->get_req();
req->add_header_pair("Accept","*/*");
req->add_header_pair("User-Agent","myHttpTask");
req->set_header_pair("Connection", "Close");
//为响应的内容申请一片堆空间
SeriesContext *context = new SeriesContext;
context->url = std::move(url);
// 为每个任务创建一个序列
auto series = Workflow::create_series_work(httpTask,nullptr);
// 把存储响应内容的指针 拷贝到序列的context当中。
series->set_context(context);
//把序列加入到并行任务中
// add_series
pwork->add_series(series);
}
pwork->start();//启动并行任务
}