Currently we must drop hb->lock in order to call
rt_mutex_timed_futex_lock(), such that it can block and wait for lock
acquisition.

The problem with this is that at this point (and after a failed
acquire but before re-acquiring hb->lock) the hb queue and rt_mutex
wait list disagree on who is waiting to acquire the futex. This leads
to a 'funny' state in wake_futex_pi() which we then need to deal with
in fixup_owner().

This inconsistent state would become even more of a problem when we
move rt_mutex_unlock() out from under hb->lock, which we want to do
because of PI inversion issues.

Avoid the funny state by reusing the
rt_mutex_{start,finish}_proxy_lock() primitives that were created for
requeue-pi.

The idea is to enqueue the rt_mutex_waiter on the rt_mutex wait list
before dropping the hb->lock, this is what rt_mutex_start_proxy_lock()
allows.

We must then wait for the lock with hb->lock dropped, _however_ we
must not do remove_waiter() in case the lock acquire fails, this means
we need to split rt_mutex_finish_proxy_lock() into two parts: wait and
cleanup.

With this done we can do something along the lines of:

        __queue_me();
        rt_mutex_start_proxy_lock();
        spin_unlock(q.lock_ptr);

        ret = rt_mutex_wait_proxy_lock();

        spin_lock(q.lock_ptr);
        if (ret)
                rt_mutex_cleanup_proxy_lock();

And the hb queue and rt_mutex wait lists are always in sync, with the
one except of rt_mutex_futex_unlock(), which removes the new owner
from the rt_mutex wait list which will remain on the hb queue until it
returns from rt_mutex_wait_proxy_lock().

This one case is fine, since unlock has no concurrency with itself.
That is, the new owner must finish and return from futex_lock_pi()
before it can do another unlock (through futex_unlock_pi()), at which
point its state is coherent again.

Signed-off-by: Peter Zijlstra (Intel) <pet...@infradead.org>
---
 kernel/futex.c                  |   78 ++++++++++++++++++++++++++++------------
 kernel/locking/rtmutex.c        |   47 +++++++++++++-----------
 kernel/locking/rtmutex_common.h |   10 +++--
 3 files changed, 88 insertions(+), 47 deletions(-)

--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -2088,20 +2088,7 @@ queue_unlock(struct futex_hash_bucket *h
        hb_waiters_dec(hb);
 }
 
-/**
- * queue_me() - Enqueue the futex_q on the futex_hash_bucket
- * @q: The futex_q to enqueue
- * @hb:        The destination hash bucket
- *
- * The hb->lock must be held by the caller, and is released here. A call to
- * queue_me() is typically paired with exactly one call to unqueue_me().  The
- * exceptions involve the PI related operations, which may use unqueue_me_pi()
- * or nothing if the unqueue is done as part of the wake process and the 
unqueue
- * state is implicit in the state of woken task (see futex_wait_requeue_pi() 
for
- * an example).
- */
-static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
-       __releases(&hb->lock)
+static inline void __queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
 {
        int prio;
 
@@ -2118,6 +2105,24 @@ static inline void queue_me(struct futex
        plist_node_init(&q->list, prio);
        plist_add(&q->list, &hb->chain);
        q->task = current;
+}
+
+/**
+ * queue_me() - Enqueue the futex_q on the futex_hash_bucket
+ * @q: The futex_q to enqueue
+ * @hb:        The destination hash bucket
+ *
+ * The hb->lock must be held by the caller, and is released here. A call to
+ * queue_me() is typically paired with exactly one call to unqueue_me().  The
+ * exceptions involve the PI related operations, which may use unqueue_me_pi()
+ * or nothing if the unqueue is done as part of the wake process and the 
unqueue
+ * state is implicit in the state of woken task (see futex_wait_requeue_pi() 
for
+ * an example).
+ */
+static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
+       __releases(&hb->lock)
+{
+       __queue_me(q, hb);
        spin_unlock(&hb->lock);
 }
 
@@ -2593,6 +2598,7 @@ static int futex_lock_pi(u32 __user *uad
                         ktime_t *time, int trylock)
 {
        struct hrtimer_sleeper timeout, *to = NULL;
+       struct rt_mutex_waiter rt_waiter;
        struct futex_hash_bucket *hb;
        struct futex_q q = futex_q_init;
        int res, ret;
@@ -2645,25 +2651,49 @@ static int futex_lock_pi(u32 __user *uad
                }
        }
 
+       WARN_ON(!q.pi_state);
+
        /*
         * Only actually queue now that the atomic ops are done:
         */
-       queue_me(&q, hb);
+       __queue_me(&q, hb);
 
-       WARN_ON(!q.pi_state);
-       /*
-        * Block on the PI mutex:
-        */
-       if (!trylock) {
-               ret = rt_mutex_timed_futex_lock(&q.pi_state->pi_mutex, to);
-       } else {
+       debug_rt_mutex_init_waiter(&rt_waiter);
+       RB_CLEAR_NODE(&rt_waiter.pi_tree_entry);
+       RB_CLEAR_NODE(&rt_waiter.tree_entry);
+       rt_waiter.task = NULL;
+
+       if (trylock) {
                ret = rt_mutex_futex_trylock(&q.pi_state->pi_mutex);
                /* Fixup the trylock return value: */
                ret = ret ? 0 : -EWOULDBLOCK;
+               goto did_trylock;
        }
 
+       /*
+        * We must add ourselves to the rt_mutex waitlist while holding hb->lock
+        * such that the hb and rt_mutex wait lists match.
+        */
+       rt_mutex_start_proxy_lock(&q.pi_state->pi_mutex, &rt_waiter, current);
+       spin_unlock(q.lock_ptr);
+
+       if (unlikely(to))
+               hrtimer_start_expires(&to->timer, HRTIMER_MODE_ABS);
+
+       ret = rt_mutex_wait_proxy_lock(&q.pi_state->pi_mutex, to, &rt_waiter);
+
        spin_lock(q.lock_ptr);
        /*
+        * If we failed to acquire the lock (signal/timeout), we must
+        * first acquire the hb->lock before removing the lock from the
+        * rt_mutex waitqueue, such that we can keep the hb and rt_mutex
+        * wait lists consistent.
+        */
+       if (ret)
+               rt_mutex_cleanup_proxy_lock(&q.pi_state->pi_mutex, &rt_waiter);
+
+did_trylock:
+       /*
         * Fixup the pi_state owner and possibly acquire the lock if we
         * haven't already.
         */
@@ -3005,10 +3035,12 @@ static int futex_wait_requeue_pi(u32 __u
                 */
                WARN_ON(!q.pi_state);
                pi_mutex = &q.pi_state->pi_mutex;
-               ret = rt_mutex_finish_proxy_lock(pi_mutex, to, &rt_waiter);
+               ret = rt_mutex_wait_proxy_lock(pi_mutex, to, &rt_waiter);
                debug_rt_mutex_free_waiter(&rt_waiter);
 
                spin_lock(q.lock_ptr);
+               if (ret)
+                       rt_mutex_cleanup_proxy_lock(pi_mutex, &rt_waiter);
                /*
                 * Fixup the pi_state owner and possibly acquire the lock if we
                 * haven't already.
--- a/kernel/locking/rtmutex.c
+++ b/kernel/locking/rtmutex.c
@@ -1485,19 +1485,6 @@ int __sched rt_mutex_lock_interruptible(
 EXPORT_SYMBOL_GPL(rt_mutex_lock_interruptible);
 
 /*
- * Futex variant with full deadlock detection.
- * Futex variants must not use the fast-path, see __rt_mutex_futex_unlock().
- */
-int __sched rt_mutex_timed_futex_lock(struct rt_mutex *lock,
-                             struct hrtimer_sleeper *timeout)
-{
-       might_sleep();
-
-       return rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE,
-                                timeout, RT_MUTEX_FULL_CHAINWALK);
-}
-
-/*
  * Futex variant, must not use fastpath.
  */
 int __sched rt_mutex_futex_trylock(struct rt_mutex *lock)
@@ -1745,21 +1732,23 @@ struct task_struct *rt_mutex_next_owner(
 }
 
 /**
- * rt_mutex_finish_proxy_lock() - Complete lock acquisition
+ * rt_mutex_wait_proxy_lock() - Wait for lock acquisition
  * @lock:              the rt_mutex we were woken on
  * @to:                        the timeout, null if none. hrtimer should 
already have
  *                     been started.
  * @waiter:            the pre-initialized rt_mutex_waiter
  *
- * Complete the lock acquisition started our behalf by another thread.
+ * Wait for the the lock acquisition started on our behalf by
+ * rt_mutex_start_proxy_lock(). Upon failure, the caller must call
+ * rt_mutex_cleanup_proxy_lock().
  *
  * Returns:
  *  0 - success
  * <0 - error, one of -EINTR, -ETIMEDOUT
  *
- * Special API call for PI-futex requeue support
+ * Special API call for PI-futex support
  */
-int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
+int rt_mutex_wait_proxy_lock(struct rt_mutex *lock,
                               struct hrtimer_sleeper *to,
                               struct rt_mutex_waiter *waiter)
 {
@@ -1772,9 +1761,6 @@ int rt_mutex_finish_proxy_lock(struct rt
        /* sleep on the mutex */
        ret = __rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, to, waiter);
 
-       if (unlikely(ret))
-               remove_waiter(lock, waiter);
-
        /*
         * try_to_take_rt_mutex() sets the waiter bit unconditionally. We might
         * have to fix that up.
@@ -1785,3 +1771,24 @@ int rt_mutex_finish_proxy_lock(struct rt
 
        return ret;
 }
+
+/**
+ * rt_mutex_cleanup_proxy_lock() - Cleanup failed lock acquisition
+ * @lock:              the rt_mutex we were woken on
+ * @waiter:            the pre-initialized rt_mutex_waiter
+ *
+ * Clean up the failed lock acquisition as per rt_mutex_wait_proxy_lock().
+ *
+ * Special API call for PI-futex support
+ */
+void rt_mutex_cleanup_proxy_lock(struct rt_mutex *lock,
+                                struct rt_mutex_waiter *waiter)
+{
+       raw_spin_lock_irq(&lock->wait_lock);
+
+       remove_waiter(lock, waiter);
+       fixup_rt_mutex_waiters(lock);
+
+       raw_spin_unlock_irq(&lock->wait_lock);
+}
+
--- a/kernel/locking/rtmutex_common.h
+++ b/kernel/locking/rtmutex_common.h
@@ -105,11 +105,13 @@ extern void rt_mutex_proxy_unlock(struct
 extern int rt_mutex_start_proxy_lock(struct rt_mutex *lock,
                                     struct rt_mutex_waiter *waiter,
                                     struct task_struct *task);
-extern int rt_mutex_finish_proxy_lock(struct rt_mutex *lock,
-                                     struct hrtimer_sleeper *to,
-                                     struct rt_mutex_waiter *waiter);
 
-extern int rt_mutex_timed_futex_lock(struct rt_mutex *l, struct 
hrtimer_sleeper *to);
+extern int rt_mutex_wait_proxy_lock(struct rt_mutex *lock,
+                              struct hrtimer_sleeper *to,
+                              struct rt_mutex_waiter *waiter);
+extern void rt_mutex_cleanup_proxy_lock(struct rt_mutex *lock,
+                                struct rt_mutex_waiter *waiter);
+
 extern int rt_mutex_futex_trylock(struct rt_mutex *l);
 
 extern void rt_mutex_futex_unlock(struct rt_mutex *lock);


Reply via email to