Modify reader/writer lock to avoid starvation of writer.  The previous
implementation would cause a writer to get starved if readers kept
acquiring the lock.  The new version uses an additional bit to indicate
that a writer is waiting and which keeps readers from starving the
writer.

Signed-off-by: Stephen Hemminger <step...@networkplumber.org>
Acked-by: Morten Brørup <m...@smartsharesystems.com>
---
v2 - incorporate feedback, change from RFC to PATCH

 lib/eal/include/generic/rte_rwlock.h | 119 +++++++++++++++++++--------
 1 file changed, 83 insertions(+), 36 deletions(-)

diff --git a/lib/eal/include/generic/rte_rwlock.h 
b/lib/eal/include/generic/rte_rwlock.h
index da9bc3e9c0e2..59ec54110444 100644
--- a/lib/eal/include/generic/rte_rwlock.h
+++ b/lib/eal/include/generic/rte_rwlock.h
@@ -15,23 +15,46 @@
  * one writer. All readers are blocked until the writer is finished
  * writing.
  *
+ * This version does not give preference to readers or writers
+ * and does not starve either readers or writers.
+ *
+ * See also:
+ *  https://locklessinc.com/articles/locks/
  */
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
+#include <rte_branch_prediction.h>
 #include <rte_common.h>
-#include <rte_atomic.h>
 #include <rte_pause.h>
 
 /**
  * The rte_rwlock_t type.
  *
- * cnt is -1 when write lock is held, and > 0 when read locks are held.
+ * Readers increment the counter by RTE_RWLOCK_READ (4)
+ * Writers set the RTE_RWLOCK_WRITE bit when lock is held
+ *     and set the RTE_RWLOCK_WAIT bit while waiting.
+ *
+ * 31                 2 1 0
+ * +-------------------+-+-+
+ * |  readers          | | |
+ * +-------------------+-+-+
+ *                      ^ ^
+ *                      | |
+ * WRITE: lock held ----/ |
+ * WAIT: writer pending --/
  */
+
+#define RTE_RWLOCK_WAIT         0x1    /* Writer is waiting */
+#define RTE_RWLOCK_WRITE 0x2   /* Writer has the lock */
+#define RTE_RWLOCK_MASK  (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)
+                               /* Writer is waiting or has lock */
+#define RTE_RWLOCK_READ         0x4    /* Reader increment */
+
 typedef struct {
-       volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks held. 
*/
+       int32_t cnt;
 } rte_rwlock_t;
 
 /**
@@ -61,17 +84,24 @@ static inline void
 rte_rwlock_read_lock(rte_rwlock_t *rwl)
 {
        int32_t x;
-       int success = 0;
 
-       while (success == 0) {
-               x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-               /* write lock is held */
-               if (x < 0) {
+       while (1) {
+               /* Wait while writer is present or pending */
+               while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED)
+                      & RTE_RWLOCK_MASK)
                        rte_pause();
-                       continue;
-               }
-               success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
-                                       __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
+
+               /* Try to get read lock */
+               x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
+                                      __ATOMIC_ACQUIRE);
+
+               /* If no writer, then acquire was successful */
+               if (likely(!(x & RTE_RWLOCK_MASK)))
+                       return;
+
+               /* Lost race with writer, backout the change. */
+               __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
+                                  __ATOMIC_RELAXED);
        }
 }
 
@@ -93,17 +123,24 @@ static inline int
 rte_rwlock_read_trylock(rte_rwlock_t *rwl)
 {
        int32_t x;
-       int success = 0;
 
-       while (success == 0) {
-               x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-               /* write lock is held */
-               if (x < 0)
-                       return -EBUSY;
-               success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
-                                       __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
-       }
+       x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
+
+       /* fail if write lock is held or writer is pending */
+       if (x & RTE_RWLOCK_MASK)
+               return -EBUSY;
 
+       /* Try to get read lock */
+       x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
+                              __ATOMIC_ACQUIRE);
+
+       /* Back out if writer raced in */
+       if (unlikely(x & RTE_RWLOCK_MASK)) {
+               __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
+                                  __ATOMIC_RELEASE);
+
+               return -EBUSY;
+       }
        return 0;
 }
 
@@ -116,7 +153,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl)
 static inline void
 rte_rwlock_read_unlock(rte_rwlock_t *rwl)
 {
-       __atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE);
+       __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, __ATOMIC_RELEASE);
 }
 
 /**
@@ -139,11 +176,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl)
        int32_t x;
 
        x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-       if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
-                             __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0)
+       if (x < RTE_RWLOCK_WRITE &&
+           __atomic_compare_exchange_n(&rwl->cnt, &x, x + RTE_RWLOCK_WRITE,
+                                       1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
+               return 0;
+       else
                return -EBUSY;
-
-       return 0;
 }
 
 /**
@@ -156,18 +194,27 @@ static inline void
 rte_rwlock_write_lock(rte_rwlock_t *rwl)
 {
        int32_t x;
-       int success = 0;
 
-       while (success == 0) {
+       while (1) {
                x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
-               /* a lock is held */
-               if (x != 0) {
-                       rte_pause();
-                       continue;
+
+               /* No readers or writers? */
+               if (likely(x < RTE_RWLOCK_WRITE)) {
+                       /* Turn off RTE_RWLOCK_WAIT, turn on RTE_RWLOCK_WRITE */
+                       if (__atomic_compare_exchange_n(&rwl->cnt, &x, 
RTE_RWLOCK_WRITE, 1,
+                                                       __ATOMIC_ACQUIRE, 
__ATOMIC_RELAXED))
+                               return;
                }
-               success = __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
-                                       __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
-       }
+
+               /* Turn on writer wait bit */
+               if (!(x & RTE_RWLOCK_WAIT))
+                       __atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT, 
__ATOMIC_RELAXED);
+
+               /* Wait until no readers befor trying again */
+               while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) > 
RTE_RWLOCK_WAIT)
+                       rte_pause();
+
+    }
 }
 
 /**
@@ -179,7 +226,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl)
 static inline void
 rte_rwlock_write_unlock(rte_rwlock_t *rwl)
 {
-       __atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE);
+       __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE, __ATOMIC_RELEASE);
 }
 
 /**
-- 
2.35.1

Reply via email to