Hello!

This is an updated version of Oleg Nesterov's QRCU that avoids the
earlier lock acquisition on the synchronize_qrcu() fastpath.  This passes
rcutorture on x86 and the weakly ordered POWER.  A promela model of the
code passes as noted before for 2 readers and 3 updaters and for 3 readers
and 2 updaters.  3 readers and 3 updaters runs every machine that I have
access to out of memory -- nothing like a little combinatorial explosion!
However, after some thought, the proof ended up being simple enough:

1.      If synchronize_qrcu() exits too soon, then by definition
        there has been a reader present during synchronize_srcu()'s
        full execution.

2.      The counter corresponding to this reader will be at least
        1 at all times.

3.      The synchronize_qrcu() code forces at least one of the counters
        to be at least one at all times -- if there is a reader, the
        sum will be at least two.  (Unfortunately, we cannot fetch
        the pair of counters atomically.)

4.      Therefore, the only way that synchronize_qrcu()s fastpath can
        see a sum of 1 is if it races with another synchronize_qrcu() --
        the first synchronize_qrcu() must read one of the counters before
        the second synchronize_qrcu() increments it, and must read the
        other counter after the second synchronize_qrcu() decrements it.
        There can be at most one reader present through this entire
        operation -- otherwise, the first synchronize_qrcu() will see
        a sum of 2 or greater.

5.      But the second synchronize_qrcu() will not release the mutex
        until after the reader is done.  During this time, the first
        synchronize_qrcu() will always see a sum of at least 2, and
        therefore cannot take the remainder of the fastpath until the
        reader is done.

6.      Because the second synchronize_qrcu() holds the mutex, no other
        synchronize_qrcu() can manipulate the counters until the reader
        is done.  A repeat of the race called out in #4 above therefore
        cannot happen until after the reader is done, in which case it
        is safe for the first synchronize_qrcu() to proceed.

Therefore, two summations of the counter separated by a memory barrier
suffices and the implementation shown below also suffices.

(And, yes, the fastpath -could- check for a sum of zero and exit
immediately, but this would help only in case of a three-way race
between two synchronize_qrcu()s and a qrcu_read_unlock(), would add
another compare, so is not worth it.)

Signed-off-by: Paul E. McKenney <[EMAIL PROTECTED]>
---

 include/linux/srcu.h |   22 +++++++++++++
 kernel/srcu.c        |   86 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 108 insertions(+)

diff -urpNa -X dontdiff linux-2.6.19/include/linux/srcu.h 
linux-2.6.19-qrcu-fp/include/linux/srcu.h
--- linux-2.6.19/include/linux/srcu.h   2006-11-29 13:57:37.000000000 -0800
+++ linux-2.6.19-qrcu-fp/include/linux/srcu.h   2007-02-05 16:00:18.000000000 
-0800
@@ -27,6 +27,8 @@
 #ifndef _LINUX_SRCU_H
 #define _LINUX_SRCU_H
 
+#include <linux/wait.h>
+
 struct srcu_struct_array {
        int c[2];
 };
@@ -50,4 +52,24 @@ void srcu_read_unlock(struct srcu_struct
 void synchronize_srcu(struct srcu_struct *sp);
 long srcu_batches_completed(struct srcu_struct *sp);
 
+/*
+ * fully compatible with srcu, but optimized for writers.
+ */
+
+struct qrcu_struct {
+       int completed;
+       atomic_t ctr[2];
+       wait_queue_head_t wq;
+       struct mutex mutex;
+};
+
+int init_qrcu_struct(struct qrcu_struct *qp);
+int qrcu_read_lock(struct qrcu_struct *qp);
+void qrcu_read_unlock(struct qrcu_struct *qp, int idx);
+void synchronize_qrcu(struct qrcu_struct *qp);
+
+static inline void cleanup_qrcu_struct(struct qrcu_struct *qp)
+{
+}
+
 #endif
diff -urpNa -X dontdiff linux-2.6.19/kernel/srcu.c 
linux-2.6.19-qrcu-fp/kernel/srcu.c
--- linux-2.6.19/kernel/srcu.c  2006-11-29 13:57:37.000000000 -0800
+++ linux-2.6.19-qrcu-fp/kernel/srcu.c  2007-02-11 18:10:35.000000000 -0800
@@ -256,3 +256,89 @@ EXPORT_SYMBOL_GPL(srcu_read_unlock);
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 EXPORT_SYMBOL_GPL(srcu_batches_completed);
 EXPORT_SYMBOL_GPL(srcu_readers_active);
+
+int init_qrcu_struct(struct qrcu_struct *qp)
+{
+       qp->completed = 0;
+       atomic_set(qp->ctr + 0, 1);
+       atomic_set(qp->ctr + 1, 0);
+       init_waitqueue_head(&qp->wq);
+       mutex_init(&qp->mutex);
+
+       return 0;
+}
+
+int qrcu_read_lock(struct qrcu_struct *qp)
+{
+       for (;;) {
+               int idx = qp->completed & 0x1;
+               if (likely(atomic_inc_not_zero(qp->ctr + idx)))
+                       return idx;
+       }
+}
+
+void qrcu_read_unlock(struct qrcu_struct *qp, int idx)
+{
+       if (atomic_dec_and_test(qp->ctr + idx))
+               wake_up(&qp->wq);
+}
+
+void synchronize_qrcu(struct qrcu_struct *qp)
+{
+       int idx;
+
+       smp_mb();  /* Force preceding change to happen before fastpath check. */
+
+       /*
+        * Fastpath: If the two counters sum to "1" at a given point in
+        * time, there are no readers.  However, it takes two separate
+        * loads to sample both counters, which won't occur simultaneously.
+        * So we might race with a counter switch, so that we might see
+        * ctr[0]==0, then the counter might switch, then we might see
+        * ctr[1]==1 (unbeknownst to us because there is a reader still
+        * there).  So we do a read memory barrier and recheck.  If the
+        * same race happens again, there must have been a second counter
+        * switch.  This second counter switch could not have happened
+        * until all preceding readers finished, so if the condition
+        * is true both times, we may safely proceed.
+        *
+        * This relies critically on the atomic increment and atomic
+        * decrement being seen as executing in order.
+        */
+
+       if (atomic_read(&qp->ctr[0]) + atomic_read(&qp->ctr[1]) <= 1) {
+               smp_rmb();  /* Keep two checks independent. */
+               if (atomic_read(&qp->ctr[0]) + atomic_read(&qp->ctr[1]) <= 1)
+                       goto out;
+       }
+
+       mutex_lock(&qp->mutex);
+
+       idx = qp->completed & 0x1;
+       if (atomic_read(qp->ctr + idx) == 1)
+               goto out_unlock;
+
+       atomic_inc(qp->ctr + (idx ^ 0x1));
+
+       /*
+        * Prevent subsequent decrement from being seen before previous
+        * increment -- such an inversion could cause the fastpath
+        * above to falsely conclude that there were no readers.  Also,
+        * reduce the likelihood that qrcu_read_lock() will loop.
+        */
+
+       smp_mb__after_atomic_inc();
+       qp->completed++;
+
+       atomic_dec(qp->ctr + idx);
+       __wait_event(qp->wq, !atomic_read(qp->ctr + idx));
+out_unlock:
+       mutex_unlock(&qp->mutex);
+out:
+       smp_mb(); /* force subsequent free after qrcu_read_unlock(). */
+}
+
+EXPORT_SYMBOL_GPL(init_qrcu_struct);
+EXPORT_SYMBOL_GPL(qrcu_read_lock);
+EXPORT_SYMBOL_GPL(qrcu_read_unlock);
+EXPORT_SYMBOL_GPL(synchronize_qrcu);
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [EMAIL PROTECTED]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to