> From: Stephen Hemminger [mailto:step...@networkplumber.org] > Sent: Tuesday, 19 July 2022 22.28 > > Modify reader/writer lock to avoid starvation of writer. The previous > implementation would cause a writer to get starved if readers kept > acquiring the lock. The new version uses an additional bit to indicate > that a writer is waiting and which keeps readers from starving the > writer. > > Signed-off-by: Stephen Hemminger <step...@networkplumber.org> > Acked-by: Morten Brørup <m...@smartsharesystems.com> > --- > v2 - incorporate feedback, change from RFC to PATCH > > lib/eal/include/generic/rte_rwlock.h | 119 +++++++++++++++++++-------- > 1 file changed, 83 insertions(+), 36 deletions(-) > > diff --git a/lib/eal/include/generic/rte_rwlock.h > b/lib/eal/include/generic/rte_rwlock.h > index da9bc3e9c0e2..59ec54110444 100644 > --- a/lib/eal/include/generic/rte_rwlock.h > +++ b/lib/eal/include/generic/rte_rwlock.h > @@ -15,23 +15,46 @@ > * one writer. All readers are blocked until the writer is finished > * writing. > * > + * This version does not give preference to readers or writers > + * and does not starve either readers or writers. > + * > + * See also: > + * https://locklessinc.com/articles/locks/ > */ > > #ifdef __cplusplus > extern "C" { > #endif > > +#include <rte_branch_prediction.h> > #include <rte_common.h> > -#include <rte_atomic.h> > #include <rte_pause.h> > > /** > * The rte_rwlock_t type. > * > - * cnt is -1 when write lock is held, and > 0 when read locks are > held. > + * Readers increment the counter by RTE_RWLOCK_READ (4) > + * Writers set the RTE_RWLOCK_WRITE bit when lock is held > + * and set the RTE_RWLOCK_WAIT bit while waiting. > + * > + * 31 2 1 0 > + * +-------------------+-+-+ > + * | readers | | | > + * +-------------------+-+-+ > + * ^ ^ > + * | | > + * WRITE: lock held ----/ | > + * WAIT: writer pending --/ > */ > + > +#define RTE_RWLOCK_WAIT 0x1 /* Writer is waiting */ > +#define RTE_RWLOCK_WRITE 0x2 /* Writer has the lock */ > +#define RTE_RWLOCK_MASK (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE) > + /* Writer is waiting or has lock */ > +#define RTE_RWLOCK_READ 0x4 /* Reader increment */ > + > typedef struct { > - volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks > held. */ > + int32_t cnt; > } rte_rwlock_t; > > /** > @@ -61,17 +84,24 @@ static inline void > rte_rwlock_read_lock(rte_rwlock_t *rwl) > { > int32_t x; > - int success = 0; > > - while (success == 0) { > - x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > - /* write lock is held */ > - if (x < 0) { > + while (1) { > + /* Wait while writer is present or pending */ > + while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) > + & RTE_RWLOCK_MASK) > rte_pause(); > - continue; > - } > - success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, > 1, > - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); > + > + /* Try to get read lock */ > + x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_ACQUIRE); > + > + /* If no writer, then acquire was successful */ > + if (likely(!(x & RTE_RWLOCK_MASK))) > + return; > + > + /* Lost race with writer, backout the change. */ > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_RELAXED); > } > } > > @@ -93,17 +123,24 @@ static inline int > rte_rwlock_read_trylock(rte_rwlock_t *rwl) > { > int32_t x; > - int success = 0; > > - while (success == 0) { > - x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > - /* write lock is held */ > - if (x < 0) > - return -EBUSY; > - success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, > 1, > - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); > - } > + x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > + > + /* fail if write lock is held or writer is pending */ > + if (x & RTE_RWLOCK_MASK) > + return -EBUSY; > > + /* Try to get read lock */ > + x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_ACQUIRE); > + > + /* Back out if writer raced in */ > + if (unlikely(x & RTE_RWLOCK_MASK)) { > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_RELEASE); > + > + return -EBUSY; > + } > return 0; > } > > @@ -116,7 +153,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl) > static inline void > rte_rwlock_read_unlock(rte_rwlock_t *rwl) > { > - __atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE); > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, __ATOMIC_RELEASE); > } > > /** > @@ -139,11 +176,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl) > int32_t x; > > x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > - if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1, > - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0) > + if (x < RTE_RWLOCK_WRITE &&
"x < RTE_RWLOCK_WRITE" will permit this writher thread to race a waiting writer thread, while the waiting writer thread is executing rte_pause(). Have you considered "!x" instead, giving priority to the waiting thread? I suppose your solution is better, because we know that this writer thread is actively running, while the waiting writer thread may have been put on hold by the O/S scheduler. > + __atomic_compare_exchange_n(&rwl->cnt, &x, x + > RTE_RWLOCK_WRITE, Only a matter of taste, but I would prefer "x | RTE_RWLOCK_WRITE" over "x + RTE_RWLOCK_WRITE". You can leave it as is. > + 1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) > + return 0; > + else > return -EBUSY; > - > - return 0; > } > > /** > @@ -156,18 +194,27 @@ static inline void > rte_rwlock_write_lock(rte_rwlock_t *rwl) > { > int32_t x; > - int success = 0; > > - while (success == 0) { > + while (1) { > x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > - /* a lock is held */ > - if (x != 0) { > - rte_pause(); > - continue; > + > + /* No readers or writers? */ > + if (likely(x < RTE_RWLOCK_WRITE)) { > + /* Turn off RTE_RWLOCK_WAIT, turn on RTE_RWLOCK_WRITE > */ > + if (__atomic_compare_exchange_n(&rwl->cnt, &x, > RTE_RWLOCK_WRITE, 1, > + __ATOMIC_ACQUIRE, > __ATOMIC_RELAXED)) > + return; See comment below; this is the point I refer to as the "next race". > } > - success = __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1, > - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); > - } > + > + /* Turn on writer wait bit */ > + if (!(x & RTE_RWLOCK_WAIT)) > + __atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT, > __ATOMIC_RELAXED); Is there a risk of race with two writer threads at this location? If a reader is active, and two writer threads reach this point simultaneously, they will both set RTE_RWLOCK_WAIT here. And then, when the reader thread is done, one of the writer thread will win the next race and replace RTE_RWLOCK_WAIT by RTE_RWLOCK_WRITE. The winning thread will then do its job and afterwards clear RTE_RWLOCK_WRITE. This means that both RTE_RWLOCK_WAIT and RTE_RWLOCK_WRITE have been cleared, but RTE_RWLOCK_WAIT should remain set for the writer thread that lost the race. Did I miss something? It does work with only one writer thread, though. > + > + /* Wait until no readers befor trying again */ Typo: befor -> before. > + while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) > > RTE_RWLOCK_WAIT) > + rte_pause(); > + > + } > } > > /** > @@ -179,7 +226,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl) > static inline void > rte_rwlock_write_unlock(rte_rwlock_t *rwl) > { > - __atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE); > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE, > __ATOMIC_RELEASE); Yes. This is correct, regardless if another writer thread is waiting or not. (Reviewed for one writer thread using rte_rwlock_write_lock() and another using rte_rwlock_write_trylock().) > } > > /** > -- > 2.35.1 > The comments in this version are good too.