On Fri, Jul 15, 2016 at 12:07:03PM +0200, Peter Zijlstra wrote:
> > So if we are kicked by the unlock_slowpath, and the lock is stealed by
> > someone else,  we need hash its node again and set l->locked to
> > _Q_SLOW_VAL, then enter pv_wait.
> 
> Right, let me go think about this a bit.

Urgh, brain hurt.

So I _think_ the below does for it but I could easily have missed yet
another case.

Waiman's patch has the problem that it can have two pv_hash() calls for
the same lock in progress and I'm thinking that means we can hit the
BUG() in pv_hash() due to that.

If we can't, it still has a problem because its not telling us either.



--- a/kernel/locking/qspinlock_paravirt.h
+++ b/kernel/locking/qspinlock_paravirt.h
@@ -20,7 +20,8 @@
  * native_queued_spin_unlock().
  */
 
-#define _Q_SLOW_VAL    (3U << _Q_LOCKED_OFFSET)
+#define _Q_HASH_VAL    (3U << _Q_LOCKED_OFFSET)
+#define _Q_SLOW_VAL    (7U << _Q_LOCKED_OFFSET)
 
 /*
  * Queue Node Adaptive Spinning
@@ -36,14 +37,11 @@
  */
 #define PV_PREV_CHECK_MASK     0xff
 
-/*
- * Queue node uses: vcpu_running & vcpu_halted.
- * Queue head uses: vcpu_running & vcpu_hashed.
- */
 enum vcpu_state {
-       vcpu_running = 0,
-       vcpu_halted,            /* Used only in pv_wait_node */
-       vcpu_hashed,            /* = pv_hash'ed + vcpu_halted */
+       vcpu_node_running = 0,
+       vcpu_node_halted,
+       vcpu_head_running,
+       vcpu_head_halted,
 };
 
 struct pv_node {
@@ -263,7 +261,7 @@ pv_wait_early(struct pv_node *prev, int
        if ((loop & PV_PREV_CHECK_MASK) != 0)
                return false;
 
-       return READ_ONCE(prev->state) != vcpu_running;
+       return READ_ONCE(prev->state) & 1;
 }
 
 /*
@@ -311,20 +309,19 @@ static void pv_wait_node(struct mcs_spin
                 *
                 * Matches the cmpxchg() from pv_kick_node().
                 */
-               smp_store_mb(pn->state, vcpu_halted);
+               smp_store_mb(pn->state, vcpu_node_halted);
 
-               if (!READ_ONCE(node->locked)) {
-                       qstat_inc(qstat_pv_wait_node, true);
-                       qstat_inc(qstat_pv_wait_early, wait_early);
-                       pv_wait(&pn->state, vcpu_halted);
-               }
+               if (READ_ONCE(node->locked))
+                       return;
+
+               qstat_inc(qstat_pv_wait_node, true);
+               qstat_inc(qstat_pv_wait_early, wait_early);
+               pv_wait(&pn->state, vcpu_node_halted);
 
                /*
-                * If pv_kick_node() changed us to vcpu_hashed, retain that
-                * value so that pv_wait_head_or_lock() knows to not also try
-                * to hash this lock.
+                * If pv_kick_node() advanced us, retain that state.
                 */
-               cmpxchg(&pn->state, vcpu_halted, vcpu_running);
+               cmpxchg(&pn->state, vcpu_node_halted, vcpu_node_running);
 
                /*
                 * If the locked flag is still not set after wakeup, it is a
@@ -362,18 +359,17 @@ static void pv_kick_node(struct qspinloc
         *
         * Matches with smp_store_mb() and cmpxchg() in pv_wait_node()
         */
-       if (cmpxchg(&pn->state, vcpu_halted, vcpu_hashed) != vcpu_halted)
+       if (cmpxchg(&pn->state, vcpu_node_halted, vcpu_head_running) != 
vcpu_node_halted)
                return;
 
        /*
-        * Put the lock into the hash table and set the _Q_SLOW_VAL.
-        *
-        * As this is the same vCPU that will check the _Q_SLOW_VAL value and
-        * the hash table later on at unlock time, no atomic instruction is
-        * needed.
+        * See pv_wait_head_or_lock(). We have to hash and force the unlock
+        * into the slow path to deliver the actual kick for waking.
         */
-       WRITE_ONCE(l->locked, _Q_SLOW_VAL);
-       (void)pv_hash(lock, pn);
+       if (cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL) == _Q_LOCKED_VAL) {
+               (void)pv_hash(lock, pn);
+               smp_store_release(&l->locked, _Q_SLOW_VAL);
+       }
 }
 
 /*
@@ -388,28 +384,22 @@ pv_wait_head_or_lock(struct qspinlock *l
 {
        struct pv_node *pn = (struct pv_node *)node;
        struct __qspinlock *l = (void *)lock;
-       struct qspinlock **lp = NULL;
        int waitcnt = 0;
        int loop;
 
        /*
-        * If pv_kick_node() already advanced our state, we don't need to
-        * insert ourselves into the hash table anymore.
-        */
-       if (READ_ONCE(pn->state) == vcpu_hashed)
-               lp = (struct qspinlock **)1;
-
-       /*
         * Tracking # of slowpath locking operations
         */
        qstat_inc(qstat_pv_lock_slowpath, true);
 
        for (;; waitcnt++) {
+               u8 locked;
+
                /*
                 * Set correct vCPU state to be used by queue node wait-early
                 * mechanism.
                 */
-               WRITE_ONCE(pn->state, vcpu_running);
+               WRITE_ONCE(pn->state, vcpu_head_running);
 
                /*
                 * Set the pending bit in the active lock spinning loop to
@@ -423,33 +413,38 @@ pv_wait_head_or_lock(struct qspinlock *l
                }
                clear_pending(lock);
 
+               /*
+                * We want to go sleep; ensure we're hashed so that
+                * __pv_queued_spin_unlock_slow() can find us for a wakeup.
+                */
+               locked = cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL);
+               switch (locked) {
+               /*
+                * We're not hashed yet, either we're fresh from pv_wait_node()
+                * or __pv_queued_spin_unlock_slow() unhashed us but we lost
+                * the trylock to a steal and have to re-hash.
+                */
+               case _Q_LOCKED_VAL:
+                       (void)pv_hash(lock, pn);
+                       smp_store_release(&l->locked, _Q_SLOW_VAL);
+                       break;
 
-               if (!lp) { /* ONCE */
-                       lp = pv_hash(lock, pn);
+               /*
+                * pv_kick_node() is hashing us, wait for it.
+                */
+               case _Q_HASH_VAL:
+                       while (READ_ONCE(l->locked) == _Q_HASH_VAL)
+                               cpu_relax();
+                       break;
 
-                       /*
-                        * We must hash before setting _Q_SLOW_VAL, such that
-                        * when we observe _Q_SLOW_VAL in 
__pv_queued_spin_unlock()
-                        * we'll be sure to be able to observe our hash entry.
-                        *
-                        *   [S] <hash>                 [Rmw] l->locked == 
_Q_SLOW_VAL
-                        *       MB                           RMB
-                        * [RmW] l->locked = _Q_SLOW_VAL  [L] <unhash>
-                        *
-                        * Matches the smp_rmb() in __pv_queued_spin_unlock().
-                        */
-                       if (xchg(&l->locked, _Q_SLOW_VAL) == 0) {
-                               /*
-                                * The lock was free and now we own the lock.
-                                * Change the lock value back to _Q_LOCKED_VAL
-                                * and unhash the table.
-                                */
-                               WRITE_ONCE(l->locked, _Q_LOCKED_VAL);
-                               WRITE_ONCE(*lp, NULL);
-                               goto gotlock;
-                       }
+               /*
+                * Ooh, unlocked, try and grab it.
+                */
+               case 0:
+                       continue;
                }
-               WRITE_ONCE(pn->state, vcpu_hashed);
+
+               WRITE_ONCE(pn->state, vcpu_head_halted);
                qstat_inc(qstat_pv_wait_head, true);
                qstat_inc(qstat_pv_wait_again, waitcnt);
                pv_wait(&l->locked, _Q_SLOW_VAL);
@@ -480,7 +475,7 @@ __pv_queued_spin_unlock_slowpath(struct
        struct __qspinlock *l = (void *)lock;
        struct pv_node *node;
 
-       if (unlikely(locked != _Q_SLOW_VAL)) {
+       if (unlikely(locked != _Q_SLOW_VAL && locked != _Q_HASH_VAL)) {
                WARN(!debug_locks_silent,
                     "pvqspinlock: lock 0x%lx has corrupted value 0x%x!\n",
                     (unsigned long)lock, atomic_read(&lock->val));
@@ -488,18 +483,17 @@ __pv_queued_spin_unlock_slowpath(struct
        }
 
        /*
-        * A failed cmpxchg doesn't provide any memory-ordering guarantees,
-        * so we need a barrier to order the read of the node data in
-        * pv_unhash *after* we've read the lock being _Q_SLOW_VAL.
-        *
-        * Matches the cmpxchg() in pv_wait_head_or_lock() setting _Q_SLOW_VAL.
+        * Wait until the hash-bucket is complete.
         */
-       smp_rmb();
+       while (READ_ONCE(l->locked) == _Q_HASH_VAL)
+               cpu_relax();
 
        /*
-        * Since the above failed to release, this must be the SLOW path.
-        * Therefore start by looking up the blocked node and unhashing it.
+        * Must first observe _Q_SLOW_VAL in order to observe
+        * consistent hash bucket.
         */
+       smp_rmb();
+
        node = pv_unhash(lock);
 
        /*

Reply via email to