linux網絡編程之POSIX 消息隊列 和 系列函數

一、在前面介紹了system v 消息隊列的相關知識,現在來稍微看看posix 消息隊列。

posix消息隊列的一個可能實現如下圖:

linux網絡編程之POSIX 消息隊列 和 系列函數

其實消息隊列就是一個可以讓進程間交換數據的場所,而兩個標準的消息隊列最大的不同可能只是api 函數的不同,如system v 的系列函數是msgxxx,而posix 是mq_xxx。posix 消息隊列也有一些對消息長度等的限制,man 7 mq_overview:

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/msg_max

10

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/msgsize_max

8192

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/queues_max

256

即一個消息隊列最多能有10條消息,每條消息的最大長度為8192字節,一個系統最多能有256個消息隊列。

還有一點是,在Linux上,posix 消息隊列是以虛擬文件系統實現的,必須將其掛載到某個目錄才能看見,如

# mkdir /dev/mqueue

# mount -t mqueue none /dev/mqueue

通過cat 命令查看消息隊列的狀態,假設mymq 是創建的一條消息隊列的名字

$ cat /dev/mqueue/mymq

QSIZE:129 NOTIFY:2 SIGNO:0 NOTIFY_PID:8260

QSIZE:消息隊列所有消息的數據長度

NOTIFY_PID:某個進程使用mq_notify 註冊了消息到達異步通知事件,即此進程的pid

NOTIFY:通知方式: 0 is SIGEV_SIGNAL; 1 is SIGEV_NONE; and 2 is SIGEV_THREAD.

SIGNO:當以信號方式通知的時候,表示信號的編號.

二、系列函數,編譯時候加上 -lrt 選項,即連接librt 庫 (實時庫)

#include <fcntl.h> /* For O_* constants *//<fcntl.h>

#include /* For mode constants */

#include <mqueue.h>

功能:用來創建和訪問一個消息隊列

原型

mqd_t mq_open(const char *name, int oflag);

mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

參數

name: 某個消息隊列的名字,必須以/打頭,並且後續不能有其它/ ,形如/somename長度不能超過NAME_MAX(255)

oflag:與open函數類似,可以是O_RDONLY、O_WRONLY、O_RDWR,還可以按位或上O_CREAT、O_EXCL、O_NONBLOCK;

mode:如果oflag指定了O_CREAT,需要設置mode。

返回值:成功返回消息隊列文件描述符;失敗返回-1

功能:關閉消息隊列

原型

mqd_t mq_close(mqd_t mqdes);

參數

mqdes : 消息隊列描述符

返回值:成功返回0;失敗返回-1

功能:刪除消息隊列

原型

mqd_t mq_unlink(const char *name);

參數

name: 消息隊列的名字

返回值:成功返回0;失敗返回-1

功能:獲取/設置消息隊列屬性

原型

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);

mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

返回值:成功返回0;失敗返回-1

struct mq_attr {

long mq_flags; /* Flags: 0 or O_NONBLOCK */

long mq_maxmsg; /* Max. # of messages on queue */

long mq_msgsize; /* Max. message size (bytes) */

long mq_curmsgs; /* # of messages currently in queue */

};

mq_flags 是標誌;mq_maxmsg 即一個消息隊列消息個數上限;mq_msgsize即一條消息的數據上限;mq_curmsgs即當前消息個數

功能:發送消息

原型

mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

參數

mqdes:消息隊列描述符

msg_ptr:指向消息的指針

msg_len:消息長度

msg_prio:消息優先級

返回值:成功返回0;失敗返回-1

功能:接收消息

原型

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

參數

mqdes:消息隊列描述符

msg_ptr:返回接收到的消息

msg_len:消息長度,一般需要指定為msgsize_max

msg_prio:返回接收到的消息優先級

返回值:成功返回接收到的消息字節數;失敗返回-1

注意:返回指定消息隊列中最高優先級的最早消息,優先級最低為0

功能:建立或者刪除消息到達通知事件

原型

mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);

參數

mqdes:消息隊列描述符

notification:

非空表示當消息到達且消息隊列先前為空,那麼將得到通知;

NULL表示撤消已註冊的通知

返回值:成功返回0;失敗返回-1

通知方式:

產生一個信號

創建一個線程執行一個指定的函數

union sigval { /* Data passed with notification */

int sival_int;/* Integer value */

void *sival_ptr;/* Pointer value */

};

struct sigevent {

int sigev_notify; /* Notification method */

int sigev_signo; /* Notification signal */

union sigval sigev_value; /* Data passed with notification */

void (*sigev_notify_function) (union sigval);

/* Function for thread notification */

void *sigev_notify_attributes;

/* Thread function attributes */

};

sigev_notify 的取值有3個:

SIGEV_NONE:消息到達不會發出通知

SIGEV_SIGNAL:以信號方式發送通知,當設置此選項時,sigev_signo 設置信號的編號,且只有當信號為實時信號時才可以通過sigev_value順帶數據,參考這裡。

SIGEV_THREAD:以線程方式通知,當設置此選項時,sigev_notify_function 即一個函數指針,sigev_notify_attributes 即線程的屬性

mq_notify 函數注意點:

1、任何時刻只能有一個進程可以被註冊為接收某個給定隊列的通知

2、當有一個消息到達某個先前為空的隊列,而且已有一個進程被註冊為接收該隊列的通知時,只有在沒有任何線程阻塞在該隊列的mq_receive調用的前提下,通知才會發出。

3、當通知被髮送給它的註冊進程時,其註冊被撤消。進程必須再次調用mq_notify以重新註冊(如果需要的話),重新註冊要放在從消息隊列讀出消息之前而不是之後。

下面寫兩個小程序測試一下:

mq_send.c

#include<stdio.h>

#include<stdlib.h>

#include

#include

#include

#include<unistd.h>

#include<errno.h>

#include<mqueue.h>

#include<fcntl.h>

#include

#include<string.h>

#define ERR_EXIT(m) \\

do { \\

perror(m); \\

exit(EXIT_FAILURE); \\

} while(0)

typedef struct stu

{

char name[32];

int age;

} STU;

int main(int argc, char *argv[])

{

if (argc != 2)

{

fprintf(stderr, "Usage: %s <prio>\\n", argv[0]);/<prio>

exit(EXIT_FAILURE);

}

mqd_t mqid;

mqid = mq_open("/abc", O_WRONLY);

if (mqid == (mqd_t) - 1)

ERR_EXIT("mq_open");

printf("mq_open succ\\n");

STU stu;

strcpy(stu.name, "test");

stu.age = 20;

unsigned int prio = atoi(argv[1]);

mq_send(mqid, (const char *)&stu, sizeof(stu), prio);

mq_close(mqid);

return 0;

}

mq_notify.c

#include<stdio.h>

#include<stdlib.h>

#include

#include

#include

#include<unistd.h>

#include<errno.h>

#include<mqueue.h>

#include<fcntl.h>

#include

#include<string.h>

#include<signal.h>

#define ERR_EXIT(m) \\

do { \\

perror(m); \\

exit(EXIT_FAILURE); \\

} while(0)

typedef struct stu

{

char name[32];

int age;

} STU;

size_t size;

mqd_t mqid;

struct sigevent sigev;

void handle(int sig)

{

mq_notify(mqid, &sigev);

STU stu;

unsigned int prio;

mq_receive(mqid, (char *)&stu, size, &prio);

printf("name=%s, age=%d, prio=%d\\n", stu.name, stu.age, prio);

}

int main(int argc, char *argv[])

{

mqid = mq_open("/abc", O_RDONLY);

if (mqid == (mqd_t) - 1)

ERR_EXIT("mq_open");

printf("mq_open succ\\n");

struct mq_attr attr;

mq_getattr(mqid, &attr);

size = attr.mq_msgsize;

signal(SIGUSR1, handle);

sigev.sigev_notify = SIGEV_SIGNAL;

sigev.sigev_signo = SIGUSR1;

mq_notify(mqid, &sigev);

for (; ;)

pause();

mq_close(mqid);

return 0;

}

mq_open.c

#include<stdio.h>

#include<stdlib.h>

#include

#include

#include

#include<unistd.h>

#include<errno.h>

#include<mqueue.h>

#include<fcntl.h>

#include

#define ERR_EXIT(m) \\

do { \\

perror(m); \\

exit(EXIT_FAILURE); \\

} while(0)

int main(void)

{

mqd_t mqid;

mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL);

if (mqid == (mqd_t) - 1)

ERR_EXIT("mq_open");

mq_close(mqid);

return 0;

}

先運行./mq_open 用mq_open 創建了一個消息隊列並mount 到/dev/mqueue 上,可以查看狀態:

simba@ubuntu:/dev/mqueue$ cat /dev/mqueue/abc

QSIZE:0 NOTIFY:0 SIGNO:0 NOTIFY_PID:0

接著運行./mq_notify 再查看狀態:

simba@ubuntu:/dev/mqueue$ cat /dev/mqueue/abc

QSIZE:0 NOTIFY:0 SIGNO:10 NOTIFY_PID:3572

準備通知的信號是10號,即SIGUSR1,等待的進程pid即./mq_notify ,此時./mq_notify 進程是阻塞的,正在等待其他進程往消息隊列寫入消息,現在運行./mq_send

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send

Usage: ./mq_send <prio>

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send 1

mq_open succ

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send 1

mq_open succ

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$

回頭看./mq_notify 的輸出:

simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_notify

mq_open succ

name=test, age=20, prio=1

name=test, age=20, prio=1

...........................................

即./mq_send 發送的兩條消息都接收到了,需要注意的是雖然通知是一次性的,但我們在消息處理函數再次註冊了通知,故能多次接收到通知,但通知只發生在消息隊列由空到不空的時候,如果先運行./mq_send 先往消息隊列發送了n條消息,那麼執行./mq_notify 是不會接收到通知的,一直阻塞著。

需要C/C++ Linux服務器開發學習資料私信“資料”(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享


分享到:


相關文章: