Hi Honnappa, > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c > new file mode 100644 > index 000000000..0fc4515ea > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.c > @@ -0,0 +1,99 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2018 Arm Limited > + */ > + > +#include <stdio.h> > +#include <string.h> > +#include <stdint.h> > +#include <errno.h> > + > +#include <rte_common.h> > +#include <rte_log.h> > +#include <rte_memory.h> > +#include <rte_malloc.h> > +#include <rte_eal.h> > +#include <rte_eal_memconfig.h> > +#include <rte_atomic.h> > +#include <rte_per_lcore.h> > +#include <rte_lcore.h> > +#include <rte_errno.h> > + > +#include "rte_rcu_qsbr.h" > + > +/* Get the memory size of QSBR variable */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) > +{ > + size_t sz; > + > + RTE_ASSERT(max_threads == 0);
Here and in all similar places: assert() will abort when its condition will be evaluated to false. So it should be max_threads != 0. Also it a public and non-datapath function. Calling assert() for invalid input parameter - seems way too extreme. Why not just return error to the caller? > + > + sz = sizeof(struct rte_rcu_qsbr); > + > + /* Add the size of quiescent state counter array */ > + sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; > + > + return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > +} > + > +/* Initialize a quiescent state variable */ > +void __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) > +{ > + RTE_ASSERT(v == NULL); > + > + memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads)); > + v->m_threads = max_threads; > + v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads, > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / > + RTE_QSBR_THRID_ARRAY_ELM_SIZE; > + v->token = RTE_QSBR_CNT_INIT; > +} > + > +/* Dump the details of a single quiescent state variable to a file. */ > +void __rte_experimental > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) > +{ > + uint64_t bmap; > + uint32_t i, t; > + > + RTE_ASSERT(v == NULL || f == NULL); > + > + fprintf(f, "\nQuiescent State Variable @%p\n", v); > + > + fprintf(f, " QS variable memory size = %lu\n", > + rte_rcu_qsbr_get_memsize(v->m_threads)); > + fprintf(f, " Given # max threads = %u\n", v->m_threads); > + > + fprintf(f, " Registered thread ID mask = 0x"); > + for (i = 0; i < v->num_elems; i++) > + fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i], > + __ATOMIC_ACQUIRE)); > + fprintf(f, "\n"); > + > + fprintf(f, " Token = %lu\n", > + __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); > + > + fprintf(f, "Quiescent State Counts for readers:\n"); > + for (i = 0; i < v->num_elems; i++) { > + bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE); > + while (bmap) { > + t = __builtin_ctzl(bmap); > + fprintf(f, "thread ID = %d, count = %lu\n", t, > + __atomic_load_n( > + &RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt, > + __ATOMIC_RELAXED)); > + bmap &= ~(1UL << t); > + } > + } > +} > + > +int rcu_log_type; > + > +RTE_INIT(rte_rcu_register) > +{ > + rcu_log_type = rte_log_register("lib.rcu"); > + if (rcu_log_type >= 0) > + rte_log_set_level(rcu_log_type, RTE_LOG_ERR); > +} > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h > new file mode 100644 > index 000000000..83943f751 > --- /dev/null > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > @@ -0,0 +1,511 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright (c) 2018 Arm Limited > + */ > + > +#ifndef _RTE_RCU_QSBR_H_ > +#define _RTE_RCU_QSBR_H_ > + > +/** > + * @file > + * RTE Quiescent State Based Reclamation (QSBR) > + * > + * Quiescent State (QS) is any point in the thread execution > + * where the thread does not hold a reference to a data structure > + * in shared memory. While using lock-less data structures, the writer > + * can safely free memory once all the reader threads have entered > + * quiescent state. > + * > + * This library provides the ability for the readers to report quiescent > + * state and for the writers to identify when all the readers have > + * entered quiescent state. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <stdio.h> > +#include <stdint.h> > +#include <errno.h> > +#include <rte_common.h> > +#include <rte_memory.h> > +#include <rte_lcore.h> > +#include <rte_debug.h> > + > +extern int rcu_log_type; > + > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG > +#define RCU_DP_LOG(level, fmt, args...) \ > + rte_log(RTE_LOG_ ## level, rcu_log_type, \ > + "%s(): " fmt "\n", __func__, ## args) > +#else > +#define RCU_DP_LOG(level, fmt, args...) > +#endif Why do you need that? Can't you use RTE_LOG_DP() instead? > + > +/* Registered thread IDs are stored as a bitmap of 64b element array. > + * Given thread id needs to be converted to index into the array and > + * the id within the array element. > + */ > +#define RTE_RCU_MAX_THREADS 1024 > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) > +#define RTE_QSBR_THRID_ARRAY_ELEMS \ > + (RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \ > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / RTE_QSBR_THRID_ARRAY_ELM_SIZE) > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 > +#define RTE_QSBR_THRID_MASK 0x3f > +#define RTE_QSBR_THRID_INVALID 0xffffffff > + > +/* Worker thread counter */ > +struct rte_rcu_qsbr_cnt { > + uint64_t cnt; > + /**< Quiescent state counter. Value 0 indicates the thread is offline */ > +} __rte_cache_aligned; > + > +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v + 1)) + > i) You can probably add struct rte_rcu_qsbr_cnt cnt[0]; at the end of struct rte_rcu_qsbr, then wouldn't need macro above. > +#define RTE_QSBR_CNT_THR_OFFLINE 0 > +#define RTE_QSBR_CNT_INIT 1 > + > +/** > + * RTE thread Quiescent State structure. > + * Quiescent state counter array (array of 'struct rte_rcu_qsbr_cnt'), > + * whose size is dependent on the maximum number of reader threads > + * (m_threads) using this variable is stored immediately following > + * this structure. > + */ > +struct rte_rcu_qsbr { > + uint64_t token __rte_cache_aligned; > + /**< Counter to allow for multiple simultaneous QS queries */ > + > + uint32_t num_elems __rte_cache_aligned; > + /**< Number of elements in the thread ID array */ > + uint32_t m_threads; > + /**< Maximum number of threads this RCU variable will use */ > + > + uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS] __rte_cache_aligned; > + /**< Registered thread IDs are stored in a bitmap array */ As I understand you ended up with fixed size array to avoid 2 variable size arrays in this struct? Is that big penalty for register/unregister() to either store a pointer to bitmap, or calculate it based on num_elems value? As another thought - do we really need bitmap at all? Might it is possible to sotre register value for each thread inside it's rte_rcu_qsbr_cnt: struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;} __rte_cache_aligned; ? That would cause check() to walk through all elems in rte_rcu_qsbr_cnt array, but from other side would help to avoid cache conflicts for register/unregister. > +} __rte_cache_aligned; > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Return the size of the memory occupied by a Quiescent State variable. > + * > + * @param max_threads > + * Maximum number of threads reporting quiescent state on this variable. > + * @return > + * Size of memory in bytes required for this QS variable. > + */ > +size_t __rte_experimental > +rte_rcu_qsbr_get_memsize(uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Initialize a Quiescent State (QS) variable. > + * > + * @param v > + * QS variable > + * @param max_threads > + * Maximum number of threads reporting QS on this variable. > + * > + */ > +void __rte_experimental > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Register a reader thread to report its quiescent state > + * on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * Any reader thread that wants to report its quiescent state must > + * call this API. This can be called during initialization or as part > + * of the packet processing loop. > + * > + * Note that rte_rcu_qsbr_thread_online must be called before the > + * thread updates its QS using rte_rcu_qsbr_update. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state on > + * the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + unsigned int i, id; > + > + RTE_ASSERT(v == NULL || thread_id >= v->max_threads); > + > + id = thread_id & RTE_QSBR_THRID_MASK; > + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Release the new register thread ID to other threads > + * calling rte_rcu_qsbr_check. > + */ > + __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Remove a reader thread, from the list of threads reporting their > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread safe. > + * This API can be called from the reader threads during shutdown. > + * Ongoing QS queries will stop waiting for the status from this > + * unregistered reader thread. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will stop reporting its quiescent > + * state on the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int > thread_id) > +{ > + unsigned int i, id; > + > + RTE_ASSERT(v == NULL || thread_id >= v->max_threads); > + > + id = thread_id & RTE_QSBR_THRID_MASK; > + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > + > + /* Make sure the removal of the thread from the list of > + * reporting threads is visible before the thread > + * does anything else. > + */ > + __atomic_fetch_and(&v->reg_thread_id[i], > + ~(1UL << id), __ATOMIC_RELEASE); > +} > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Add a registered reader thread, to the list of threads reporting their > + * quiescent state on a QS variable. > + * > + * This is implemented as a lock-free function. It is multi-thread > + * safe. > + * > + * Any registered reader thread that wants to report its quiescent state must > + * call this API before calling rte_rcu_qsbr_update. This can be called > + * during initialization or as part of the packet processing loop. > + * > + * The reader thread must call rte_rcu_thread_offline API, before > + * calling any functions that block, to ensure that rte_rcu_qsbr_check > + * API does not wait indefinitely for the reader thread to update its QS. > + * > + * The reader thread must call rte_rcu_thread_online API, after the blocking > + * function call returns, to ensure that rte_rcu_qsbr_check API > + * waits for the reader thread to update its QS. > + * > + * @param v > + * QS variable > + * @param thread_id > + * Reader thread with this thread ID will report its quiescent state on > + * the QS variable. > + */ > +static __rte_always_inline void __rte_experimental > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id) > +{ > + uint64_t t; > + > + RTE_ASSERT(v == NULL || thread_id >= v->max_threads); > + > + /* Copy the current value of token. > + * The fence at the end of the function will ensure that > + * the following will not move down after the load of any shared > + * data structure. > + */ > + t = __atomic_load_n(&v->token, __ATOMIC_RELAXED); > + > + /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure > + * 'cnt' (64b) is accessed atomically. > + */ > + __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt, > + t, __ATOMIC_RELAXED); > + > + /* The subsequent load of the data structure should not > + * move above the store. Hence a store-load barrier > + * is required. > + * If the load of the data structure moves above the store, > + * writer might not see that the reader is online, even though > + * the reader is referencing the shared data structure. > + */ > + __atomic_thread_fence(__ATOMIC_SEQ_CST); If it has to generate a proper memory-barrier here anyway, could it use rte_smp_mb() here? At least for IA it would generate more lightweight one. Konstantin > +} > +