Hi Micheal,

Yes, there is a problem here.  Looking at the code and your test output,
we can see  that there is a race condition between __mq_timedsend()
and  __mq_timedreceive().  Note that  __mq_timedrecieve() does not hold the
mq_exclusive mutex when calling  the sem_trywait(&mqhp->mq_notempty).

The sender can see a non empty queue and decide not to notify. But before it can put the message and upated the mq_notempty semaphore count, non blocking
receiver(s) can come by,  pick up the remaining messages, subsequently
see an empty queue and return error.

The fix will be to hold the mq_exclusive lock in __mq_timedrecieve() when calling
sema_trywait(&mqhp->mq_notempty).

The blocking receive  cases work fine as is.

__mq_timedsend()
..

if (nmessages == 0 && nblocked == 0) <-- sees a non empty queue (nmessages != 0). <-- can see a non empty queue (nmessages != 0) here.
                       notify = 1;
       }

       mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
       (void) sem_post(&mqhp->mq_notempty);


..
}

I will file a bug and let you know.
-Prakash.



Michael Schulte wrote:
Hey Bart and Prakash,

first of all: A happy New Year!

I have seen that a good old RFE 4017841 to implement SIGEV_THREAD
notification type of timers, message queues and asynchronous I/O
has successfully been committed a couple of months ago
in our next version of Solaris. :-) (ca. build 40 or so).

There is a problem with the code in the area of POSIX
message queues that becomes obvious, once you run my
(test-) program on e.g. a Ultra-10 with Sol 5.11 build 54
installed.

The program occasionally hangs:
$ cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
$ notify_thread 10 10 1
Test is starting nmsg=10, nruns=10, delay=1.
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
->notify_thread 3 called with argument 42 and counter 0.
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   thr 3 message: <test 2570> with size 9
   message queue is empty.
<-notify_thread 3 left with counter 20
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
CHILD: msg sent
<<<< Now it hangs >>>>>>>>>

When I examine the stack of the parent, I can see this:
$ pstack 2570   (<--- this is the child and is OK)
2570:   notify_thread 10 10 1
 ff2bf850 lwp_sema_timedwait (ff350098, 0, 1)
 ff2a9aac sem_wait (ff350098, 3, 4d534751, 20, fffffff0, 0) + 20
 ff2a752c __mq_timedsend (22940, ffbfec50, 9, 1, 0, 0) + b0
 00011814 main     (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 33c
 00010e18 _start   (0, 0, 0, 0, 0, 0) + 108

$ pstack 2569  (<--- this is the parent that shouldn't hang)
2569:   notify_thread 10 10 1
-----------------  lwp# 1 / thread# 1  --------------------
 ff2bfa54 waitid   (0, a0a, ffbfea98, 3)
 ff2b226c waitpid  (a0a, ffbfec40, 0, 0, ff392000, ff392000) + 58
 000119e4 main     (4, ffbfeeb4, ffbfeec8, 22000, ba394, ff390140) + 50c
 00010e18 _start   (0, 0, 0, 0, 0, 0) + 108
-----------------  lwp# 2 / thread# 2  --------------------
 ff2be5e0 lwp_park (0, 0, 0)
 ff2b8448 cond_wait_queue (ff390850, ff390838, 0, 0, 0, 0) + 28
ff2b7ecc sig_cond_wait (ff390850, ff390838, ff17bfa0, ff17bf80, 6073c, ff390838) + 10 ff2aa160 mqueue_spawner (ff390800, ff390838, 2a, ff30a5f0, 60510, 0) + 84
 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
-----------------  lwp# 3 / thread# 3  --------------------
 ff2be5e0 lwp_park (0, 0, 0)
 ff2b8448 cond_wait_queue (ff3908b0, ff390888, 0, 0, 0, 0) + 28
ff2b7ecc sig_cond_wait (ff3908b0, ff390888, 3, ff07bf88, ff1c0000, 3) + 10 ff2aaed4 tpool_worker (ff390280, ff3902c0, 0, ff30a5f0, ff3908b0, ff390888) + 104
 ff2be564 _lwp_start (0, 0, 0, 0, 0, 0)
$


So the worker thread "tpool_worker" is missing its "wake up"
on the condition variable. Why is it missing it?

I think, that the bug is in usr/src/lib/libc/port/rt/mqueue.c
and it could be done with the following fix ( add these lines >>>
after line 711 and in particular the line after 1014).

...
688     /*
689      * Now determine if we want to kick the notification.  POSIX
690      * requires that if a process has registered for notification,
691      * we must kick it when the queue makes an empty to non-empty
692      * transition, and there are no blocked receivers.  Note that
693      * this mechanism does _not_ guarantee that the kicked process
694      * will be able to receive a message without blocking;
695      * another receiver could intervene in the meantime.  Thus,
696      * the notification mechanism is inherently racy; all we can
697      * do is hope to minimize the window as much as possible.
698      * In general, we want to avoid kicking the notification when
699      * there are clearly receivers blocked.  We'll determine if
700      * we want to kick the notification before the mq_putmsg(),
701      * but the actual signotify() won't be done until the message
702      * is on the queue.
703      */
704     if (mqhp->mq_sigid.sn_pid != 0) {
705         int nmessages, nblocked;
706
707         (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
708         (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
709
710         if (nmessages == 0 && nblocked == 0)
711             notify = 1;
    >>>    /*
    >>>     * Force a kick of the receiver, if sender would block
    >>>     * because message queue would be full and receiver would
    >>>  * not continue receiving messages.
    >>>  */
    >>> if (nmessages == mqhp->mq_maxmsg - 1)
    >>>     notify = 1;
712     }
713
714     mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
715     (void) sem_post(&mqhp->mq_notempty);
716
717     if (notify) {
718         /* notify and also delete the registration */
719         (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
720         if (mqhp->mq_ntype == SIGEV_THREAD ||
721             mqhp->mq_ntype == SIGEV_PORT)
722             (void) sem_post(&mqhp->mq_spawner);
723         mqhp->mq_ntype = 0;
724         mqhp->mq_des = 0;
725     }
...
1012         switch (ntype) {
1013         case SIGEV_THREAD:
1014 case SIGEV_PORT: >>> mqhp->mq_sigid.sn_pid = getpid();
1015             tcdp->tcd_port = port;
1016             tcdp->tcd_msg_object = mqdp;
1017             tcdp->tcd_msg_userval = userval;
1018             sig_mutex_lock(&tcdp->tcd_lock);
1019             tcdp->tcd_msg_enabled = ntype;
1020             sig_mutex_unlock(&tcdp->tcd_lock);
1021             (void) cond_broadcast(&tcdp->tcd_cv);
1022             break;
1023         }

Perhaps you can confirm my findings and if you decide
to open a bug, could you please drop me short note in
which build it would be fixed, so that I can update my box
at that time which - as I said - currently
runs build 54 (Community release).

Many greetings from Germany,
best regards,

Michael Schulte

------------------------------------------------------------------------

/*
 * cc -g -D_REENTRANT -o notify_thread notify_thread.c -lpthread -lposix4
 *
 * purpose: Demonstrate usage of SIGEV_THREAD.
 *
 * Call mq_notify(mqd_t mqdes, *notification) to attach a notification
 * request to a valid mqdes. Call mq_send() from another process and
 * send messages. Whenever (in the parent) the message queue transits
 * from EMPTY -> NONEMPTY, start a thread that fetches the message(s)
 * being in the message queue.
 * Demonstrate usage of non-async-signal safe functions in notify_thread()
 * by doing some printing and a simple mutex/count game.
 *
 * Author: Michael Schulte
 * 2004-02-11
 */

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <mqueue.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>

#define str_LEN 256


static mqd_t    mq;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int counter = 0;
static int nruns;
static int delay_in_secs;



/*
 * notify_thread(par_sigval)
 *
 * This is the notification function which is called in a new thread
 * when the message queue makes a transition from EMPTY -> NONEMPTY.
 */
static void
notify_thread(union sigval par_sigval)
{
        char msg[str_LEN];
        ssize_t size;
        int status;
        
        (void) printf("->notify_thread %d called with argument %d and "
            "counter %d.\n", pthread_self(), par_sigval.sival_int, counter);
/*
         * Now we can fetch the messages. As O_NONBLOCK is specified,
         * all messages currently in the message queue are fetched.
         */
        do {
                msg[0] = '\0';
                size = mq_receive(mq, msg, str_LEN, NULL);
                if (size > 0) {
                        (void) printf("   thr %d message: <%s> with size %d\n",
                            pthread_self(), msg, size);
                        /*
                         * Now we can do some processing and async signal
                         * unsafe stuff (as we are in a thread).
                         */
                        status = pthread_mutex_lock (&mutex);
                        if (status != 0) {
                                perror ( "Lock mutex");
                                exit (-1);
                        }
                        if (++counter >= nruns) {
                                status = pthread_cond_signal (&cond);
                                if (status != 0) {
                                        perror ( "Signal condition");
                                        exit (-1);
                                }
                        }
                        status = pthread_mutex_unlock (&mutex);
                        if (status != 0) {
                                perror ( "Unlock mutex");
                                exit (-1);
                        }
                        if ((counter % 7) == 0)
                                (void) sleep(delay_in_secs);
                } else {
                        if (errno == EAGAIN)
                                (void) printf("   message queue is empty.\n");
                        else
                                (void) printf("   XXX <%s>\n", strerror(errno));
                }
        } while (size >= 0);
        (void) printf ("<-notify_thread %d left with counter %d\n",
            pthread_self(), counter);
}



void main(int argc, char *argv[])
{
        char    buf_uns[str_LEN];
        char    msg[str_LEN];
        char    *name = "notify";
        int     ret = 0;
        pid_t   pid;
        int     status;
        struct mq_attr  attrs;
        struct sigevent notification;
        int     nmsg;


        if (argc != 4) {
                (void) printf("notify_thread <nr of messages to send> <nr of 
experiments> "
                    "<delay in secs>\n");
                exit(1);
        }
nmsg = atoi(argv[1]);
        nruns = atoi(argv[2]);
        delay_in_secs = atoi(argv[3]);

        (void) printf("Test is starting nmsg=%d, nruns=%d, delay=%d.\n", nmsg, 
nruns,
            delay_in_secs);
        
        (void) sprintf(buf_uns, "/%s%ld", name, (long)getpid());

        /*
         * Set members of the mq_attr structure
         * and create an open message queue description.
         */
        attrs.mq_msgsize = str_LEN;
        attrs.mq_maxmsg = 10;
        attrs.mq_flags = O_NONBLOCK;

        mq = mq_open(buf_uns, O_RDWR | O_CREAT | O_EXCL | O_NONBLOCK, 0666,
            &attrs);
        if (mq == (mqd_t)-1) {
                (void) printf("mq_open() failed with errno %d.\n", errno);
                exit(1);
        }

        /*
         * Set notification structure to start the thread, when mq
         * transits from EMPTY -> NONEMPTY.
         */
        notification.sigev_notify = SIGEV_THREAD;
        notification.sigev_notify_function = notify_thread;
        notification.sigev_value.sival_int = 42;
        notification.sigev_notify_attributes = NULL;
        
        /* register the calling process for notification */
        ret = mq_notify(mq, &notification);
        if (ret != 0) {
                (void) printf("mq_notify() failed. errno = %d\n", errno);
                goto fin;
        }

        if ((pid = fork()) < 0) {
                (void) printf("fork() failed with errno %d.\n", errno);
                exit(1);
        }

        if (pid == 0) {
                /* In the child */
                int i,j;
                                
                (void) sprintf(buf_uns, "/%s%ld", name, (long)getppid());

                /* Open the message queue for writing. */
                attrs.mq_msgsize = str_LEN;
                attrs.mq_maxmsg = 10;
                attrs.mq_flags = 0;

                mq = mq_open(buf_uns, O_RDWR, 0666, &attrs);
                if (mq == (mqd_t)-1) {
                        (void) printf("mq_open() failed with errno %d.\n",
                            errno);
                        exit(1);
                }

                /* the child sends a message to a message queue */
                sprintf(msg, "test %d", getpid());
                j = 0;
                while (j < nruns) {
                        for (i = 0; i < nmsg; i++) {
                                ret = mq_send(mq, msg, strlen(msg), 1);
                                (void) printf("CHILD: msg sent\n");
                                if (ret != 0) {
                                        (void) printf("mq_send() failed in child 
with %d.\n",
                                            errno);
                                        nruns = 0;
                                        exit(99);
                                }
                        }
                        sleep(delay_in_secs);
                        j++;
                }
                (void) printf("Child exit.\n");
                exit(0);
        }

        /* In the parent */

        ret = pthread_mutex_lock (&mutex);
        if (ret != 0) {
                perror ( "Lock mutex");
                goto fin;
        }
        while (counter < nruns) {
                ret = pthread_cond_wait (&cond, &mutex);
                if (ret != 0) {
                        perror ( "Wait on condition");
                        goto fin;
                }
        }
        ret = pthread_mutex_unlock (&mutex);
        if (ret != 0) {
                perror ( "Unlock mutex");
                goto fin;
        }

        ret = waitpid(pid, &status, 0);
        (void) printf("Parent awoken.\n");
        if (ret == pid) {
                if (WEXITSTATUS(status) != 0) {
                        /* The child terminated with error */
                        (void) printf("The child terminated with exit status 
%d.\n",
                            WEXITSTATUS(status));
                        goto fin;
                }
        }

        (void) printf("Parent got %d messages.\n", counter);
fin:
        (void) mq_close(mq);

        ret = mq_unlink(buf_uns);
        if (ret != 0) {
                (void) printf("mq_unlink() failed with %d\n", errno);
                exit(ret);
        }

        (void) printf("Test finished.\n");
        exit(ret);
}
------------------------------------------------------------------------

_______________________________________________
perf-discuss mailing list
perf-discuss@opensolaris.org

_______________________________________________
perf-discuss mailing list
perf-discuss@opensolaris.org

Reply via email to