> Hi Honnappa, > > > > Add RCU library supporting quiescent state based memory reclamation > method. > > This library helps identify the quiescent state of the reader threads > > so that the writers can free the memory associated with the lock less > > data structures. > > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > Reviewed-by: Steve Capper <steve.cap...@arm.com> > > Reviewed-by: Gavin Hu <gavin...@arm.com> > > --- > ... > > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h > > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index > > 000000000..c818e77fd > > --- /dev/null > > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > > @@ -0,0 +1,321 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * Copyright (c) 2018 Arm Limited > > + */ > > + > > +#ifndef _RTE_RCU_QSBR_H_ > > +#define _RTE_RCU_QSBR_H_ > > + > > +/** > > + * @file > > + * RTE Quiescent State Based Reclamation (QSBR) > > + * > > + * Quiescent State (QS) is any point in the thread execution > > + * where the thread does not hold a reference to shared memory. > > + * A critical section for a data structure can be a quiescent state > > +for > > + * another data structure. > > + * > > + * This library provides the ability to identify quiescent state. > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include <stdio.h> > > +#include <stdint.h> > > +#include <errno.h> > > +#include <rte_common.h> > > +#include <rte_memory.h> > > +#include <rte_lcore.h> > > +#include <rte_debug.h> > > + > > +/**< Maximum number of reader threads supported. */ #define > > +RTE_RCU_MAX_THREADS 128 > > + > > +#if !RTE_IS_POWER_OF_2(RTE_RCU_MAX_THREADS) > > +#error RTE_RCU_MAX_THREADS must be a power of 2 #endif > > + > > +/**< Number of array elements required for the bit-map */ #define > > +RTE_QSBR_BIT_MAP_ELEMS (RTE_RCU_MAX_THREADS/(sizeof(uint64_t) > * 8)) > > + > > +/* Thread IDs are stored as a bitmap of 64b element array. Given > > +thread id > > + * needs to be converted to index into the array and the id within > > + * the array element. > > + */ > > +#define RTE_QSBR_THR_INDEX_SHIFT 6 > > +#define RTE_QSBR_THR_ID_MASK 0x3f > > + > > +/* Worker thread counter */ > > +struct rte_rcu_qsbr_cnt { > > + uint64_t cnt; /**< Quiescent state counter. */ } > > +__rte_cache_aligned; > > + > > +/** > > + * RTE thread Quiescent State structure. > > + */ > > +struct rte_rcu_qsbr { > > + uint64_t reg_thread_id[RTE_QSBR_BIT_MAP_ELEMS] > __rte_cache_aligned; > > + /**< Registered reader thread IDs - reader threads reporting > > + * on this QS variable represented in a bit map. > > + */ > > + > > + uint64_t token __rte_cache_aligned; > > + /**< Counter to allow for multiple simultaneous QS queries */ > > + > > + struct rte_rcu_qsbr_cnt w[RTE_RCU_MAX_THREADS] > __rte_cache_aligned; > > + /**< QS counter for each reader thread, counts upto > > + * current value of token. > > As I understand you decided to stick with neutral thread_id and let user > define what exactly thread_id is (lcore, syste, thread id, something else)? Yes, that is correct. I will reply to the other thread to continue the discussion.
> If so, can you probably get rid of RTE_RCU_MAX_THREADS limitation? I am not seeing this as a limitation. The user can change this if required. May be I should change it as follows: #ifndef RTE_RCU_MAX_THREADS #define RTE_RCU_MAX_THREADS 128 #endif > I.E. struct rte_rcu_qsbr_cnt w[] and allow user at init time to define max > number of threads allowed. > Or something like: > #define RTE_RCU_QSBR_DEF(name, max_thread) struct name { \ > uint64_t reg_thread_id[ALIGN_CEIL(max_thread, 64) >> 6]; \ > ... > struct rte_rcu_qsbr_cnt w[max_thread]; \ } I am trying to understand this. I am not following why 'name' is required? Would the user call 'RTE_RCU_QSBR_DEF' in the application header file? > > > > + */ > > +} __rte_cache_aligned; > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Initialize a Quiescent State (QS) variable. > > + * > > + * @param v > > + * QS variable > > + * > > + */ > > +void __rte_experimental > > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v); > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Add a reader thread, to the list of threads reporting their > > +quiescent > > + * state on a QS variable. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe. > > + * Any reader thread that wants to report its quiescent state must > > + * call this API before calling rte_rcu_qsbr_update. This can be > > +called > > + * during initialization or as part of the packet processing loop. > > + * Any ongoing QS queries may wait for the status from this > > +registered > > + * thread. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * Reader thread with this thread ID will report its quiescent state on > > + * the QS variable. > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_register_thread(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) { > > + unsigned int i, id; > > + > > + RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS); > > + > > + id = thread_id & RTE_QSBR_THR_ID_MASK; > > + i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT; > > + > > + /* Worker thread has to count the quiescent states > > + * only from the current value of token. > > + * __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure > > + * 'cnt' (64b) is accessed atomically. > > + */ > > + __atomic_store_n(&v->w[thread_id].cnt, > > + __atomic_load_n(&v->token, __ATOMIC_ACQUIRE), > > + __ATOMIC_RELAXED); > > + > > + /* Release the store to initial TQS count so that readers > > + * can use it immediately after this function returns. > > + */ > > + __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, > > +__ATOMIC_RELEASE); } > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Remove a reader thread, from the list of threads reporting their > > + * quiescent state on a QS variable. > > + * > > + * This is implemented as a lock-free function. It is multi-thread safe. > > + * This API can be called from the reader threads during shutdown. > > + * Ongoing QS queries will stop waiting for the status from this > > + * unregistered reader thread. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * Reader thread with this thread ID will stop reporting its quiescent > > + * state on the QS variable. > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_unregister_thread(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) { > > + unsigned int i, id; > > + > > + RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS); > > + > > + id = thread_id & RTE_QSBR_THR_ID_MASK; > > + i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT; > > + > > + /* Make sure the removal of the thread from the list of > > + * reporting threads is visible before the thread > > + * does anything else. > > + */ > > + __atomic_fetch_and(&v->reg_thread_id[i], > > + ~(1UL << id), __ATOMIC_RELEASE); > > +} > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Trigger the reader threads to report the quiescent state > > + * status. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe and can be called from worker threads. > > + * > > + * @param v > > + * TQS variable > > + * @param n > > + * Expected number of times the quiescent state is entered > > + * @param t > > + * - If successful, this is the token for this call of the API. > > + * This should be passed to rte_rcu_qsbr_check API. > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v, unsigned int n, uint64_t > > +*t) { > > + RTE_ASSERT(v == NULL || t == NULL); > > + > > + /* This store release will ensure that changes to any data > > + * structure are visible to the workers before the token > > + * update is visible. > > + */ > > + *t = __atomic_add_fetch(&v->token, n, __ATOMIC_RELEASE); } > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Update quiescent state for a reader thread. > > + * > > + * This is implemented as a lock-free function. It is multi-thread safe. > > + * All the reader threads registered to report their quiescent state > > + * on the QS variable must call this API. > > + * > > + * @param v > > + * QS variable > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int thread_id) { > > + uint64_t t; > > + > > + RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS); > > + > > + /* Load the token before the reader thread loads any other > > + * (lock-free) data structure. This ensures that updates > > + * to the data structures are visible if the update > > + * to token is visible. > > + */ > > + t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE); > > + > > + /* Relaxed load/store on the counter is enough as we are > > + * reporting an already completed quiescent state. > > + * __atomic_load_n(cnt, __ATOMIC_RELAXED) is used as 'cnt' (64b) > > + * is accessed atomically. > > + * Copy the current token value. This will end grace period > > + * of multiple concurrent writers. > > + */ > > + if (__atomic_load_n(&v->w[thread_id].cnt, __ATOMIC_RELAXED) != t) > > + __atomic_store_n(&v->w[thread_id].cnt, t, > __ATOMIC_RELAXED); } > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Checks if all the reader threads have entered the quiescent state > > + * 'n' number of times. 'n' is provided in rte_rcu_qsbr_start API. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe and can be called from the worker threads as well. > > + * > > + * @param v > > + * QS variable > > + * @param t > > + * Token returned by rte_rcu_qsbr_start API > > + * @param wait > > + * If true, block till all the reader threads have completed entering > > + * the quiescent state 'n' number of times > > + * @return > > + * - 0 if all reader threads have NOT passed through specified number > > + * of quiescent states. > > + * - 1 if all reader threads have passed through specified number > > + * of quiescent states. > > + */ > > +static __rte_always_inline int __rte_experimental > > +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) { > > + uint32_t i, j, id; > > + uint64_t bmap; > > + > > + RTE_ASSERT(v == NULL); > > + > > + i = 0; > > + do { > > + /* Load the current registered thread bit map before > > + * loading the reader thread quiescent state counters. > > + */ > > + bmap = __atomic_load_n(&v->reg_thread_id[i], > __ATOMIC_ACQUIRE); > > + id = i << RTE_QSBR_THR_INDEX_SHIFT; > > + > > + while (bmap) { > > + j = __builtin_ctzl(bmap); > > + > > + /* __atomic_store_n(cnt, __ATOMIC_RELAXED) > > + * is used to ensure 'cnt' (64b) is accessed > > + * atomically. > > + */ > > + if (unlikely(__atomic_load_n(&v->w[id + j].cnt, > > + __ATOMIC_RELAXED) < t)) { > > + /* This thread is not in QS */ > > + if (!wait) > > + return 0; > > + > > + /* Loop till this thread enters QS */ > > + rte_pause(); > > + continue; > Shouldn't you re-read reg_thread_id[i] here? > Konstantin Yes, you are right. I will try to add a test case as well to address this. > > + } > > + > > + bmap &= ~(1UL << j); > > + } > > + > > + i++; > > + } while (i < RTE_QSBR_BIT_MAP_ELEMS); > > + > > + return 1; > > +} > > +