<snip> > > The original reader/writer lock in DPDK can cause a stream of readers to > starve writers. > > The new version uses an additional bit to indicate that a writer is waiting > and > which keeps readers from starving the writer. This addition makes sense. I am wondering if we should create a new lock. Is it possible that some applications are dependent on the current behavior?
> > Signed-off-by: Stephen Hemminger <step...@networkplumber.org> > --- > Would like this to be in 22.11, but needs some more review > > lib/eal/include/generic/rte_rwlock.h | 93 ++++++++++++++++++---------- > 1 file changed, 61 insertions(+), 32 deletions(-) > > diff --git a/lib/eal/include/generic/rte_rwlock.h > b/lib/eal/include/generic/rte_rwlock.h > index da9bc3e9c0e2..725cd19ffb27 100644 > --- a/lib/eal/include/generic/rte_rwlock.h > +++ b/lib/eal/include/generic/rte_rwlock.h > @@ -13,7 +13,7 @@ > * This file defines an API for read-write locks. The lock is used to > * protect data that allows multiple readers in parallel, but only > * one writer. All readers are blocked until the writer is finished > - * writing. > + * writing. This version will not starve writers. > * > */ > > @@ -28,10 +28,17 @@ extern "C" { > /** > * The rte_rwlock_t type. > * > - * cnt is -1 when write lock is held, and > 0 when read locks are held. > + * Readers increment the counter by RW_READ (4) > + * Writers set the RWLOCK_WRITE bit when lock is held > + * and set the RWLOCK_WAIT bit while waiting. > */ > + > +#define RTE_RWLOCK_WAIT 0x1 /* Writer is waiting */ > +#define RTE_RWLOCK_WRITE 0x2 /* Writer has the lock */ > +#define RTE_RWLOCK_READ 0x4 /* Reader increment */ > + > typedef struct { > - volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks held. > */ > + volatile int32_t cnt; > } rte_rwlock_t; > > /** > @@ -61,17 +68,24 @@ static inline void > rte_rwlock_read_lock(rte_rwlock_t *rwl) { > int32_t x; > - int success = 0; > > - while (success == 0) { > + while (1) { > x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > /* write lock is held */ > - if (x < 0) { > + if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)) { > rte_pause(); > continue; > } > - success = __atomic_compare_exchange_n(&rwl->cnt, &x, x > + 1, 1, > - __ATOMIC_ACQUIRE, > __ATOMIC_RELAXED); > + > + /* Try to get read lock */ > + x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_ACQUIRE); > + if (!(x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE))) > + return; > + > + /* Undo */ > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_RELEASE); > } > } > > @@ -93,17 +107,23 @@ static inline int > rte_rwlock_read_trylock(rte_rwlock_t *rwl) { > int32_t x; > - int success = 0; > > - while (success == 0) { > - x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > - /* write lock is held */ > - if (x < 0) > - return -EBUSY; > - success = __atomic_compare_exchange_n(&rwl->cnt, &x, x > + 1, 1, > - __ATOMIC_ACQUIRE, > __ATOMIC_RELAXED); > - } > + x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > + > + /* write lock is held */ > + if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)) > + return -EBUSY; > + > + /* Try to get read lock */ > + x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_ACQUIRE); > + > + if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)) { > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, > + __ATOMIC_RELEASE); > > + return -EBUSY; > + } > return 0; > } > > @@ -116,7 +136,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl) static > inline void rte_rwlock_read_unlock(rte_rwlock_t *rwl) { > - __atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE); > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ, > __ATOMIC_RELEASE); > } > > /** > @@ -139,11 +159,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl) > int32_t x; > > x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > - if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1, > - __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0) > + if (x < RTE_RWLOCK_WRITE && > + __atomic_compare_exchange_n(&rwl->cnt, &x, x + > RTE_RWLOCK_WRITE, > + 1, __ATOMIC_ACQUIRE, > __ATOMIC_RELAXED)) > + return 0; > + else > return -EBUSY; > - > - return 0; > } > > /** > @@ -156,18 +177,26 @@ static inline void > rte_rwlock_write_lock(rte_rwlock_t *rwl) { > int32_t x; > - int success = 0; > > - while (success == 0) { > + while (1) { > x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED); > - /* a lock is held */ > - if (x != 0) { > - rte_pause(); > - continue; > + > + /* No readers or writers */ > + if (x < RTE_RWLOCK_WRITE) { > + /* Turn off RTE_RWLOCK_WAIT, turn on > RTE_RWLOCK_WRITE */ > + if (__atomic_compare_exchange_n(&rwl->cnt, &x, > RTE_RWLOCK_WRITE, 1, > + > __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) > + return; > } > - success = __atomic_compare_exchange_n(&rwl->cnt, &x, - > 1, 1, > - __ATOMIC_ACQUIRE, > __ATOMIC_RELAXED); > - } > + > + /* Turn on writer wait bit */ > + if (!(x & RTE_RWLOCK_WAIT)) > + __atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT, > __ATOMIC_RELAXED); > + > + /* Wait until can try to take the lock */ > + while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) > > RTE_RWLOCK_WAIT) > + rte_pause(); > + } > } > > /** > @@ -179,7 +208,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl) static > inline void rte_rwlock_write_unlock(rte_rwlock_t *rwl) { > - __atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE); > + __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE, > __ATOMIC_RELEASE); > } > > /** > -- > 2.35.1