On Wed, Sep 26, 2018 at 01:01:20PM +0200, Peter Zijlstra wrote: > On x86 we cannot do fetch_or with a single instruction and end up > using a cmpxchg loop, this reduces determinism. Replace the fetch_or > with a very tricky composite xchg8 + load. > > The basic idea is that we use xchg8 to test-and-set the pending bit > (when it is a byte) and then a load to fetch the whole word. Using > two instructions of course opens a window we previously did not have. > In particular the ordering between pending and tail is of interrest, > because that is where the split happens. > > The claim is that if we order them, it all works out just fine. There > are two specific cases where the pending,tail state changes: > > - when the 3rd lock(er) comes in and finds pending set, it'll queue > and set tail; since we set tail while pending is set, the ordering > is split is not important (and not fundamentally different form > fetch_or). [*] > > - when the last queued lock holder acquires the lock (uncontended), > we clear the tail and set the lock byte. By first setting the > pending bit this cmpxchg will fail and the later load must then > see the remaining tail. > > Another interesting scenario is where there are only 2 threads: > > lock := (0,0,0) > > CPU 0 CPU 1 > > lock() lock() > trylock(-> 0,0,1) trylock() /* fail */ > return; xchg_relaxed(pending, 1) (-> 0,1,1) > mb() > val = smp_load_acquire(*lock); > > Where, without the mb() the load would've been allowed to return 0 for > the locked byte.
If this were true, we would have a violation of "coherence": C peterz {} P0(atomic_t *val) { int r0; /* in queued_spin_trylock() */ r0 = atomic_cmpxchg_acquire(val, 0, 1); } P1(atomic_t *val) { int r0; int r1; /* in queued_spin_trylock() */ r0 = atomic_cmpxchg_acquire(val, 0, 1); /* or atomic_read(val) */ /* in queued_spin_lock_slowpath)() -- set_pending_fetch_acquire() */ r1 = atomic_read_acquire(val); } exists (0:r0=0 /\ ~1:r0=0 /\ 1:r1=0) Andrea > > [*] there is a fun scenario where the 3rd locker observes the pending > bit, but before it sets the tail, the 1st lock (owner) unlocks and the > 2nd locker acquires the lock, leaving us with a temporary 0,0,1 state. > But this is not different between fetch_or or this new method. > > Suggested-by: Will Deacon <will.dea...@arm.com> > Signed-off-by: Peter Zijlstra (Intel) <pet...@infradead.org> > --- > arch/x86/include/asm/qspinlock.h | 2 + > kernel/locking/qspinlock.c | 56 > ++++++++++++++++++++++++++++++++++++++- > 2 files changed, 57 insertions(+), 1 deletion(-) > > --- a/arch/x86/include/asm/qspinlock.h > +++ b/arch/x86/include/asm/qspinlock.h > @@ -9,6 +9,8 @@ > > #define _Q_PENDING_LOOPS (1 << 9) > > +#define _Q_NO_FETCH_OR > + > #ifdef CONFIG_PARAVIRT_SPINLOCKS > extern void native_queued_spin_lock_slowpath(struct qspinlock *lock, u32 > val); > extern void __pv_init_lock_hash(void); > --- a/kernel/locking/qspinlock.c > +++ b/kernel/locking/qspinlock.c > @@ -232,6 +232,60 @@ static __always_inline u32 xchg_tail(str > #endif /* _Q_PENDING_BITS == 8 */ > > /** > + * set_pending_fetch_acquire - fetch the whole lock value and set pending > and locked > + * @lock : Pointer to queued spinlock structure > + * Return: The previous lock value > + * > + * *,*,* -> *,1,* > + */ > +static __always_inline u32 set_pending_fetch_acquire(struct qspinlock *lock) > +{ > +#if defined(_Q_NO_FETCH_OR) && _Q_PENDING_BITS == 8 > + u32 val; > + > + /* > + * For the !LL/SC archs that do not have a native atomic_fetch_or > + * but do have a native xchg (x86) we can do trickery and avoid the > + * cmpxchg-loop based fetch-or to improve determinism. > + * > + * We first xchg the pending byte to set PENDING, and then issue a load > + * for the rest of the word and mask out the pending byte to compute a > + * 'full' value. > + */ > + val = xchg_relaxed(&lock->pending, 1) << _Q_PENDING_OFFSET; > + /* > + * Ensures the tail load happens after the xchg(). > + * > + * lock unlock (a) > + * xchg ---------------. > + * (b) lock unlock +----- fetch_or > + * load ---------------' > + * lock unlock (c) > + * > + * For both lock and unlock, (a) and (c) are the same as fetch_or(), > + * since either are fully before or after. The only new case is (b), > + * where a concurrent lock or unlock can interleave with the composite > + * operation. > + * > + * In either case, it is the queueing case that is of interrest, > otherwise > + * the whole operation is covered by the xchg() and the tail will be 0. > + * > + * For lock-(b); we only care if we set the PENDING bit or not. If we > lost > + * the PENDING race, we queue. Otherwise we'd observe the tail anyway. > + * > + * For unlock-(b); since we'll have set PENDING, the uncontended claim > + * will fail and we'll observe a !0 tail. > + */ > + smp_mb__after_atomic(); > + val |= atomic_read_acquire(&lock->val) & ~_Q_PENDING_MASK; > + > + return val; > +#else > + return atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); > +#endif > +} > + > +/** > * set_locked - Set the lock bit and own the lock > * @lock: Pointer to queued spinlock structure > * > @@ -328,7 +382,7 @@ void queued_spin_lock_slowpath(struct qs > * > * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock > */ > - val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); > + val = set_pending_fetch_acquire(lock); > > /* > * If we observe contention, there was a concurrent lock. > >