On Thu, Jun 02, 2016 at 06:57:00PM +0100, Will Deacon wrote:
> > This 'replaces' commit:
> > 
> >   54cf809b9512 ("locking,qspinlock: Fix spin_is_locked() and 
> > spin_unlock_wait()")
> > 
> > and seems to still work with the test case from that thread while
> > getting rid of the extra barriers.

New version :-)

This one has moar comments; and also tries to address an issue with
xchg_tail(), which is its own consumer. Paul, Will said you'd love the
address dependency through union members :-)

(I should split this in at least 3 patches I suppose)

ARM64 and PPC should provide private versions of is_locked and
unlock_wait; because while this one deals with the unordered store as
per qspinlock construction, it still relies on cmpxchg_acquire()'s store
being visible.

---
 include/asm-generic/qspinlock.h |  40 ++++---------
 kernel/locking/qspinlock.c      | 126 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 137 insertions(+), 29 deletions(-)

diff --git a/include/asm-generic/qspinlock.h b/include/asm-generic/qspinlock.h
index 6bd05700d8c9..3020bdb7a43c 100644
--- a/include/asm-generic/qspinlock.h
+++ b/include/asm-generic/qspinlock.h
@@ -26,33 +26,18 @@
  * @lock: Pointer to queued spinlock structure
  * Return: 1 if it is locked, 0 otherwise
  */
+#ifndef queued_spin_is_locked
 static __always_inline int queued_spin_is_locked(struct qspinlock *lock)
 {
        /*
-        * queued_spin_lock_slowpath() can ACQUIRE the lock before
-        * issuing the unordered store that sets _Q_LOCKED_VAL.
+        * See queued_spin_unlock_wait().
         *
-        * See both smp_cond_acquire() sites for more detail.
-        *
-        * This however means that in code like:
-        *
-        *   spin_lock(A)               spin_lock(B)
-        *   spin_unlock_wait(B)        spin_is_locked(A)
-        *   do_something()             do_something()
-        *
-        * Both CPUs can end up running do_something() because the store
-        * setting _Q_LOCKED_VAL will pass through the loads in
-        * spin_unlock_wait() and/or spin_is_locked().
-        *
-        * Avoid this by issuing a full memory barrier between the spin_lock()
-        * and the loads in spin_unlock_wait() and spin_is_locked().
-        *
-        * Note that regular mutual exclusion doesn't care about this
-        * delayed store.
+        * Any !0 state indicates it is locked, even if _Q_LOCKED_VAL
+        * isn't immediately observable.
         */
-       smp_mb();
-       return atomic_read(&lock->val) & _Q_LOCKED_MASK;
+       return !!atomic_read(&lock->val);
 }
+#endif
 
 /**
  * queued_spin_value_unlocked - is the spinlock structure unlocked?
@@ -78,6 +63,7 @@ static __always_inline int queued_spin_is_contended(struct 
qspinlock *lock)
 {
        return atomic_read(&lock->val) & ~_Q_LOCKED_MASK;
 }
+
 /**
  * queued_spin_trylock - try to acquire the queued spinlock
  * @lock : Pointer to queued spinlock structure
@@ -123,19 +109,15 @@ static __always_inline void queued_spin_unlock(struct 
qspinlock *lock)
 #endif
 
 /**
- * queued_spin_unlock_wait - wait until current lock holder releases the lock
+ * queued_spin_unlock_wait - wait until the _current_ lock holder releases the 
lock
  * @lock : Pointer to queued spinlock structure
  *
  * There is a very slight possibility of live-lock if the lockers keep coming
  * and the waiter is just unfortunate enough to not see any unlock state.
  */
-static inline void queued_spin_unlock_wait(struct qspinlock *lock)
-{
-       /* See queued_spin_is_locked() */
-       smp_mb();
-       while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
-               cpu_relax();
-}
+#ifndef queued_spin_unlock_wait
+void queued_spin_unlock_wait(struct qspinlock *lock);
+#endif
 
 #ifndef virt_spin_lock
 static __always_inline bool virt_spin_lock(struct qspinlock *lock)
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index ce2f75e32ae1..e1c29d352e0e 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -395,6 +395,8 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 
val)
         * pending stuff.
         *
         * p,*,* -> n,*,*
+        *
+        * RELEASE, such that the stores to @node must be complete.
         */
        old = xchg_tail(lock, tail);
        next = NULL;
@@ -405,6 +407,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 
val)
         */
        if (old & _Q_TAIL_MASK) {
                prev = decode_tail(old);
+               /*
+                * The above xchg_tail() is also load of @lock which generates,
+                * through decode_tail(), a pointer.
+                *
+                * The address dependency matches the RELEASE of xchg_tail()
+                * such that the access to @prev must happen after.
+                */
+               smp_read_barrier_depends();
+
                WRITE_ONCE(prev->next, node);
 
                pv_wait_node(node, prev);
@@ -496,6 +507,121 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, 
u32 val)
 EXPORT_SYMBOL(queued_spin_lock_slowpath);
 
 /*
+ * PROBLEM: some architectures have an interesting issue with atomic ACQUIRE
+ * operations in that the ACQUIRE applies to the LOAD _not_ the STORE (ARM64,
+ * PPC). Also qspinlock has a similar issue per construction, the setting of
+ * the locked byte can be unordered acquiring the lock proper.
+ *
+ * This gets to be 'interesting' in the following cases, where the /should/s
+ * end up false because of this issue.
+ *
+ *
+ * CASE 1:
+ *
+ * So the spin_is_locked() correctness issue comes from something like:
+ *
+ *   CPU0                              CPU1
+ *
+ *   global_lock();                    local_lock(i)
+ *     spin_lock(&G)                     spin_lock(&L[i])
+ *     for (i)                           if (!spin_is_locked(&G)) {
+ *       spin_unlock_wait(&L[i]);          smp_acquire__after_ctrl_dep();
+ *                                         return;
+ *                                       }
+ *                                       // deal with fail
+ *
+ * Where it is important CPU1 sees G locked or CPU0 sees L[i] locked such
+ * that there is exclusion between the two critical sections.
+ *
+ * The load from spin_is_locked(&G) /should/ be constrained by the ACQUIRE from
+ * spin_lock(&L[i]), and similarly the load(s) from spin_unlock_wait(&L[i])
+ * /should/ be constrained by the ACQUIRE from spin_lock(&G).
+ *
+ * Similarly, later stuff is constrained by the ACQUIRE from CTRL+RMB.
+ *
+ *
+ * CASE 2:
+ *
+ * For spin_unlock_wait() there is a second correctness issue, namely:
+ *
+ *   CPU0                              CPU1
+ *
+ *   flag = set;
+ *   smp_mb();                         spin_lock(&l)
+ *   spin_unlock_wait(&l);             if (!flag)
+ *                                       // add to lockless list
+ *                                     spin_unlock(&l);
+ *   // iterate lockless list
+ *
+ * Which wants to ensure that CPU1 will stop adding bits to the list and CPU0
+ * will observe the last entry on the list (if spin_unlock_wait() had ACQUIRE
+ * semantics etc..)
+ *
+ * Where flag /should/ be ordered against the locked store of l.
+ */
+
+
+/*
+ * queued_spin_lock_slowpath() can (load-)ACQUIRE the lock before
+ * issuing an _unordered_ store to set _Q_LOCKED_VAL.
+ *
+ * This means that the store can be delayed, but no later than the
+ * store-release from the unlock. This means that simply observing
+ * _Q_LOCKED_VAL is not sufficient to determine if the lock is acquired.
+ *
+ * There are two paths that can issue the unordered store:
+ *
+ *  (1) clear_pending_set_locked():    *,1,0 -> *,0,1
+ *
+ *  (2) set_locked():                  t,0,0 -> t,0,1 ; t != 0
+ *      atomic_cmpxchg_relaxed():      t,0,0 -> 0,0,1
+ *
+ * However, in both cases we have other !0 state we've set before to queue
+ * ourseves:
+ *
+ * For (1) we have the atomic_cmpxchg_acquire() that set _Q_PENDING_VAL, our
+ * load is constrained by that ACQUIRE to not pass before that, and thus must
+ * observe the store.
+ *
+ * For (2) we have a more intersting scenario. We enqueue ourselves using
+ * xchg_tail(), which ends up being a RELEASE. This in itself is not
+ * sufficient, however that is followed by an smp_cond_acquire() on the same
+ * word, giving a RELEASE->ACQUIRE ordering. This again constrains our load and
+ * guarantees we must observe that store.
+ *
+ * Therefore both cases have other !0 state that is observable before the
+ * unordered locked byte store comes through. This means we can use that to
+ * wait for the lock store, and then wait for an unlock.
+ */
+#ifndef queued_spin_unlock_wait
+void queued_spin_unlock_wait(struct qspinlock *lock)
+{
+       u32 val;
+
+       for (;;) {
+               val = atomic_read(&lock->val);
+
+               if (!val) /* not locked, we're done */
+                       goto done;
+
+               if (val & _Q_LOCKED_MASK) /* locked, go wait for unlock */
+                       break;
+
+               /* not locked, but pending, wait until we observe the lock */
+               cpu_relax();
+       }
+
+       /* any unlock is good */
+       while (atomic_read(&lock->val) & _Q_LOCKED_MASK)
+               cpu_relax();
+
+done:
+       smp_rmb();
+}
+EXPORT_SYMBOL(queued_spin_unlock_wait);
+#endif
+
+/*
  * Generate the paravirt code for queued_spin_unlock_slowpath().
  */
 #if !defined(_GEN_PV_LOCK_SLOWPATH) && defined(CONFIG_PARAVIRT_SPINLOCKS)

Reply via email to