class CThreadManage
{
private:
CThreadPool* m_Pool;
int m_NumOfThread;
protected:
public:
CThreadManage();
CThreadManage(int num);
virtual ~CThreadManage();
void SetParallelNum(int num);
void Run(CJob* job,void* jobdata);
void TerminateAll(void);
};
CThreadManage::CThreadManage()
{
m_NumOfThread = 10;
m_Pool = new CThreadPool(m_NumOfThread);
}
CThreadManage::CThreadManage(int num)
{
m_NumOfThread = num;
m_Pool = new CThreadPool(m_NumOfThread);
}
CThreadManage::~CThreadManage()
{
if(NULL != m_Pool)
delete m_Pool;
}
void CThreadManage::SetParallelNum(int num)
{
m_NumOfThread = num;
}
void CThreadManage::Run(CJob* job,void* jobdata)
{
m_Pool->Run(job,jobdata);
}
void CThreadManage::TerminateAll(void)
{
m_Pool->TerminateAll();
}
class CThread
{
private:
int m_ErrCode;
Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize
unsigned long m_ThreadID;
bool m_Detach; //The thread is detached
bool m_CreateSuspended; //if suspend after creating
char* m_ThreadName;
ThreadState m_ThreadState; //the state of the thread
protected:
void SetErrcode(int errcode){m_ErrCode = errcode;}
static void* ThreadFunction(void*);
public:
CThread();
CThread(bool createsuspended,bool detach);
virtual ~CThread();
virtual void Run(void) = 0;
void SetThreadState(ThreadState state){m_ThreadState = state;}
bool Terminate(void); //Terminate the threa
bool Start(void); //Start to execute the thread
void Exit(void);
bool Wakeup(void);
ThreadState GetThreadState(void){return m_ThreadState;}
int GetLastError(void){return m_ErrCode;}
void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);}
char* GetThreadName(void){return m_ThreadName;}
int GetThreadID(void){return m_ThreadID;}
bool SetPriority(int priority);
int GetPriority(void);
int GetConcurrency(void);
void SetConcurrency(int num);
bool Detach(void);
bool Join(void);
bool Yield(void);
int Self(void);
};
class CThreadPool
{
friend class CWorkerThread;
private:
unsigned int m_MaxNum; //the max thread num that can create at the same time
unsigned int m_AvailLow; //The min num of idle thread that shoule kept
unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time
unsigned int m_AvailNum; //the normal thread num of idle num;
unsigned int m_InitNum; //Normal thread num;
protected:
CWorkerThread* GetIdleThread(void);
void AppendToIdleList(CWorkerThread* jobthread);
void MoveToBusyList(CWorkerThread* idlethread);
void MoveToIdleList(CWorkerThread* busythread);
void DeleteIdleThread(int num);
void CreateIdleThread(int num);
public:
CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock
CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock
CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock
CThreadMutex m_VarMutex;
CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list
CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list
CCondition m_IdleJobCond; //m_JobCond is used to sync job list
CCondition m_MaxNumCond;
vector<CWorkerThread*> m_ThreadList;
vector<CWorkerThread*> m_BusyList; //Thread List
vector<CWorkerThread*> m_IdleList; //Idle List
CThreadPool();
CThreadPool(int initnum);
virtual ~CThreadPool();
void SetMaxNum(int maxnum){m_MaxNum = maxnum;}
int GetMaxNum(void){return m_MaxNum;}
void SetAvailLowNum(int minnum){m_AvailLow = minnum;}
int GetAvailLowNum(void){return m_AvailLow;}
void SetAvailHighNum(int highnum){m_AvailHigh = highnum;}
int GetAvailHighNum(void){return m_AvailHigh;}
int GetActualAvailNum(void){return m_AvailNum;}
int GetAllNum(void){return m_ThreadList.size();}
int GetBusyNum(void){return m_BusyList.size();}
void SetInitNum(int initnum){m_InitNum = initnum;}
int GetInitNum(void){return m_InitNum;}
void TerminateAll(void);
void Run(CJob* job,void* jobdata);
};
CWorkerThread* CThreadPool::GetIdleThread(void)
{
while(m_IdleList.size() ==0 )
m_IdleCond.Wait();
m_IdleMutex.Lock();
if(m_IdleList.size() > 0 )
{
CWorkerThread* thr = (CWorkerThread*)m_IdleList.front();
printf("Get Idle thread %d\n",thr->GetThreadID());
m_IdleMutex.Unlock();
return thr;
}
m_IdleMutex.Unlock();
return NULL;
}
//create num idle thread and put them to idlelist
void CThreadPool::CreateIdleThread(int num)
{
for(int i=0;i<num;i++){
CWorkerThread* thr = new CWorkerThread();
thr->SetThreadPool(this);
AppendToIdleList(thr);
m_VarMutex.Lock();
m_AvailNum++;
m_VarMutex.Unlock();
thr->Start(); //begin the thread,the thread wait for job
}
}
void CThreadPool::Run(CJob* job,void* jobdata)
{
assert(job!=NULL);
//if the busy thread num adds to m_MaxNum,so we should wait
if(GetBusyNum() == m_MaxNum)
m_MaxNumCond.Wait();
if(m_IdleList.size()<m_AvailLow)
{
if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
CreateIdleThread(m_InitNum-m_IdleList.size());
else
CreateIdleThread(m_MaxNum-GetAllNum());
}
CWorkerThread* idlethr = GetIdleThread();
if(idlethr !=NULL)
{
idlethr->m_WorkMutex.Lock();
MoveToBusyList(idlethr);
idlethr->SetThreadPool(this);
job->SetWorkThread(idlethr);
printf("Job is set to thread %d \n",idlethr->GetThreadID());
idlethr->SetJob(job,jobdata);
}
}
for(int i=0;i<m_InitNum;i++)
{
CWorkerThread* thr = new CWorkerThread();
AppendToIdleList(thr);
thr->SetThreadPool(this);
thr->Start(); //begin the thread,the thread wait for job
}
if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum )
CreateIdleThread(m_InitNum-m_IdleList.size());
else
CreateIdleThread(m_MaxNum-GetAllNum());
class CJob
{
private:
int m_JobNo; //The num was assigned to the job
char* m_JobName; //The job name
CThread *m_pWorkThread; //The thread associated with the job
public:
CJob( void );
virtual ~CJob();
int GetJobNo(void) const { return m_JobNo; }
void SetJobNo(int jobno){ m_JobNo = jobno;}
char* GetJobName(void) const { return m_JobName; }
void SetJobName(char* jobname);
CThread *GetWorkThread(void){ return m_pWorkThread; }
void SetWorkThread ( CThread *pWorkThread ){
m_pWorkThread = pWorkThread;
}
virtual void Run ( void *ptr ) = 0;
};
class CXJob:public CJob
{
public:
CXJob(){i=0;}
~CXJob(){}
void Run(void* jobdata) {
printf("The Job comes from CXJOB\n");
sleep(2);
}
};
class CYJob:public CJob
{
public:
CYJob(){i=0;}
~CYJob(){}
void Run(void* jobdata) {
printf("The Job comes from CYJob\n");
}
};
main()
{
CThreadManage* manage = new CThreadManage(10);
for(int i=0;i<40;i++)
{
CXJob* job = new CXJob();
manage->Run(job,NULL);
}
sleep(2);
CYJob* job = new CYJob();
manage->Run(job,NULL);
manage->TerminateAll();
}
struct TC_ThreadPool_Exception : public TC_Exception
{
TC_ThreadPool_Exception(const string &buffer) : TC_Exception(buffer){};
TC_ThreadPool_Exception(const string &buffer, int err) : TC_Exception(buffer, err){};
~TC_ThreadPool_Exception () throw (){};
};
/**
* @brief 用通线程池类, 与tc_functor, tc_functorwrapper配合使用.
*
* 使用方式说明:
* 1 采用tc_functorwrapper封装一个调用
* 2 用tc_threadpool对调用进行执行
* 具体示例代码请参见:test/test_tc_thread_pool.cpp
*/
/**线程池本身继承自锁,可以帮助锁定**/
class TC_ThreadPool : public TC_ThreadLock
{
public:
/**
* @brief 构造函数
*
*/
TC_ThreadPool ();
/**
* @brief 析构, 会停止所有线程
*/
~TC_ThreadPool ();
/**
* @brief 初始化.
*
* @param num 工作线程个数
*/
void init(size_t num);
/**
* @brief 获取线程个数.
*
* @return size_t 线程个数
*/
size_t getThreadNum() { Lock sync(* this); return _jobthread. size(); }
/**
* @brief 获取线程池的任务数( exec添加进去的).
*
* @return size_t 线程池的任务数
*/
size_t getJobNum() { return _jobqueue. size(); }
/**
* @brief 停止所有线程
*/
void stop();
/**
* @brief 启动所有线程
*/
void start();
/**
* @brief 启动所有线程并, 执行初始化对象.
*
* @param ParentFunctor
* @param tf
*/
template<class ParentFunctor>
void start(const TC_FunctorWrapper< ParentFunctor> &tf)
{
for(size_t i = 0; i < _jobthread .size(); i++)
{
_startqueue. push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
}
start();
}
/**
* @brief 添加对象到线程池执行,该函数马上返回,
* 线程池的线程执行对象
*/
template<class ParentFunctor>
void exec(const TC_FunctorWrapper< ParentFunctor> &tf)
{
_jobqueue.push_back(new TC_FunctorWrapper<ParentFunctor >(tf));
}
/**
* @brief 等待所有工作全部结束(队列无任务, 无空闲线程).
*
* @param millsecond 等待的时间( ms), -1:永远等待
* @return true, 所有工作都处理完毕
* false,超时退出
*/
bool waitForAllDone(int millsecond = -1);
public:
/**
* @brief 线程数据基类,所有线程的私有数据继承于该类
*/
class ThreadData
{
public:
/**
* @brief 构造
*/
ThreadData(){};
/**
* @brief 析够
*/
virtual ~ThreadData(){};
/**
* @brief 生成数据.
*
* @ param T
* @return ThreadData*
*/
template<typename T>
static T* makeThreadData()
{
return new T;
}
};
/**
* @brief 设置线程数据.
*
* @param p 线程数据
*/
static void setThreadData(ThreadData *p);
/**
* @brief 获取线程数据.
*
* @return ThreadData* 线程数据
*/
static ThreadData* getThreadData();
/**
* @brief 设置线程数据, key需要自己维护.
*
* @param pkey 线程私有数据key
* @param p 线程指针
*/
static void setThreadData(pthread_key_t pkey, ThreadData *p);
/**
* @brief 获取线程数据, key需要自己维护.
*
* @param pkey 线程私有数据key
* @return 指向线程的ThreadData*指针
*/
static ThreadData* getThreadData(pthread_key_t pkey);
protected:
/**
* @brief 释放资源.
*
* @param p
*/
static void destructor(void *p);
/**
* @brief 初始化key
*/
class KeyInitialize
{
public:
/**
* @brief 初始化key
*/
KeyInitialize()
{
int ret = pthread_key_create(&TC_ThreadPool::g_key, TC_ThreadPool::destructor);
if(ret != 0)
{
throw TC_ThreadPool_Exception("[TC_ThreadPool::KeyInitialize] pthread_key_create error", ret);
}
}
/**
* @brief 释放key
*/
~KeyInitialize()
{
pthread_key_delete(TC_ThreadPool::g_key);
}
};
/**
* @brief 初始化key的控制
*/
static KeyInitialize g_key_initialize;
/**
* @brief 数据key
*/
static pthread_key_t g_key;
protected:
/**
* @brief 线程池中的工作线程
*/
class ThreadWorker : public TC_Thread
{
public:
/**
* @brief 工作线程构造函数.
*
* @ param tpool
*/
ThreadWorker(TC_ThreadPool *tpool);
/**
* @brief 通知工作线程结束
*/
void terminate();
protected:
/**
* @brief 运行
*/
virtual void run();
protected:
/**
* 线程池指针
*/
TC_ThreadPool * _tpool;
/**
* 是否结束线程
*/
bool _bTerminate;
};
protected:
/**
* @brief 清除
*/
void clear();
/**
* @brief 获取任务, 如果没有任务, 则为NULL.
*
* @return TC_FunctorWrapperInterface*
*/
TC_FunctorWrapperInterface * get(ThreadWorker *ptw);
/**
* @brief 获取启动任务.
*
* @return TC_FunctorWrapperInterface*
*/
TC_FunctorWrapperInterface * get();
/**
* @brief 空闲了一个线程.
*
* @param ptw
*/
void idle(ThreadWorker *ptw);
/**
* @brief 通知等待在任务队列上的工作线程醒来
*/
void notifyT();
/**
* @brief 是否处理结束.
*
* @return bool
*/
bool finish();
/**
* @brief 线程退出时调用
*/
void exit();
friend class ThreadWorker;
protected:
/**
* 任务队列
*/
TC_ThreadQueue< TC_FunctorWrapperInterface*> _jobqueue;
/**
* 启动任务
*/
TC_ThreadQueue< TC_FunctorWrapperInterface*> _startqueue;
/**
* 工作线程
*/
std::vector<ThreadWorker *> _jobthread;
/**
* 繁忙线程
*/
std::set<ThreadWorker *> _busthread;
/**
* 任务队列的锁
*/
TC_ThreadLock _tmutex;
/**
* 是否所有任务都执行完毕
*/
bool _bAllDone;
};
TC_ThreadPool ::ThreadWorker::ThreadWorker(TC_ThreadPool *tpool)
: _tpool (tpool)
, _bTerminate ( false)
{
}
void TC_ThreadPool ::ThreadWorker::terminate()
{
_bTerminate = true;
_tpool->notifyT();
}
void TC_ThreadPool ::ThreadWorker::run()
{
//调用初始化部分
TC_FunctorWrapperInterface *pst = _tpool->get();
if(pst)
{
try
{
(*pst)();
}
catch ( ... )
{
}
delete pst;
pst = NULL;
}
//调用处理部分
while (! _bTerminate)
{
TC_FunctorWrapperInterface *pfw = _tpool->get( this);
if(pfw != NULL)
{
auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
try
{
(*pfw)();
}
catch ( ... )
{
}
_tpool->idle( this);
}
}
//结束
_tpool->exit();
}
每个工作线程在刚开始时都会执行一下初始化操作,并进入一个无限循环的部分//调用处理部分
while (! _bTerminate)
{
TC_FunctorWrapperInterface *pfw = _tpool->get( this);
if(pfw != NULL)
{
auto_ptr< TC_FunctorWrapperInterface> apfw(pfw);
try
{
(*pfw)();
}
catch ( ... )
{
}
_tpool->idle( this);
}
}
TC_FunctorWrapperInterface *TC_ThreadPool:: get(ThreadWorker *ptw)
{
TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
{
return NULL;
}
{
Lock sync( _tmutex);
_busthread. insert(ptw);
}
return pFunctorWrapper;
}
void TC_ThreadPool:: idle(ThreadWorker *ptw)
{
Lock sync( _tmutex);
_busthread. erase(ptw);
//无繁忙线程, 通知等待在线程池结束的线程醒过来
if( _busthread. empty())
{
_bAllDone = true;
_tmutex.notifyAll();
}
}
if( _busthread. empty())
{
_bAllDone = true;
_tmutex.notifyAll();
}
bool TC_ThreadPool:: waitForAllDone( int millsecond)
{
Lock sync( _tmutex);
start1:
//任务队列和繁忙线程都是空的
if (finish())
{
return true;
}
//永远等待
if(millsecond < 0)
{
_tmutex.timedWait(1000);
goto start1;
}
int64_t iNow = TC_Common:: now2ms();
int m = millsecond;
start2:
bool b = _tmutex.timedWait(millsecond);
//完成处理了
if(finish())
{
return true;
}
if(!b)
{
return false;
}
millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
goto start2;
return false;
}
_tmutex.timedWait(millsecond)方法唤醒。反复判断是否所有的工作是否完成:
bool TC_ThreadPool:: finish()
{
return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
}
TC_ThreadPool ::KeyInitialize TC_ThreadPool::g_key_initialize;
pthread_key_t TC_ThreadPool::g_key ;
void TC_ThreadPool::destructor( void *p)
{
ThreadData *ttd = ( ThreadData*)p;
if(ttd)
{
delete ttd;
}
}
void TC_ThreadPool::exit()
{
TC_ThreadPool:: ThreadData *p = getThreadData();
if(p)
{
delete p;
int ret = pthread_setspecific( g_key, NULL );
if(ret != 0)
{
throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
}
}
_jobqueue. clear();
}
void TC_ThreadPool::setThreadData( TC_ThreadPool:: ThreadData *p)
{
TC_ThreadPool:: ThreadData *pOld = getThreadData();
if(pOld != NULL && pOld != p)
{
delete pOld;
}
int ret = pthread_setspecific( g_key, ( void *)p);
if(ret != 0)
{
throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
}
}
TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData ()
{
return ( ThreadData *) pthread_getspecific( g_key);
}
void TC_ThreadPool::setThreadData( pthread_key_t pkey, ThreadData *p)
{
TC_ThreadPool:: ThreadData *pOld = getThreadData(pkey);
if(pOld != NULL && pOld != p)
{
delete pOld;
}
int ret = pthread_setspecific(pkey, ( void *)p);
if(ret != 0)
{
throw TC_ThreadPool_Exception ("[TC_ThreadPool::setThreadData] pthread_setspecific error", ret);
}
}
TC_ThreadPool ::ThreadData * TC_ThreadPool::getThreadData( pthread_key_t pkey)
{
return ( ThreadData *) pthread_getspecific(pkey);
}
TC_ThreadPool::TC_ThreadPool()
: _bAllDone ( true)
{
}
TC_ThreadPool::~TC_ThreadPool()
{
stop();
clear();
}
void TC_ThreadPool::clear()
{
std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
while(it != _jobthread. end())
{
delete (*it);
++it;
}
_jobthread. clear();
_busthread. clear();
}
void TC_ThreadPool::init( size_t num)
{
stop();
Lock sync(* this);
clear();
for( size_t i = 0; i < num; i++)
{
_jobthread. push_back( new ThreadWorker( this));
}
}
void TC_ThreadPool::stop()
{
Lock sync(* this);
std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
while(it != _jobthread. end())
{
if ((*it)-> isAlive())
{
(*it)-> terminate();
(*it)-> getThreadControl().join ();
}
++it;
}
_bAllDone = true;
}
void TC_ThreadPool::start()
{
Lock sync(* this);
std::vector< ThreadWorker *>::iterator it = _jobthread. begin();
while(it != _jobthread. end())
{
(*it)-> start();
++it;
}
_bAllDone = false;
}
bool TC_ThreadPool:: finish()
{
return _startqueue. empty() && _jobqueue .empty() && _busthread. empty() && _bAllDone;
}
bool TC_ThreadPool::waitForAllDone( int millsecond)
{
Lock sync( _tmutex);
start1:
//任务队列和繁忙线程都是空的
if (finish ())
{
return true;
}
//永远等待
if(millsecond < 0)
{
_tmutex.timedWait(1000);
goto start1;
}
int64_t iNow = TC_Common:: now2ms();
int m = millsecond;
start2:
bool b = _tmutex.timedWait(millsecond);
//完成处理了
if(finish ())
{
return true;
}
if(!b)
{
return false;
}
millsecond = max((int64_t )0, m - (TC_Common ::now2ms() - iNow));
goto start2;
return false;
}
TC_FunctorWrapperInterface *TC_ThreadPool::get( ThreadWorker *ptw)
{
TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
if(! _jobqueue. pop_front(pFunctorWrapper, 1000))
{
return NULL;
}
{
Lock sync( _tmutex);
_busthread. insert(ptw);
}
return pFunctorWrapper;
}
TC_FunctorWrapperInterface *TC_ThreadPool::get()
{
TC_FunctorWrapperInterface *pFunctorWrapper = NULL;
if(! _startqueue. pop_front(pFunctorWrapper))
{
return NULL;
}
return pFunctorWrapper;
}
void TC_ThreadPool::idle( ThreadWorker *ptw)
{
Lock sync( _tmutex);
_busthread. erase(ptw);
//无繁忙线程, 通知等待在线程池结束的线程醒过来
if( _busthread. empty())
{
_bAllDone = true;
_tmutex.notifyAll();
}
}
void TC_ThreadPool::notifyT()
{
_jobqueue. notifyT();
}
[b]线程池使用后记
[/b]线程池适合场合
事 实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任 务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略。
总之线程池通常适合下面的几个场合:
(1) 单位时间内处理任务频繁而且任务处理时间短
(2) 对实时性要求较高。如果接受到任务后在创建线程,可能满足不了实时要求,因此必须采用线程池进行预创建。
(3) 必须经常面对高突发性事件,比如Web服务器,如果有足球转播,则服务器将产生巨大的冲击。此时如果采取传统方法,则必须不停的大量产生线程,销毁线程。此时采用动态线程池可以避免这种情况的发生。
机械节能产品生产企业官网模板...
大气智能家居家具装修装饰类企业通用网站模板...
礼品公司网站模板
宽屏简约大气婚纱摄影影楼模板...
蓝白WAP手机综合医院类整站源码(独立后台)...苏ICP备2024110244号-2 苏公网安备32050702011978号 增值电信业务经营许可证编号:苏B2-20251499 | Copyright 2018 - 2025 源码网商城 (www.ymwmall.com) 版权所有