源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

基于条件变量的消息队列 说明介绍

  • 时间:2020-07-19 09:30 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:基于条件变量的消息队列 说明介绍
条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。      消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!     [b]消息队列在服务器开发过程中主要用于什么对象呢?[/b]      1: 我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!      2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。      3:日志;处理模式与方式2 类似。不过日志大概是不需要返回的! [b]给出源代码:[/b] [b]BlockingQueue.h文件[/b]
[u]复制代码[/u] 代码如下:
/*  * BlockingQueue.h  *  *  Created on: Apr 19, 2013  *      Author: archy_yu  */ #ifndef BLOCKINGQUEUE_H_ #define BLOCKINGQUEUE_H_ #include <queue> #include <pthread.h> typedef void* CommonItem; class BlockingQueue { public:     BlockingQueue();     virtual ~BlockingQueue();     int peek(CommonItem &item);     int append(CommonItem item); private:     pthread_mutex_t _mutex;     pthread_cond_t _cond;     std::queue<CommonItem> _read_queue;     std::queue<CommonItem> _write_queue; };   #endif /* BLOCKINGQUEUE_H_ */
[b]BlockingQueue.cpp 文件代码 [/b]
[u]复制代码[/u] 代码如下:
/*  * BlockingQueue.cpp  *  *  Created on: Apr 19, 2013  *      Author: archy_yu  */ #include "BlockingQueue.h" BlockingQueue::BlockingQueue() {     pthread_mutex_init(&this->_mutex,NULL);     pthread_cond_init(&this->_cond,NULL); } BlockingQueue::~BlockingQueue() {     pthread_mutex_destroy(&this->_mutex);     pthread_cond_destroy(&this->_cond); } int BlockingQueue::peek(CommonItem &item) {     if( !this->_read_queue.empty() )     {         item = this->_read_queue.front();         this->_read_queue.pop();     }     else     {         pthread_mutex_lock(&this->_mutex);         while(this->_write_queue.empty())         {             pthread_cond_wait(&this->_cond,&this->_mutex);         }         while(!this->_write_queue.empty())         {             this->_read_queue.push(this->_write_queue.front());             this->_write_queue.pop();         }         pthread_mutex_unlock(&this->_mutex);     }       return 0; } int BlockingQueue::append(CommonItem item) {     pthread_mutex_lock(&this->_mutex);     this->_write_queue.push(item);     pthread_cond_signal(&this->_cond);     pthread_mutex_unlock(&this->_mutex);     return 0; }
[b]测试代码: [/b]
[u]复制代码[/u] 代码如下:
BlockingQueue _queue; void* process(void* arg) {     int i=0;     while(true)     {         int *j = new int();         *j = i;         _queue.append((void *)j);         i ++;     }     return NULL; } int main(int argc,char** argv) {     pthread_t pid;     pthread_create(&pid,0,process,0);     long long int start = get_os_system_time();     int i = 0;     while(true)     {         int* j = NULL;         _queue.peek((void* &)j);         i ++;         if(j != NULL && (*j) == 100000)         {             long long int end = get_os_system_time();             printf("consume %d\n",end - start);             break;         }     }     return 0; }
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部