Integrate RCU QSBR process.
(Refer to RCU documentation to understand various aspects of
integrating RCU library into other libraries.)

Suggested-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Signed-off-by: Dharmik Thakkar <dharmik.thak...@arm.com>
---
 lib/librte_hash/Makefile             |   2 +-
 lib/librte_hash/meson.build          |   2 +
 lib/librte_hash/rte_cuckoo_hash.c    | 352 +++++++++++++++++++++++++--
 lib/librte_hash/rte_cuckoo_hash.h    |   3 +
 lib/librte_hash/rte_hash.h           |  38 ++-
 lib/librte_hash/rte_hash_version.map |   2 +-
 6 files changed, 363 insertions(+), 36 deletions(-)

diff --git a/lib/librte_hash/Makefile b/lib/librte_hash/Makefile
index 5669d83f454f..3edc9c96beaa 100644
--- a/lib/librte_hash/Makefile
+++ b/lib/librte_hash/Makefile
@@ -8,7 +8,7 @@ LIB = librte_hash.a
 
 CFLAGS += -O3 -DALLOW_EXPERIMENTAL_API
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
-LDLIBS += -lrte_eal -lrte_ring
+LDLIBS += -lrte_eal -lrte_ring -lrte_rcu
 
 EXPORT_MAP := rte_hash_version.map
 
diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
index ebf70de89014..468668ea2488 100644
--- a/lib/librte_hash/meson.build
+++ b/lib/librte_hash/meson.build
@@ -2,6 +2,7 @@
 # Copyright(c) 2017 Intel Corporation
 
 version = 2
+allow_experimental_apis = true
 headers = files('rte_cmp_arm64.h',
        'rte_cmp_x86.h',
        'rte_crc_arm64.h',
@@ -14,6 +15,7 @@ headers = files('rte_cmp_arm64.h',
 
 sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c')
 deps += ['ring']
+deps += ['rcu']
 
 # rte ring reset is not yet part of stable API
 allow_experimental_apis = true
diff --git a/lib/librte_hash/rte_cuckoo_hash.c 
b/lib/librte_hash/rte_cuckoo_hash.c
index 87a4c01f2f9e..851c73328c8f 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -28,6 +28,7 @@
 #include <rte_compat.h>
 #include <rte_vect.h>
 #include <rte_tailq.h>
+#include <rte_rcu_qsbr.h>
 
 #include "rte_hash.h"
 #include "rte_cuckoo_hash.h"
@@ -44,6 +45,11 @@ static struct rte_tailq_elem rte_hash_tailq = {
 };
 EAL_REGISTER_TAILQ(rte_hash_tailq)
 
+struct __rte_hash_qs_item {
+       uint64_t token; /**< QSBR token.*/
+       uint64_t index; /**< key_idx and ext_bkt_idx pair*/
+};
+
 struct rte_hash *
 rte_hash_find_existing(const char *name)
 {
@@ -121,6 +127,68 @@ get_alt_bucket_index(const struct rte_hash *h,
        return (cur_bkt_idx ^ sig) & h->bucket_bitmask;
 }
 
+/* Add an item into FIFO.
+ * return: 0 - success
+ */
+static int
+__rte_hash_rcu_qsbr_fifo_push(struct rte_ring *fifo,
+       struct __rte_hash_qs_item *item)
+{
+       if (rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->token) != 0) {
+               rte_errno = ENOSPC;
+               return 1;
+       }
+       if (rte_ring_sp_enqueue(fifo, (void *)(uintptr_t)item->index) != 0) {
+               void *obj;
+               /* token needs to be dequeued when index enqueue fails */
+               rte_ring_sc_dequeue(fifo, &obj);
+               rte_errno = ENOSPC;
+               return 1;
+       }
+
+       return 0;
+}
+
+/* Remove item from FIFO.
+ * Used when data observed by rte_ring_peek.
+ */
+static void
+__rte_hash_rcu_qsbr_fifo_pop(struct rte_ring *fifo,
+       struct __rte_hash_qs_item *item)
+{
+       void *obj_token = NULL;
+       void *obj_index = NULL;
+
+       (void)rte_ring_sc_dequeue(fifo, &obj_token);
+       (void)rte_ring_sc_dequeue(fifo, &obj_index);
+
+       if (item) {
+               item->token = (uint64_t)((uintptr_t)obj_token);
+               item->index = (uint32_t)((uintptr_t)obj_index);
+       }
+}
+
+/* RCU clean up
+ * This only cleans up data associated with keys (pdata)
+ * Assumption: No reader is using the hash table
+ */
+static void
+__rte_hash_rcu_qsbr_clean_up(const struct rte_hash *h)
+{
+               struct __rte_hash_qs_item qs_item;
+               uint32_t key_to_free;
+               void *hash_data = NULL;
+               struct rte_hash_key *k = NULL;
+               while (!rte_ring_empty(h->qs_fifo)) {
+                       __rte_hash_rcu_qsbr_fifo_pop(h->qs_fifo, &qs_item);
+                       key_to_free = (uint32_t)(qs_item.index);
+                       k = (struct rte_hash_key *) ((char *)h->key_store +
+                               key_to_free * h->key_entry_size);
+                       hash_data = k->pdata;
+                       h->free_key_data_func(hash_data);
+               }
+}
+
 struct rte_hash *
 rte_hash_create(const struct rte_hash_parameters *params)
 {
@@ -193,11 +261,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
        if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL)
                no_free_on_del = 1;
 
-       if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
+       if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF)
                readwrite_concur_lf_support = 1;
-               /* Enable not freeing internal memory/index on delete */
-               no_free_on_del = 1;
-       }
 
        /* Store all keys and leave the first entry as a dummy entry for 
lookup_bulk */
        if (use_local_cache)
@@ -484,6 +549,13 @@ rte_hash_free(struct rte_hash *h)
 
        rte_mcfg_tailq_write_unlock();
 
+       /* RCU clean up
+        * This only cleans up data associated with keys (pdata)
+        * Assumption: No reader is using the hash table
+        */
+       if (h->qsv && h->free_key_data_func)
+               __rte_hash_rcu_qsbr_clean_up(h);
+
        if (h->use_local_cache)
                rte_free(h->local_free_slots);
        if (h->writer_takes_lock)
@@ -495,6 +567,7 @@ rte_hash_free(struct rte_hash *h)
        rte_free(h->buckets_ext);
        rte_free(h->tbl_chng_cnt);
        rte_free(h->ext_bkt_to_free);
+       rte_ring_free(h->qs_fifo);
        rte_free(h);
        rte_free(te);
 }
@@ -567,6 +640,106 @@ __hash_rw_reader_unlock(const struct rte_hash *h)
                rte_rwlock_read_unlock(h->readwrite_lock);
 }
 
+/* Max number of indexes to reclaim at one time. */
+#define RCU_QSBR_RECLAIM_SIZE  8
+
+/* When RCU QSBR FIFO usage is above 1/(2^RCU_QSBR_RECLAIM_LEVEL),
+ * reclaim will be triggered.
+ */
+#define RCU_QSBR_RECLAIM_LEVEL 3
+
+/* Reclaim some key/ext-bkt indexes based on quiescent state check.
+ * RCU_QSBR_RECLAIM_SIZE groups will be reclaimed at max.
+ * return: 0 - success, -EINVAL - no index reclaimed.
+ */
+static int
+__rte_hash_rcu_qsbr_reclaim_chunk(const struct rte_hash *h, uint32_t *index)
+{
+       struct __rte_hash_qs_item qs_item;
+       void *obj_token;
+       uint32_t cnt = 0;
+       uint32_t key_to_free;
+       uint32_t ext_bkt_to_free;
+       void *hash_data = NULL;
+       struct rte_hash_key *keys, *k;
+       if (h->qsv == NULL)
+               return -ENOSPC;
+
+       unsigned int lcore_id, n_slots;
+       struct lcore_cache *cached_free_slots;
+       keys = h->key_store;
+
+       /* Check reader threads quiescent state and
+        * reclaim as much as possible.
+        */
+       while ((cnt < RCU_QSBR_RECLAIM_SIZE) &&
+               (rte_ring_peek(h->qs_fifo, &obj_token) == 0) &&
+               (rte_rcu_qsbr_check(h->qsv, (uint64_t)((uintptr_t)obj_token),
+                                       false) == 1)) {
+               __rte_hash_rcu_qsbr_fifo_pop(h->qs_fifo, &qs_item);
+
+               key_to_free = (uint32_t)(qs_item.index);
+               ext_bkt_to_free = (uint32_t)(qs_item.index >> 32);
+               k = (struct rte_hash_key *) ((char *)keys +
+                                       key_to_free * h->key_entry_size);
+               hash_data = k->pdata;
+               h->free_key_data_func(hash_data);
+               if (h->ext_table_support) {
+                       if (ext_bkt_to_free)
+                               /* Recycle empty ext bkt to free list. */
+                               rte_ring_sp_enqueue(h->free_ext_bkts,
+                                       (void *)(uintptr_t)ext_bkt_to_free);
+               }
+               /* Store key_idx for the new key in index*/
+               if (index != NULL  && *index != EMPTY_SLOT)
+                       *index = key_to_free;
+
+               /* Return rest of the key indexes to free slot ring */
+               else if (h->use_local_cache) {
+                       lcore_id = rte_lcore_id();
+                       cached_free_slots = &h->local_free_slots[lcore_id];
+                       /* Cache full, need to free it. */
+                       if (cached_free_slots->len == LCORE_CACHE_SIZE) {
+                               /* Need to enqueue the free slots in global 
ring. */
+                               n_slots = 
rte_ring_mp_enqueue_burst(h->free_slots,
+                                                       cached_free_slots->objs,
+                                                       LCORE_CACHE_SIZE, NULL);
+                               RETURN_IF_TRUE((n_slots == 0), -EFAULT);
+                               cached_free_slots->len -= n_slots;
+                       }
+                       /* Put index of new free slot in cache. */
+                       cached_free_slots->objs[cached_free_slots->len] =
+                                               (void 
*)((uintptr_t)key_to_free);
+                       cached_free_slots->len++;
+               } else {
+                       rte_ring_sp_enqueue(h->free_slots,
+                                       (void *)((uintptr_t)key_to_free));
+               }
+
+               cnt++;
+       }
+
+       if (cnt)
+               return 0;
+       return -ENOSPC;
+}
+
+/* Trigger reclaim when necessary.
+ * Reclaim happens when RCU QSBR queue usage is over 12.5%.
+ */
+static void
+__rte_hash_rcu_qsbr_try_reclaim(const struct rte_hash *h)
+{
+       if (h->qsv == NULL)
+               return;
+
+       if (rte_ring_count(h->qs_fifo) <
+               (rte_ring_get_capacity(h->qs_fifo) >> RCU_QSBR_RECLAIM_LEVEL))
+               return;
+
+       (void)__rte_hash_rcu_qsbr_reclaim_chunk(h, NULL);
+}
+
 void
 rte_hash_reset(struct rte_hash *h)
 {
@@ -576,6 +749,14 @@ rte_hash_reset(struct rte_hash *h)
                return;
 
        __hash_rw_writer_lock(h);
+
+       /* RCU clean up
+        * This only cleans up data associated with keys (pdata)
+        * Assumption: No reader is using the hash table
+        */
+       if (h->qsv && h->free_key_data_func)
+               __rte_hash_rcu_qsbr_clean_up(h);
+
        memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
        memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
        *h->tbl_chng_cnt = 0;
@@ -612,6 +793,10 @@ rte_hash_reset(struct rte_hash *h)
                for (i = 0; i < RTE_MAX_LCORE; i++)
                        h->local_free_slots[i].len = 0;
        }
+
+       if (h->qs_fifo)
+               rte_ring_reset(h->qs_fifo);
+
        __hash_rw_writer_unlock(h);
 }
 
@@ -915,6 +1100,19 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
        return -ENOSPC;
 }
 
+static inline int32_t
+get_free_slot(const struct rte_hash *h)
+{
+       uint32_t key_to_free;
+       __hash_rw_writer_lock(h);
+       if (__rte_hash_rcu_qsbr_reclaim_chunk(h, &key_to_free) < 0) {
+               __hash_rw_writer_unlock(h);
+               return -ENOSPC;
+       }
+       __hash_rw_writer_unlock(h);
+       return key_to_free;
+}
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
                                                hash_sig_t sig, void *data)
@@ -972,7 +1170,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, 
const void *key,
                                        cached_free_slots->objs,
                                        LCORE_CACHE_SIZE, NULL);
                        if (n_slots == 0) {
-                               return -ENOSPC;
+                               goto rcu_check;
                        }
 
                        cached_free_slots->len += n_slots;
@@ -982,11 +1180,19 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, 
const void *key,
                cached_free_slots->len--;
                slot_id = cached_free_slots->objs[cached_free_slots->len];
        } else {
-               if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
-                       return -ENOSPC;
-               }
+               if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0)
+                       goto rcu_check;
        }
+       goto new_slot_found;
 
+rcu_check:
+       ret = get_free_slot(h);
+       if (ret < 0)
+               return -ENOSPC;
+       else
+               slot_id = (void *)((uintptr_t)ret);
+
+new_slot_found:
        new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
        new_idx = (uint32_t)((uintptr_t) slot_id);
        /* The store to application data (by the application) at *data should
@@ -1454,10 +1660,11 @@ search_and_remove(const struct rte_hash *h, const void 
*key,
                                        key_idx * h->key_entry_size);
                        if (rte_hash_cmp_eq(key, k->key, h) == 0) {
                                bkt->sig_current[i] = NULL_SIGNATURE;
-                               /* Free the key store index if
-                                * no_free_on_del is disabled.
+                               /* Free the key store index if no_free_on_del
+                                * and readwrite_concur_lf_support is disabled.
                                 */
-                               if (!h->no_free_on_del)
+                               if (!h->readwrite_concur_lf_support &&
+                                       !h->no_free_on_del)
                                        remove_entry(h, bkt, i);
 
                                __atomic_store_n(&bkt->key_idx[i],
@@ -1486,6 +1693,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, 
const void *key,
        int pos;
        int32_t ret, i;
        uint16_t short_sig;
+       uint32_t ext_bkt_to_free = 0;
+       uint32_t key_to_free = 0;
+       struct __rte_hash_qs_item qs_item;
 
        short_sig = get_short_sig(sig);
        prim_bucket_idx = get_prim_bucket_index(h, sig);
@@ -1520,10 +1730,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, 
const void *key,
 
 /* Search last bucket to see if empty to be recycled */
 return_bkt:
-       if (!last_bkt) {
-               __hash_rw_writer_unlock(h);
-               return ret;
-       }
+       if (!last_bkt)
+               goto return_key;
+
        while (last_bkt->next) {
                prev_bkt = last_bkt;
                last_bkt = last_bkt->next;
@@ -1538,9 +1747,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, 
const void *key,
                prev_bkt->next = NULL;
                uint32_t index = last_bkt - h->buckets_ext + 1;
                /* Recycle the empty bkt if
-                * no_free_on_del is disabled.
+                * readwrite_concur_lf_support is disabled.
                 */
-               if (h->no_free_on_del)
+               if (h->readwrite_concur_lf_support) {
                        /* Store index of an empty ext bkt to be recycled
                         * on calling rte_hash_del_xxx APIs.
                         * When lock free read-write concurrency is enabled,
@@ -1548,10 +1757,34 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, 
const void *key,
                         * immediately (as readers might be using it still).
                         * Hence freeing of the ext bkt is piggy-backed to
                         * freeing of the key index.
+                        * If using external RCU, store this index in an array.
                         */
-                       h->ext_bkt_to_free[ret] = index;
-               else
-                       rte_ring_sp_enqueue(h->free_ext_bkts, (void 
*)(uintptr_t)index);
+                       if (h->qsv == NULL)
+                               h->ext_bkt_to_free[ret] = index;
+                       /* Enqueue this index to RCU fifo if using internal RCU
+                        * and no_free_on_del is disabled.
+                        */
+                       else if (!h->no_free_on_del)
+                               ext_bkt_to_free = index;
+               } else
+                       rte_ring_sp_enqueue(h->free_ext_bkts,
+                                       (void *)(uintptr_t)index);
+       }
+
+return_key:
+       if (h->readwrite_concur_lf_support && h->qsv && !h->no_free_on_del) {
+               qs_item.token = rte_rcu_qsbr_start(h->qsv);
+               /* Key index where key is stored, adding the first dummy index 
*/
+               key_to_free = ret + 1;
+               qs_item.index = ((uint64_t)ext_bkt_to_free << 32) | key_to_free;
+               /* Push into QSBR FIFO. */
+               if (__rte_hash_rcu_qsbr_fifo_push(h->qs_fifo, &qs_item) != 0)
+                       RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
+
+               /* Speculatively reclaim 'deleted' indexes.
+                * Help spread the reclaim work load across multiple calls.
+                */
+               __rte_hash_rcu_qsbr_try_reclaim(h);
        }
        __hash_rw_writer_unlock(h);
        return ret;
@@ -1610,12 +1843,42 @@ rte_hash_free_key_with_position(const struct rte_hash 
*h,
        /* Out of bounds */
        if (key_idx >= total_entries)
                return -EINVAL;
-       if (h->ext_table_support && h->readwrite_concur_lf_support) {
-               uint32_t index = h->ext_bkt_to_free[position];
-               if (index) {
-                       /* Recycle empty ext bkt to free list. */
-                       rte_ring_sp_enqueue(h->free_ext_bkts, (void 
*)(uintptr_t)index);
-                       h->ext_bkt_to_free[position] = 0;
+       if (h->readwrite_concur_lf_support) {
+               if (!h->qsv) {
+                       if (h->ext_table_support) {
+                               __hash_rw_writer_lock(h);
+                               uint32_t index = h->ext_bkt_to_free[position];
+                               if (index) {
+                                       /* Recycle empty ext bkt to free list. 
*/
+                                       rte_ring_sp_enqueue(h->free_ext_bkts,
+                                               (void *)(uintptr_t)index);
+                                       h->ext_bkt_to_free[position] = 0;
+                               }
+                               __hash_rw_writer_unlock(h);
+                       }
+               } else if (h->no_free_on_del) {
+                       /* Push into QSBR FIFO */
+                       struct __rte_hash_qs_item qs_item;
+                       uint32_t ext_bkt_to_free = 0;
+                       uint32_t key_to_free = 0;
+                       qs_item.token = rte_rcu_qsbr_start(h->qsv);
+                       key_to_free = key_idx;
+                       if (h->ext_table_support)
+                               ext_bkt_to_free = h->ext_bkt_to_free[position];
+                       qs_item.index = ((uint64_t)ext_bkt_to_free << 32) |
+                                               key_to_free;
+                       __hash_rw_writer_lock(h);
+                       /* Push into QSBR FIFO. */
+                       if (__rte_hash_rcu_qsbr_fifo_push(h->qs_fifo, &qs_item)
+                               != 0)
+                               RTE_LOG(ERR, HASH, "Failed to push QSBR 
FIFO\n");
+
+                       /* Speculatively reclaim 'deleted' indexes.
+                        * Help spread the reclaim work load across multiple 
calls.
+                        */
+                       __rte_hash_rcu_qsbr_try_reclaim(h);
+                       __hash_rw_writer_unlock(h);
+                       return 0;
                }
        }
 
@@ -2225,3 +2488,40 @@ rte_hash_iterate(const struct rte_hash *h, const void 
**key, void **data, uint32
        (*next)++;
        return position - 1;
 }
+
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_rcu_qsbr *v,
+                               rte_hash_free_key_data free_key_data_func)
+{
+       uint32_t qs_fifo_size;
+       char rcu_ring_name[RTE_RING_NAMESIZE];
+
+       if ((h == NULL) || (v == NULL)) {
+               rte_errno = EINVAL;
+               return 1;
+       }
+
+       if (h->qsv) {
+               rte_errno = EEXIST;
+               return 1;
+       }
+
+       /* round up qs_fifo_size to next power of two that is not less than
+        * total hash table entries. Will store 'token' and 'index'.
+        */
+       qs_fifo_size = 2 * rte_align32pow2(h->entries);
+
+       /* Init QSBR reclaiming FIFO. */
+       snprintf(rcu_ring_name, sizeof(rcu_ring_name), "HT_RCU_%s", h->name);
+       h->qs_fifo = rte_ring_create(rcu_ring_name, 
rte_align32pow2(qs_fifo_size),
+                       SOCKET_ID_ANY, 0);
+       if (h->qs_fifo == NULL) {
+               RTE_LOG(ERR, HASH, "Hash QS FIFO memory allocation failed\n");
+               rte_errno = ENOMEM;
+               return 1;
+       }
+       h->qsv = v;
+       h->free_key_data_func = free_key_data_func;
+
+       return 0;
+}
diff --git a/lib/librte_hash/rte_cuckoo_hash.h 
b/lib/librte_hash/rte_cuckoo_hash.h
index fb19bb27dfef..c64494ee8b85 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -168,6 +168,9 @@ struct rte_hash {
        struct lcore_cache *local_free_slots;
        /**< Local cache per lcore, storing some indexes of the free slots */
 
+       struct rte_rcu_qsbr *qsv;
+       struct rte_ring *qs_fifo;
+       rte_hash_free_key_data free_key_data_func;
        /* Fields used in lookup */
 
        uint32_t key_len __rte_cache_aligned;
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index 0d73370dc483..20db00784d0b 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -15,6 +15,7 @@
 #include <stddef.h>
 
 #include <rte_compat.h>
+#include <rte_rcu_qsbr.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -44,8 +45,6 @@ extern "C" {
 
 /** Flag to disable freeing of key index on hash delete.
  * Refer to rte_hash_del_xxx APIs for more details.
- * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
- * is enabled.
  */
 #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
 
@@ -69,6 +68,13 @@ typedef uint32_t (*rte_hash_function)(const void *key, 
uint32_t key_len,
 /** Type of function used to compare the hash key. */
 typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t 
key_len);
 
+/**
+ * Type of function used to free data stored in the key.
+ * Required when using internal RCU to allow appication to free key-data once
+ * the key is returned to the the ring of free key-slots.
+ */
+typedef void (*rte_hash_free_key_data)(void *key_data);
+
 /**
  * Parameters used when creating the hash table.
  */
@@ -267,8 +273,7 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const 
void *key, hash_sig_t
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -296,8 +301,7 @@ rte_hash_del_key(const struct rte_hash *h, const void *key);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
@@ -350,8 +354,7 @@ rte_hash_get_key_with_position(const struct rte_hash *h, 
const int32_t position,
  * of the key. This operation is not multi-thread safe and should
  * only be called from one thread by default. Thread safety
  * can be enabled by setting flag during table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
  * the key index returned by rte_hash_del_key_xxx APIs must be freed
  * using this API. This API should be called after all the readers
  * have stopped referencing the entry corresponding to this key.
@@ -545,6 +548,25 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const void 
**keys,
  */
 int32_t
 rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, 
uint32_t *next);
+
+/**
+ * Configure the RCU QSBR variable to use in hash structure.
+ *
+ * @param h
+ *   Hash table
+ * @param v
+ *   RCU QSBR variable
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL if invalid pointer
+ *   - EEXIST if already added QSBR
+ *   - ENOMEM if memory allocation failure
+ */
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_rcu_qsbr *v,
+                       rte_hash_free_key_data free_key_data_func);
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_hash/rte_hash_version.map 
b/lib/librte_hash/rte_hash_version.map
index 734ae28b0408..8c3b21891de0 100644
--- a/lib/librte_hash/rte_hash_version.map
+++ b/lib/librte_hash/rte_hash_version.map
@@ -58,5 +58,5 @@ EXPERIMENTAL {
        global:
 
        rte_hash_free_key_with_position;
-
+       rte_hash_rcu_qsbr_add;
 };
-- 
2.17.1

Reply via email to