[Previous]
[Contents]
[Next]

POSIX.4 Message Queues

This chapter discusses:

Introduction

POSIX message queues allow for an efficient, priority-driven IPC mechanism with multiple readers and writers. For the experienced POSIX programmer, this description invokes an image of named pipes. However, there are some fundamental differences between pipes and message queues:

Of course, message queues in the QNX implementation still have all of the advantages of named pipes (FIFOs in QNX):

For more information, see "POSIX.4 Message Queues" in the Better Coordination: Messages, Shared Memory, and Synchronization chapter of Programming for the Real World - POSIX.4 by Bill O. Gallmeister (O'Reilly & Associates, Inc. ISBN: 1-56592-074-0).

Message-queue structures

The message-queue structures are found in the <mqueue.h> header file.

mqd_t

Like pipes and FIFOs, all message queue operations are performed based on message queue descriptors (an mqd_t). In the QNX implementation, an mqd_t is a file descriptor. Thus, in addition to the POSIX message queue API, the programmer may call almost any I/O routine that takes a file descriptor.

mq_attr

Every message queue has an attribute structure associated with it. This structure includes:

long mq_flags
The options set for this queue. This field may have been changed since queue creation with a call to mq_setattr() (see below for a description of each bit).
long mq_maxmsg
The maximum number of messages that can be stored on the queue. In the QNX implementation, an mq_maxmsg of 0 means that there is no maximum. This value was set when the queue was created.
long mq_msgsize
The maximum size of each message on the given message queue. This value was also set when the queue was created.
long mq_curmsgs
This field represents the number of messages currently on the given queue.
long mq_sendwait
The number of processes currently waiting to send a message. This field was eliminated from the POSIX standard after draft 9, but has been resurrected for the QNX implementation. A nonzero value in this field implies that the queue is full.
long mq_recvwait
The number of processes currently waiting to receive a message. Like mq_sendwait, this field was resurrected for the QNX implementation. A nonzero value in this field implies that the queue is empty.

Flags

The mq_flags field is the bitwise OR of zero or more of the constants described below.

The following constant is specified by POSIX 1003.4:

MQ_NONBLOCK
No mq_receive() or mq_send() will ever block on this queue. If the queue is in such a condition that the given operation cannot be performed without blocking, then an error is returned, and errno is set to EAGAIN.

The following constants are unique to the QNX implementation:

MQ_MULT_NOTIFY
When this flag is set, more than one process may be registered for notification; a call to mq_notify() never returns an EBUSY error. If multiple processes are registered for notification, then they are notified on a first come, first served basis. This flag may only be set on; it may never be set off. Any attempt to set this flag off once it is on causes errno to be set to EINVAL.
MQ_NOTIFY_ALWAYS
When this flag is set, the mq_notify() call causes the process to be notified, even if the queue currently contains messages.
MQ_PRIO_RESTRICT
The POSIX 1003.4 standard allows any process to send a message of arbitrary priority. This may allow low-priority processes to wield excessive power. If this flag is set, then no process may use mq_send() to send a message with a priority higher than its own. An attempt to do so causes errno to be set to EINVAL.
MQ_PRIO_BOOST
By default, any write() to a message queue sends a message with priority 0. If this option is in effect, then any time a zero-priority message is written, its priority is boosted to that of the calling process. This option only affects messages with a priority of 0.
MQ_READBUF_DYNAMIC
POSIX mandates that an mq_receive() performed on a queue with a buffer size less than that queues maximum message size must fail. However, it is very possible that a process could have a receive buffer of significantly smaller size than the maximum, and still never miss data. If this flag is set, processes may make calls to mq_receive() with arbitrary buffer sizes. The call only fails if a message that has a size larger than the buffer allotted is about to be received. In this case, an error is returned, and errno is set to EINVAL.

Functions for message queues

The POSIX message queue API is as follows:

Function Summary
mq_close() close a message queue
mq_getattr() get the current attributes of a message queue
mq_notify() notify the calling process when the queue becomes nonempty
mq_open() open or create a message queue
mq_receive() receive a message from a queue
mq_send() put a message into a message queue
mq_setattr() set the flags for a message queue
mq_unlink() unlink (i.e. delete) a message queue

In addition, the QNX implementation allows for the following standard I/O and file manipulation calls to be used on message queues:

Function Summary
access() check to see if the given queue exists and can be accessed
chmod() change the permissions for a message queue
chown() change the owner user ID and group ID of the specified message queue
close() close a message queue
creat() create a message queue
dup() duplicate a file descriptor, getting an unused descriptor number
dup2() duplicate a file descriptor, supplying a new descriptor number
fchmod() change the permissions for a message queue
fchown() change the user ID and group ID of a message queue
fstat() get message queue status
lstat() get message queue status
open() open a message queue
read() read (receive) from the given message queue
readv() read several messages and places them into several buffers
select() allow for synchronous multiplexing with standard I/O
stat() get status for a message queue - see below
unlink() remove the given message queue
utime() set the modification time for a message queue
write() write (send) to the given queue

Note: For stat(), most fields in the stat structure are filled in as would be expected, with a few exceptions:
  • The st_size field contains the number of messages currently on the queue.
  • The st_ino field contains the capacity of the message queue.

Compiling, linking and executing

To compile, a program must include the <mqueue.h> header file. This header file is found in the /usr/include directory.

In order to use any of the API specific to message queues, a program must be linked with the mqueue library in the /usr/lib directory. This library file contains the client-side stubs for these calls. Add the following to your compile command so that the compiler can find the libraries:

-l mqueue

For a program using message queues to run, the message queue server must first be running. This can be started by typing:

Mqueue &

If you wish to connect to an Mqueue server on a different node, set the MQ_NODE environment variable accordingly. The mq_open() stub checks this variable to determine on which node to seek out the server. For more information, see the QNX Utilities Reference.

Some sample source

Heres a little test program that fully demonstrates the POSIX message queue API. Be sure to see the descriptions of each of the calls to understand fully what's going on:

#include <mqueue.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MSG_SIZE       4096

// This handler will be called when the queue 
// becomes non-empty.

void handler (int sig_num) {
    printf ("Received sig %d.\n", sig_num);
}
    
void main () {

  struct mq_attr attr, old_attr;   // To store queue attributes
  struct sigevent sigevent;        // For notification
  mqd_t mqdes, mqdes2;             // Message queue descriptors
  char buf[MSG_SIZE];              // A good-sized buffer
  unsigned int prio;               // Priority 
    
  // First we need to set up the attribute structure
  attr.mq_maxmsg = 300;
  attr.mq_msgsize = MSG_SIZE;
  attr.mq_flags = 0;

  // Open a queue with the attribute structure
  mqdes = mq_open ("sideshow-bob", O_RDWR | O_CREAT, 
                   0664, &attr);

  // Now open a queue with the default attribute structure
  mqdes2 = mq_open ("troy-mcclure", O_RDWR | O_CREAT, 
                    0664, 0);
    
  // This will now be a temporary queue...as soon as it's closed,
  // it will be removed
  mq_unlink ("troy-mcclure");

  // Get the attributes for Sideshow Bob
  mq_getattr (mqdes, &attr);
  printf ("%d messages are currently on the queue.\n", 
          attr.mq_curmsgs);

  if (attr.mq_curmsgs != 0) {
    
    // There are some messages on this queue....eat em
    
    // First set the queue to not block any calls    
    attr.mq_flags = MQ_NONBLOCK;
    mq_setattr (mqdes, &attr, &old_attr);    
        
    // Now eat all of the messages
    while (mq_receive (mqdes, &buf[0], MSG_SIZE, &prio) != -1) 
      printf ("Received a message with priority %d.\n", prio);
            
    // The call failed.  Make sure errno is EAGAIN
    if (errno != EAGAIN) { 
      perror ("mq_receive()");
      _exit (EXIT_FAILURE);
    }
        
    // Now restore the attributes
    mq_setattr (mqdes, &old_attr, 0);            
  }
    
  // We want to be notified when something is there 
  signal (SIGUSR1, handler);
  sigevent.sigev_signo = SIGUSR1;
    
  if (mq_notify (mqdes, &sigevent) == -1) {
    if (errno == EBUSY) 
      printf (
        "Another process has registered for notification.\n");
    _exit (EXIT_FAILURE);
  }
    
  for (prio = 0; prio <= MQ_PRIO_MAX; prio += 8) {
    printf ("Writing a message with priority %d.\n", prio);    
    if (mq_send (mqdes, "I8-)", 4, prio) == -1)
      perror ("mq_send()");
  }

  // Close all open message queue descriptors    
  mq_close (mqdes);
  mq_close (mqdes2);

}

The first time the example program is run, the output should be as follows:

$ mq_test
0 messages are currently on the queue.
Writing a message with priority 0.
Received sig 16.
Writing a message with priority 8.
Writing a message with priority 16.
Writing a message with priority 24.
Writing a message with priority 32.

After the program wrote to the empty queue, it was signalled that the queue had made the transition from empty to nonempty. The second time the example program is run, the following should be produced:

$ mq_test
5 messages are currently on the queue.
Received a message with priority 32.
Received a message with priority 24.
Received a message with priority 16.
Received a message with priority 8.
Received a message with priority 0.
Writing a message with priority 0.
Received sig 16.
Writing a message with priority 8.
Writing a message with priority 16.
Writing a message with priority 24.
Writing a message with priority 32.

Note that the first message received was the message with the highest priority, with all of the other priorities following in suit.

End-users perspective

Because all message queues exist in the pathname space, it's very easy for end users to determine the status of a queue. From the shell, the user must first change directories to the prefix owned by Mqueue:

$ cd /dev/mqueue

A simple ls may reveal the following:

/dev/mqueue $ ls
homer       ned        marge        otto

This tells us that there are currently four message queues in the system. To get more detailed information, try ls with the "long listing" option:

/dev/mqueue $ ls -l
total 0
nrw-rw-r--  1 bryan     techies           0 Jun 30 13:47 homer
nrw-rw-r--  1 bryan     techies           0 Jun 30 13:29 marge
nrw-rw-r--  1 bryan     techies           0 Jun 30 13:46 ned
nrw-rw-r--  1 bryan     techies           0 Jun 30 13:29 otto

This is a little more useful. Starting from the left, the n in the first column of each entry says that this is a special named file. This is so something like vi knows that it shouldn't attempt to edit it. Following the n, each entry has its permissions. These are identical to file permissions. In this case:

Following this we have a 1, which is meaningless. If you can think of a useful stat to go there (it's the st_status field) let QSSL know. Following the 1, we have the owner, the group, the number of messages currently on the queue (right now all of these queues are empty), the time of creation, and finally the name.

If you want a little more information, try the little-used ls -i (this specifies the "inode" option, and is useful for filesystems):

/dev/mqueue $ ls -i
1024 homer       1024 ned
1024 marge       1024 otto

Now we know the capacity of each of these queues (in this case, 1024 messages). Lets try sending something to our good friend Ned Flanders:

/dev/mqueue $ echo Hidelee-ho, Neighbor >> ned

Now if we do an ls -il we should see the following:

/dev/mqueue $ ls -il
total 1
1024 nrw-rw-r--  1 bryan     techies           0 Jun 30 13:47 homer
1024 nrw-rw-r--  1 bryan     techies           0 Jun 30 13:29 marge
1024 nrw-rw-r--  1 bryan     techies           1 Jun 30 13:46 ned
1024 nrw-rw-r--  1 bryan     techies           0 Jun 30 13:29 otto

There's a message on ned, so lets extract it:

/dev/mqueue $ cat < ned
Hidelee-ho, Neighbor
^C

[Previous]
[Contents]
[Next]