Hey Bart and Prakash,

first of all: A happy New Year!

I have seen that a good old RFE 4017841 to implement SIGEV_THREAD
notification type of timers, message queues and asynchronous I/O
has successfully been committed a couple of months ago
in our next version of Solaris. :-) (ca. build 40 or so).

There is a problem with the code in the area of POSIX
message queues that becomes obvious, once you run my
(test-) program on e.g. a Ultra-10 with Sol 5.11 build 54
installed.

The program occasionally hangs:
$ cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
$ notify_thread 10 10 1
Test is starting nmsg=10, nruns=10, delay=1.
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
->notify_thread 3 called with argument 42 and counter 0.
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   message queue is empty.
<-notify_thread 3 left with counter 20
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
<<<< Now it hangs >>>>>>>>>

When I examine the stack of the parent, I can see this:
$ pstack 2570   (<--- this is the child and is OK)
2570:   notify_thread 10 10 1
 ff2bf850 lwp_sema_timedwait (ff350098, 0, 1)
 ff2a9aac sem_wait (ff350098, 3, 4d534751, 20, fffffff0, 0) + 20
 ff2a752c __mq_timedsend (22940, ffbfec50, 9, 1, 0, 0) + b0
 00011814 main     (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 33c
 00010e18 _start   (0, 0, 0, 0, 0, 0) + 108

$ pstack 2569  (<--- this is the parent that shouldn't hang)
2569:   notify_thread 10 10 1
-----------------  lwp# 1 / thread# 1  --------------------
 ff2bfa54 waitid   (0, a0a, ffbfea98, 3)
 ff2b226c waitpid  (a0a, ffbfec40, 0, 0, ff392000, ff392000) + 58
 000119e4 main     (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 50c
 00010e18 _start   (0, 0, 0, 0, 0, 0) + 108
-----------------  lwp# 2 / thread# 2  --------------------
 ff2be5e0 lwp_park (0, 0, 0)
 ff2b8448 cond_wait_queue (ff390850, ff390838, 0, 0, 0, 0) + 28
 ff2b7ecc sig_cond_wait (ff390850, ff390838, ff17bfa0, ff17bf80, 6073c, 
ff390838) + 10
 ff2aa160 mqueue_spawner (ff390800, ff390838, 2a, ff30a5f0, 60510, 0) + 84
 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
-----------------  lwp# 3 / thread# 3  --------------------
 ff2be5e0 lwp_park (0, 0, 0)
 ff2b8448 cond_wait_queue (ff3908b0, ff390888, 0, 0, 0, 0) + 28
 ff2b7ecc sig_cond_wait (ff3908b0, ff390888, 3, ff07bf88, ff1c0000, 3) + 10
 ff2aaed4 tpool_worker (ff390280, ff3902c0, 0, ff30a5f0, ff3908b0, ff390888) + 
104
 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
$


So the worker thread "tpool_worker" is missing its "wake up"
on the condition variable. Why is it missing it?

I think, that the bug is in usr/src/lib/libc/port/rt/mqueue.c
and it could be done with the following fix ( add these lines >>>
after line 711 and in particular the line after 1014).

...
688     /*
689      * Now determine if we want to kick the notification.  POSIX
690      * requires that if a process has registered for notification,
691      * we must kick it when the queue makes an empty to non-empty
692      * transition, and there are no blocked receivers.  Note that
693      * this mechanism does _not_ guarantee that the kicked process
694      * will be able to receive a message without blocking;
695      * another receiver could intervene in the meantime.  Thus,
696      * the notification mechanism is inherently racy; all we can
697      * do is hope to minimize the window as much as possible.
698      * In general, we want to avoid kicking the notification when
699      * there are clearly receivers blocked.  We'll determine if
700      * we want to kick the notification before the mq_putmsg(),
701      * but the actual signotify() won't be done until the message
702      * is on the queue.
703      */
704     if (mqhp->mq_sigid.sn_pid != 0) {
705             int nmessages, nblocked;
706
707             (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
708             (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
709
710             if (nmessages == 0 && nblocked == 0)
711                     notify = 1;
    >>>        /*
    >>>         * Force a kick of the receiver, if sender would block
    >>>         * because message queue would be full and receiver would
    >>>  * not continue receiving messages.
    >>>  */
    >>> if (nmessages == mqhp->mq_maxmsg - 1)
    >>>     notify = 1;
712     }
713
714     mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
715     (void) sem_post(&mqhp->mq_notempty);
716
717     if (notify) {
718             /* notify and also delete the registration */
719             (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
720             if (mqhp->mq_ntype == SIGEV_THREAD ||
721                 mqhp->mq_ntype == SIGEV_PORT)
722                     (void) sem_post(&mqhp->mq_spawner);
723             mqhp->mq_ntype = 0;
724             mqhp->mq_des = 0;
725     }
...
1012            switch (ntype) {
1013            case SIGEV_THREAD:
1014            case SIGEV_PORT:                                        
        >>>            mqhp->mq_sigid.sn_pid = getpid();
1015                    tcdp->tcd_port = port;
1016                    tcdp->tcd_msg_object = mqdp;
1017                    tcdp->tcd_msg_userval = userval;
1018                    sig_mutex_lock(&tcdp->tcd_lock);
1019                    tcdp->tcd_msg_enabled = ntype;
1020                    sig_mutex_unlock(&tcdp->tcd_lock);
1021                    (void) cond_broadcast(&tcdp->tcd_cv);
1022                    break;
1023            }

Perhaps you can confirm my findings and if you decide
to open a bug, could you please drop me short note in
which build it would be fixed, so that I can update my box
at that time which - as I said - currently
runs build 54 (Community release).

Many greetings from Germany,
best regards,

Michael Schulte

--
Michael Schulte                                      [EMAIL PROTECTED]
OpenSolaris Kernel Development                       http://opensolaris.org/
/*
 * cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
 *
 * purpose: Demonstrate usage of SIGEV_THREAD.
 *
 * Call mq_notify(mqd_t mqdes, *notification) to attach a notification
 * request to a valid mqdes. Call mq_send() from another process and
 * send messages. Whenever (in the parent) the message queue transits
 * from EMPTY -> NONEMPTY, start a thread that fetches the message(s)
 * being in the message queue.
 * Demonstrate usage of non-async-signal safe functions in notify_thread()
 * by doing some printing and a simple mutex/count game.
 *
 * Author: Michael Schulte
 * 2004-02-11
 */

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <mqueue.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>

#define	str_LEN 256


static mqd_t	mq;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int counter = 0;
static int nruns;
static int delay_in_secs;



/*
 * notify_thread(par_sigval)
 *
 * This is the notification function which is called in a new thread
 * when the message queue makes a transition from EMPTY -> NONEMPTY.
 */
static void
notify_thread(union sigval par_sigval)
{
	char msg[str_LEN];
	ssize_t size;
	int status;
	
	(void) printf("->notify_thread %d called with argument %d and "
	    "counter %d.\n", pthread_self(), par_sigval.sival_int, counter);
	    
	/*
	 * Now we can fetch the messages. As O_NONBLOCK is specified,
	 * all messages currently in the message queue are fetched.
	 */
	do {
		msg[0] = '\0';
		size = mq_receive(mq, msg, str_LEN, NULL);
		if (size > 0) {
			(void) printf("   thr %d message: <%s> with size %d\n",
			    pthread_self(), msg, size);
			/*
			 * Now we can do some processing and async signal
			 * unsafe stuff (as we are in a thread).
			 */
			status = pthread_mutex_lock (&mutex);
			if (status != 0) {
				perror ( "Lock mutex");
				exit (-1);
			}
			if (++counter >= nruns) {
				status = pthread_cond_signal (&cond);
				if (status != 0) {
					perror ( "Signal condition");
					exit (-1);
				}
			}
			status = pthread_mutex_unlock (&mutex);
			if (status != 0) {
				perror ( "Unlock mutex");
				exit (-1);
			}
			if ((counter % 7) == 0)
				(void) sleep(delay_in_secs);
		} else {
			if (errno == EAGAIN)
				(void) printf("   message queue is empty.\n");
			else
				(void) printf("   XXX <%s>\n", strerror(errno));
		}
	} while (size >= 0);
	(void) printf ("<-notify_thread %d left with counter %d\n",
	    pthread_self(), counter);
}



void main(int argc, char *argv[])
{
	char	buf_uns[str_LEN];
	char	msg[str_LEN];
	char	*name = "notify";
	int	ret = 0;
	pid_t	pid;
	int	status;
	struct mq_attr	attrs;
	struct sigevent	notification;
	int	nmsg;


	if (argc != 4) {
		(void) printf("notify_thread <nr of messages to send> <nr of experiments> "
		    "<delay in secs>\n");
        	exit(1);
	}
  
	nmsg = atoi(argv[1]);
	nruns = atoi(argv[2]);
	delay_in_secs = atoi(argv[3]);

	(void) printf("Test is starting nmsg=%d, nruns=%d, delay=%d.\n", nmsg, nruns,
	    delay_in_secs);
	
	(void) sprintf(buf_uns, "/%s%ld", name, (long)getpid());

	/*
	 * Set members of the mq_attr structure
	 * and create an open message queue description.
	 */
	attrs.mq_msgsize = str_LEN;
	attrs.mq_maxmsg = 10;
	attrs.mq_flags = O_NONBLOCK;

	mq = mq_open(buf_uns, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666,
	    &attrs);
	if (mq == (mqd_t)-1) {
		(void) printf("mq_open() failed with errno %d.\n", errno);
		exit(1);
	}

	/*
	 * Set notification structure to start the thread, when mq
	 * transits from EMPTY -> NONEMPTY.
	 */
	notification.sigev_notify = SIGEV_THREAD;
	notification.sigev_notify_function = notify_thread;
	notification.sigev_value.sival_int = 42;
	notification.sigev_notify_attributes = NULL;
	
	/* register the calling process for notification */
	ret = mq_notify(mq, &notification);
	if (ret != 0) {
		(void) printf("mq_notify() failed. errno = %d\n", errno);
		goto fin;
	}

	if ((pid = fork()) < 0) {
		(void) printf("fork() failed with errno %d.\n", errno);
		exit(1);
	}

	if (pid == 0) {
		/* In the child */
		int i,j;
				
		(void) sprintf(buf_uns, "/%s%ld", name, (long)getppid());

		/* Open the message queue for writing. */
		attrs.mq_msgsize = str_LEN;
		attrs.mq_maxmsg = 10;
		attrs.mq_flags = 0;

		mq = mq_open(buf_uns, O_RDWR, 0666, &attrs);
		if (mq == (mqd_t)-1) {
			(void) printf("mq_open() failed with errno %d.\n",
			    errno);
			exit(1);
		}

		/* the child sends a message to a message queue */
		sprintf(msg, "test %d", getpid());
		j = 0;
		while (j < nruns) {
			for (i = 0; i < nmsg; i++) {
				ret = mq_send(mq, msg, strlen(msg), 1);
				(void) printf("CHILD: msg sent\n");
				if (ret != 0) {
					(void) printf("mq_send() failed in child with %d.\n",
					    errno);
					nruns = 0;
					exit(99);
				}
			}
			sleep(delay_in_secs);
			j++;
		}
		(void) printf("Child exit.\n");
		exit(0);
	}

	/* In the parent */

	ret = pthread_mutex_lock (&mutex);
	if (ret != 0) {
		perror ( "Lock mutex");
		goto fin;
	}
	while (counter < nruns) {
		ret = pthread_cond_wait (&cond, &mutex);
		if (ret != 0) {
			perror ( "Wait on condition");
			goto fin;
		}
	}
	ret = pthread_mutex_unlock (&mutex);
	if (ret != 0) {
		perror ( "Unlock mutex");
		goto fin;
	}

	ret = waitpid(pid, &status, 0);
	(void) printf("Parent awoken.\n");
	if (ret == pid) {
		if (WEXITSTATUS(status) != 0) {
			/* The child terminated with error */
			(void) printf("The child terminated with exit status %d.\n",
			    WEXITSTATUS(status));
			goto fin;
		}
	}

	(void) printf("Parent got %d messages.\n", counter);
fin:
	(void) mq_close(mq);

	ret = mq_unlink(buf_uns);
	if (ret != 0) {
		(void) printf("mq_unlink() failed with %d\n", errno);
		exit(ret);
	}

	(void) printf("Test finished.\n");
	exit(ret);
}
_______________________________________________
perf-discuss mailing list
perf-discuss@opensolaris.org

Reply via email to