Hi Honnappa,

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
> new file mode 100644
> index 000000000..0fc4515ea
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr.c
> @@ -0,0 +1,99 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2018 Arm Limited
> + */
> +
> +#include <stdio.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <errno.h>
> +
> +#include <rte_common.h>
> +#include <rte_log.h>
> +#include <rte_memory.h>
> +#include <rte_malloc.h>
> +#include <rte_eal.h>
> +#include <rte_eal_memconfig.h>
> +#include <rte_atomic.h>
> +#include <rte_per_lcore.h>
> +#include <rte_lcore.h>
> +#include <rte_errno.h>
> +
> +#include "rte_rcu_qsbr.h"
> +
> +/* Get the memory size of QSBR variable */
> +size_t __rte_experimental
> +rte_rcu_qsbr_get_memsize(uint32_t max_threads)
> +{
> +     size_t sz;
> +
> +     RTE_ASSERT(max_threads == 0);

Here and in all similar places:
assert() will abort when its condition will be evaluated to false.
So it should be max_threads != 0.
Also it a public and non-datapath function.
Calling assert() for invalid input parameter - seems way too extreme.
Why not just return error to the caller? 

> +
> +     sz = sizeof(struct rte_rcu_qsbr);
> +
> +     /* Add the size of quiescent state counter array */
> +     sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
> +
> +     return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
> +}
> +
> +/* Initialize a quiescent state variable */
> +void __rte_experimental
> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
> +{
> +     RTE_ASSERT(v == NULL);
> +
> +     memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads));
> +     v->m_threads = max_threads;
> +     v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
> +                     RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
> +                     RTE_QSBR_THRID_ARRAY_ELM_SIZE;
> +     v->token = RTE_QSBR_CNT_INIT;
> +}
> +
> +/* Dump the details of a single quiescent state variable to a file. */
> +void __rte_experimental
> +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
> +{
> +     uint64_t bmap;
> +     uint32_t i, t;
> +
> +     RTE_ASSERT(v == NULL || f == NULL);
> +
> +     fprintf(f, "\nQuiescent State Variable @%p\n", v);
> +
> +     fprintf(f, "  QS variable memory size = %lu\n",
> +                             rte_rcu_qsbr_get_memsize(v->m_threads));
> +     fprintf(f, "  Given # max threads = %u\n", v->m_threads);
> +
> +     fprintf(f, "  Registered thread ID mask = 0x");
> +     for (i = 0; i < v->num_elems; i++)
> +             fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i],
> +                                     __ATOMIC_ACQUIRE));
> +     fprintf(f, "\n");
> +
> +     fprintf(f, "  Token = %lu\n",
> +                     __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
> +
> +     fprintf(f, "Quiescent State Counts for readers:\n");
> +     for (i = 0; i < v->num_elems; i++) {
> +             bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE);
> +             while (bmap) {
> +                     t = __builtin_ctzl(bmap);
> +                     fprintf(f, "thread ID = %d, count = %lu\n", t,
> +                             __atomic_load_n(
> +                                     &RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt,
> +                                     __ATOMIC_RELAXED));
> +                     bmap &= ~(1UL << t);
> +             }
> +     }
> +}
> +
> +int rcu_log_type;
> +
> +RTE_INIT(rte_rcu_register)
> +{
> +     rcu_log_type = rte_log_register("lib.rcu");
> +     if (rcu_log_type >= 0)
> +             rte_log_set_level(rcu_log_type, RTE_LOG_ERR);
> +}
> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
> new file mode 100644
> index 000000000..83943f751
> --- /dev/null
> +++ b/lib/librte_rcu/rte_rcu_qsbr.h
> @@ -0,0 +1,511 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright (c) 2018 Arm Limited
> + */
> +
> +#ifndef _RTE_RCU_QSBR_H_
> +#define _RTE_RCU_QSBR_H_
> +
> +/**
> + * @file
> + * RTE Quiescent State Based Reclamation (QSBR)
> + *
> + * Quiescent State (QS) is any point in the thread execution
> + * where the thread does not hold a reference to a data structure
> + * in shared memory. While using lock-less data structures, the writer
> + * can safely free memory once all the reader threads have entered
> + * quiescent state.
> + *
> + * This library provides the ability for the readers to report quiescent
> + * state and for the writers to identify when all the readers have
> + * entered quiescent state.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_debug.h>
> +
> +extern int rcu_log_type;
> +
> +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
> +#define RCU_DP_LOG(level, fmt, args...) \
> +     rte_log(RTE_LOG_ ## level, rcu_log_type, \
> +             "%s(): " fmt "\n", __func__, ## args)
> +#else
> +#define RCU_DP_LOG(level, fmt, args...)
> +#endif

Why do you need that?
Can't you use RTE_LOG_DP() instead?

> +
> +/* Registered thread IDs are stored as a bitmap of 64b element array.
> + * Given thread id needs to be converted to index into the array and
> + * the id within the array element.
> + */
> +#define RTE_RCU_MAX_THREADS 1024
> +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
> +#define RTE_QSBR_THRID_ARRAY_ELEMS \
> +     (RTE_ALIGN_MUL_CEIL(RTE_RCU_MAX_THREADS, \
> +      RTE_QSBR_THRID_ARRAY_ELM_SIZE) / RTE_QSBR_THRID_ARRAY_ELM_SIZE)
> +#define RTE_QSBR_THRID_INDEX_SHIFT 6
> +#define RTE_QSBR_THRID_MASK 0x3f
> +#define RTE_QSBR_THRID_INVALID 0xffffffff
> +
> +/* Worker thread counter */
> +struct rte_rcu_qsbr_cnt {
> +     uint64_t cnt;
> +     /**< Quiescent state counter. Value 0 indicates the thread is offline */
> +} __rte_cache_aligned;
> +
> +#define RTE_QSBR_CNT_ARRAY_ELM(v, i) (((struct rte_rcu_qsbr_cnt *)(v + 1)) + 
> i)

You can probably add
struct rte_rcu_qsbr_cnt cnt[0];
at the end of struct rte_rcu_qsbr, then wouldn't need macro above.

> +#define RTE_QSBR_CNT_THR_OFFLINE 0
> +#define RTE_QSBR_CNT_INIT 1
> +
> +/**
> + * RTE thread Quiescent State structure.
> + * Quiescent state counter array (array of 'struct rte_rcu_qsbr_cnt'),
> + * whose size is dependent on the maximum number of reader threads
> + * (m_threads) using this variable is stored immediately following
> + * this structure.
> + */
> +struct rte_rcu_qsbr {
> +     uint64_t token __rte_cache_aligned;
> +     /**< Counter to allow for multiple simultaneous QS queries */
> +
> +     uint32_t num_elems __rte_cache_aligned;
> +     /**< Number of elements in the thread ID array */
> +     uint32_t m_threads;
> +     /**< Maximum number of threads this RCU variable will use */
> +
> +     uint64_t reg_thread_id[RTE_QSBR_THRID_ARRAY_ELEMS] __rte_cache_aligned;
> +     /**< Registered thread IDs are stored in a bitmap array */


As I understand you ended up with fixed size array to avoid 2 variable size 
arrays in this struct?
Is that big penalty for register/unregister() to either store a pointer to 
bitmap, or calculate it based on num_elems value?
As another thought - do we really need bitmap at all?
Might it is possible to sotre register value for each thread inside it's 
rte_rcu_qsbr_cnt:
struct rte_rcu_qsbr_cnt {uint64_t cnt; uint32_t register;} __rte_cache_aligned;
?
That would cause check() to walk through all elems in rte_rcu_qsbr_cnt array,
but from other side would help to avoid cache conflicts for 
register/unregister. 

> +} __rte_cache_aligned;
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Return the size of the memory occupied by a Quiescent State variable.
> + *
> + * @param max_threads
> + *   Maximum number of threads reporting quiescent state on this variable.
> + * @return
> + *   Size of memory in bytes required for this QS variable.
> + */
> +size_t __rte_experimental
> +rte_rcu_qsbr_get_memsize(uint32_t max_threads);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Initialize a Quiescent State (QS) variable.
> + *
> + * @param v
> + *   QS variable
> + * @param max_threads
> + *   Maximum number of threads reporting QS on this variable.
> + *
> + */
> +void __rte_experimental
> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Register a reader thread to report its quiescent state
> + * on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe.
> + * Any reader thread that wants to report its quiescent state must
> + * call this API. This can be called during initialization or as part
> + * of the packet processing loop.
> + *
> + * Note that rte_rcu_qsbr_thread_online must be called before the
> + * thread updates its QS using rte_rcu_qsbr_update.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will report its quiescent state on
> + *   the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +     unsigned int i, id;
> +
> +     RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +     id = thread_id & RTE_QSBR_THRID_MASK;
> +     i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> +
> +     /* Release the new register thread ID to other threads
> +      * calling rte_rcu_qsbr_check.
> +      */
> +     __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE);
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Remove a reader thread, from the list of threads reporting their
> + * quiescent state on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread safe.
> + * This API can be called from the reader threads during shutdown.
> + * Ongoing QS queries will stop waiting for the status from this
> + * unregistered reader thread.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will stop reporting its quiescent
> + *   state on the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int 
> thread_id)
> +{
> +     unsigned int i, id;
> +
> +     RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +     id = thread_id & RTE_QSBR_THRID_MASK;
> +     i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
> +
> +     /* Make sure the removal of the thread from the list of
> +      * reporting threads is visible before the thread
> +      * does anything else.
> +      */
> +     __atomic_fetch_and(&v->reg_thread_id[i],
> +                             ~(1UL << id), __ATOMIC_RELEASE);
> +}
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Add a registered reader thread, to the list of threads reporting their
> + * quiescent state on a QS variable.
> + *
> + * This is implemented as a lock-free function. It is multi-thread
> + * safe.
> + *
> + * Any registered reader thread that wants to report its quiescent state must
> + * call this API before calling rte_rcu_qsbr_update. This can be called
> + * during initialization or as part of the packet processing loop.
> + *
> + * The reader thread must call rte_rcu_thread_offline API, before
> + * calling any functions that block, to ensure that rte_rcu_qsbr_check
> + * API does not wait indefinitely for the reader thread to update its QS.
> + *
> + * The reader thread must call rte_rcu_thread_online API, after the blocking
> + * function call returns, to ensure that rte_rcu_qsbr_check API
> + * waits for the reader thread to update its QS.
> + *
> + * @param v
> + *   QS variable
> + * @param thread_id
> + *   Reader thread with this thread ID will report its quiescent state on
> + *   the QS variable.
> + */
> +static __rte_always_inline void __rte_experimental
> +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
> +{
> +     uint64_t t;
> +
> +     RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
> +
> +     /* Copy the current value of token.
> +      * The fence at the end of the function will ensure that
> +      * the following will not move down after the load of any shared
> +      * data structure.
> +      */
> +     t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
> +
> +     /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
> +      * 'cnt' (64b) is accessed atomically.
> +      */
> +     __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
> +             t, __ATOMIC_RELAXED);
> +
> +     /* The subsequent load of the data structure should not
> +      * move above the store. Hence a store-load barrier
> +      * is required.
> +      * If the load of the data structure moves above the store,
> +      * writer might not see that the reader is online, even though
> +      * the reader is referencing the shared data structure.
> +      */
> +     __atomic_thread_fence(__ATOMIC_SEQ_CST);

If it has to generate a proper memory-barrier here anyway,
could it use rte_smp_mb() here?
At least for IA it would generate more lightweight one. 
Konstantin

> +}
> +

Reply via email to