Hey Bart and Prakash,
first of all: A happy New Year!
I have seen that a good old RFE 4017841 to implement SIGEV_THREAD
notification type of timers, message queues and asynchronous I/O
has successfully been committed a couple of months ago
in our next version of Solaris. :-) (ca. build 40 or so).
There is a problem with the code in the area of POSIX
message queues that becomes obvious, once you run my
(test-) program on e.g. a Ultra-10 with Sol 5.11 build 54
installed.
The program occasionally hangs:
$ cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
$ notify_thread 10 10 1
Test is starting nmsg=10, nruns=10, delay=1.
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
->notify_thread 3 called with argument 42 and counter 0.
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
thr 3 message: <test 2570> with size 9
message queue is empty.
<-notify_thread 3 left with counter 20
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
<<<< Now it hangs >>>>>>>>>
When I examine the stack of the parent, I can see this:
$ pstack 2570 (<--- this is the child and is OK)
2570: notify_thread 10 10 1
ff2bf850 lwp_sema_timedwait (ff350098, 0, 1)
ff2a9aac sem_wait (ff350098, 3, 4d534751, 20, fffffff0, 0) + 20
ff2a752c __mq_timedsend (22940, ffbfec50, 9, 1, 0, 0) + b0
00011814 main (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 33c
00010e18 _start (0, 0, 0, 0, 0, 0) + 108
$ pstack 2569 (<--- this is the parent that shouldn't hang)
2569: notify_thread 10 10 1
----------------- lwp# 1 / thread# 1 --------------------
ff2bfa54 waitid (0, a0a, ffbfea98, 3)
ff2b226c waitpid (a0a, ffbfec40, 0, 0, ff392000, ff392000) + 58
000119e4 main (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 50c
00010e18 _start (0, 0, 0, 0, 0, 0) + 108
----------------- lwp# 2 / thread# 2 --------------------
ff2be5e0 lwp_park (0, 0, 0)
ff2b8448 cond_wait_queue (ff390850, ff390838, 0, 0, 0, 0) + 28
ff2b7ecc sig_cond_wait (ff390850, ff390838, ff17bfa0, ff17bf80,
6073c, ff390838) + 10
ff2aa160 mqueue_spawner (ff390800, ff390838, 2a, ff30a5f0, 60510, 0)
+ 84
ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
----------------- lwp# 3 / thread# 3 --------------------
ff2be5e0 lwp_park (0, 0, 0)
ff2b8448 cond_wait_queue (ff3908b0, ff390888, 0, 0, 0, 0) + 28
ff2b7ecc sig_cond_wait (ff3908b0, ff390888, 3, ff07bf88, ff1c0000, 3)
+ 10
ff2aaed4 tpool_worker (ff390280, ff3902c0, 0, ff30a5f0, ff3908b0,
ff390888) + 104
ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
$
So the worker thread "tpool_worker" is missing its "wake up"
on the condition variable. Why is it missing it?
I think, that the bug is in usr/src/lib/libc/port/rt/mqueue.c
and it could be done with the following fix ( add these lines >>>
after line 711 and in particular the line after 1014).
...
688 /*
689 * Now determine if we want to kick the notification. POSIX
690 * requires that if a process has registered for notification,
691 * we must kick it when the queue makes an empty to non-empty
692 * transition, and there are no blocked receivers. Note that
693 * this mechanism does _not_ guarantee that the kicked process
694 * will be able to receive a message without blocking;
695 * another receiver could intervene in the meantime. Thus,
696 * the notification mechanism is inherently racy; all we can
697 * do is hope to minimize the window as much as possible.
698 * In general, we want to avoid kicking the notification when
699 * there are clearly receivers blocked. We'll determine if
700 * we want to kick the notification before the mq_putmsg(),
701 * but the actual signotify() won't be done until the message
702 * is on the queue.
703 */
704 if (mqhp->mq_sigid.sn_pid != 0) {
705 int nmessages, nblocked;
706
707 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
708 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
709
710 if (nmessages == 0 && nblocked == 0)
711 notify = 1;
>>> /*
>>> * Force a kick of the receiver, if sender would block
>>> * because message queue would be full and receiver would
>>> * not continue receiving messages.
>>> */
>>> if (nmessages == mqhp->mq_maxmsg - 1)
>>> notify = 1;
712 }
713
714 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
715 (void) sem_post(&mqhp->mq_notempty);
716
717 if (notify) {
718 /* notify and also delete the registration */
719 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
720 if (mqhp->mq_ntype == SIGEV_THREAD ||
721 mqhp->mq_ntype == SIGEV_PORT)
722 (void) sem_post(&mqhp->mq_spawner);
723 mqhp->mq_ntype = 0;
724 mqhp->mq_des = 0;
725 }
...
1012 switch (ntype) {
1013 case SIGEV_THREAD:
1014 case SIGEV_PORT:
>>> mqhp->mq_sigid.sn_pid = getpid();
1015 tcdp->tcd_port = port;
1016 tcdp->tcd_msg_object = mqdp;
1017 tcdp->tcd_msg_userval = userval;
1018 sig_mutex_lock(&tcdp->tcd_lock);
1019 tcdp->tcd_msg_enabled = ntype;
1020 sig_mutex_unlock(&tcdp->tcd_lock);
1021 (void) cond_broadcast(&tcdp->tcd_cv);
1022 break;
1023 }
Perhaps you can confirm my findings and if you decide
to open a bug, could you please drop me short note in
which build it would be fixed, so that I can update my box
at that time which - as I said - currently
runs build 54 (Community release).
Many greetings from Germany,
best regards,
Michael Schulte
------------------------------------------------------------------------
/*
* cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
*
* purpose: Demonstrate usage of SIGEV_THREAD.
*
* Call mq_notify(mqd_t mqdes, *notification) to attach a notification
* request to a valid mqdes. Call mq_send() from another process and
* send messages. Whenever (in the parent) the message queue transits
* from EMPTY -> NONEMPTY, start a thread that fetches the message(s)
* being in the message queue.
* Demonstrate usage of non-async-signal safe functions in notify_thread()
* by doing some printing and a simple mutex/count game.
*
* Author: Michael Schulte
* 2004-02-11
*/
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <mqueue.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#define str_LEN 256
static mqd_t mq;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int counter = 0;
static int nruns;
static int delay_in_secs;
/*
* notify_thread(par_sigval)
*
* This is the notification function which is called in a new thread
* when the message queue makes a transition from EMPTY -> NONEMPTY.
*/
static void
notify_thread(union sigval par_sigval)
{
char msg[str_LEN];
ssize_t size;
int status;
(void) printf("->notify_thread %d called with argument %d and "
"counter %d.\n", pthread_self(), par_sigval.sival_int, counter);
/*
* Now we can fetch the messages. As O_NONBLOCK is specified,
* all messages currently in the message queue are fetched.
*/
do {
msg[0] = '\0';
size = mq_receive(mq, msg, str_LEN, NULL);
if (size > 0) {
(void) printf(" thr %d message: <%s> with size %d\n",
pthread_self(), msg, size);
/*
* Now we can do some processing and async signal
* unsafe stuff (as we are in a thread).
*/
status = pthread_mutex_lock (&mutex);
if (status != 0) {
perror ( "Lock mutex");
exit (-1);
}
if (++counter >= nruns) {
status = pthread_cond_signal (&cond);
if (status != 0) {
perror ( "Signal condition");
exit (-1);
}
}
status = pthread_mutex_unlock (&mutex);
if (status != 0) {
perror ( "Unlock mutex");
exit (-1);
}
if ((counter % 7) == 0)
(void) sleep(delay_in_secs);
} else {
if (errno == EAGAIN)
(void) printf(" message queue is empty.\n");
else
(void) printf(" XXX <%s>\n", strerror(errno));
}
} while (size >= 0);
(void) printf ("<-notify_thread %d left with counter %d\n",
pthread_self(), counter);
}
void main(int argc, char *argv[])
{
char buf_uns[str_LEN];
char msg[str_LEN];
char *name = "notify";
int ret = 0;
pid_t pid;
int status;
struct mq_attr attrs;
struct sigevent notification;
int nmsg;
if (argc != 4) {
(void) printf("notify_thread <nr of messages to send> <nr of
experiments> "
"<delay in secs>\n");
exit(1);
}
nmsg = atoi(argv[1]);
nruns = atoi(argv[2]);
delay_in_secs = atoi(argv[3]);
(void) printf("Test is starting nmsg=%d, nruns=%d, delay=%d.\n", nmsg,
nruns,
delay_in_secs);
(void) sprintf(buf_uns, "/%s%ld", name, (long)getpid());
/*
* Set members of the mq_attr structure
* and create an open message queue description.
*/
attrs.mq_msgsize = str_LEN;
attrs.mq_maxmsg = 10;
attrs.mq_flags = O_NONBLOCK;
mq = mq_open(buf_uns, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666,
&attrs);
if (mq == (mqd_t)-1) {
(void) printf("mq_open() failed with errno %d.\n", errno);
exit(1);
}
/*
* Set notification structure to start the thread, when mq
* transits from EMPTY -> NONEMPTY.
*/
notification.sigev_notify = SIGEV_THREAD;
notification.sigev_notify_function = notify_thread;
notification.sigev_value.sival_int = 42;
notification.sigev_notify_attributes = NULL;
/* register the calling process for notification */
ret = mq_notify(mq, ¬ification);
if (ret != 0) {
(void) printf("mq_notify() failed. errno = %d\n", errno);
goto fin;
}
if ((pid = fork()) < 0) {
(void) printf("fork() failed with errno %d.\n", errno);
exit(1);
}
if (pid == 0) {
/* In the child */
int i,j;
(void) sprintf(buf_uns, "/%s%ld", name, (long)getppid());
/* Open the message queue for writing. */
attrs.mq_msgsize = str_LEN;
attrs.mq_maxmsg = 10;
attrs.mq_flags = 0;
mq = mq_open(buf_uns, O_RDWR, 0666, &attrs);
if (mq == (mqd_t)-1) {
(void) printf("mq_open() failed with errno %d.\n",
errno);
exit(1);
}
/* the child sends a message to a message queue */
sprintf(msg, "test %d", getpid());
j = 0;
while (j < nruns) {
for (i = 0; i < nmsg; i++) {
ret = mq_send(mq, msg, strlen(msg), 1);
(void) printf("CHILD: msg sent\n");
if (ret != 0) {
(void) printf("mq_send() failed in child
with %d.\n",
errno);
nruns = 0;
exit(99);
}
}
sleep(delay_in_secs);
j++;
}
(void) printf("Child exit.\n");
exit(0);
}
/* In the parent */
ret = pthread_mutex_lock (&mutex);
if (ret != 0) {
perror ( "Lock mutex");
goto fin;
}
while (counter < nruns) {
ret = pthread_cond_wait (&cond, &mutex);
if (ret != 0) {
perror ( "Wait on condition");
goto fin;
}
}
ret = pthread_mutex_unlock (&mutex);
if (ret != 0) {
perror ( "Unlock mutex");
goto fin;
}
ret = waitpid(pid, &status, 0);
(void) printf("Parent awoken.\n");
if (ret == pid) {
if (WEXITSTATUS(status) != 0) {
/* The child terminated with error */
(void) printf("The child terminated with exit status
%d.\n",
WEXITSTATUS(status));
goto fin;
}
}
(void) printf("Parent got %d messages.\n", counter);
fin:
(void) mq_close(mq);
ret = mq_unlink(buf_uns);
if (ret != 0) {
(void) printf("mq_unlink() failed with %d\n", errno);
exit(ret);
}
(void) printf("Test finished.\n");
exit(ret);
}
------------------------------------------------------------------------
_______________________________________________
perf-discuss mailing list
perf-discuss@opensolaris.org