> Date: Sat, 12 Mar 2022 10:42:18 +0000
> From: Taylor R Campbell <riastr...@netbsd.org>
> 
> - The one-word difference is immaterial for ordering atomic-r/m/w and
>   then load/store (or, equivalently, ll/sc and then load/store) -- so
>   the change doesn't affect mutex_enter-type operations implemented
>   with, e.g., atomic_cas.

It turns out this argument I made is wrong!  Although not for any of
the reasons discussed up-thread.

The _conclusion_ is correct that the proposed change (w/rw to r/rw)
doesn't affect mutex_enter-type operations with atomic_cas -- doing
atomic-r/m/w then membar-r/rw makes a load-acquire operation which is
sufficient for locking (provided unlocking does store-release as
usual) to serialize the critical sections.

So r/rw suffices for the Solaris language of roughly `lock acquisition
before load/store', whether `lock acquisition' is an atomic
test-and-set, compare-and-swap, ll/sc, or even crazier schemes like
Dekker's algorithm -- noting that Dekker's algorithm requires an
_additional_ store-before-load barrier _inside_ `lock acquisition'.
And r/rw is often much cheaper than w/rw.

But the _premise_ I stated above is wrong, because atomic-r/m/w then
membar-r/rw is _sometimes_ different from atomic-r/m/w then
membar-w/rw.  This can be exhibited through Dekker's algorithm, which
(for two CPUs) is:

        int waiting[2];
        int turn;

        lock()
        {
        top:    waiting[curcpu()] = 1;
                membar(w/r);
                while (waiting[1 - curcpu()]) {
                        if (turn != curcpu()) {
                                waiting[curcpu()] = 0;
                                while (turn != curcpu())
                                        continue;
                                goto top;
                        }
                }
                membar(r/rw);   /* `lock acquisition then load/store' */
        }

        unlock()
        {
                membar(rw/w);   /* `load/store then lock release' */
                turn = 1 - curcpu();
                waiting[curcpu()] = 0;
        }

For the first two lines of lock(), we can obviously use an atomic_swap
instead of a simple store, and the stronger membar(w/rw) instead of
membar(w/r):

        lock()
        {
        top:    (void)atomic_swap(&waiting[curcpu()], 1);
                membar(w/rw);
                ...

This is an atomic-r/m/w then membar(w/rw).  Dekker's algorithm
notoriously _requires_ store-before-load ordering here.  So replacing
the membar(w/rw) by membar(r/rw) can't be correct, even though it
immediately follows an atomic-r/m/w.

The difference is exhibited by the attached program when run on an
rpi4 -- it should print 40000000 if lock/unlock are correct, but it
sometimes prints a slightly lower count because Dekker's algorithm
fails to achieve mutual exclusion with  DMB ISHLD  (that is, r/rw)
instead of  DMB ISH  (that is, rw/rw).
#include <sys/atomic.h>

#include <assert.h>
#include <err.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

#undef  BROKEN
#define BROKEN  1

#if defined(__amd64__)
#define membar_acquire()        asm volatile("" ::: "memory")
#define membar_release()        asm volatile("" ::: "memory")
#ifdef BROKEN   /* not really broken because atomic_swap implies seq_cst */
#define membar_dekker()         asm volatile("" ::: "memory")
#else
#define membar_dekker()         asm volatile("mfence" ::: "memory")
#endif
#define noop()                  asm volatile("pause" ::: "memory")
#elif defined(__aarch64__)
#define membar_acquire()        asm volatile("dmb ishld" ::: "memory")
#define membar_release()        asm volatile("dmb ish" ::: "memory")
#ifdef BROKEN
#define membar_dekker()         asm volatile("dmb ishld" ::: "memory")
#else
#define membar_dekker()         asm volatile("dmb ish" ::: "memory")
#endif
#define noop()                  asm volatile("yield" ::: "memory")
#endif

volatile struct {
        unsigned v;
        uint8_t pad[128 - sizeof(unsigned)];
} waiting[2] __aligned((128));
volatile unsigned turn;
volatile unsigned counter;

static void
lock(unsigned me)
{

top:    atomic_swap_uint(&waiting[me].v, 1);
        membar_dekker();
        while (waiting[1 - me].v) {
                if (turn != me) {
                        waiting[me].v = 0;
                        while (turn != me)
                                continue;
                        goto top;
                }
        }
        membar_acquire();
}

static void
unlock(unsigned me)
{

        membar_release();
        turn = 1 - me;
        waiting[me].v = 0;
}

static void *
thread(void *cookie)
{
        unsigned me = (uintptr_t)cookie;
        unsigned i;

        for (i = 10000000; i --> 0;) {
                lock(me);
                counter++;
                noop();
                counter++;
                unlock(me);
        }

        return NULL;
}

int
main(void)
{
        pthread_t t[2];
        unsigned i;
        int error;

        for (i = 0; i < 2; i++) {
                error = pthread_create(&t[i], NULL, &thread,
                    (void *)(uintptr_t)i);
                if (error)
                        errc(1, error, "pthread_create");
        }
        for (i = 0; i < 2; i++) {
                error = pthread_join(t[i], NULL);
                if (error)
                        errc(1, error, "pthread_join");
        }
        printf("%u\n", counter);
        fflush(stdout);
        return ferror(stdout);
}

Reply via email to