ptg10805159
Appendix C Chapter 17 Solutions 941
void *
helper(void *arg)
{
int n;
struct threadinfo *tip = arg;
for(;;) {
memset(&tip->m, 0, sizeof(struct mymsg));
if ((n = msgrcv(tip->qid, &tip->m, MAXMSZ, 0,
MSG_NOERROR)) < 0)
err_sys("msgrcv error");
tip->len = n;
pthread_mutex_lock(&tip->mutex);
if (write(tip->fd, "a", sizeof(char)) < 0)
err_sys("write error");
pthread_cond_wait(&tip->ready, &tip->mutex);
pthread_mutex_unlock(&tip->mutex);
}
}
int
main()
{
char c;
int i, n, err;
int fd[2];
int qid[NQ];
struct pollfd pfd[NQ];
struct threadinfo ti[NQ];
pthread_t tid[NQ];
for (i = 0; i < NQ; i++) {
if ((qid[i] = msgget((KEY+i), IPC_CREAT|0666)) < 0)
err_sys("msgget error");
printf("queue ID %d is %d\n", i, qid[i]);
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, fd) < 0)
err_sys("socketpair error");
pfd[i].fd = fd[0];
pfd[i].events = POLLIN;
ti[i].qid = qid[i];
ti[i].fd = fd[1];
if (pthread_cond_init(&ti[i].ready, NULL) != 0)
err_sys("pthread_cond_init error");
if (pthread_mutex_init(&ti[i].mutex, NULL) != 0)
err_sys("pthread_mutex_init error");
if ((err = pthread_create(&tid[i], NULL, helper,
&ti[i])) != 0)
err_exit(err, "pthread_create error");
}