Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Reviewed-by: Gavin Hu <gavin...@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com>
Reviewed-by: Steve Capper <steve.cap...@arm.com>
Reviewed-by: Yipeng Wang <yipeng1.w...@intel.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 305 +++++++++++++++++++++++++-------------
 lib/librte_hash/rte_cuckoo_hash.h |   3 +
 2 files changed, 208 insertions(+), 100 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c 
b/lib/librte_hash/rte_cuckoo_hash.c
index e55725a..262162c 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -142,6 +142,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
        unsigned int readwrite_concur_support = 0;
        unsigned int writer_takes_lock = 0;
        unsigned int recycle_on_del = 1;
+       uint32_t *tbl_chng_cnt = NULL;
 
        rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
@@ -293,6 +294,14 @@ rte_hash_create(const struct rte_hash_parameters *params)
                goto err_unlock;
        }
 
+       tbl_chng_cnt = rte_zmalloc_socket(NULL, sizeof(uint32_t),
+                       RTE_CACHE_LINE_SIZE, params->socket_id);
+
+       if (tbl_chng_cnt == NULL) {
+               RTE_LOG(ERR, HASH, "memory allocation failed\n");
+               goto err_unlock;
+       }
+
 /*
  * If x86 architecture is used, select appropriate compare function,
  * which may use x86 intrinsics, otherwise use memcmp
@@ -361,6 +370,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
                default_hash_func : params->hash_func;
        h->key_store = k;
        h->free_slots = r;
+       h->tbl_chng_cnt = tbl_chng_cnt;
+       *h->tbl_chng_cnt = 0;
        h->hw_trans_mem_support = hw_trans_mem_support;
        h->multi_writer_support = multi_writer_support;
        h->readwrite_concur_support = readwrite_concur_support;
@@ -407,6 +418,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
        rte_free(buckets);
        rte_free(buckets_ext);
        rte_free(k);
+       rte_free(tbl_chng_cnt);
        return NULL;
 }
 
@@ -447,6 +459,7 @@ rte_hash_free(struct rte_hash *h)
        rte_free(h->key_store);
        rte_free(h->buckets);
        rte_free(h->buckets_ext);
+       rte_free(h->tbl_chng_cnt);
        rte_free(h);
        rte_free(te);
 }
@@ -531,6 +544,7 @@ rte_hash_reset(struct rte_hash *h)
        __hash_rw_writer_lock(h);
        memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
        memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
+       *h->tbl_chng_cnt = 0;
 
        /* clear the free ring */
        while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
@@ -744,11 +758,27 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
                if (unlikely(&h->buckets[prev_alt_bkt_idx]
                                != curr_bkt)) {
                        /* revert it to empty, otherwise duplicated keys */
-                       curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+                       __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+                               EMPTY_SLOT,
+                               __ATOMIC_RELEASE);
                        __hash_rw_writer_unlock(h);
                        return -1;
                }
 
+               /* Inform the previous move. The current move need
+                * not be informed now as the current bucket entry
+                * is present in both primary and secondary.
+                * Since there is one writer, load acquires on
+                * tbl_chng_cnt are not required.
+                */
+               __atomic_store_n(h->tbl_chng_cnt,
+                                *h->tbl_chng_cnt + 1,
+                                __ATOMIC_RELEASE);
+               /* The stores to sig_alt and sig_current should not
+                * move above the store to tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_RELEASE);
+
                /* Need to swap current/alt sig to allow later
                 * Cuckoo insert to move elements back to its
                 * primary bucket if available
@@ -765,6 +795,20 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
                curr_bkt = curr_node->bkt;
        }
 
+       /* Inform the previous move. The current move need
+        * not be informed now as the current bucket entry
+        * is present in both primary and secondary.
+        * Since there is one writer, load acquires on
+        * tbl_chng_cnt are not required.
+        */
+       __atomic_store_n(h->tbl_chng_cnt,
+                        *h->tbl_chng_cnt + 1,
+                        __ATOMIC_RELEASE);
+       /* The stores to sig_alt and sig_current should not
+        * move above the store to tbl_chng_cnt.
+        */
+       __atomic_thread_fence(__ATOMIC_RELEASE);
+
        curr_bkt->sig_current[curr_slot] = sig;
        /* Release the new bucket entry */
        __atomic_store_n(&curr_bkt->key_idx[curr_slot],
@@ -1095,34 +1139,62 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, 
const void *key,
 {
        uint32_t prim_bucket_idx, sec_bucket_idx;
        struct rte_hash_bucket *bkt, *cur_bkt;
+       uint32_t cnt_b, cnt_a;
        int ret;
        uint16_t short_sig;
 
        short_sig = get_short_sig(sig);
        prim_bucket_idx = get_prim_bucket_index(h, sig);
        sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
-       bkt = &h->buckets[prim_bucket_idx];
 
        __hash_rw_reader_lock(h);
 
-       /* Check if key is in primary location */
-       ret = search_one_bucket(h, key, short_sig, data, bkt);
-       if (ret != -1) {
-               __hash_rw_reader_unlock(h);
-               return ret;
-       }
-       /* Calculate secondary hash */
-       bkt = &h->buckets[sec_bucket_idx];
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in search_one_bucket are not hoisted.
+                */
+               cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+                               __ATOMIC_ACQUIRE);
 
-       /* Check if key is in secondary location */
-       FOR_EACH_BUCKET(cur_bkt, bkt) {
-               ret = search_one_bucket(h, key, short_sig, data, cur_bkt);
+               /* Check if key is in primary location */
+               bkt = &h->buckets[prim_bucket_idx];
+               ret = search_one_bucket(h, key, short_sig, data, bkt);
                if (ret != -1) {
                        __hash_rw_reader_unlock(h);
                        return ret;
                }
-       }
+               /* Calculate secondary hash */
+               bkt = &h->buckets[sec_bucket_idx];
+
+               /* Check if key is in secondary location */
+               FOR_EACH_BUCKET(cur_bkt, bkt) {
+                       ret = search_one_bucket(h, key, short_sig,
+                                               data, cur_bkt);
+                       if (ret != -1) {
+                               __hash_rw_reader_unlock(h);
+                               return ret;
+                       }
+               }
+
+               /* The loads of sig_current in search_one_bucket
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, key index in primary bucket
+                * and key index in secondary bucket will make sure
+                * that it does not get hoisted.
+                */
+               cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
+
        __hash_rw_reader_unlock(h);
+
        return -ENOENT;
 }
 
@@ -1444,6 +1516,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const 
void **keys,
        uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        struct rte_hash_bucket *cur_bkt, *next_bkt;
        void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+       uint32_t cnt_b, cnt_a;
 
        /* Prefetch first keys */
        for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1485,106 +1558,138 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, 
const void **keys,
        }
 
        __hash_rw_reader_lock(h);
-       /* Compare signatures and prefetch key slot of first hit */
-       for (i = 0; i < num_keys; i++) {
-               compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in compare_signatures are not hoisted.
+                */
+               cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+
+               /* Compare signatures and prefetch key slot of first hit */
+               for (i = 0; i < num_keys; i++) {
+                       compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
                                primary_bkt[i], secondary_bkt[i],
                                sig[i], h->sig_cmp_fn);
 
-               if (prim_hitmask[i]) {
-                       uint32_t first_hit =
-                                       __builtin_ctzl(prim_hitmask[i]) >> 1;
-                       uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
-                       continue;
-               }
+                       if (prim_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(prim_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                                       primary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
+                               continue;
+                       }
 
-               if (sec_hitmask[i]) {
-                       uint32_t first_hit =
-                                       __builtin_ctzl(sec_hitmask[i]) >> 1;
-                       uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
+                       if (sec_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(sec_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                                       secondary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
+                       }
                }
-       }
 
-       /* Compare keys, first hits in primary first */
-       for (i = 0; i < num_keys; i++) {
-               positions[i] = -ENOENT;
-               while (prim_hitmask[i]) {
-                       uint32_t hit_index =
-                                       __builtin_ctzl(prim_hitmask[i]) >> 1;
-
-                       uint32_t key_idx =
-                       __atomic_load_n(
-                               &primary_bkt[i]->key_idx[hit_index],
-                               __ATOMIC_ACQUIRE);
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-
-                       if (key_idx != EMPTY_SLOT)
-                               pdata[i] = __atomic_load_n(&key_slot->pdata,
-                                               __ATOMIC_ACQUIRE);
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, 
keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = pdata[i];
+               /* Compare keys, first hits in primary first */
+               for (i = 0; i < num_keys; i++) {
+                       positions[i] = -ENOENT;
+                       while (prim_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(prim_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &primary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
                        }
-                       prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
-               }
-
-               while (sec_hitmask[i]) {
-                       uint32_t hit_index =
-                                       __builtin_ctzl(sec_hitmask[i]) >> 1;
 
-                       uint32_t key_idx =
-                       __atomic_load_n(
-                               &secondary_bkt[i]->key_idx[hit_index],
-                               __ATOMIC_ACQUIRE);
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-
-                       if (key_idx != EMPTY_SLOT)
-                               pdata[i] = __atomic_load_n(&key_slot->pdata,
-                                               __ATOMIC_ACQUIRE);
-
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
+                       while (sec_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(sec_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &secondary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
 
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, 
keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = pdata[i];
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
                        }
-                       sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
+next_key:
+                       continue;
                }
 
-next_key:
-               continue;
-       }
+               /* The loads of sig_current in compare_signatures
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, primary key index and secondary
+                * key index will make sure that it does not get
+                * hoisted.
+                */
+               cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
 
        /* all found, do not need to go through ext bkt */
        if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) {
diff --git a/lib/librte_hash/rte_cuckoo_hash.h 
b/lib/librte_hash/rte_cuckoo_hash.h
index b823d37..728cd17 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 /* rte_cuckoo_hash.h
@@ -196,6 +197,8 @@ struct rte_hash {
        rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
        struct rte_hash_bucket *buckets_ext; /**< Extra buckets array */
        struct rte_ring *free_ext_bkts; /**< Ring of indexes of free buckets */
+       uint32_t *tbl_chng_cnt;
+       /**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {
-- 
2.7.4

Reply via email to