源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

c++版线程池和任务池示例

  • 时间:2020-03-30 09:15 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:c++版线程池和任务池示例
commondef.h
[u]复制代码[/u] 代码如下:
//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁 const int CHECK_IDLE_TASK_INTERVAL = 300; //单位秒,任务自动销毁时间间隔 const int TASK_DESTROY_INTERVAL = 60; //监控线程池是否为空时间间隔,微秒 const int IDLE_CHECK_POLL_EMPTY = 500; //线程池线程空闲自动退出时间间隔 ,5分钟 const int  THREAD_WAIT_TIME_OUT = 300;
taskpool.cpp
[u]复制代码[/u] 代码如下:
#include "taskpool.h" #include <string.h> #include <stdio.h> #include <pthread.h>     TaskPool::TaskPool(const int & poolMaxSize)     : m_poolSize(poolMaxSize)       , m_taskListSize(0)       , m_bStop(false) {     pthread_mutex_init(&m_lock, NULL);     pthread_mutex_init(&m_idleMutex, NULL);     pthread_cond_init(&m_idleCond, NULL);     pthread_attr_t attr;     pthread_attr_init( &attr );     pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 让线程独立运行     pthread_create(&m_idleId, &attr, CheckIdleTask, this); //创建监测空闲任务进程     pthread_attr_destroy(&attr); } TaskPool::~TaskPool() {     if(!m_bStop)     {         StopPool();     }     if(!m_taskList.empty())     {         std::list<Task*>::iterator it = m_taskList.begin();         for(; it != m_taskList.end(); ++it)         {             if(*it != NULL)             {                 delete *it;                 *it = NULL;             }         }         m_taskList.clear();         m_taskListSize = 0;     }     if(!m_idleList.empty())     {         std::list<Task*>::iterator it = m_idleList.begin();         for(; it != m_idleList.end(); ++it)         {             if(*it != NULL)             {                 delete *it;                 *it = NULL;             }         }         m_idleList.clear();     }     pthread_mutex_destroy(&m_lock);     pthread_mutex_destroy(&m_idleMutex);     pthread_cond_destroy(&m_idleCond); } void * TaskPool::CheckIdleTask(void * arg) {     TaskPool * pool = (TaskPool*)arg;     while(1)     {         pool->LockIdle();         pool->RemoveIdleTask();         if(pool->GetStop())         {             pool->UnlockIdle();             break;         }         pool->CheckIdleWait();         pool->UnlockIdle();     } } void TaskPool::StopPool() {     m_bStop = true;     LockIdle();     pthread_cond_signal(&m_idleCond); //防止监控线程正在等待,而引起无法退出的问题     UnlockIdle();     pthread_join(m_idleId, NULL); } bool TaskPool::GetStop() {     return m_bStop; } void TaskPool::CheckIdleWait() {     struct timespec timeout;     memset(&timeout, 0, sizeof(timeout));     timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;     timeout.tv_nsec = 0;     pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout); } int TaskPool::RemoveIdleTask() {     int iRet = 0;     std::list<Task*>::iterator it, next;     std::list<Task*>::reverse_iterator rit = m_idleList.rbegin();     time_t curTime = time(0);     for(; rit != m_idleList.rend(); )     {         it = --rit.base();         if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)         {             iRet++;             delete *it;             *it = NULL;             next = m_idleList.erase(it);             rit = std::list<Task*>::reverse_iterator(next);         }         else         {             break;            }     } } int TaskPool::AddTask(task_fun fun, void *arg) {     int iRet = 0;     if(0 != fun)     {         pthread_mutex_lock(&m_lock);         if(m_taskListSize >= m_poolSize)         {             pthread_mutex_unlock(&m_lock);             iRet = -1; //task pool is full;         }         else         {             pthread_mutex_unlock(&m_lock);             Task * task = GetIdleTask();             if(NULL == task)             {                 task = new Task;             }             if(NULL == task)             {                 iRet = -2; // new failed             }             else             {                 task->fun = fun;                 task->data = arg;                 pthread_mutex_lock(&m_lock);                 m_taskList.push_back(task);                 ++m_taskListSize;                 pthread_mutex_unlock(&m_lock);             }         }     }     return iRet; } Task* TaskPool::GetTask() {     Task *task = NULL;     pthread_mutex_lock(&m_lock);     if(!m_taskList.empty())     {         task =  m_taskList.front();         m_taskList.pop_front();         --m_taskListSize;     }     pthread_mutex_unlock(&m_lock);     return task; } void TaskPool::LockIdle() {     pthread_mutex_lock(&m_idleMutex); } void TaskPool::UnlockIdle() {     pthread_mutex_unlock(&m_idleMutex); } Task * TaskPool::GetIdleTask() {     LockIdle();     Task * task = NULL;     if(!m_idleList.empty())     {         task = m_idleList.front();         m_idleList.pop_front();     }     UnlockIdle();     return task; } void TaskPool::SaveIdleTask(Task*task) {     if(NULL != task)     {         task->fun = 0;         task->data = NULL;         task->last_time = time(0);         LockIdle();         m_idleList.push_front(task);         UnlockIdle();     } }
taskpool.h
[u]复制代码[/u] 代码如下:
#ifndef TASKPOOL_H #define TASKPOOL_H /* purpose @ 任务池,主要是缓冲外部高并发任务数,有manager负责调度任务  *          任务池可自动销毁长时间空闲的Task对象  *          可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间  *          TASK_DESTROY_INTERVAL 设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁  * date    @ 2013.12.23  * author  @ haibin.wang  */ #include <list> #include <pthread.h> #include "commondef.h" //所有的用户操作为一个task, typedef void (*task_fun)(void *); struct Task {     task_fun fun; //任务处理函数     void* data; //任务处理数据     time_t last_time; //加入空闲队列的时间,用于自动销毁 }; //任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池 class TaskPool { public:  /* pur @ 初始化任务池,启动任务池空闲队列自动销毁线程      * para @ maxSize 最大任务数,大于0     */     TaskPool(const int & poolMaxSize);     ~TaskPool();     /* pur @ 添加任务到任务队列的尾部      * para @ task, 具体任务      * return @ 0 添加成功,负数 添加失败     */        int AddTask(task_fun fun, void* arg);     /* pur @ 从任务列表的头获取一个任务      * return @  如果列表中有任务则返回一个Task指针,否则返回一个NULL     */        Task* GetTask();     /* pur @ 保存空闲任务到空闲队列中      * para @ task 已被调用执行的任务      * return @     */     void SaveIdleTask(Task*task);     void StopPool(); public:     void LockIdle();     void UnlockIdle();     void CheckIdleWait();     int RemoveIdleTask();     bool GetStop(); private:     static void * CheckIdleTask(void *);     /* pur @ 获取空闲的task      * para @      * para @      * return @ NULL说明没有空闲的,否则从m_idleList中获取一个     */     Task* GetIdleTask();     int GetTaskSize(); private:     int m_poolSize; //任务池大小     int m_taskListSize; // 统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加     bool m_bStop; //是否停止     std::list<Task*> m_taskList;//所有待处理任务列表     std::list<Task*> m_idleList;//所有空闲任务列表     pthread_mutex_t m_lock; //对任务列表进行加锁,保证每次只能取一个任务     pthread_mutex_t m_idleMutex; //空闲任务队列锁     pthread_cond_t m_idleCond; //空闲队列等待条件     pthread_t m_idleId;; }; #endif
threadpool.cpp
[u]复制代码[/u] 代码如下:
/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)  * date    @ 2014.01.03  * author  @ haibin.wang  */ #include "threadpool.h" #include <errno.h> #include <string.h> /* #include <iostream> #include <stdio.h> */ Thread::Thread(bool detach, ThreadPool * pool)     : m_pool(pool) {     pthread_attr_init(&m_attr);     if(detach)     {         pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 让线程独立运行     }     else     {          pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );     }     pthread_mutex_init(&m_mutex, NULL); //初始化互斥量     pthread_cond_init(&m_cond, NULL); //初始化条件变量     task.fun = 0;     task.data = NULL; } Thread::~Thread() {     pthread_cond_destroy(&m_cond);     pthread_mutex_destroy(&m_mutex);     pthread_attr_destroy(&m_attr); }     ThreadPool::ThreadPool()     : m_poolMax(0)     , m_idleNum(0)     , m_totalNum(0)       , m_bStop(false) {     pthread_mutex_init(&m_mutex, NULL);     pthread_mutex_init(&m_runMutex,NULL);     pthread_mutex_init(&m_terminalMutex, NULL);     pthread_cond_init(&m_terminalCond, NULL);     pthread_cond_init(&m_emptyCond, NULL); } ThreadPool::~ThreadPool() {     /*if(!m_threads.empty())     {         std::list<Thread*>::iterator it = m_threads.begin();         for(; it != m_threads.end(); ++it)         {             if(*it != NULL)             {                 pthread_cond_destroy( &((*it)->m_cond) );                 pthread_mutex_destroy( &((*it)->m_mutex) );                 delete *it;                 *it = NULL;             }         }         m_threads.clear();     }*/     pthread_mutex_destroy(&m_runMutex);     pthread_mutex_destroy(&m_terminalMutex);     pthread_mutex_destroy(&m_mutex);     pthread_cond_destroy(&m_terminalCond);     pthread_cond_destroy(&m_emptyCond); } int ThreadPool::InitPool(const int & poolMax, const int & poolPre) {     if(poolMax < poolPre             || poolPre < 0             || poolMax <= 0)     {         return -1;     }     m_poolMax = poolMax;     int iRet = 0;     for(int i=0; i<poolPre; ++i)     {         Thread * thread = CreateThread();         if(NULL == thread)         {             iRet = -2;         }     }     if(iRet < 0)     {          std::list<Thread*>::iterator it = m_threads.begin();         for(; it!= m_threads.end(); ++it)         {             if(NULL != (*it) )             {                 delete *it;                 *it = NULL;             }         }         m_threads.clear();         m_totalNum = 0;     }     return iRet; } void ThreadPool::GetThreadRun(task_fun fun, void* arg) {     //从线程池中获取一个线程     pthread_mutex_lock( &m_mutex);     if(m_threads.empty())     {         pthread_cond_wait(&m_emptyCond,&m_mutex); //阻塞等待有空闲线程     }     Thread * thread = m_threads.front();     m_threads.pop_front();     pthread_mutex_unlock( &m_mutex);     pthread_mutex_lock( &thread->m_mutex );     thread->task.fun = fun;     thread->task.data = arg;            pthread_cond_signal(&thread->m_cond); //触发线程WapperFun循环执行     pthread_mutex_unlock( &thread->m_mutex ); } int ThreadPool::Run(task_fun fun, void * arg) {     pthread_mutex_lock(&m_runMutex); //保证每次只能由一个线程执行     int iRet = 0;     if(m_totalNum <m_poolMax) //     {         if(m_threads.empty() && (NULL == CreateThread()) )         {             iRet = -1;//can not create new thread!         }         else         {             GetThreadRun(fun, arg);         }     }     else     {         GetThreadRun(fun, arg);     }     pthread_mutex_unlock(&m_runMutex);     return iRet; } void ThreadPool::StopPool(bool bStop) {     m_bStop = bStop;     if(bStop)     {         //启动监控所有空闲线程是否退出的线程         Thread thread(false, this);         pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程         //阻塞等待所有空闲线程退出         pthread_join(thread.m_threadId, NULL);     }     /*if(bStop)     {         pthread_mutex_lock(&m_terminalMutex);         //启动监控所有空闲线程是否退出的线程         Thread thread(true, this);         pthread_create(&thread.m_threadId,&thread.m_attr, ThreadPool::TerminalCheck , &thread); //启动监控所有线程退出线程         //阻塞等待所有空闲线程退出         pthread_cond_wait(&m_terminalCond, & m_terminalMutex);         pthread_mutex_unlock(&m_terminalMutex);     }*/ } bool ThreadPool::GetStop() {     return m_bStop; } Thread * ThreadPool::CreateThread() {     Thread * thread = NULL;     thread = new Thread(true, this);     if(NULL != thread)     {         int iret = pthread_create(&thread->m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通过WapperFun将线程加入到空闲队列中         if(0 != iret)         {             delete thread;             thread = NULL;         }     }     return thread; } void * ThreadPool::WapperFun(void*arg) {     Thread * thread = (Thread*)arg;     if(NULL == thread || NULL == thread->m_pool)     {         return NULL;     }     ThreadPool * pool = thread->m_pool;     pool->IncreaseTotalNum();     struct timespec abstime;     memset(&abstime, 0, sizeof(abstime));     while(1)     {         if(0 != thread->task.fun)         {             thread->task.fun(thread->task.data);         }         if( true == pool->GetStop() )          {             break; //确定当前任务执行完毕后再判定是否退出线程         }         pthread_mutex_lock( &thread->m_mutex );         pool->SaveIdleThread(thread); //将线程加入到空闲队列中         abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;         abstime.tv_nsec = 0;         if(ETIMEDOUT  == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待线程被唤醒 或超时自动退出         {             pthread_mutex_unlock( &thread->m_mutex );             break;         }         pthread_mutex_unlock( &thread->m_mutex );     }     pool->LockMutex();     pool->DecreaseTotalNum();     if(thread != NULL)     {         pool->RemoveThread(thread);         delete thread;         thread = NULL;     }     pool->UnlockMutex();     return 0; } void ThreadPool::SaveIdleThread(Thread * thread ) {     if(thread)     {         thread->task.fun = 0;         thread->task.data = NULL;         LockMutex();         if(m_threads.empty())         {             pthread_cond_broadcast(&m_emptyCond); //发送不空的信号,告诉run函数线程队列已经不空了         }         m_threads.push_front(thread);         UnlockMutex();     } } int ThreadPool::TotalThreads() {     return m_totalNum; } void ThreadPool::SendSignal() {     LockMutex();     std::list<Thread*>::iterator it = m_threads.begin();     for(; it!= m_threads.end(); ++it)     {         pthread_mutex_lock( &(*it)->m_mutex );         pthread_cond_signal(&((*it)->m_cond));         pthread_mutex_unlock( &(*it)->m_mutex );     }     UnlockMutex(); } void * ThreadPool::TerminalCheck(void* arg) {     Thread * thread = (Thread*)arg;     if(NULL == thread || NULL == thread->m_pool)     {         return NULL;     }     ThreadPool * pool = thread->m_pool;     while((false == pool->GetStop()) || pool->TotalThreads() >0 )     {         pool->SendSignal();         usleep(IDLE_CHECK_POLL_EMPTY);     }     //pool->TerminalCondSignal();     return 0; } void ThreadPool::TerminalCondSignal() {     pthread_cond_signal(&m_terminalCond); } void ThreadPool::RemoveThread(Thread* thread) {     m_threads.remove(thread); } void ThreadPool::LockMutex() {     pthread_mutex_lock( &m_mutex); } void ThreadPool::UnlockMutex() {     pthread_mutex_unlock( &m_mutex ); } void ThreadPool::IncreaseTotalNum() {     LockMutex();     m_totalNum++;     UnlockMutex(); } void ThreadPool::DecreaseTotalNum() {     m_totalNum--; }
threadpool.h
[u]复制代码[/u] 代码如下:
#ifndef THREADPOOL_H #define THREADPOOL_H /* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a  *          当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出  * date    @ 2013.12.23  * author  @ haibin.wang  */ #include <list> #include <string> #include "taskpool.h" //通过threadmanager来控制任务调度进程 //threadpool的TerminalCheck线程负责监测线程池所有线程退出 class ThreadPool; class Thread { public:     Thread(bool detach, ThreadPool * pool);     ~Thread();     pthread_t  m_threadId; //线程id     pthread_mutex_t m_mutex; //互斥锁     pthread_cond_t m_cond; //条件变量     pthread_attr_t m_attr; //线程属性  Task  task; //     ThreadPool * m_pool; //所属线程池 }; //线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中 class ThreadPool { public:     ThreadPool();     ~ThreadPool();     /* pur @ 初始化线程池      * para @ poolMax 线程池最大线程数      * para @ poolPre 预创建线程数      * return @ 0:成功      *          -1: parameter error, must poolMax > poolPre >=0      *          -2: 创建线程失败     */     int InitPool(const int & poolMax, const int & poolPre);     /* pur @ 执行一个任务      * para @ task 任务指针      * return @ 0任务分配成功,负值 任务分配失败,-1,创建新线程失败     */     int Run(task_fun fun, void* arg);  /* pur @ 设置是否停止线程池工作      * para @ bStop true停止,false不停止     */  void StopPool(bool bStop); public: //此公有函数主要用于静态函数调用     /* pur @ 获取进程池的启停状态      * return @     */     bool GetStop();     void SaveIdleThread(Thread * thread );     void LockMutex();     void UnlockMutex();     void DecreaseTotalNum();     void IncreaseTotalNum();     void RemoveThread(Thread* thread);     void TerminalCondSignal();     int TotalThreads();     void SendSignal(); private:  /* pur @ 创建线程      * return @ 非空 成功,NULL失败,     */  Thread * CreateThread();     /* pur @ 从线程池中获取一个一个线程运行任务      * para @ fun 函数指针      * para @ arg 函数参数      * return @     */     void GetThreadRun(task_fun fun, void* arg);  static void * WapperFun(void*);  static void * TerminalCheck(void*);//循环监测是否所有线程终止线程 private:     int m_poolMax;//线程池最大线程数     int m_idleNum; //空闲线程数     int m_totalNum; //当前线程总数 小于最大线程数   bool m_bStop; //是否停止线程池  pthread_mutex_t m_mutex; //线程列表锁  pthread_mutex_t m_runMutex; //run函数锁     pthread_mutex_t m_terminalMutex; //终止所有线程互斥量     pthread_cond_t  m_terminalCond; //终止所有线程条件变量     pthread_cond_t  m_emptyCond; //空闲线程不空条件变量     std::list<Thread*> m_threads; // 线程列表 }; #endif
threadpoolmanager.cpp
[u]复制代码[/u] 代码如下:
#include "threadpoolmanager.h" #include "threadpool.h" #include "taskpool.h" #include <errno.h> #include <string.h> /*#include <string.h> #include <sys/time.h> #include <stdio.h>*/  //   struct timeval time_beg, time_end; ThreadPoolManager::ThreadPoolManager()     : m_threadPool(NULL)     , m_taskPool(NULL)     , m_bStop(false) {     pthread_mutex_init(&m_mutex_task,NULL);     pthread_cond_init(&m_cond_task, NULL);    /* memset(&time_beg, 0, sizeof(struct timeval));     memset(&time_end, 0, sizeof(struct timeval));     gettimeofday(&time_beg, NULL);*/ } ThreadPoolManager::~ThreadPoolManager() {     StopAll();     if(NULL != m_threadPool)     {         delete m_threadPool;         m_threadPool = NULL;     }     if(NULL != m_taskPool)     {         delete m_taskPool;         m_taskPool = NULL;     }     pthread_cond_destroy( &m_cond_task);     pthread_mutex_destroy( &m_mutex_task );     /*gettimeofday(&time_end, NULL);     long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);     printf("manager total time = %d\n", total);     gettimeofday(&time_beg, NULL);*/ } int ThreadPoolManager::Init(         const int &tastPoolSize,         const int &threadPoolMax,         const int &threadPoolPre) {     m_threadPool = new ThreadPool();     if(NULL == m_threadPool)     {         return -1;     }     m_taskPool = new TaskPool(tastPoolSize);     if(NULL == m_taskPool)     {         return -2;     }     if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))     {         return -3;     }     //启动线程池     //启动任务池     //启动任务获取线程,从任务池中不断拿任务到线程池中     pthread_attr_t attr;     pthread_attr_init( &attr );     pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );     pthread_create(&m_taskThreadId, &attr, TaskThread, this); //创建获取任务进程     pthread_attr_destroy(&attr);     return 0; } void ThreadPoolManager::StopAll() {     m_bStop = true;     LockTask();     pthread_cond_signal(&m_cond_task);     UnlockTask();     pthread_join(m_taskThreadId, NULL);     //等待当前所有任务执行完毕     m_taskPool->StopPool();     m_threadPool->StopPool(true); // 停止线程池工作 } void ThreadPoolManager::LockTask() {     pthread_mutex_lock(&m_mutex_task); } void ThreadPoolManager::UnlockTask() {     pthread_mutex_unlock(&m_mutex_task); } void* ThreadPoolManager::TaskThread(void* arg) {     ThreadPoolManager * manager = (ThreadPoolManager*)arg;     while(1)     {         manager->LockTask(); //防止任务没有执行完毕发送了停止信号         while(1) //将任务队列中的任务执行完再退出         {             Task * task = manager->GetTaskPool()->GetTask();             if(NULL == task)             {                 break;             }             else             {                 manager->GetThreadPool()->Run(task->fun, task->data);                 manager->GetTaskPool()->SaveIdleTask(task);             }         }         if(manager->GetStop())         {             manager->UnlockTask();             break;         }         manager->TaskCondWait(); //等待有任务的时候执行         manager->UnlockTask();     }     return 0; } ThreadPool * ThreadPoolManager::GetThreadPool() {     return m_threadPool; } TaskPool * ThreadPoolManager::GetTaskPool() {     return m_taskPool; } int  ThreadPoolManager::Run(task_fun fun,void* arg) {     if(0 == fun)     {         return 0;     }     if(!m_bStop)     {           int iRet =  m_taskPool->AddTask(fun, arg);         if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )         {             pthread_cond_signal(&m_cond_task);             UnlockTask();         }         return iRet;     }     else     {         return -3;     } } bool ThreadPoolManager::GetStop() {     return m_bStop; } void ThreadPoolManager::TaskCondWait() {     struct timespec to;     memset(&to, 0, sizeof to);     to.tv_sec = time(0) + 60;     to.tv_nsec = 0;     pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超时 }
threadpoolmanager.h
[u]复制代码[/u] 代码如下:
#ifndef THREADPOOLMANAGER_H #define THREADPOOLMANAGER_H /* purpose @  *      基本流程:  *          管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中  *      基本功能:  *          1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程  *          2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)  *          3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)  *      线程资源:  *          如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定  *          当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁  *          线程最大数为:1个TaskPool线程 + 1个manager任务调度线程 + ThreadPool最大线程数 + 1个manager退出监控线程 + 1线程池所有线程退出监控线程  *          线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程 + 1个manager创建任务调度线程  *      使用方法:  *          ThreadPoolManager manager;  *          manager.Init(100000, 50, 5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器  *          manager.run(fun, data); //添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL  *  * date    @ 2013.12.23  * author  @ haibin.wang  *  *  详细参数控制可以修改commondef.h中的相关变量值  */ #include <pthread.h> typedef void (*task_fun)(void *); class ThreadPool; class TaskPool; class ThreadPoolManager { public:     ThreadPoolManager();     ~ThreadPoolManager();     /* pur @ 初始化线程池与任务池,threadPoolMax > threadPoolPre > threadPoolMin >= 0      * para @ tastPoolSize 任务池大小      * para @ threadPoolMax 线程池最大线程数      * para @ threadPoolPre 预创建线程数      * return @ 0:初始化成功,负数 初始化失败      *          -1:创建线程池失败      *          -2:创建任务池失败      *          -3:线程池初始化失败     */     int Init(const int &tastPoolSize,             const int &threadPoolMax,             const int &threadPoolPre);     /* pur @ 执行一个任务      * para @ fun 需要执行的函数指针      * para @ arg fun需要的参数,默认为NULL      * return @ 0 任务分配成功,负数 任务分配失败      *          -1:任务池满      *          -2:任务池new失败      *          -3:manager已经发送停止信号,不再接收新任务     */     int Run(task_fun fun,void* arg=NULL); public: //以下public函数主要用于静态函数调用     bool GetStop();     void TaskCondWait();     TaskPool * GetTaskPool();     ThreadPool * GetThreadPool();     void LockTask();     void UnlockTask();     void LockFull(); private:  static void * TaskThread(void*); //任务处理线程  void StopAll(); private:     ThreadPool *m_threadPool; //线程池     TaskPool * m_taskPool; //任务池     bool m_bStop; // 是否终止管理器     pthread_t m_taskThreadId; // TaskThread线程id  pthread_mutex_t m_mutex_task;     pthread_cond_t m_cond_task; }; #endif
main.cpp
[u]复制代码[/u] 代码如下:
#include <iostream> #include <string> #include "threadpoolmanager.h" #include <sys/time.h> #include <string.h> #include <stdlib.h> #include <pthread.h> using namespace std; int seq = 0; int billNum =0; int inter = 1; pthread_mutex_t m_mutex; void myFunc(void*arg) {     pthread_mutex_lock(&m_mutex);     seq++;     if(seq%inter == 0 )     {         cout << "fun 1=" << seq << endl;     }     if(seq>=1000000000)     {         cout << "billion" << endl;         seq = 0;         billNum++;     }     pthread_mutex_unlock(&m_mutex);     //sleep(); } int main(int argc, char** argv) {     if(argc != 6)     {         cout << "必须有5个参数 任务执行次数 任务池大小 线程池大小 预创建线程数 输出间隔" << endl;         cout << "eg: ./test 999999 10000 100 10 20" << endl;         cout << "上例代表创建一个间隔20个任务输出,任务池大小为10000,线程池大小为100,预创建10个线程,执行任务次数为:999999" << endl;         return 0;     }     double loopSize = atof(argv[1]);     int taskSize = atoi(argv[2]);     int threadPoolSize = atoi(argv[3]);     int preSize = atoi(argv[4]);     inter = atoi(argv[5]);     pthread_mutex_init(&m_mutex,NULL);     ThreadPoolManager manager;     if(0>manager.Init(taskSize,  threadPoolSize, preSize))     {         cout << "初始化失败" << endl;         return 0;     }     cout << "*******************初始化完成*********************" << endl;     struct timeval time_beg, time_end;     memset(&time_beg, 0, sizeof(struct timeval));     memset(&time_end, 0, sizeof(struct timeval));     gettimeofday(&time_beg, NULL);     double i=0;     for(; i<loopSize; ++i)     {         while(0>manager.Run(myFunc,NULL))         {             usleep(100);         }     }     gettimeofday(&time_end, NULL);     long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);     cout << "total time =" << total << endl;     cout << "total num =" << i  << " billion num=" << billNum<< endl;     cout << __FILE__ << "将关闭所有线程" << endl;     //pthread_mutex_destroy(&m_mutex);     return 0; }
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部