源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

实现posix消息队列示例分享

  • 时间:2022-04-09 15:51 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:实现posix消息队列示例分享
mqueue.h
[u]复制代码[/u] 代码如下:
// //  mqueue.h //  UNIX_C // //  Created by 周凯 on 14-2-9. //  Copyright (c) 2014年 zk. All rights reserved. // #ifndef __PS_MQUEUE_H #define __PS_MQUEUE_H #include <unistd.h> #include <sys/types.h> typedef struct mq_info     *mqd_t; typedef struct mq_attr    mq_attr; #ifdef __cplusplus extern "C" { #endif     mqd_t   mq_open(const char *name, int flag, .../*mode_t mode, struct mq_attr *attr*/);     int     mq_close(mqd_t mqdes);     int     mq_unlink(const char *name);     int     mq_getattr(mqd_t mqdes,mq_attr *attr);     int     mq_setattr(mqd_t mqdes,const mq_attr *attr,mq_attr *old);     int     mq_send(mqd_t mqdes,const char *ptr,size_t len,unsigned int prio);     int     mq_receive(mqd_t mqdes,char *ptr,size_t len,unsigned int *priop);         //     void    mq_info_test(mqd_t mqdes); #ifdef __cplusplus } #endif #endif
多进程,多线程创建同一个队列测试
[u]复制代码[/u] 代码如下:
#include <wrap_ext.h> #include <mqueue.h> void *create_mq(void *name){     mqd_t mq;     mq = mq_open("/tmp/mqfile", O_CREAT,FILE_MODE,0);     if (mq == (mqd_t) -1) {         err_ret(errno, "mq_open() error");         return 0;     }     mq_info_test(mq);     mq_close(mq);     return 0; } int main(){     mq_unlink("/tmp/mqfile");     if (Fork() == 0) {         create_mq("/tmp/mqfile");         exit(0);     }     Create_detach_thread(create_mq, "/tmp/mqfile");     Create_detach_thread(create_mq, "/tmp/mqfile");     sleep(50);     //mq_unlink("/tmp/mqfile");     return 0; }
测试结果
[u]复制代码[/u] 代码如下:
create,start create... create,start init... exists,wait get... exists,wait get... create,end init... mq_hdr.mqh_free:116 bytes msghdr size:268 bytesmap file size:3332 bytes next msg offset and msg length: [384,0];[652,0];[920,0];[1188,0];[1456,0]; [1724,0];[1992,0];[2260,0];[2528,0];exists,start get... [2796,0]; [3064,0];[0,0]; end,start get... exists,start get... mq_hdr.mqh_free:116 bytes msghdr size:268 bytesmap file size:3332 bytes next msg offset and msg length: [384,0];[652,0];[920,0];[1188,0];[1456,0]; [1724,0];[1992,0];[2260,0];[2528,0];[2796,0]; [3064,0];[0,0]; end,start get... mq_hdr.mqh_free:116 bytes msghdr size:268 bytesmap file size:3332 bytes next msg offset and msg length: [384,0];[652,0];[920,0];[1188,0];[1456,0]; [1724,0];[1992,0];[2260,0];[2528,0];[2796,0]; [3064,0];[0,0]; Program ended with exit code: 0
属性设置、获取测试
[u]复制代码[/u] 代码如下:
#include <wrap_ext.h> #include <mqueue.h> void print_attr(mq_attr *attr){     assert(attr);     err_msg(" mq_attr mq_flag:0x%0x"             " mq_curmsgs:%d"             " mq_msgsize:%d"             " mq_maxmsg:%d"             ,attr->mq_flags             ,attr->mq_curmsgs             ,attr->mq_msgsize             ,attr->mq_maxmsg); } void *create_mq(void *name){     pthread_t tid;     mq_attr attr,old;     mqd_t mq;     int flag;     flag = O_CREAT;     tid = pthread_self();     if ((long)tid % 2 != 0) {         flag = O_NONBLOCK;     }     mq = mq_open("/tmp/mqfile", flag | O_CREAT,FILE_MODE,0);     if (mq == (mqd_t) -1) {         err_ret(errno, "mq_open() error");         return 0;     }     if ((long)tid % 2 == 0) {         attr.mq_flags = O_NONBLOCK;         mq_setattr(mq, &attr, &old);     }     else         mq_getattr(mq, &old);     print_attr(&old);     //mq_info_test(mq);     mq_close(mq);     return 0; } int main(){     pid_t pid;     mq_unlink("/tmp/mqfile");     if ((pid=Fork()) == 0) {         create_mq("/tmp/mqfile3");         Create_detach_thread(create_mq, "/tmp/mqfile1");         Create_detach_thread(create_mq, "/tmp/mqfile2");         sleep(1);         exit(0);     }     Create_detach_thread(create_mq, "/tmp/mqfile1");     Create_detach_thread(create_mq, "/tmp/mqfile2");     create_mq("/tmp/mqfile3");     wait(0);     sleep(5);     //mq_unlink("/tmp/mqfile");     return 0; }
测试注册通知规则
[u]复制代码[/u] 代码如下:
#include <wrap_ext.h> #include <mqueue.h> int main(){     pid_t pid;     Init_wait();     mqd_t mq;     sigevent_t sige;     mq_unlink("/tmp/mqfile");     mq = mq_open("/tmp/mqfile", O_CREAT,FILE_MODE,0);     Signal(SIGCHLD, SIG_DFL);     if (mq == (mqd_t) -1) {         err_sys(errno, "mq_open() error");     }     if ((pid=Fork()) == 0) {         if (mq_notify(mq, &sige) == -1)             err_ret(errno, "mq_notify() error");         Tell_parent();         Wait_parent();         End_wait();         sleep(1);         exit(0);     }     Wait_child();     /*子进程已注册,测试是否能注册、取消通知*/     if (mq_notify(mq, 0) == -1)         err_ret(errno, "mq_notify() error");     if (mq_notify(mq, &sige) == -1)         err_ret(errno, "mq_notify() error");     Tell_child(pid);     End_wait();     wait(0);     sleep(1);     /*子进程已结束,测试是否能注册通知*/     if (mq_notify(mq, &sige) == -1)         err_ret(errno, "mq_notify() error");     //mq_unlink("/tmp/mqfile");     return 0; }
mqueue.c
[u]复制代码[/u] 代码如下:
// //  File.c //  UNIX_C // //  Created by 周凯 on 14-2-9. //  Copyright (c) 2014年 zk. All rights reserved. // #include "mqueue.h" #include <wrap_ext.h> #if !defined(_LINUX_) #define va_mode_t   int #else #define va_mode_t   mode_t #endif typedef struct mq_info  mq_info; typedef struct mq_hdr   mq_hdr; //typedef struct mq_attr  mq_attr; typedef struct mq_msg   mq_msg; struct mq_hdr{     mq_attr mqh_attr;     long    mqh_head;     long    mqh_free;     pthread_cond_t  mqh_conn;     pthread_mutex_t mqh_mutex;     sigevent_t      mqh_sigevent;     pid_t   mqh_pid; }; struct mq_msg{     long    msg_next;/*从映射内存的地址起,到下一个消息的偏移值*/     ssize_t msg_len;     int     msg_prio; }; struct mq_info{     mq_hdr *mqi_hdr;     long long   mqi_magic;     int     mqi_flag; }; #define MQ_MAXMSG   12 #define MQ_MSGSIZE  256 #define MQ_MAGIC    0x9235167840 /*  防止以下情况:     一个进程或线程以创建模式打开一个队列,     随后CPU切换当前进程或线程到另一个正     在打开此前创建的队列,但是该队列并未     初始化完毕,故使用一个记录锁加一个线     程锁,进行同步。  注:     该实现不是异步调用安全,即不能在信号处理函数中调用队列打开(创建)函数  */ #define MQ_LOCK_FILE    "/tmp/mq_lock_file" static struct mq_attr def_attr = {0,MQ_MAXMSG,MQ_MSGSIZE,0}; static pthread_once_t __mq_once = PTHREAD_ONCE_INIT; static pthread_mutex_t __mq_lock; static pthread_key_t __mq_key; static void __mq_once_init(); static int  __mq_get_filelock(); static void *__mq_mmap_file(int fd,mq_attr *attr); static int  __mq_init_mmap(void *ptr,mq_attr *attr); static void __mq_unmap(const char *name,void *ptr);   static void __mq_once_init(){     pthread_mutexattr_t mattr;     Pthread_mutexattr_init(&mattr);     Pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE);     Pthread_mutex_init(&__mq_lock, &mattr);     Pthread_mutexattr_destroy(&mattr);     Pthread_key_create(&__mq_key, 0); } static int  __mq_get_filelock(){     int fd,tmp;     Pthread_mutex_lock(&__mq_lock);     if ((fd = (int)Pthread_getspecific(__mq_key)) == 0) {         fd = open(MQ_LOCK_FILE, O_CREAT | O_EXCL | O_WRONLY, FILE_MODE);         if (fd == -1 && errno != EEXIST)             err_sys(errno, "mq_open(),__mq_get_filelock() error");         else             fd =Open(MQ_LOCK_FILE, O_WRONLY, 0);         if (fd == 0) {             tmp = Open(MQ_LOCK_FILE, O_WRONLY, 0);             close(fd);             fd = tmp;         }         Pthread_setspecific(__mq_key, (void*)fd);     }     Pthread_mutex_unlock(&__mq_lock);     return fd; } static void *__mq_mmap_file(int fd,mq_attr *attr){     size_t filesize;     void *ptr;     if (attr == 0) {         attr = &def_attr;     }     if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {         errno = EINVAL;         return MAP_FAILED;     }     filesize = sizeof(mq_hdr)+(sizeof(mq_msg)+ALIGN_VAL(attr->mq_msgsize, sizeof(long)))*attr->mq_maxmsg;         if(lseek(fd, filesize - 1, SEEK_SET)<0)         return MAP_FAILED;     if(write(fd,"",1)!=1)         return MAP_FAILED;     ptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);     return ptr; } static void __mq_unmap(const char *name,void *ptr){     size_t filesize;     stat_t fstat;     assert(name);         if (stat(name, &fstat) == -1) {         return;     }     filesize = (size_t)fstat.st_size;     unlink(name);     if (ptr == MAP_FAILED) {         return;     }     munmap(ptr, filesize);     return; } static int  __mq_init_mmap(void *ptr,mq_attr *attr){     char *tmp;     size_t index,i;     int flag;     mq_hdr *mqhdr;     mq_msg *mqmsg;     pthread_condattr_t cattr;     pthread_mutexattr_t mattr;     assert(ptr);     if (attr == 0) {         attr = &def_attr;     }     if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {         errno = EINVAL;         return -1;     }     tmp = ptr;     mqhdr = (mq_hdr*)tmp;     mqhdr->mqh_attr.mq_flags = 0;     mqhdr->mqh_attr.mq_curmsgs = 0;     mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;     mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;     flag = pthread_condattr_init(&cattr);     if (flag) {         errno = flag;         return -1;     }     flag = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);     if (flag) {         errno = flag;         return -1;     }     flag = pthread_cond_init(&mqhdr->mqh_conn, &cattr);     if (flag) {         errno = flag;         return -1;     }     flag = pthread_condattr_destroy(&cattr);     if (flag) {         errno = flag;         return -1;     }     flag = pthread_mutexattr_init(&mattr);     if (flag) {         errno = flag;         return -1;     }     flag = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);     if (flag) {         errno = flag;         return -1;     }     flag = pthread_mutex_init(&mqhdr->mqh_mutex, &mattr);     if (flag) {         errno = flag;         return -1;     }     flag = pthread_mutexattr_destroy(&mattr);     if (flag) {         errno = flag;         return -1;     }     index = mqhdr->mqh_free = sizeof(mq_hdr);     mqmsg = (mq_msg*)(tmp+index);     for (i = 0; i < attr->mq_maxmsg - 1; i++) {         mqmsg->msg_next = sizeof(mq_msg) + ALIGN_VAL(attr->mq_msgsize, sizeof(long)) + index;         index = mqmsg->msg_next;         mqmsg ++;         //mqmsg = (mq_msg*)(tmp+index);     }     mqmsg->msg_next = 0;     return 0; }   mqd_t   mq_open(const char *name,int flag,...){     int fd, nonblock, lockfile_fd, err;     void *ptr;     mq_attr *mqattr;     mqd_t mqdesc;     stat_t filestat;     debug_assert("Invalid pointer", "mq_open()", name);     Pthread_once(&__mq_once, __mq_once_init);     nonblock = flag & O_NONBLOCK;     mqattr = NULL;     mqdesc = NULL;     ptr = MAP_FAILED; __again:     if (flag & O_CREAT) {         va_list vp;         mode_t mode;         /*分析可变参数*/         va_start(vp, flag);         mode = va_arg(vp, va_mode_t);         mqattr = va_arg(vp, mq_attr *);         va_end(vp);         Pthread_mutex_lock(&__mq_lock);         lockfile_fd = __mq_get_filelock();         write_lock_wait(lockfile_fd, SEEK_SET, 0, 0);         fd = open(name, flag | O_CREAT | O_EXCL | O_RDWR, mode);         if (fd < 0) {             /*如果指定了O_EXCL,并且文件已存在,则等待其他进程或线程完成初始化*/             if (errno == EEXIST && (flag & O_EXCL) == 1) {                 return (mqd_t)-1;             }             goto __exists_wait_init;         }         /*初始化内存映射文件*/         err_msg("create,start init...");         /*初始化映射文件大小(注意必须使文件长度达到映射的大小),且映射文件到内存*/         ptr = __mq_mmap_file(fd, mqattr);         //sleep(1);         if (ptr == MAP_FAILED) {             goto __err;         }         /*初始化映射内存的内容*/         if (__mq_init_mmap(ptr, mqattr) < 0) {             goto __err;         }         mqdesc = (mqd_t)calloc(1, sizeof(mq_hdr));         if (mqdesc == 0) {             goto __err;         }         mqdesc->mqi_hdr = (mq_hdr*)ptr;         mqdesc->mqi_flag = nonblock;         mqdesc->mqi_magic = MQ_MAGIC;         err_msg("create,end init...");         file_unlock(lockfile_fd, SEEK_SET, 0, 0);         Pthread_mutex_unlock(&__mq_lock);         return mqdesc;     } __exists_wait_init:     fd = open(name, O_RDWR, 0);     if (fd < 0 ) {         if (errno == ENOENT && (flag & O_CREAT)) {             goto __again;         }         goto __err;     }     err_msg("exists,start get...");     if (stat(name, &filestat) == -1) {         if (errno == ENOENT && (flag & O_CREAT)) {             goto __again;         }         goto __err;     }     ptr = mmap(0, (size_t)filestat.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);     if (ptr == MAP_FAILED) {         goto __err;     }     mqdesc = (mqd_t)calloc(1, sizeof(mq_hdr));     if (mqdesc == 0) {         goto __err;     }     mqdesc->mqi_hdr = (mq_hdr*)ptr;     mqdesc->mqi_flag = nonblock;     mqdesc->mqi_magic = MQ_MAGIC;     close(fd);     file_unlock(lockfile_fd, SEEK_SET, 0, 0);     Pthread_mutex_unlock(&__mq_lock);     err_msg("end,start get...");     return mqdesc; __err:     file_unlock(lockfile_fd, SEEK_SET, 0, 0);     Pthread_mutex_unlock(&__mq_lock);     err = errno;     __mq_unmap(name, ptr);     close(fd);     if (mqdesc)         free(mqdesc);     errno = err;     return (mqd_t)-1; } int     mq_close(mqd_t mqdes){     size_t filesize;     mq_attr *mattr;     int flag;     assert(mqdes);     if (mqdes->mqi_magic != MQ_MAGIC) {         errno = EBADF;         return -1;     }     mattr = &mqdes->mqi_hdr->mqh_attr;     filesize = mattr->mq_maxmsg * (sizeof(mq_msg)* ALIGN_VAL(mattr->mq_msgsize, sizeof(long))) + sizeof(mq_hdr);     flag = munmap((void*)mqdes->mqi_hdr, filesize);     mqdes->mqi_magic = 0;     free(mqdes);     return flag; } int     mq_unlink(char const *name){     assert(name);     return unlink(name); } int     mq_getattr(mqd_t mqdes,mq_attr *attr){     int flag;     mq_attr *tmp;     assert(mqdes);     assert(attr);     if (mqdes->mqi_magic != MQ_MAGIC) {         errno = EBADF;         return -1;     }     tmp = &mqdes->mqi_hdr->mqh_attr;     /*防止其他进程或线程在改变属性值*/     flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);     if (flag > 0) {         errno = flag;         return -1;     }     bcopy(&mqdes->mqi_hdr->mqh_attr, attr, sizeof(mq_attr));     attr->mq_flags = mqdes->mqi_flag;     flag = pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);     if (flag > 0) {         errno = flag;         return -1;     }     return 0; } int     mq_setattr(mqd_t mqdes,const mq_attr *attr,mq_attr *old){     int flag;     mq_attr *tmp;     assert(mqdes);     assert(attr);     if (mqdes->mqi_magic != MQ_MAGIC) {         errno = EBADF;         return -1;     }     tmp = &mqdes->mqi_hdr->mqh_attr;     /*防止其他进程或线程在读取属性值*/     flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);     if (flag > 0) {         errno = flag;         return -1;     }     if (old != NULL) {         bcopy(&mqdes->mqi_hdr->mqh_attr, old, sizeof(mq_attr));         old->mq_flags = mqdes->mqi_flag;     }     /*创建后,只有文件标识可以改变*/     //bcopy(attr, &mqdes->mqi_hdr->mqh_attr, sizeof(mq_attr));     /*只有O_NONBLOCK标志可以存储*/     if (attr->mq_flags & O_NONBLOCK) {         mqdes->mqi_flag |= O_NONBLOCK;     }     else {         mqdes->mqi_flag &= ~O_NONBLOCK;     }     flag = pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);     if (flag > 0) {         errno = flag;         return -1;     }     return 0; } int     mq_notify(mqd_t mqdes,const struct sigevent *notification){     sigevent_t *old;     pid_t pid;     int flag;     assert(mqdes);     if (mqdes->mqi_magic != MQ_MAGIC) {         errno = EBADF;         return -1;     }     flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);     if (flag > 0) {         errno = flag;         return -1;     }     pid = mqdes->mqi_hdr->mqh_pid;     /*已设置*/     if (pid != 0) {         /*发送一个0信号给注册的进程,如果能发送,或者不能发送但不是返回没有进程的错误(可能权限不够),则不能再次注册通知*/         /*有效进程*/         if (kill(pid, 0) != -1 || errno != ESRCH) {             if (notification == 0) {                 if (pid != getpid()) {                     errno = EPERM;                     flag = -1;                 }                 else {                     mqdes->mqi_hdr->mqh_pid = 0;                     flag = 0;                 }             }             else {                 errno = EBUSY;                 flag = -1;             }             goto __return;         }         /*无效进程*/     }     /*未设置*/     if (notification != 0) {         mqdes->mqi_hdr->mqh_pid = getpid();         old = &mqdes->mqi_hdr->mqh_sigevent;         bcopy(notification, old, sizeof(sigevent_t));     }     flag = 0; __return:     pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);     return flag; } void    mq_info_test(mqd_t mqdes){     size_t i,msgsize,index;     mq_msg *msg;     mq_attr *mattr;     assert(mqdes);     mattr = &mqdes->mqi_hdr->mqh_attr;     msgsize = sizeof(mq_msg) + ALIGN_VAL(mattr->mq_msgsize, sizeof(long));     index = mqdes->mqi_hdr->mqh_free;     err_msg("mq_hdr.mqh_free:%ld bytes\n"             "msghdr size:%u bytes"             "map file size:%u bytes"             , index             , msgsize             , mattr->mq_maxmsg * msgsize + index);     err_msg("next msg offset and msg length:");     msg = (mq_msg*)&((char*)mqdes->mqi_hdr)[index];     for (i = 0; i < mattr->mq_maxmsg; i++) {         fprintf(stderr, "[%ld,%ld];", msg->msg_next, msg->msg_len);         if ((i+1)%5 == 0) {             fprintf(stderr,"\n");         }         msg ++ ;     }     if ((i+1)%5 != 0) {         fprintf(stderr,"\n");     }     return; }
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部