rte_rcu_qsbr_get_memsize API is introduced. This will allow the user
to controll the amount of memory used based on the maximum
number of threads present in the application.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
---
 lib/librte_rcu/rte_rcu_qsbr.c |  51 ++++++++++++---
 lib/librte_rcu/rte_rcu_qsbr.h | 118 +++++++++++++++++++++-------------
 2 files changed, 118 insertions(+), 51 deletions(-)

diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index 3c2577ee2..02464fdba 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -21,11 +21,39 @@
 
 #include "rte_rcu_qsbr.h"
 
+/* Get the memory size of QSBR variable */
+unsigned int __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads)
+{
+       int n;
+       ssize_t sz;
+
+       RTE_ASSERT(max_threads == 0);
+
+       sz = sizeof(struct rte_rcu_qsbr);
+
+       /* Add the size of the registered thread ID bitmap array */
+       n = RTE_ALIGN(max_threads, RTE_QSBR_THRID_ARRAY_ELM_SIZE);
+       sz += RTE_QSBR_THRID_ARRAY_SIZE(n);
+
+       /* Add the size of quiescent state counter array */
+       sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
+
+       return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+}
+
 /* Initialize a quiescent state variable */
 void __rte_experimental
-rte_rcu_qsbr_init(struct rte_rcu_qsbr *v)
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
 {
-       memset(v, 0, sizeof(struct rte_rcu_qsbr));
+       RTE_ASSERT(v == NULL);
+
+       memset(v, 0, rte_rcu_qsbr_get_memsize(max_threads));
+       v->m_threads = max_threads;
+       v->ma_threads = RTE_ALIGN(max_threads, RTE_QSBR_THRID_ARRAY_ELM_SIZE);
+
+       v->num_elems = v->ma_threads/RTE_QSBR_THRID_ARRAY_ELM_SIZE;
+       v->thrid_array_size = RTE_QSBR_THRID_ARRAY_SIZE(v->ma_threads);
 }
 
 /* Dump the details of a single quiescent state variable to a file. */
@@ -39,9 +67,15 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 
        fprintf(f, "\nQuiescent State Variable @%p\n", v);
 
+       fprintf(f, "  QS variable memory size = %u\n",
+                               rte_rcu_qsbr_get_memsize(v->m_threads));
+       fprintf(f, "  Given # max threads = %u\n", v->m_threads);
+       fprintf(f, "  Adjusted # max threads = %u\n", v->ma_threads);
+
        fprintf(f, "  Registered thread ID mask = 0x");
-       for (i = 0; i < RTE_QSBR_BIT_MAP_ELEMS; i++)
-               fprintf(f, "%lx", __atomic_load_n(&v->reg_thread_id[i],
+       for (i = 0; i < v->num_elems; i++)
+               fprintf(f, "%lx", __atomic_load_n(
+                                       RTE_QSBR_THRID_ARRAY_ELM(v, i),
                                        __ATOMIC_ACQUIRE));
        fprintf(f, "\n");
 
@@ -49,14 +83,15 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
                        __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
 
        fprintf(f, "Quiescent State Counts for readers:\n");
-       for (i = 0; i < RTE_QSBR_BIT_MAP_ELEMS; i++) {
-               bmap = __atomic_load_n(&v->reg_thread_id[i],
+       for (i = 0; i < v->num_elems; i++) {
+               bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
                                        __ATOMIC_ACQUIRE);
                while (bmap) {
                        t = __builtin_ctzl(bmap);
                        fprintf(f, "thread ID = %d, count = %lu\n", t,
-                               __atomic_load_n(&v->w[i].cnt,
-                                               __ATOMIC_RELAXED));
+                               __atomic_load_n(
+                                       &RTE_QSBR_CNT_ARRAY_ELM(v, i)->cnt,
+                                       __ATOMIC_RELAXED));
                        bmap &= ~(1UL << t);
                }
        }
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index 53e00488b..21fa2c198 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -29,46 +29,71 @@ extern "C" {
 #include <rte_lcore.h>
 #include <rte_debug.h>
 
-/**< Maximum number of reader threads supported. */
-#define RTE_RCU_MAX_THREADS 128
-
-#if !RTE_IS_POWER_OF_2(RTE_RCU_MAX_THREADS)
-#error RTE_RCU_MAX_THREADS must be a power of 2
-#endif
-
-/**< Number of array elements required for the bit-map */
-#define RTE_QSBR_BIT_MAP_ELEMS (RTE_RCU_MAX_THREADS/(sizeof(uint64_t) * 8))
-
-/* Thread IDs are stored as a bitmap of 64b element array. Given thread id
- * needs to be converted to index into the array and the id within
- * the array element.
+/* Registered thread IDs are stored as a bitmap of 64b element array.
+ * Given thread id needs to be converted to index into the array and
+ * the id within the array element.
+ */
+/* Thread ID array size
+ * @param ma_threads
+ *   num of threads aligned to 64
  */
-#define RTE_QSBR_THR_INDEX_SHIFT 6
-#define RTE_QSBR_THR_ID_MASK 0x3f
+#define RTE_QSBR_THRID_ARRAY_SIZE(ma_threads) \
+       RTE_ALIGN((ma_threads) >> 3, RTE_CACHE_LINE_SIZE)
+#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
+#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *)(v + 1) + i)
+#define RTE_QSBR_THRID_INDEX_SHIFT 6
+#define RTE_QSBR_THRID_MASK 0x3f
 
 /* Worker thread counter */
 struct rte_rcu_qsbr_cnt {
        uint64_t cnt; /**< Quiescent state counter. */
 } __rte_cache_aligned;
 
+#define RTE_QSBR_CNT_ARRAY_ELM(v, i) ((struct rte_rcu_qsbr_cnt *) \
+       ((uint8_t *)(v + 1) + v->thrid_array_size) + i)
+
 /**
  * RTE thread Quiescent State structure.
+ * The following data, which is dependent on the maximum number of
+ * threads using this variable, is stored in memory immediately
+ * following this structure.
+ *
+ * 1) registered thread ID bitmap array
+ *    This is a uint64_t array enough to hold 'ma_threads' number
+ *    of thread IDs.
+ * 2) quiescent state counter array
+ *    This is an array of 'struct rte_rcu_qsbr_cnt' with
+ *    'm_threads' number of elements.
  */
 struct rte_rcu_qsbr {
-       uint64_t reg_thread_id[RTE_QSBR_BIT_MAP_ELEMS] __rte_cache_aligned;
-       /**< Registered reader thread IDs - reader threads reporting
-        * on this QS variable represented in a bit map.
-        */
-
        uint64_t token __rte_cache_aligned;
        /**< Counter to allow for multiple simultaneous QS queries */
 
-       struct rte_rcu_qsbr_cnt w[RTE_RCU_MAX_THREADS] __rte_cache_aligned;
-       /**< QS counter for each reader thread, counts upto
-        * current value of token.
-        */
+       uint32_t thrid_array_size __rte_cache_aligned;
+       /**< Registered thread ID bitmap array size in bytes */
+       uint32_t num_elems;
+       /**< Number of elements in the thread ID array */
+
+       uint32_t m_threads;
+       /**< Maximum number of threads this RCU variable will use */
+       uint32_t ma_threads;
+       /**< Maximum number of threads aligned to 32 */
 } __rte_cache_aligned;
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Return the size of the memory occupied by a Quiescent State (QS) variable.
+ *
+ * @param max_threads
+ *   Maximum number of threads reporting QS on this variable.
+ * @return
+ *   Size of memory in bytes required for this QS variable.
+ */
+unsigned int __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads);
+
 /**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
@@ -77,10 +102,12 @@ struct rte_rcu_qsbr {
  *
  * @param v
  *   QS variable
+ * @param max_threads
+ *   Maximum number of threads reporting QS on this variable.
  *
  */
 void __rte_experimental
-rte_rcu_qsbr_init(struct rte_rcu_qsbr *v);
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
 
 /**
  * @warning
@@ -108,24 +135,25 @@ rte_rcu_qsbr_register_thread(struct rte_rcu_qsbr *v, 
unsigned int thread_id)
 {
        unsigned int i, id;
 
-       RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
+       RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
 
-       id = thread_id & RTE_QSBR_THR_ID_MASK;
-       i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT;
+       id = thread_id & RTE_QSBR_THRID_MASK;
+       i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
 
        /* Worker thread has to count the quiescent states
         * only from the current value of token.
         * __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
         * 'cnt' (64b) is accessed atomically.
         */
-       __atomic_store_n(&v->w[thread_id].cnt,
+       __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
                __atomic_load_n(&v->token, __ATOMIC_ACQUIRE),
                __ATOMIC_RELAXED);
 
        /* Release the store to initial TQS count so that readers
         * can use it immediately after this function returns.
         */
-       __atomic_fetch_or(&v->reg_thread_id[i], 1UL << id, __ATOMIC_RELEASE);
+       __atomic_fetch_or(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+               1UL << id, __ATOMIC_RELEASE);
 }
 
 /**
@@ -151,16 +179,16 @@ rte_rcu_qsbr_unregister_thread(struct rte_rcu_qsbr *v, 
unsigned int thread_id)
 {
        unsigned int i, id;
 
-       RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
+       RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
 
-       id = thread_id & RTE_QSBR_THR_ID_MASK;
-       i = thread_id >> RTE_QSBR_THR_INDEX_SHIFT;
+       id = thread_id & RTE_QSBR_THRID_MASK;
+       i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
 
        /* Make sure the removal of the thread from the list of
         * reporting threads is visible before the thread
         * does anything else.
         */
-       __atomic_fetch_and(&v->reg_thread_id[i],
+       __atomic_fetch_and(RTE_QSBR_THRID_ARRAY_ELM(v, i),
                                ~(1UL << id), __ATOMIC_RELEASE);
 }
 
@@ -212,7 +240,7 @@ rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int 
thread_id)
 {
        uint64_t t;
 
-       RTE_ASSERT(v == NULL || thread_id >= RTE_RCU_MAX_THREADS);
+       RTE_ASSERT(v == NULL || thread_id >= v->max_threads);
 
        /* Load the token before the reader thread loads any other
         * (lock-free) data structure. This ensures that updates
@@ -228,8 +256,10 @@ rte_rcu_qsbr_update(struct rte_rcu_qsbr *v, unsigned int 
thread_id)
         * Copy the current token value. This will end grace period
         * of multiple concurrent writers.
         */
-       if (__atomic_load_n(&v->w[thread_id].cnt, __ATOMIC_RELAXED) != t)
-               __atomic_store_n(&v->w[thread_id].cnt, t, __ATOMIC_RELAXED);
+       if (__atomic_load_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
+                               __ATOMIC_RELAXED) != t)
+               __atomic_store_n(&RTE_QSBR_CNT_ARRAY_ELM(v, thread_id)->cnt,
+                                t, __ATOMIC_RELAXED);
 }
 
 /**
@@ -268,18 +298,20 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, 
bool wait)
                /* Load the current registered thread bit map before
                 * loading the reader thread quiescent state counters.
                 */
-               bmap = __atomic_load_n(&v->reg_thread_id[i], __ATOMIC_ACQUIRE);
-               id = i << RTE_QSBR_THR_INDEX_SHIFT;
+               bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+                               __ATOMIC_ACQUIRE);
+               id = i << RTE_QSBR_THRID_INDEX_SHIFT;
 
                while (bmap) {
                        j = __builtin_ctzl(bmap);
 
-/* printf ("Status check: token = %lu, wait = %d, Bit Map = 0x%x, Thread ID = 
%d\n", t, wait, bmap, id+j); */
+/* printf ("Status check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = 
%d\n", t, wait, bmap, id+j); */
                        /* __atomic_load_n(cnt, __ATOMIC_RELAXED)
                         * is used to ensure 'cnt' (64b) is accessed
                         * atomically.
                         */
-                       if (unlikely(__atomic_load_n(&v->w[id + j].cnt,
+                       if (unlikely(__atomic_load_n(
+                                       &RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt,
                                        __ATOMIC_RELAXED) < t)) {
 
 /* printf ("Status not in QS: token = %lu, Wait = %d, Thread QS cnt = %lu, 
Thread ID = %d\n", t, wait, RTE_QSBR_CNT_ARRAY_ELM(v, id + j)->cnt, id+j); */
@@ -292,7 +324,7 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool 
wait)
                                 * Re-read the bitmap.
                                 */
                                bmap = __atomic_load_n(
-                                               &v->reg_thread_id[i],
+                                               RTE_QSBR_THRID_ARRAY_ELM(v, i),
                                                __ATOMIC_ACQUIRE);
 
                                continue;
@@ -302,7 +334,7 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool 
wait)
                }
 
                i++;
-       } while (i < RTE_QSBR_BIT_MAP_ELEMS);
+       } while (i < v->num_elems);
 
        return 1;
 }
-- 
2.17.1

Reply via email to