> 
> In use cases that hash table capacity needs to be guaranteed, the extendable
> bucket feature can be used to contain extra keys in linked lists when conflict
> happens. This is similar concept to the extendable bucket hash table in packet
> framework.
> 
> This commit adds the extendable bucket feature. User can turn it on or off
> through the extra flag field during table creation time.
> 
> Extendable bucket table composes of buckets that can be linked list to current
> main table. When extendable bucket is enabled, the hash table load can
> always acheive 100%.
> In other words, the table can always accomodate the same number of keys as
> the specified table size. This provides 100% table capacity guarantee.
> Although keys ending up in the ext buckets may have longer look up time, they
> should be rare due to the cuckoo algorithm.
> 
> Signed-off-by: Yipeng Wang <yipeng1.w...@intel.com>
> ---
>  lib/librte_hash/rte_cuckoo_hash.c | 376
> ++++++++++++++++++++++++++++++++------
>  lib/librte_hash/rte_cuckoo_hash.h |   5 +
>  lib/librte_hash/rte_hash.h        |   3 +
>  3 files changed, 331 insertions(+), 53 deletions(-)
> 
> diff --git a/lib/librte_hash/rte_cuckoo_hash.c
> b/lib/librte_hash/rte_cuckoo_hash.c
> index eba13e9..02650b9 100644
> --- a/lib/librte_hash/rte_cuckoo_hash.c
> +++ b/lib/librte_hash/rte_cuckoo_hash.c
> @@ -31,6 +31,10 @@
>  #include "rte_hash.h"
>  #include "rte_cuckoo_hash.h"
> 
> +#define FOR_EACH_BUCKET(CURRENT_BKT, START_BUCKET)
> \
> +     for (CURRENT_BKT = START_BUCKET;                                      \
> +             CURRENT_BKT != NULL;                                          \
> +             CURRENT_BKT = CURRENT_BKT->next)
> 
>  TAILQ_HEAD(rte_hash_list, rte_tailq_entry);
> 
> @@ -63,6 +67,14 @@ rte_hash_find_existing(const char *name)
>       return h;
>  }
> 
> +static inline struct rte_hash_bucket *
> +rte_hash_get_last_bkt(struct rte_hash_bucket *lst_bkt) {
> +     while (lst_bkt->next != NULL)
> +             lst_bkt = lst_bkt->next;
> +     return lst_bkt;
> +}
> +
>  void rte_hash_set_cmp_func(struct rte_hash *h, rte_hash_cmp_eq_t func)  {
>       h->cmp_jump_table_idx = KEY_CUSTOM;
> @@ -85,13 +97,17 @@ rte_hash_create(const struct rte_hash_parameters
> *params)
>       struct rte_tailq_entry *te = NULL;
>       struct rte_hash_list *hash_list;
>       struct rte_ring *r = NULL;
> +     struct rte_ring *r_ext = NULL;
>       char hash_name[RTE_HASH_NAMESIZE];
>       void *k = NULL;
>       void *buckets = NULL;
> +     void *buckets_ext = NULL;
>       char ring_name[RTE_RING_NAMESIZE];
> +     char ext_ring_name[RTE_RING_NAMESIZE];
>       unsigned num_key_slots;
>       unsigned i;
>       unsigned int hw_trans_mem_support = 0, multi_writer_support = 0;
> +     unsigned int ext_table_support = 0;
>       unsigned int readwrite_concur_support = 0;
> 
>       rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
> @@ -124,6 +140,9 @@ rte_hash_create(const struct rte_hash_parameters
> *params)
>               multi_writer_support = 1;
>       }
> 
> +     if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_EXT_TABLE)
> +             ext_table_support = 1;
> +
>       /* Store all keys and leave the first entry as a dummy entry for
> lookup_bulk */
>       if (multi_writer_support)
>               /*
> @@ -145,6 +164,24 @@ rte_hash_create(const struct rte_hash_parameters
> *params)
>               goto err;
>       }
> 
> +     const uint32_t num_buckets = rte_align32pow2(params->entries) /
> +                                             RTE_HASH_BUCKET_ENTRIES;
> +
> +     /* Create ring for extendable buckets. */
> +     if (ext_table_support) {
> +             snprintf(ext_ring_name, sizeof(ext_ring_name), "HT_EXT_%s",
> +                                                             params-
> >name);
> +             r_ext = rte_ring_create(ext_ring_name,
> +                             rte_align32pow2(num_buckets + 1),
> +                             params->socket_id, 0);
> +
> +             if (r_ext == NULL) {
> +                     RTE_LOG(ERR, HASH, "ext buckets memory allocation
> "
> +                                                             "failed\n");
> +                     goto err;
> +             }
> +     }
> +
>       snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name);
> 
>       rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
> @@ -177,18 +214,34 @@ rte_hash_create(const struct rte_hash_parameters
> *params)
>               goto err_unlock;
>       }
> 
> -     const uint32_t num_buckets = rte_align32pow2(params->entries)
> -                                     / RTE_HASH_BUCKET_ENTRIES;
> -
>       buckets = rte_zmalloc_socket(NULL,
>                               num_buckets * sizeof(struct rte_hash_bucket),
>                               RTE_CACHE_LINE_SIZE, params->socket_id);
> 
>       if (buckets == NULL) {
> -             RTE_LOG(ERR, HASH, "memory allocation failed\n");
> +             RTE_LOG(ERR, HASH, "buckets memory allocation failed\n");
>               goto err_unlock;
>       }
> 
> +     /* Allocate same number of extendable buckets */
> +     if (ext_table_support) {
> +             buckets_ext = rte_zmalloc_socket(NULL,
> +                             num_buckets * sizeof(struct rte_hash_bucket),
> +                             RTE_CACHE_LINE_SIZE, params->socket_id);
> +             if (buckets_ext == NULL) {
> +                     RTE_LOG(ERR, HASH, "ext buckets memory allocation
> "
> +                                                     "failed\n");
> +                     goto err_unlock;
> +             }
> +             /* Populate ext bkt ring. We reserve 0 similar to the
> +              * key-data slot, just in case in future we want to
> +              * use bucket index for the linked list and 0 means NULL
> +              * for next bucket
> +              */
> +             for (i = 1; i <= num_buckets; i++)
> +                     rte_ring_sp_enqueue(r_ext, (void *)((uintptr_t) i));
> +     }
> +
>       const uint32_t key_entry_size = sizeof(struct rte_hash_key) + params-
> >key_len;
>       const uint64_t key_tbl_size = (uint64_t) key_entry_size *
> num_key_slots;
> 
> @@ -262,6 +315,8 @@ rte_hash_create(const struct rte_hash_parameters
> *params)
>       h->num_buckets = num_buckets;
>       h->bucket_bitmask = h->num_buckets - 1;
>       h->buckets = buckets;
> +     h->buckets_ext = buckets_ext;
> +     h->free_ext_bkts = r_ext;
>       h->hash_func = (params->hash_func == NULL) ?
>               default_hash_func : params->hash_func;
>       h->key_store = k;
> @@ -269,6 +324,7 @@ rte_hash_create(const struct rte_hash_parameters
> *params)
>       h->hw_trans_mem_support = hw_trans_mem_support;
>       h->multi_writer_support = multi_writer_support;
>       h->readwrite_concur_support = readwrite_concur_support;
> +     h->ext_table_support = ext_table_support;
> 
>  #if defined(RTE_ARCH_X86)
>       if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX2))
> @@ -304,9 +360,11 @@ rte_hash_create(const struct rte_hash_parameters
> *params)
>       rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
>  err:
>       rte_ring_free(r);
> +     rte_ring_free(r_ext);
>       rte_free(te);
>       rte_free(h);
>       rte_free(buckets);
> +     rte_free(buckets_ext);
>       rte_free(k);
>       return NULL;
>  }
> @@ -344,8 +402,10 @@ rte_hash_free(struct rte_hash *h)
>               rte_free(h->readwrite_lock);
>       }
>       rte_ring_free(h->free_slots);
> +     rte_ring_free(h->free_ext_bkts);
>       rte_free(h->key_store);
>       rte_free(h->buckets);
> +     rte_free(h->buckets_ext);
>       rte_free(h);
>       rte_free(te);
>  }
> @@ -403,7 +463,6 @@ __hash_rw_writer_lock(const struct rte_hash *h)
>               rte_rwlock_write_lock(h->readwrite_lock);
>  }
> 
> -
>  static inline void
>  __hash_rw_reader_lock(const struct rte_hash *h)  { @@ -448,6 +507,14 @@
> rte_hash_reset(struct rte_hash *h)
>       while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
>               rte_pause();
> 
> +     /* clear free extendable bucket ring and memory */
> +     if (h->ext_table_support) {
> +             memset(h->buckets_ext, 0, h->num_buckets *
> +                                             sizeof(struct
> rte_hash_bucket));
> +             while (rte_ring_dequeue(h->free_ext_bkts, &ptr) == 0)
> +                     rte_pause();
> +     }
> +
>       /* Repopulate the free slots ring. Entry zero is reserved for key misses
> */
>       if (h->multi_writer_support)
>               tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) * @@ -
> 458,6 +525,13 @@ rte_hash_reset(struct rte_hash *h)
>       for (i = 1; i < tot_ring_cnt + 1; i++)
>               rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i));
> 
> +     /* Repopulate the free ext bkt ring. */
> +     if (h->ext_table_support) {
> +             for (i = 1; i < h->num_buckets + 1; i++)
> +                     rte_ring_sp_enqueue(h->free_ext_bkts,
> +                                             (void *)((uintptr_t) i));
> +     }
> +
>       if (h->multi_writer_support) {
>               /* Reset local caches per lcore */
>               for (i = 0; i < RTE_MAX_LCORE; i++)
> @@ -524,24 +598,27 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash
> *h,
>               int32_t *ret_val)
>  {
>       unsigned int i;
> -     struct rte_hash_bucket *cur_bkt = prim_bkt;
> +     struct rte_hash_bucket *cur_bkt;
>       int32_t ret;
> 
>       __hash_rw_writer_lock(h);
>       /* Check if key was inserted after last check but before this
>        * protected region in case of inserting duplicated keys.
>        */
> -     ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
> +     ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
>       if (ret != -1) {
>               __hash_rw_writer_unlock(h);
>               *ret_val = ret;
>               return 1;
>       }
> -     ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
> -     if (ret != -1) {
> -             __hash_rw_writer_unlock(h);
> -             *ret_val = ret;
> -             return 1;
> +
> +     FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
> +             ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
> +             if (ret != -1) {
> +                     __hash_rw_writer_unlock(h);
> +                     *ret_val = ret;
> +                     return 1;
> +             }
>       }
> 
>       /* Insert new entry if there is room in the primary @@ -580,7 +657,7
> @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
>                       int32_t *ret_val)
>  {
>       uint32_t prev_alt_bkt_idx;
> -     struct rte_hash_bucket *cur_bkt = bkt;
> +     struct rte_hash_bucket *cur_bkt;
>       struct queue_node *prev_node, *curr_node = leaf;
>       struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
>       uint32_t prev_slot, curr_slot = leaf_slot; @@ -597,18 +674,20 @@
> rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
>       /* Check if key was inserted after last check but before this
>        * protected region.
>        */
> -     ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
> +     ret = search_and_update(h, data, key, bkt, sig, alt_hash);
>       if (ret != -1) {
>               __hash_rw_writer_unlock(h);
>               *ret_val = ret;
>               return 1;
>       }
> 
> -     ret = search_and_update(h, data, key, alt_bkt, alt_hash, sig);
> -     if (ret != -1) {
> -             __hash_rw_writer_unlock(h);
> -             *ret_val = ret;
> -             return 1;
> +     FOR_EACH_BUCKET(cur_bkt, alt_bkt) {
> +             ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
> +             if (ret != -1) {
> +                     __hash_rw_writer_unlock(h);
> +                     *ret_val = ret;
> +                     return 1;
> +             }
>       }
> 
>       while (likely(curr_node->prev != NULL)) { @@ -711,15 +790,18 @@
> __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,  {
>       hash_sig_t alt_hash;
>       uint32_t prim_bucket_idx, sec_bucket_idx;
> -     struct rte_hash_bucket *prim_bkt, *sec_bkt;
> +     struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt;
>       struct rte_hash_key *new_k, *keys = h->key_store;
>       void *slot_id = NULL;
> -     uint32_t new_idx;
> +     void *ext_bkt_id = NULL;
> +     uint32_t new_idx, bkt_id;
>       int ret;
>       unsigned n_slots;
>       unsigned lcore_id;
> +     unsigned int i;
>       struct lcore_cache *cached_free_slots = NULL;
>       int32_t ret_val;
> +     struct rte_hash_bucket *last;
> 
>       prim_bucket_idx = sig & h->bucket_bitmask;
>       prim_bkt = &h->buckets[prim_bucket_idx]; @@ -739,10 +821,12 @@
> __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
>       }
> 
>       /* Check if key is already inserted in secondary location */
> -     ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
> -     if (ret != -1) {
> -             __hash_rw_writer_unlock(h);
> -             return ret;
> +     FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
> +             ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
> +             if (ret != -1) {
> +                     __hash_rw_writer_unlock(h);
> +                     return ret;
> +             }
>       }
>       __hash_rw_writer_unlock(h);
> 
> @@ -808,10 +892,70 @@ __rte_hash_add_key_with_hash(const struct
> rte_hash *h, const void *key,
>       else if (ret == 1) {
>               enqueue_slot_back(h, cached_free_slots, slot_id);
>               return ret_val;
> -     } else {
> +     }
> +
> +     /* if ext table not enabled, we failed the insertion */
> +     if (!h->ext_table_support) {
>               enqueue_slot_back(h, cached_free_slots, slot_id);
>               return ret;
>       }
> +
> +     /* Now we need to go through the extendable bucket. Protection is
> needed
> +      * to protect all extendable bucket processes.
> +      */
> +     __hash_rw_writer_lock(h);
> +     /* We check for duplicates again since could be inserted before the
> lock */
> +     ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
> +     if (ret != -1) {
> +             enqueue_slot_back(h, cached_free_slots, slot_id);
> +             goto failure;
> +     }
> +
> +     FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
> +             ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
> +             if (ret != -1) {
> +                     enqueue_slot_back(h, cached_free_slots, slot_id);
> +                     goto failure;
> +             }
> +     }
> +
> +     /* Search sec and ext buckets to find an empty entry to insert. */
> +     FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
> +             for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
> +                     /* Check if slot is available */
> +                     if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) {
> +                             cur_bkt->sig_current[i] = alt_hash;
> +                             cur_bkt->sig_alt[i] = sig;
> +                             cur_bkt->key_idx[i] = new_idx;
> +                             __hash_rw_writer_unlock(h);
> +                             return new_idx - 1;
> +                     }
> +             }
> +     }
> +
> +     /* Failed to get an empty entry from extendable buckets. Link a new
> +      * extendable bucket. We first get a free bucket from ring.
> +      */
> +     if (rte_ring_sc_dequeue(h->free_ext_bkts, &ext_bkt_id) != 0) {
> +             ret = -ENOSPC;
> +             goto failure;
> +     }
> +
> +     bkt_id = (uint32_t)((uintptr_t)ext_bkt_id) - 1;
> +     /* Use the first location of the new bucket */
> +     (h->buckets_ext[bkt_id]).sig_current[0] = alt_hash;
> +     (h->buckets_ext[bkt_id]).sig_alt[0] = sig;
> +     (h->buckets_ext[bkt_id]).key_idx[0] = new_idx;
> +     /* Link the new bucket to sec bucket linked list */
> +     last = rte_hash_get_last_bkt(sec_bkt);
> +     last->next = &h->buckets_ext[bkt_id];
> +     __hash_rw_writer_unlock(h);
> +     return new_idx - 1;
> +
> +failure:
> +     __hash_rw_writer_unlock(h);
> +     return ret;
> +
>  }
> 
>  int32_t
> @@ -890,7 +1034,7 @@ __rte_hash_lookup_with_hash(const struct rte_hash
> *h, const void *key,  {
>       uint32_t bucket_idx;
>       hash_sig_t alt_hash;
> -     struct rte_hash_bucket *bkt;
> +     struct rte_hash_bucket *bkt, *cur_bkt;
>       int ret;
> 
>       bucket_idx = sig & h->bucket_bitmask;
> @@ -910,10 +1054,12 @@ __rte_hash_lookup_with_hash(const struct
> rte_hash *h, const void *key,
>       bkt = &h->buckets[bucket_idx];
> 
>       /* Check if key is in secondary location */
> -     ret = search_one_bucket(h, key, alt_hash, data, bkt);
> -     if (ret != -1) {
> -             __hash_rw_reader_unlock(h);
> -             return ret;
> +     FOR_EACH_BUCKET(cur_bkt, bkt) {
> +             ret = search_one_bucket(h, key, alt_hash, data, cur_bkt);
> +             if (ret != -1) {
> +                     __hash_rw_reader_unlock(h);
> +                     return ret;
> +             }
>       }
>       __hash_rw_reader_unlock(h);
>       return -ENOENT;
> @@ -978,16 +1124,42 @@ remove_entry(const struct rte_hash *h, struct
> rte_hash_bucket *bkt, unsigned i)
>       }
>  }
> 
> +/* Compact the linked list by moving key from last entry in linked list
> +to the
> + * empty slot.
> + */
> +static inline void
> +__rte_hash_compact_ll(struct rte_hash_bucket *cur_bkt, int pos) {
> +     int i;
> +     struct rte_hash_bucket *last_bkt;
> +
> +     if (!cur_bkt->next)
> +             return;
> +
> +     last_bkt = rte_hash_get_last_bkt(cur_bkt);
> +
> +     for (i = RTE_HASH_BUCKET_ENTRIES - 1; i >= 0; i--) {
> +             if (last_bkt->key_idx[i] != EMPTY_SLOT) {
> +                     cur_bkt->key_idx[pos] = last_bkt->key_idx[i];
> +                     cur_bkt->sig_current[pos] = last_bkt->sig_current[i];
> +                     cur_bkt->sig_alt[pos] = last_bkt->sig_alt[i];
> +                     last_bkt->sig_current[i] = NULL_SIGNATURE;
> +                     last_bkt->sig_alt[i] = NULL_SIGNATURE;
> +                     last_bkt->key_idx[i] = EMPTY_SLOT;
> +                     return;
In lock-free algorithm, this will require the global counter increment.

> +             }
> +     }
> +}
> +
>  /* Search one bucket and remove the matched key */  static inline int32_t
> search_and_remove(const struct rte_hash *h, const void *key,
> -                     struct rte_hash_bucket *bkt, hash_sig_t sig)
> +                     struct rte_hash_bucket *bkt, hash_sig_t sig, int *pos)
>  {
>       struct rte_hash_key *k, *keys = h->key_store;
>       unsigned int i;
>       int32_t ret;
> 
> -     /* Check if key is in primary location */
> +     /* Check if key is in bucket */
>       for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
>               if (bkt->sig_current[i] == sig &&
>                               bkt->key_idx[i] != EMPTY_SLOT) {
> @@ -996,12 +1168,12 @@ search_and_remove(const struct rte_hash *h,
> const void *key,
>                       if (rte_hash_cmp_eq(key, k->key, h) == 0) {
>                               remove_entry(h, bkt, i);
> 
> -                             /*
> -                              * Return index where key is stored,
> +                             /* Return index where key is stored,
>                                * subtracting the first dummy index
>                                */
>                               ret = bkt->key_idx[i] - 1;
>                               bkt->key_idx[i] = EMPTY_SLOT;
> +                             *pos = i;
>                               return ret;
>                       }
>               }
> @@ -1015,34 +1187,66 @@ __rte_hash_del_key_with_hash(const struct
> rte_hash *h, const void *key,  {
>       uint32_t bucket_idx;
>       hash_sig_t alt_hash;
> -     struct rte_hash_bucket *bkt;
> -     int32_t ret;
> +     struct rte_hash_bucket *prim_bkt, *sec_bkt, *prev_bkt, *last_bkt;
> +     struct rte_hash_bucket *cur_bkt;
> +     int pos;
> +     int32_t ret, i;
> 
>       bucket_idx = sig & h->bucket_bitmask;
> -     bkt = &h->buckets[bucket_idx];
> +     prim_bkt = &h->buckets[bucket_idx];
> 
>       __hash_rw_writer_lock(h);
>       /* look for key in primary bucket */
> -     ret = search_and_remove(h, key, bkt, sig);
> +     ret = search_and_remove(h, key, prim_bkt, sig, &pos);
>       if (ret != -1) {
> -             __hash_rw_writer_unlock(h);
> -             return ret;
> +             __rte_hash_compact_ll(prim_bkt, pos);
> +             last_bkt = prim_bkt->next;
> +             prev_bkt = prim_bkt;
> +             goto return_bkt;
>       }
> 
>       /* Calculate secondary hash */
>       alt_hash = rte_hash_secondary_hash(sig);
>       bucket_idx = alt_hash & h->bucket_bitmask;
> -     bkt = &h->buckets[bucket_idx];
> +     sec_bkt = &h->buckets[bucket_idx];
> +
> +     FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
> +             ret = search_and_remove(h, key, cur_bkt, alt_hash, &pos);
> +             if (ret != -1) {
> +                     __rte_hash_compact_ll(cur_bkt, pos);
> +                     last_bkt = sec_bkt->next;
> +                     prev_bkt = sec_bkt;
> +                     goto return_bkt;
> +             }
> +     }
> 
> -     /* look for key in secondary bucket */
> -     ret = search_and_remove(h, key, bkt, alt_hash);
> -     if (ret != -1) {
> +     __hash_rw_writer_unlock(h);
> +     return -ENOENT;
> +
> +/* Search last bucket to see if empty to be recycled */
> +return_bkt:
> +     if (!last_bkt) {
>               __hash_rw_writer_unlock(h);
>               return ret;
>       }
> +     while (last_bkt->next) {
> +             prev_bkt = last_bkt;
> +             last_bkt = last_bkt->next;
> +     }
Minor: We are trying to find the last bucket here, along with its previous. May 
be we can modify 'rte_hash_get_last_bkt' instead?

> +
> +     for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
> +             if (last_bkt->key_idx[i] != EMPTY_SLOT)
> +                     break;
> +     }
> +     /* found empty bucket and recycle */
> +     if (i == RTE_HASH_BUCKET_ENTRIES) {
> +             prev_bkt->next = last_bkt->next = NULL;
> +             uint32_t index = last_bkt - h->buckets_ext + 1;
> +             rte_ring_sp_enqueue(h->free_ext_bkts, (void
> *)(uintptr_t)index);
In the lock-less algorithm, the bucket cannot be freed immediately. I looked at 
couple of solutions. The bucket needs to be stored internally and should be 
associated with the key-store index (or position). I am thinking that I will 
add a field to 'struct rte_hash_key' to store the bucket pointer or index.

>From the code, my understanding is that we will free only the last bucket. We 
>will never free the middle bucket, please correct me if I am wrong. This will 
>keep it simple for the lock-free algorithm.

I could work through these issues. So, I do not see any issues for lock-free 
algorithm (as of now :) ).

> +     }
> 
>       __hash_rw_writer_unlock(h);
> -     return -ENOENT;
> +     return ret;
>  }
> 
>  int32_t
> @@ -1143,12 +1347,14 @@ __rte_hash_lookup_bulk(const struct rte_hash
> *h, const void **keys,  {
>       uint64_t hits = 0;
>       int32_t i;
> +     int32_t ret;
>       uint32_t prim_hash[RTE_HASH_LOOKUP_BULK_MAX];
>       uint32_t sec_hash[RTE_HASH_LOOKUP_BULK_MAX];
>       const struct rte_hash_bucket
> *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
>       const struct rte_hash_bucket
> *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
>       uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
>       uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
> +     struct rte_hash_bucket *cur_bkt, *next_bkt;
> 
>       /* Prefetch first keys */
>       for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++) @@ -1266,6
> +1472,34 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void
> **keys,
>               continue;
>       }
> 
> +     /* all found, do not need to go through ext bkt */
> +     if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) {
> +             if (hit_mask != NULL)
> +                     *hit_mask = hits;
> +             __hash_rw_reader_unlock(h);
> +             return;
> +     }
> +
> +     /* need to check ext buckets for match */
> +     for (i = 0; i < num_keys; i++) {
> +             if ((hits & (1ULL << i)) != 0)
> +                     continue;
> +             next_bkt = secondary_bkt[i]->next;
> +             FOR_EACH_BUCKET(cur_bkt, next_bkt) {
> +                     if (data != NULL)
> +                             ret = search_one_bucket(h, keys[i],
> +                                             sec_hash[i], &data[i],
> cur_bkt);
> +                     else
> +                             ret = search_one_bucket(h, keys[i],
> +                                             sec_hash[i], NULL, cur_bkt);
> +                     if (ret != -1) {
> +                             positions[i] = ret;
> +                             hits |= 1ULL << i;
> +                             break;
> +                     }
> +             }
> +     }
> +
>       __hash_rw_reader_unlock(h);
> 
>       if (hit_mask != NULL)
> @@ -1308,10 +1542,13 @@ rte_hash_iterate(const struct rte_hash *h, const
> void **key, void **data, uint32
> 
>       RETURN_IF_TRUE(((h == NULL) || (next == NULL)), -EINVAL);
> 
> -     const uint32_t total_entries = h->num_buckets *
> RTE_HASH_BUCKET_ENTRIES;
> -     /* Out of bounds */
> -     if (*next >= total_entries)
> -             return -ENOENT;
> +     const uint32_t total_entries_main = h->num_buckets *
> +
>       RTE_HASH_BUCKET_ENTRIES;
> +     const uint32_t total_entries = total_entries_main << 1;
> +
> +     /* Out of bounds of all buckets (both main table and ext table */
Typo: missing ')'

> +     if (*next >= total_entries_main)
> +             goto extend_table;
> 
>       /* Calculate bucket and index of current iterator */
>       bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES; @@ -1322,14
> +1559,13 @@ rte_hash_iterate(const struct rte_hash *h, const void **key,
> void **data, uint32
>       while (h->buckets[bucket_idx].key_idx[idx] == EMPTY_SLOT) {
>               (*next)++;
>               /* End of table */
> -             if (*next == total_entries) {
> +             if (*next == total_entries_main) {
>                       __hash_rw_reader_unlock(h);
> -                     return -ENOENT;
> +                     goto extend_table;
>               }
>               bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
>               idx = *next % RTE_HASH_BUCKET_ENTRIES;
>       }
> -
>       /* Get position of entry in key table */
>       position = h->buckets[bucket_idx].key_idx[idx];
>       next_key = (struct rte_hash_key *) ((char *)h->key_store + @@ -
> 1344,4 +1580,38 @@ rte_hash_iterate(const struct rte_hash *h, const void
> **key, void **data, uint32
>       (*next)++;
> 
>       return position - 1;
> +
> +/* Begin to iterate extendable buckets */
> +extend_table:
> +     /* Out of total bound or if ext bucket feature is not enabled */
> +     if (*next >= total_entries || !h->ext_table_support)
> +             return -ENOENT;
> +
> +     bucket_idx = (*next - total_entries_main) /
> RTE_HASH_BUCKET_ENTRIES;
> +     idx = (*next - total_entries_main) % RTE_HASH_BUCKET_ENTRIES;
> +
> +     __hash_rw_reader_lock(h);
> +     while (h->buckets_ext[bucket_idx].key_idx[idx] == EMPTY_SLOT) {
> +             (*next)++;
> +             if (*next == total_entries) {
> +                     __hash_rw_reader_unlock(h);
> +                     return -ENOENT;
> +             }
> +             bucket_idx = (*next - total_entries_main) /
> +                                             RTE_HASH_BUCKET_ENTRIES;
> +             idx = (*next - total_entries_main) %
> RTE_HASH_BUCKET_ENTRIES;
> +     }
> +     /* Get position of entry in key table */
> +     position = h->buckets_ext[bucket_idx].key_idx[idx];
> +     next_key = (struct rte_hash_key *) ((char *)h->key_store +
> +                             position * h->key_entry_size);
> +     /* Return key and data */
> +     *key = next_key->key;
> +     *data = next_key->pdata;
> +
> +     __hash_rw_reader_unlock(h);
> +
> +     /* Increment iterator */
> +     (*next)++;
> +     return position - 1;
>  }
> diff --git a/lib/librte_hash/rte_cuckoo_hash.h
> b/lib/librte_hash/rte_cuckoo_hash.h
> index fc0e5c2..e601520 100644
> --- a/lib/librte_hash/rte_cuckoo_hash.h
> +++ b/lib/librte_hash/rte_cuckoo_hash.h
> @@ -142,6 +142,8 @@ struct rte_hash_bucket {
>       hash_sig_t sig_alt[RTE_HASH_BUCKET_ENTRIES];
> 
>       uint8_t flag[RTE_HASH_BUCKET_ENTRIES];
> +
> +     void *next;
>  } __rte_cache_aligned;
> 
>  /** A hash table structure. */
> @@ -166,6 +168,7 @@ struct rte_hash {
>       /**< If multi-writer support is enabled. */
>       uint8_t readwrite_concur_support;
>       /**< If read-write concurrency support is enabled */
> +     uint8_t ext_table_support;     /**< Enable extendable bucket table */
>       rte_hash_function hash_func;    /**< Function used to calculate hash.
> */
>       uint32_t hash_func_init_val;    /**< Init value used by hash_func. */
>       rte_hash_cmp_eq_t rte_hash_custom_cmp_eq; @@ -184,6 +187,8
> @@ struct rte_hash {
>        * to the key table.
>        */
>       rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
> +     struct rte_hash_bucket *buckets_ext; /**< Extra buckets array */
> +     struct rte_ring *free_ext_bkts; /**< Ring of indexes of free buckets
> +*/
>  } __rte_cache_aligned;
> 
>  struct queue_node {
> diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h index
> 9e7d931..11d8e28 100644
> --- a/lib/librte_hash/rte_hash.h
> +++ b/lib/librte_hash/rte_hash.h
> @@ -37,6 +37,9 @@ extern "C" {
>  /** Flag to support reader writer concurrency */  #define
> RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY 0x04
> 
> +/** Flag to indicate the extendabe bucket table feature should be used
> +*/ #define RTE_HASH_EXTRA_FLAGS_EXT_TABLE 0x08
> +
>  /** Signature of key that is stored internally. */  typedef uint32_t 
> hash_sig_t;
> 
> --
> 2.7.4

Reply via email to