Hey Bart and Prakash, first of all: A happy New Year!
I have seen that a good old RFE 4017841 to implement SIGEV_THREAD notification type of timers, message queues and asynchronous I/O has successfully been committed a couple of months ago in our next version of Solaris. :-) (ca. build 40 or so). There is a problem with the code in the area of POSIX message queues that becomes obvious, once you run my (test-) program on e.g. a Ultra-10 with Sol 5.11 build 54 installed. The program occasionally hangs: $ cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4 $ notify_thread 10 10 1 Test is starting nmsg=10, nruns=10, delay=1. CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent ->notify_thread 3 called with argument 42 and counter 0. thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 CHILD: msg sent CHILD: msg sent CHILD: msg sent thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 thr 3 message: <test 2570> with size 9 message queue is empty. <-notify_thread 3 left with counter 20 CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent CHILD: msg sent <<<< Now it hangs >>>>>>>>> When I examine the stack of the parent, I can see this: $ pstack 2570 (<--- this is the child and is OK) 2570: notify_thread 10 10 1 ff2bf850 lwp_sema_timedwait (ff350098, 0, 1) ff2a9aac sem_wait (ff350098, 3, 4d534751, 20, fffffff0, 0) + 20 ff2a752c __mq_timedsend (22940, ffbfec50, 9, 1, 0, 0) + b0 00011814 main (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 33c 00010e18 _start (0, 0, 0, 0, 0, 0) + 108 $ pstack 2569 (<--- this is the parent that shouldn't hang) 2569: notify_thread 10 10 1 ----------------- lwp# 1 / thread# 1 -------------------- ff2bfa54 waitid (0, a0a, ffbfea98, 3) ff2b226c waitpid (a0a, ffbfec40, 0, 0, ff392000, ff392000) + 58 000119e4 main (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 50c 00010e18 _start (0, 0, 0, 0, 0, 0) + 108 ----------------- lwp# 2 / thread# 2 -------------------- ff2be5e0 lwp_park (0, 0, 0) ff2b8448 cond_wait_queue (ff390850, ff390838, 0, 0, 0, 0) + 28 ff2b7ecc sig_cond_wait (ff390850, ff390838, ff17bfa0, ff17bf80, 6073c, ff390838) + 10 ff2aa160 mqueue_spawner (ff390800, ff390838, 2a, ff30a5f0, 60510, 0) + 84 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0) ----------------- lwp# 3 / thread# 3 -------------------- ff2be5e0 lwp_park (0, 0, 0) ff2b8448 cond_wait_queue (ff3908b0, ff390888, 0, 0, 0, 0) + 28 ff2b7ecc sig_cond_wait (ff3908b0, ff390888, 3, ff07bf88, ff1c0000, 3) + 10 ff2aaed4 tpool_worker (ff390280, ff3902c0, 0, ff30a5f0, ff3908b0, ff390888) + 104 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0) $ So the worker thread "tpool_worker" is missing its "wake up" on the condition variable. Why is it missing it? I think, that the bug is in usr/src/lib/libc/port/rt/mqueue.c and it could be done with the following fix ( add these lines >>> after line 711 and in particular the line after 1014). ... 688 /* 689 * Now determine if we want to kick the notification. POSIX 690 * requires that if a process has registered for notification, 691 * we must kick it when the queue makes an empty to non-empty 692 * transition, and there are no blocked receivers. Note that 693 * this mechanism does _not_ guarantee that the kicked process 694 * will be able to receive a message without blocking; 695 * another receiver could intervene in the meantime. Thus, 696 * the notification mechanism is inherently racy; all we can 697 * do is hope to minimize the window as much as possible. 698 * In general, we want to avoid kicking the notification when 699 * there are clearly receivers blocked. We'll determine if 700 * we want to kick the notification before the mq_putmsg(), 701 * but the actual signotify() won't be done until the message 702 * is on the queue. 703 */ 704 if (mqhp->mq_sigid.sn_pid != 0) { 705 int nmessages, nblocked; 706 707 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); 708 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); 709 710 if (nmessages == 0 && nblocked == 0) 711 notify = 1; >>> /* >>> * Force a kick of the receiver, if sender would block >>> * because message queue would be full and receiver would >>> * not continue receiving messages. >>> */ >>> if (nmessages == mqhp->mq_maxmsg - 1) >>> notify = 1; 712 } 713 714 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); 715 (void) sem_post(&mqhp->mq_notempty); 716 717 if (notify) { 718 /* notify and also delete the registration */ 719 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); 720 if (mqhp->mq_ntype == SIGEV_THREAD || 721 mqhp->mq_ntype == SIGEV_PORT) 722 (void) sem_post(&mqhp->mq_spawner); 723 mqhp->mq_ntype = 0; 724 mqhp->mq_des = 0; 725 } ... 1012 switch (ntype) { 1013 case SIGEV_THREAD: 1014 case SIGEV_PORT: >>> mqhp->mq_sigid.sn_pid = getpid(); 1015 tcdp->tcd_port = port; 1016 tcdp->tcd_msg_object = mqdp; 1017 tcdp->tcd_msg_userval = userval; 1018 sig_mutex_lock(&tcdp->tcd_lock); 1019 tcdp->tcd_msg_enabled = ntype; 1020 sig_mutex_unlock(&tcdp->tcd_lock); 1021 (void) cond_broadcast(&tcdp->tcd_cv); 1022 break; 1023 } Perhaps you can confirm my findings and if you decide to open a bug, could you please drop me short note in which build it would be fixed, so that I can update my box at that time which - as I said - currently runs build 54 (Community release). Many greetings from Germany, best regards, Michael Schulte -- Michael Schulte [EMAIL PROTECTED] OpenSolaris Kernel Development http://opensolaris.org/
/* * cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4 * * purpose: Demonstrate usage of SIGEV_THREAD. * * Call mq_notify(mqd_t mqdes, *notification) to attach a notification * request to a valid mqdes. Call mq_send() from another process and * send messages. Whenever (in the parent) the message queue transits * from EMPTY -> NONEMPTY, start a thread that fetches the message(s) * being in the message queue. * Demonstrate usage of non-async-signal safe functions in notify_thread() * by doing some printing and a simple mutex/count game. * * Author: Michael Schulte * 2004-02-11 */ #include <stdio.h> #include <stdlib.h> #include <signal.h> #include <mqueue.h> #include <errno.h> #include <unistd.h> #include <string.h> #include <sys/types.h> #include <sys/wait.h> #include <pthread.h> #define str_LEN 256 static mqd_t mq; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static int counter = 0; static int nruns; static int delay_in_secs; /* * notify_thread(par_sigval) * * This is the notification function which is called in a new thread * when the message queue makes a transition from EMPTY -> NONEMPTY. */ static void notify_thread(union sigval par_sigval) { char msg[str_LEN]; ssize_t size; int status; (void) printf("->notify_thread %d called with argument %d and " "counter %d.\n", pthread_self(), par_sigval.sival_int, counter); /* * Now we can fetch the messages. As O_NONBLOCK is specified, * all messages currently in the message queue are fetched. */ do { msg[0] = '\0'; size = mq_receive(mq, msg, str_LEN, NULL); if (size > 0) { (void) printf(" thr %d message: <%s> with size %d\n", pthread_self(), msg, size); /* * Now we can do some processing and async signal * unsafe stuff (as we are in a thread). */ status = pthread_mutex_lock (&mutex); if (status != 0) { perror ( "Lock mutex"); exit (-1); } if (++counter >= nruns) { status = pthread_cond_signal (&cond); if (status != 0) { perror ( "Signal condition"); exit (-1); } } status = pthread_mutex_unlock (&mutex); if (status != 0) { perror ( "Unlock mutex"); exit (-1); } if ((counter % 7) == 0) (void) sleep(delay_in_secs); } else { if (errno == EAGAIN) (void) printf(" message queue is empty.\n"); else (void) printf(" XXX <%s>\n", strerror(errno)); } } while (size >= 0); (void) printf ("<-notify_thread %d left with counter %d\n", pthread_self(), counter); } void main(int argc, char *argv[]) { char buf_uns[str_LEN]; char msg[str_LEN]; char *name = "notify"; int ret = 0; pid_t pid; int status; struct mq_attr attrs; struct sigevent notification; int nmsg; if (argc != 4) { (void) printf("notify_thread <nr of messages to send> <nr of experiments> " "<delay in secs>\n"); exit(1); } nmsg = atoi(argv[1]); nruns = atoi(argv[2]); delay_in_secs = atoi(argv[3]); (void) printf("Test is starting nmsg=%d, nruns=%d, delay=%d.\n", nmsg, nruns, delay_in_secs); (void) sprintf(buf_uns, "/%s%ld", name, (long)getpid()); /* * Set members of the mq_attr structure * and create an open message queue description. */ attrs.mq_msgsize = str_LEN; attrs.mq_maxmsg = 10; attrs.mq_flags = O_NONBLOCK; mq = mq_open(buf_uns, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666, &attrs); if (mq == (mqd_t)-1) { (void) printf("mq_open() failed with errno %d.\n", errno); exit(1); } /* * Set notification structure to start the thread, when mq * transits from EMPTY -> NONEMPTY. */ notification.sigev_notify = SIGEV_THREAD; notification.sigev_notify_function = notify_thread; notification.sigev_value.sival_int = 42; notification.sigev_notify_attributes = NULL; /* register the calling process for notification */ ret = mq_notify(mq, ¬ification); if (ret != 0) { (void) printf("mq_notify() failed. errno = %d\n", errno); goto fin; } if ((pid = fork()) < 0) { (void) printf("fork() failed with errno %d.\n", errno); exit(1); } if (pid == 0) { /* In the child */ int i,j; (void) sprintf(buf_uns, "/%s%ld", name, (long)getppid()); /* Open the message queue for writing. */ attrs.mq_msgsize = str_LEN; attrs.mq_maxmsg = 10; attrs.mq_flags = 0; mq = mq_open(buf_uns, O_RDWR, 0666, &attrs); if (mq == (mqd_t)-1) { (void) printf("mq_open() failed with errno %d.\n", errno); exit(1); } /* the child sends a message to a message queue */ sprintf(msg, "test %d", getpid()); j = 0; while (j < nruns) { for (i = 0; i < nmsg; i++) { ret = mq_send(mq, msg, strlen(msg), 1); (void) printf("CHILD: msg sent\n"); if (ret != 0) { (void) printf("mq_send() failed in child with %d.\n", errno); nruns = 0; exit(99); } } sleep(delay_in_secs); j++; } (void) printf("Child exit.\n"); exit(0); } /* In the parent */ ret = pthread_mutex_lock (&mutex); if (ret != 0) { perror ( "Lock mutex"); goto fin; } while (counter < nruns) { ret = pthread_cond_wait (&cond, &mutex); if (ret != 0) { perror ( "Wait on condition"); goto fin; } } ret = pthread_mutex_unlock (&mutex); if (ret != 0) { perror ( "Unlock mutex"); goto fin; } ret = waitpid(pid, &status, 0); (void) printf("Parent awoken.\n"); if (ret == pid) { if (WEXITSTATUS(status) != 0) { /* The child terminated with error */ (void) printf("The child terminated with exit status %d.\n", WEXITSTATUS(status)); goto fin; } } (void) printf("Parent got %d messages.\n", counter); fin: (void) mq_close(mq); ret = mq_unlink(buf_uns); if (ret != 0) { (void) printf("mq_unlink() failed with %d\n", errno); exit(ret); } (void) printf("Test finished.\n"); exit(ret); }
_______________________________________________ perf-discuss mailing list perf-discuss@opensolaris.org