ptg10805159
632 Advanced IPC Chapter 17
{
int i, n, err;
int fd[2];
int qid[NQ];
struct pollfd pfd[NQ];
struct threadinfo ti[NQ];
pthread_t tid[NQ];
char buf[MAXMSZ];
for (i = 0; i < NQ; i++) {
if ((qid[i] = msgget((KEY+i), IPC_CREAT|0666)) < 0)
err_sys("msgget error");
printf("queue ID %d is %d\n", i, qid[i]);
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, fd) < 0)
err_sys("socketpair error");
pfd[i].fd = fd[0];
pfd[i].events = POLLIN;
ti[i].qid = qid[i];
ti[i].fd = fd[1];
if ((err = pthread_create(&tid[i], NULL, helper, &ti[i])) != 0)
err_exit(err, "pthread_create error");
}
for (;;) {
if (poll(pfd, NQ, -1) < 0)
err_sys("poll error");
for (i = 0; i < NQ; i++) {
if (pfd[i].revents & POLLIN) {
if ((n = read(pfd[i].fd, buf, sizeof(buf))) < 0)
err_sys("read error");
buf[n] = 0;
printf("queue id %d, message %s\n", qid[i], buf);
}
}
}
exit(0);
}
Figure 17.3 Poll for XSI messages using UNIX domain sockets
Note that we use datagram (SOCK_DGRAM)sockets instead of stream sockets. This
allows us to retain message boundaries so when we read from the socket, we read only
one message at a time.
This technique allows us to use eitherpollorselect(indirectly) with message
queues. As long as the costs of one thread per queue and copying each message two
extra times (once to write it to the socket and once to read it from the socket) are
acceptable, this technique will make it easier to use XSI message queues.
Well use the program shown in Figure17.4 to send messages to our test program
from Figure17.3.