> Hi Honnappa,
> 
> 
> > Add RCU library supporting quiescent state based memory reclamation
> method.
> > This library helps identify the quiescent state of the reader threads
> > so that the writers can free the memory associated with the lock less
> > data structures.
> >
> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> > Reviewed-by: Steve Capper <steve.cap...@arm.com>
> > Reviewed-by: Gavin Hu <gavin...@arm.com>
> > ---
> ...
> 
> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h
> > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index
> > 000000000..c818e77fd
> > --- /dev/null
> > +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> > @@ -0,0 +1,321 @@
> > +/* SPDX-License-Identifier: BSD-3-Clause
> > + * Copyright (c) 2018 Arm Limited
> > + */
> > +
> > +#ifndef _RTE_RCU_QSBR_H_
> > +#define _RTE_RCU_QSBR_H_
> > +
> > +/**
> > + * @file
> > + * RTE Quiescent State Based Reclamation (QSBR)
> > + *
> > + * Quiescent State (QS) is any point in the thread execution
> > + * where the thread does not hold a reference to shared memory.
> > + * A critical section for a data structure can be a quiescent state
> > +for
> > + * another data structure.
> > + *
> > + * This library provides the ability to identify quiescent state.
> > + */
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +#include <stdio.h>
> > +#include <stdint.h>
> > +#include <errno.h>
> > +#include <rte_common.h>
> > +#include <rte_memory.h>
> > +#include <rte_lcore.h>
> > +#include <rte_debug.h>
> > +
> > +/**< Maximum number of reader threads supported. */ #define
> > +RTE_RCU_MAX_THREADS 128
> > +
> > +#if !RTE_IS_POWER_OF_2(RTE_RCU_MAX_THREADS)
> > +#error RTE_RCU_MAX_THREADS must be a power of 2 #endif
> > +
> > +/**< Number of array elements required for the bit-map */ #define
> > +RTE_QSBR_BIT_MAP_ELEMS (RTE_RCU_MAX_THREADS/(sizeof(uint64_t)
> * 8))
> > +
> > +/* Thread IDs are stored as a bitmap of 64b element array. Given
> > +thread id
> > + * needs to be converted to index into the array and the id within
> > + * the array element.
> > + */
> > +#define RTE_QSBR_THR_INDEX_SHIFT 6
> > +#define RTE_QSBR_THR_ID_MASK 0x3f
> > +
> > +/* Worker thread counter */
> > +struct rte_rcu_qsbr_cnt {
> > +   uint64_t cnt; /**< Quiescent state counter. */ }
> > +__rte_cache_aligned;
> > +
> > +/**
> > + * RTE thread Quiescent State structure.
> > + */
> > +struct rte_rcu_qsbr {
> > +   uint64_t reg_thread_id[RTE_QSBR_BIT_MAP_ELEMS]
> __rte_cache_aligned;
> > +   /**< Registered reader thread IDs - reader threads reporting
> > +    * on this QS variable represented in a bit map.
> > +    */
> > +
> > +   uint64_t token __rte_cache_aligned;
> > +   /**< Counter to allow for multiple simultaneous QS queries */
> > +
> > +   struct rte_rcu_qsbr_cnt w[RTE_RCU_MAX_THREADS]
> __rte_cache_aligned;
> > +   /**< QS counter for each reader thread, counts upto
> > +    * current value of token.
> 
> As I understand you decided to stick with neutral thread_id and let user
> define what exactly thread_id is (lcore, syste, thread id, something else)?
Yes, that is correct. I will reply to the other thread to continue the 
discussion.

> If so, can you probably get rid of RTE_RCU_MAX_THREADS limitation?
I am not seeing this as a limitation. The user can change this if required. May 
be I should change it as follows:
#ifndef RTE_RCU_MAX_THREADS
#define RTE_RCU_MAX_THREADS 128
#endif

> I.E. struct rte_rcu_qsbr_cnt w[] and allow user at init time to define max
> number of threads allowed.
> Or something like:
> #define RTE_RCU_QSBR_DEF(name, max_thread) struct name { \
>   uint64_t reg_thread_id[ALIGN_CEIL(max_thread, 64)  >> 6]; \
>    ...
>    struct rte_rcu_qsbr_cnt w[max_thread]; \ }
I am trying to understand this. I am not following why 'name' is required? 
Would the user call 'RTE_RCU_QSBR_DEF' in the application header file?

> 
> 
> > +    */
> > +} __rte_cache_aligned;
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Initialize a Quiescent State (QS) variable.
> > + *
> > + * @param v
> > + *   QS variable
> > + *
> > + */
> > +void __rte_experimental
> > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v);
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Add a reader thread, to the list of threads reporting their
> > +quiescent
> > + * state on a QS variable.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread
> > + * safe.
> > + * Any reader thread that wants to report its quiescent state must
> > + * call this API before calling rte_rcu_qsbr_update. This can be
> > +called
> > + * during initialization or as part of the packet processing loop.
> > + * Any ongoing QS queries may wait for the status from this
> > +registered
> > + * thread.
> > + *
> > + * @param v
> > + *   QS variable
> > + * @param thread_id
> > + *   Reader thread with this thread ID will report its quiescent state on
> > + *   the QS variable.
> > + */
> > +static __rte_always_inline void __rte_experimental
> > +rte_rcu_qsbr_register_thread(struct rte_rcu_qsbr *v, unsigned int
> > +thread_id) {
> > +   unsigned int i, id;
> > +
> > +   RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
> > +
> > +   id = thread_id & RTE_QSBR_THR_ID_MASK;
> > +   i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT;
> > +
> > +   /* Worker thread has to count the quiescent states
> > +    * only from the current value of token.
> > +    * __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
> > +    * 'cnt' (64b) is accessed atomically.
> > +    */
> > +   __atomic_store_n(&v->w[thread_id].cnt,
> > +           __atomic_load_n(&v->token, __ATOMIC_ACQUIRE),
> > +           __ATOMIC_RELAXED);
> > +
> > +   /* Release the store to initial TQS count so that readers
> > +    * can use it immediately after this function returns.
> > +    */
> > +   __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id,
> > +__ATOMIC_RELEASE); }
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Remove a reader thread, from the list of threads reporting their
> > + * quiescent state on a QS variable.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread safe.
> > + * This API can be called from the reader threads during shutdown.
> > + * Ongoing QS queries will stop waiting for the status from this
> > + * unregistered reader thread.
> > + *
> > + * @param v
> > + *   QS variable
> > + * @param thread_id
> > + *   Reader thread with this thread ID will stop reporting its quiescent
> > + *   state on the QS variable.
> > + */
> > +static __rte_always_inline void __rte_experimental
> > +rte_rcu_qsbr_unregister_thread(struct rte_rcu_qsbr *v, unsigned int
> > +thread_id) {
> > +   unsigned int i, id;
> > +
> > +   RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
> > +
> > +   id = thread_id & RTE_QSBR_THR_ID_MASK;
> > +   i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT;
> > +
> > +   /* Make sure the removal of the thread from the list of
> > +    * reporting threads is visible before the thread
> > +    * does anything else.
> > +    */
> > +   __atomic_fetch_and(&v->reg_thread_id[i],
> > +                           ~(1UL << id), __ATOMIC_RELEASE);
> > +}
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Trigger the reader threads to report the quiescent state
> > + * status.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread
> > + * safe and can be called from worker threads.
> > + *
> > + * @param v
> > + *   TQS variable
> > + * @param n
> > + *   Expected number of times the quiescent state is entered
> > + * @param t
> > + *   - If successful, this is the token for this call of the API.
> > + *     This should be passed to rte_rcu_qsbr_check API.
> > + */
> > +static __rte_always_inline void __rte_experimental
> > +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v, unsigned int n, uint64_t
> > +*t) {
> > +   RTE_ASSERT(v == NULL || t == NULL);
> > +
> > +   /* This store release will ensure that changes to any data
> > +    * structure are visible to the workers before the token
> > +    * update is visible.
> > +    */
> > +   *t = __atomic_add_fetch(&v->token, n, __ATOMIC_RELEASE); }
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Update quiescent state for a reader thread.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread safe.
> > + * All the reader threads registered to report their quiescent state
> > + * on the QS variable must call this API.
> > + *
> > + * @param v
> > + *   QS variable
> > + */
> > +static __rte_always_inline void __rte_experimental
> > +rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int thread_id) {
> > +   uint64_t t;
> > +
> > +   RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
> > +
> > +   /* Load the token before the reader thread loads any other
> > +    * (lock-free) data structure. This ensures that updates
> > +    * to the data structures are visible if the update
> > +    * to token is visible.
> > +    */
> > +   t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
> > +
> > +   /* Relaxed load/store on the counter is enough as we are
> > +    * reporting an already completed quiescent state.
> > +    * __atomic_load_n(cnt, __ATOMIC_RELAXED) is used as 'cnt' (64b)
> > +    * is accessed atomically.
> > +    * Copy the current token value. This will end grace period
> > +    * of multiple concurrent writers.
> > +    */
> > +   if (__atomic_load_n(&v->w[thread_id].cnt, __ATOMIC_RELAXED) != t)
> > +           __atomic_store_n(&v->w[thread_id].cnt, t,
> __ATOMIC_RELAXED); }
> > +
> > +/**
> > + * @warning
> > + * @b EXPERIMENTAL: this API may change without prior notice
> > + *
> > + * Checks if all the reader threads have entered the quiescent state
> > + * 'n' number of times. 'n' is provided in rte_rcu_qsbr_start API.
> > + *
> > + * This is implemented as a lock-free function. It is multi-thread
> > + * safe and can be called from the worker threads as well.
> > + *
> > + * @param v
> > + *   QS variable
> > + * @param t
> > + *   Token returned by rte_rcu_qsbr_start API
> > + * @param wait
> > + *   If true, block till all the reader threads have completed entering
> > + *   the quiescent state 'n' number of times
> > + * @return
> > + *   - 0 if all reader threads have NOT passed through specified number
> > + *     of quiescent states.
> > + *   - 1 if all reader threads have passed through specified number
> > + *     of quiescent states.
> > + */
> > +static __rte_always_inline int __rte_experimental
> > +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) {
> > +   uint32_t i, j, id;
> > +   uint64_t bmap;
> > +
> > +   RTE_ASSERT(v == NULL);
> > +
> > +   i = 0;
> > +   do {
> > +           /* Load the current registered thread bit map before
> > +            * loading the reader thread quiescent state counters.
> > +            */
> > +           bmap = __atomic_load_n(&v->reg_thread_id[i],
> __ATOMIC_ACQUIRE);
> > +           id = i << RTE_QSBR_THR_INDEX_SHIFT;
> > +
> > +           while (bmap) {
> > +                   j = __builtin_ctzl(bmap);
> > +
> > +                   /* __atomic_store_n(cnt, __ATOMIC_RELAXED)
> > +                    * is used to ensure 'cnt' (64b) is accessed
> > +                    * atomically.
> > +                    */
> > +                   if (unlikely(__atomic_load_n(&v->w[id + j].cnt,
> > +                                   __ATOMIC_RELAXED) < t)) {
> > +                           /* This thread is not in QS */
> > +                           if (!wait)
> > +                                   return 0;
> > +
> > +                           /* Loop till this thread enters QS */
> > +                           rte_pause();
> > +                           continue;
> Shouldn't you re-read reg_thread_id[i] here?
> Konstantin
Yes, you are right. I will try to add a test case as well to address this.

> > +                   }
> > +
> > +                   bmap &= ~(1UL << j);
> > +           }
> > +
> > +           i++;
> > +   } while (i < RTE_QSBR_BIT_MAP_ELEMS);
> > +
> > +   return 1;
> > +}
> > +

Reply via email to