【单例 & 定长 & 优先 & 动态线程池】 ( C++11 | 拒绝策略 | 动态任务分配 | 单例设计模式 )

发布时间:2024年01月17日

目录

简介

概念

优点

实现思路

1. 初始化线程池

2. 线程池工作流程

3. 任务执行

4. 线程生命周期管理

5. 关闭线程池

Include

CV & enums

内嵌类定义

私有成员属性

?私有辅助函数

静态成员类外初始化

?测试函数

完整代码


简介

概念

????????线程池(Thread Pool)是一种基于池化技术的多线程处理模式。目的是为了减少在创建和销毁线程上所花的时间以及系统资源的开销,提高系统的工作效率。线程池维护多个线程,这些线程处于等待状态,准备处理任务。

优点

  • 提高性能:

????????线程池预先创建了线程,因此当任务到达时,无需等待线程的创建过程就能立即开始执行。创建线程是个代价高昂的操作,因此线程池可以减少创建和销毁线程的开销,提高系统整体性能。

  • 控制并发线程数:

????????线程池可以限制系统中并发执行的线程数量,如果没有这个限制,大量的并发线程可能会导致系统负载过高,影响性能。

  • 复用线程:

????????在线程池中,一旦线程完成任务,这个线程可以被复用,用来执行另一项任务。这样可以减少线程的创建和销毁次数。

  • 管理线程生命周期:

????????线程池通常支持定时执行任务、周期执行任务等复杂的线程管理功能,比如Java的ScheduledThreadPoolExecutor。

  • 提高系统稳定性:

????????通过对线程进行统一管理和控制,可以避免因线程数量过多导致的系统崩溃风险。

  • 简化编程模型:

????????使用线程池模型,开发人员只需关心如何提交任务,不必关心如何创建、终止线程,降低了编程复杂性。

实现思路

1. 初始化线程池

  • 预创建线程:根据配置的线程数量预创建一定数量的线程。
  • 任务队列:创建一个任务队列来存放等待执行的任务。
  • 线程状态管理:为线程分配标记,如:工作中、空闲等。

2. 线程池工作流程

  • 线程任务循环:线程从任务队列中取任务执行,执行完后继续取下一个任务。
  • 任务分配:当一个新任务到来时,将它加入任务队列中,或直接分配给空闲线程。

3. 任务执行

  • 执行任务:线程执行取到的任务。
  • 异常处理:确保线程在执行任务时能够处理异常,保证线程可以继续执行后续任务。

4. 线程生命周期管理

  • 空闲时间管理:若线程长时间空闲,则可能将其从线程池中移除以节省系统资源。
  • 动态调整:根据任务的数量,动态增减线程数量。

5. 关闭线程池

  • 温和关闭:不接受新任务,但等待所有任务执行完毕。
  • 立即关闭:尝试停止所有正在执行的任务,并且不再处理队列中等待的任务。

Include

#include <iostream>
#include <cstdint>
#include <stdexcept>
#include <exception>

#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

#include <cstdlib>
#include <ctime>

#include <string>
#include <vector>
#include <queue>
#include <deque>
#include <initializer_list>
#include <memory>

CV & enums

#ifndef __DURAMS_T
#	define __DURAMS_T
typedef std::chrono::duration< double, std::milli > duraMS_t;
#endif 

static std::once_flag flag;
static double CmpFactor { 0.75 };

using ulong = unsigned long;
enum task_priority{ NORMAL = 0, MEDIUM, HIGH,};
enum thread_pool_status { HALT = 0, RUNNING = 1, };
enum rejected_execution_policy { AbortPolicy = 0, CallerRunsPolicy, DiscardLeasestPolicy, };

内嵌类定义

class ThreadPool {
	typedef struct task{ 
		mutable int 		   Priority; 
		std::function< void( void ) > f;
		inline bool operator<( const task& other) const {
             return Priority < other.Priority; }
		} task;
	class Detor {
		public:
			~Detor( void ){
				if( ThreadPool::pInst )delete ThreadPool::pInst;
				ThreadPool::pInst = nullptr;
			}
	};
	class DefExcept : public std::exception{
	public:
		const char * what () const throw () {
		    return "Too many task has been put";
		}
	};
    /*other...*/
};

私有成员属性

private:
	static ThreadPool* pInst;
	
	int Cores;
	int Maximum;
	int Policy;
	duraMS_t KeepAliveMS;
	
	
	std::atomic< int > Busy { 0 };
	
	std::priority_queue< task, std::deque< task >, std::less< task > > TaskQ;
	std::vector< std::thread > WorkerThreads;
	
	std::mutex Mtx;
	std::condition_variable Condv;
	
	int Status { thread_pool_status::RUNNING };

?私有辅助函数

private:
	inline void CreateInstance( const std::initializer_list< size_t >& init ){
		std::vector< size_t > args( init );
		pInst = new ThreadPool( ( int )args.at( 0 ), 
								( int )args.at( 1 ),
								( int )args.at( 2 ), 
								( ulong )args.at( 3 ) );
		static Detor d;
	}
	void Adjust( void ) noexcept{
		WorkerThreads.emplace_back( [ this ]( void )mutable->void {
				while( 1 ){
					std::unique_lock< std::mutex > lk( Mtx, std::try_to_lock );
					if( !lk.owns_lock() ) {
						std::cout << "Miss for one time...\n";
						continue;
					}
					Condv.wait( lk, [this]( void )->bool{ return !TaskQ.empty() || !(Status == thread_pool_status::HALT); } );
					++( this->Busy );
					( TaskQ.top() ).f();
					TaskQ.pop();
					--( this->Busy );
				}
			});
	}
	ThreadPool( int __Cores, 
				int __Maximum, 
				int __Policy,
				ulong __KeepAliveMS )
		:Cores( __Cores )
		,Maximum( __Maximum )
		,Policy( __Policy )
		,KeepAliveMS( std::chrono::milliseconds(__KeepAliveMS) )
	{
		do Adjust(); while( __Cores-- );
	}		

	void handler( std::packaged_task< void( void ) > &pkg ) throw ( std::exception, std::string ){
		switch( this->Policy ){
			case AbortPolicy:
				throw std::exception();
			case CallerRunsPolicy:
				std::thread( std::ref( pkg ) ).join();
				break;
			case DiscardLeasestPolicy:
				/* 待完善 */
				break;
		}				
	}
	
public:
	explicit ThreadPool( const ThreadPool& other) = delete;
	explicit ThreadPool( ThreadPool&& other) = delete;
	ThreadPool& operator=( const ThreadPool& other) = delete;
	ThreadPool& operator=( ThreadPool&& other) = delete;
	static ThreadPool& GetInstance( int __Cores, 
							 		int __Maximum = INT_MAX, 
							 		int __Policy = rejected_execution_policy::AbortPolicy,
							 		ulong __KeepAliveMS = LONG_MAX){
							 	static ThreadPool *pObj { nullptr };
							 	std::call_once( flag, &ThreadPool::CreateInstance,
								 			  ( std::initializer_list< size_t> )
											  { __Cores, __Maximum, __KeepAliveMS } );
							 }
	~ThreadPool( void ){
		{
			std::unique_lock< std::mutex > lk( Mtx );
			Status = thread_pool_status::HALT;
		}
		Condv.notify_all();
		for( auto& t : WorkerThreads ) if( t.joinable() ) t.join();
	}
	
	template< class F,class... Args>
	auto push( int priority, F && f, Args&&... args ) -> std::future< decltype( f( args... ) ) >{
		std::cout << "Number of args:\t" << sizeof...( args ) << '\n';
		std::function< decltype( f( args... ) )() > func = 
			std::bind( std::forward< F >( f ),
			std::forward< Args >( args )... );
		std::shared_ptr< std::packaged_task< decltype( f( args...) )( void ) > > ptr = 
			std::make_shared< std::packaged_task< decltype( f( args... ) )( void ) > >( func );
		std::function< void( void ) > wrapper_func = [ ptr ]( void ) -> void{ (*ptr)(); };
		{
			std::unique_lock< std::mutex > lk( Mtx, std::defer_lock );
			if( TaskQ.size() >= Maximum ){
				try {handler( ( *ptr ) );}
				catch( DefExcept& e ){
					std::cerr << e.what();
				}catch( ... ){
					std::cerr << "Unknown error";
				}
			}
			lk.lock();
			TaskQ.emplace( ( task ){ priority,std::move( wrapper_func ) } );
			
			if( Busy < ( int )( TaskQ.size() * CmpFactor ) 
			 && TaskQ.size() < Maximum)
				Adjust();			 	
		}
		Condv.notify_one();
		return ptr->get_future();
	}

静态成员类外初始化

ThreadPool* ThreadPool::pInst = { nullptr };

?测试函数

int main( void ){
	srand( (unsigned int)time( nullptr ) );
	constexpr int Cores { 5 };
	constexpr int Maximum { 15 };
	constexpr uint32_t KeepAliveMS { 1000 };
	
	register int counter { 10 },
				 taskID { 0 };
	
	auto& ref = ThreadPool::GetInstance(Cores, Maximum, 
										CallerRunsPolicy,
										KeepAliveMS );
	while( counter-- ){
		int randomTimeCost { ( int )( 1 + std::rand() / ( (RAND_MAX + 1u) / 6 ) ) };
		int randomPriority { ( int )( std::rand() / ( (RAND_MAX + 1u) / HIGH ) ) };
		
		std::thread::id thrdID;
		auto futr = ref.push( randomPriority, [ &taskID ] ( int _Cost, std::thread::id& id )mutable -> void{
			char buf[100];
			sprintf( buf, "Runing...(takes %d seconds)\n", ( ++_Cost ) );
			std::cout << buf;
			std::this_thread::sleep_for( std::chrono::seconds( _Cost ) );
//			p.set_value( std::this_thread::get_id() );
			id = std::this_thread::get_id();
			return ;
		}, randomTimeCost, std::ref( thrdID ) );
		
		std::future_status sts;
		bool halt { false };
		do{
			switch ( ( sts = futr.wait_for( std::chrono::seconds( 1 ) ) ) ){
				case std::future_status::deferred:
					std::cout << "deferred\n";
					break;
				case std::future_status::timeout:
					std::cout << "tick 1s timeout\n";
					break;
				case std::future_status::ready:
					std::cout << "ready\n";
					halt = true;
					break;
			}
		}while( !halt );
		
		std::cout << " task:\t" << taskID << " is done by thread:\t" 
				  << thrdID 
				  << '\n' << std::flush;
	} 
	
	return 0;
}

完整代码

#include <iostream>
#include <cstdint>
#include <stdexcept>
#include <exception>

#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

#include <cstdlib>
#include <ctime>

#include <string>
#include <vector>
#include <queue>
#include <deque>
#include <initializer_list>
#include <memory>

#ifndef __DURAMS_T
#	define __DURAMS_T
typedef std::chrono::duration< double, std::milli > duraMS_t;
#endif 

static std::once_flag flag;
static double CmpFactor { 0.75 };

using ulong = unsigned long;
enum task_priority{ NORMAL = 0, MEDIUM, HIGH,};
enum thread_pool_status { HALT = 0, RUNNING = 1, };
enum rejected_execution_policy { AbortPolicy = 0, CallerRunsPolicy, DiscardLeasestPolicy, };
class ThreadPool {
	typedef struct task{ 
		mutable int 		   Priority; 
		std::function< void( void ) > f;
		inline bool operator<( const task& other) const { return Priority < other.Priority; }
		} task;
	class Detor {
		public:
			~Detor( void ){
				if( ThreadPool::pInst )delete ThreadPool::pInst;
				ThreadPool::pInst = nullptr;
			}
	};
	class DefExcept : public std::exception{
	public:
		const char * what () const throw () {
		    return "Too many task has been put";
		}
	};                                  
private:
	static ThreadPool* pInst;
	
	int Cores;
	int Maximum;
	int Policy;
	duraMS_t KeepAliveMS;
	
	
	std::atomic< int > Busy { 0 };
	
	std::priority_queue< task, std::deque< task >, std::less< task > > TaskQ;
	std::vector< std::thread > WorkerThreads;
	
	std::mutex Mtx;
	std::condition_variable Condv;
	
	int Status { thread_pool_status::RUNNING };
private:
	inline void CreateInstance( const std::initializer_list< size_t >& init ){
		std::vector< size_t > args( init );
		pInst = new ThreadPool( ( int )args.at( 0 ), 
								( int )args.at( 1 ),
								( int )args.at( 2 ), 
								( ulong )args.at( 3 ) );
		static Detor d;
	}
	void Adjust( void ) noexcept{
		WorkerThreads.emplace_back( [ this ]( void )mutable->void {
				while( 1 ){
					std::unique_lock< std::mutex > lk( Mtx, std::try_to_lock );
					if( !lk.owns_lock() ) {
						std::cout << "Miss for one time...\n";
						continue;
					}
					Condv.wait( lk, [this]( void )->bool{ return !TaskQ.empty() || !(Status == thread_pool_status::HALT); } );
					++( this->Busy );
					( TaskQ.top() ).f();
					TaskQ.pop();
					--( this->Busy );
				}
			});
	}
	ThreadPool( int __Cores, 
				int __Maximum, 
				int __Policy,
				ulong __KeepAliveMS )
		:Cores( __Cores )
		,Maximum( __Maximum )
		,Policy( __Policy )
		,KeepAliveMS( std::chrono::milliseconds(__KeepAliveMS) )
	{
		do Adjust(); while( __Cores-- );
	}		

	void handler( std::packaged_task< void( void ) > &pkg ) throw ( std::exception, std::string ){
		switch( this->Policy ){
			case AbortPolicy:
				throw std::exception();
			case CallerRunsPolicy:
				std::thread( std::ref( pkg ) ).join();
				break;
			case DiscardLeasestPolicy:
				/* 待完善 */
				break;
		}				
	}
	
public:
	explicit ThreadPool( const ThreadPool& other) = delete;
	explicit ThreadPool( ThreadPool&& other) = delete;
	ThreadPool& operator=( const ThreadPool& other) = delete;
	ThreadPool& operator=( ThreadPool&& other) = delete;
	static ThreadPool& GetInstance( int __Cores, 
							 		int __Maximum = INT_MAX, 
							 		int __Policy = rejected_execution_policy::AbortPolicy,
							 		ulong __KeepAliveMS = LONG_MAX){
							 	static ThreadPool *pObj { nullptr };
							 	std::call_once( flag, &ThreadPool::CreateInstance,
								 			  ( std::initializer_list< size_t> )
											  { __Cores, __Maximum, __KeepAliveMS } );
							 }
	~ThreadPool( void ){
		{
			std::unique_lock< std::mutex > lk( Mtx );
			Status = thread_pool_status::HALT;
		}
		Condv.notify_all();
		for( auto& t : WorkerThreads ) if( t.joinable() ) t.join();
	}
	
	template< class F,class... Args>
	auto push( int priority, F && f, Args&&... args ) -> std::future< decltype( f( args... ) ) >{
		std::cout << "Number of args:\t" << sizeof...( args ) << '\n';
		std::function< decltype( f( args... ) )() > func = 
			std::bind( std::forward< F >( f ),
			std::forward< Args >( args )... );
		std::shared_ptr< std::packaged_task< decltype( f( args...) )( void ) > > ptr = 
			std::make_shared< std::packaged_task< decltype( f( args... ) )( void ) > >( func );
		std::function< void( void ) > wrapper_func = [ ptr ]( void ) -> void{ (*ptr)(); };
		{
			std::unique_lock< std::mutex > lk( Mtx, std::defer_lock );
			if( TaskQ.size() >= Maximum ){
				try {handler( ( *ptr ) );}
				catch( DefExcept& e ){
					std::cerr << e.what();
				}catch( ... ){
					std::cerr << "Unknown error";
				}
			}
			lk.lock();
			TaskQ.emplace( ( task ){ priority,std::move( wrapper_func ) } );
			
			if( Busy < ( int )( TaskQ.size() * CmpFactor ) 
			 && TaskQ.size() < Maximum)
				Adjust();			 	
		}
		Condv.notify_one();
		return ptr->get_future();
	}
	
};
ThreadPool* ThreadPool::pInst = { nullptr };

int main( void ){
	srand( (unsigned int)time( nullptr ) );
	constexpr int Cores { 5 };
	constexpr int Maximum { 15 };
	constexpr uint32_t KeepAliveMS { 1000 };
	
	register int counter { 10 },
				 taskID { 0 };
	
	auto& ref = ThreadPool::GetInstance(Cores, Maximum, 
										CallerRunsPolicy,
										KeepAliveMS );
	while( counter-- ){
		int randomTimeCost { ( int )( 1 + std::rand() / ( (RAND_MAX + 1u) / 6 ) ) };
		int randomPriority { ( int )( std::rand() / ( (RAND_MAX + 1u) / HIGH ) ) };
		
		std::thread::id thrdID;
		auto futr = ref.push( randomPriority, [ &taskID ] ( int _Cost, std::thread::id& id )mutable -> void{
			char buf[100];
			sprintf( buf, "Runing...(takes %d seconds)\n", ( ++_Cost ) );
			std::cout << buf;
			std::this_thread::sleep_for( std::chrono::seconds( _Cost ) );
//			p.set_value( std::this_thread::get_id() );
			id = std::this_thread::get_id();
			return ;
		}, randomTimeCost, std::ref( thrdID ) );
		
		std::future_status sts;
		bool halt { false };
		do{
			switch ( ( sts = futr.wait_for( std::chrono::seconds( 1 ) ) ) ){
				case std::future_status::deferred:
					std::cout << "deferred\n";
					break;
				case std::future_status::timeout:
					std::cout << "tick 1s timeout\n";
					break;
				case std::future_status::ready:
					std::cout << "ready\n";
					halt = true;
					break;
			}
		}while( !halt );
		
		std::cout << " task:\t" << taskID << " is done by thread:\t" 
				  << thrdID 
				  << '\n' << std::flush;
	} 
	
	return 0;
}

文章来源:https://blog.csdn.net/RFoer/article/details/135637973
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。