> From: Honnappa Nagarahalli [mailto:honnappa.nagaraha...@arm.com]
> Sent: Friday, 8 July 2022 21.22
> 
> <snip>
> >
> > The original reader/writer lock in DPDK can cause a stream of readers
> to
> > starve writers.
> >
> > The new version uses an additional bit to indicate that a writer is
> waiting and
> > which keeps readers from starving the writer.
> This addition makes sense.
> I am wondering if we should create a new lock. Is it possible that some
> applications are dependent on the current behavior?

Any reader risks having to wait a while for a writer to finish its work.

In my opinion, this implementation only increases the probability of that risk 
occurring, but it doesn't change the writer's impact on the readers. Therefore, 
I think this improved implementation can replace the old rwlock.

> 
> >
> > Signed-off-by: Stephen Hemminger <step...@networkplumber.org>
> > ---
> > Would like this to be in 22.11, but needs some more review
> >
> >  lib/eal/include/generic/rte_rwlock.h | 93 ++++++++++++++++++--------
> --
> >  1 file changed, 61 insertions(+), 32 deletions(-)
> >
> > diff --git a/lib/eal/include/generic/rte_rwlock.h
> > b/lib/eal/include/generic/rte_rwlock.h
> > index da9bc3e9c0e2..725cd19ffb27 100644
> > --- a/lib/eal/include/generic/rte_rwlock.h
> > +++ b/lib/eal/include/generic/rte_rwlock.h
> > @@ -13,7 +13,7 @@
> >   * This file defines an API for read-write locks. The lock is used
> to
> >   * protect data that allows multiple readers in parallel, but only
> >   * one writer. All readers are blocked until the writer is finished
> > - * writing.
> > + * writing. This version will not starve writers.
> >   *
> >   */
> >
> > @@ -28,10 +28,17 @@ extern "C" {
> >  /**
> >   * The rte_rwlock_t type.
> >   *
> > - * cnt is -1 when write lock is held, and > 0 when read locks are
> held.
> > + * Readers increment the counter by RW_READ (4)
> > + * Writers set the RWLOCK_WRITE bit when lock is held
> > + *     and set the RWLOCK_WAIT bit while waiting.
> >   */
> > +
> > +#define RTE_RWLOCK_WAIT     0x1    /* Writer is waiting */
> > +#define RTE_RWLOCK_WRITE 0x2       /* Writer has the lock */
> > +#define RTE_RWLOCK_READ     0x4    /* Reader increment */
> > +
> >  typedef struct {
> > -   volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks
> held.
> > */
> > +   volatile int32_t cnt;

Not signed anymore, so consider uint32_t. Suggest also rename to cnt_state or 
similar, since it is not just a counter anymore.

> >  } rte_rwlock_t;
> >
> >  /**
> > @@ -61,17 +68,24 @@ static inline void
> >  rte_rwlock_read_lock(rte_rwlock_t *rwl)  {
> >     int32_t x;
> > -   int success = 0;
> >
> > -   while (success == 0) {
> > +   while (1) {
> >             x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> >             /* write lock is held */

Held -> Held or pending, not just held. Add question mark, or move inside the 
if block.

> > -           if (x < 0) {
> > +           if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)) {
> >                     rte_pause();
> >                     continue;
> >             }
> > -           success = __atomic_compare_exchange_n(&rwl->cnt, &x, x
> > + 1, 1,
> > -                                   __ATOMIC_ACQUIRE,
> > __ATOMIC_RELAXED);
> > +
> > +           /* Try to get read lock */
> > +           x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
> > +                                  __ATOMIC_ACQUIRE);
> > +           if (!(x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)))
> > +                   return;
> > +
> > +           /* Undo */

Undo -> Unable, so release the read lock.

> > +           __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> > +                              __ATOMIC_RELEASE);
> >     }
> >  }
> >
> > @@ -93,17 +107,23 @@ static inline int
> >  rte_rwlock_read_trylock(rte_rwlock_t *rwl)  {
> >     int32_t x;
> > -   int success = 0;
> >
> > -   while (success == 0) {
> > -           x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> > -           /* write lock is held */
> > -           if (x < 0)
> > -                   return -EBUSY;
> > -           success = __atomic_compare_exchange_n(&rwl->cnt, &x, x
> > + 1, 1,
> > -                                   __ATOMIC_ACQUIRE,
> > __ATOMIC_RELAXED);
> > -   }
> > +   x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> > +
> > +   /* write lock is held */

Same comment as above.

> > +   if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE))
> > +           return -EBUSY;
> > +
> > +   /* Try to get read lock */
> > +   x = __atomic_add_fetch(&rwl->cnt, RTE_RWLOCK_READ,
> > +                          __ATOMIC_ACQUIRE);
> > +
> > +   if (x & (RTE_RWLOCK_WAIT | RTE_RWLOCK_WRITE)) {

Add a comment, e.g.: Unable, so release the read lock.

> > +           __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> > +                              __ATOMIC_RELEASE);
> >
> > +           return -EBUSY;
> > +   }
> >     return 0;
> >  }
> >
> > @@ -116,7 +136,7 @@ rte_rwlock_read_trylock(rte_rwlock_t *rwl)
> static
> > inline void  rte_rwlock_read_unlock(rte_rwlock_t *rwl)  {
> > -   __atomic_fetch_sub(&rwl->cnt, 1, __ATOMIC_RELEASE);
> > +   __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_READ,
> > __ATOMIC_RELEASE);
> >  }
> >
> >  /**
> > @@ -139,11 +159,12 @@ rte_rwlock_write_trylock(rte_rwlock_t *rwl)
> >     int32_t x;
> >
> >     x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> > -   if (x != 0 || __atomic_compare_exchange_n(&rwl->cnt, &x, -1, 1,
> > -                         __ATOMIC_ACQUIRE, __ATOMIC_RELAXED) == 0)
> > +   if (x < RTE_RWLOCK_WRITE &&
> > +       __atomic_compare_exchange_n(&rwl->cnt, &x, x +
> > RTE_RWLOCK_WRITE,
> > +                                   1, __ATOMIC_ACQUIRE,
> > __ATOMIC_RELAXED))
> > +           return 0;
> > +   else
> >             return -EBUSY;
> > -
> > -   return 0;
> >  }
> >
> >  /**
> > @@ -156,18 +177,26 @@ static inline void
> > rte_rwlock_write_lock(rte_rwlock_t *rwl)  {
> >     int32_t x;
> > -   int success = 0;
> >
> > -   while (success == 0) {
> > +   while (1) {
> >             x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
> > -           /* a lock is held */
> > -           if (x != 0) {
> > -                   rte_pause();
> > -                   continue;
> > +
> > +           /* No readers or writers */

Add question mark, or move inside if block.

> > +           if (x < RTE_RWLOCK_WRITE) {
> > +                   /* Turn off RTE_RWLOCK_WAIT, turn on
> > RTE_RWLOCK_WRITE */
> > +                   if (__atomic_compare_exchange_n(&rwl->cnt, &x,
> > RTE_RWLOCK_WRITE, 1,
> > +
> >     __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
> > +                           return;
> >             }
> > -           success = __atomic_compare_exchange_n(&rwl->cnt, &x, -
> > 1, 1,
> > -                                   __ATOMIC_ACQUIRE,
> > __ATOMIC_RELAXED);
> > -   }
> > +
> > +           /* Turn on writer wait bit */
> > +           if (!(x & RTE_RWLOCK_WAIT))
> > +                   __atomic_fetch_or(&rwl->cnt, RTE_RWLOCK_WAIT,
> > __ATOMIC_RELAXED);
> > +
> > +           /* Wait until can try to take the lock */
> > +           while (__atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED) >
> > RTE_RWLOCK_WAIT)
> > +                   rte_pause();
> > +    }
> >  }
> >
> >  /**
> > @@ -179,7 +208,7 @@ rte_rwlock_write_lock(rte_rwlock_t *rwl)  static
> > inline void  rte_rwlock_write_unlock(rte_rwlock_t *rwl)  {
> > -   __atomic_store_n(&rwl->cnt, 0, __ATOMIC_RELEASE);
> > +   __atomic_fetch_sub(&rwl->cnt, RTE_RWLOCK_WRITE,
> > __ATOMIC_RELEASE);
> >  }
> >
> >  /**
> > --
> > 2.35.1
> 

Always the creative mind, Stephen. :-)

You might consider adding/updating even more comments.

Acked-by: Morten Brørup <m...@smartsharesystems.com>

Reply via email to