Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
Reviewed-by: Gavin Hu <gavin...@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com>
Reviewed-by: Steve Capper <steve.cap...@arm.com>
---
 lib/librte_hash/rte_cuckoo_hash.c | 307 +++++++++++++++++++++++++-------------
 lib/librte_hash/rte_cuckoo_hash.h |   2 +
 lib/librte_hash/rte_hash.h        |   8 +-
 3 files changed, 206 insertions(+), 111 deletions(-)

diff --git a/lib/librte_hash/rte_cuckoo_hash.c 
b/lib/librte_hash/rte_cuckoo_hash.c
index 2d89158..1e4a8d4 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -256,6 +256,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 #endif
        /* Setup hash context */
        snprintf(h->name, sizeof(h->name), "%s", params->name);
+       h->tbl_chng_cnt = 0;
        h->entries = params->entries;
        h->key_len = params->key_len;
        h->key_entry_size = key_entry_size;
@@ -588,7 +589,7 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
  * return 0 if succeeds.
  */
 static inline int
-rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
+rte_hash_cuckoo_move_insert_mw(struct rte_hash *h,
                        struct rte_hash_bucket *bkt,
                        struct rte_hash_bucket *alt_bkt,
                        const struct rte_hash_key *key, void *data,
@@ -639,11 +640,27 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
                if (unlikely(&h->buckets[prev_alt_bkt_idx]
                                != curr_bkt)) {
                        /* revert it to empty, otherwise duplicated keys */
-                       curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+                       __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+                               EMPTY_SLOT,
+                               __ATOMIC_RELEASE);
                        __hash_rw_writer_unlock(h);
                        return -1;
                }
 
+               /* Inform the previous move. The current move need
+                * not be informed now as the current bucket entry
+                * is present in both primary and secondary.
+                * Since there is one writer, load acquires on
+                * tbl_chng_cnt are not required.
+                */
+               __atomic_store_n(&h->tbl_chng_cnt,
+                                h->tbl_chng_cnt + 1,
+                                __ATOMIC_RELEASE);
+               /* The stores to sig_alt and sig_current should not
+                * move above the store to tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_RELEASE);
+
                /* Need to swap current/alt sig to allow later
                 * Cuckoo insert to move elements back to its
                 * primary bucket if available
@@ -662,6 +679,20 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
                curr_bkt = curr_node->bkt;
        }
 
+       /* Inform the previous move. The current move need
+        * not be informed now as the current bucket entry
+        * is present in both primary and secondary.
+        * Since there is one writer, load acquires on
+        * tbl_chng_cnt are not required.
+        */
+       __atomic_store_n(&h->tbl_chng_cnt,
+                        h->tbl_chng_cnt + 1,
+                        __ATOMIC_RELEASE);
+       /* The stores to sig_alt and sig_current should not
+        * move above the store to tbl_chng_cnt.
+        */
+       __atomic_thread_fence(__ATOMIC_RELEASE);
+
        curr_bkt->sig_current[curr_slot] = sig;
        curr_bkt->sig_alt[curr_slot] = alt_hash;
        /* Release the new bucket entry */
@@ -680,7 +711,7 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
  * Cuckoo
  */
 static inline int
-rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
+rte_hash_cuckoo_make_space_mw(struct rte_hash *h,
                        struct rte_hash_bucket *bkt,
                        struct rte_hash_bucket *sec_bkt,
                        const struct rte_hash_key *key, void *data,
@@ -728,7 +759,7 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
 }
 
 static inline int32_t
-__rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
+__rte_hash_add_key_with_hash(struct rte_hash *h, const void *key,
                                                hash_sig_t sig, void *data)
 {
        hash_sig_t alt_hash;
@@ -844,7 +875,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, 
const void *key,
 }
 
 int32_t
-rte_hash_add_key_with_hash(const struct rte_hash *h,
+rte_hash_add_key_with_hash(struct rte_hash *h,
                        const void *key, hash_sig_t sig)
 {
        RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
@@ -852,14 +883,14 @@ rte_hash_add_key_with_hash(const struct rte_hash *h,
 }
 
 int32_t
-rte_hash_add_key(const struct rte_hash *h, const void *key)
+rte_hash_add_key(struct rte_hash *h, const void *key)
 {
        RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
        return __rte_hash_add_key_with_hash(h, key, rte_hash_hash(h, key), 0);
 }
 
 int
-rte_hash_add_key_with_hash_data(const struct rte_hash *h,
+rte_hash_add_key_with_hash_data(struct rte_hash *h,
                        const void *key, hash_sig_t sig, void *data)
 {
        int ret;
@@ -873,7 +904,7 @@ rte_hash_add_key_with_hash_data(const struct rte_hash *h,
 }
 
 int
-rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data)
+rte_hash_add_key_data(struct rte_hash *h, const void *key, void *data)
 {
        int ret;
 
@@ -926,30 +957,56 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, 
const void *key,
        uint32_t bucket_idx;
        hash_sig_t alt_hash;
        struct rte_hash_bucket *bkt;
+       uint32_t cnt_b, cnt_a;
        int ret;
 
-       bucket_idx = sig & h->bucket_bitmask;
-       bkt = &h->buckets[bucket_idx];
-
        __hash_rw_reader_lock(h);
 
-       /* Check if key is in primary location */
-       ret = search_one_bucket(h, key, sig, data, bkt);
-       if (ret != -1) {
-               __hash_rw_reader_unlock(h);
-               return ret;
-       }
-       /* Calculate secondary hash */
-       alt_hash = rte_hash_secondary_hash(sig);
-       bucket_idx = alt_hash & h->bucket_bitmask;
-       bkt = &h->buckets[bucket_idx];
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in search_one_bucket are not hoisted.
+                */
+               cnt_b = __atomic_load_n(&h->tbl_chng_cnt,
+                               __ATOMIC_ACQUIRE);
+
+               bucket_idx = sig & h->bucket_bitmask;
+               bkt = &h->buckets[bucket_idx];
+
+               /* Check if key is in primary location */
+               ret = search_one_bucket(h, key, sig, data, bkt);
+               if (ret != -1) {
+                       __hash_rw_reader_unlock(h);
+                       return ret;
+               }
+               /* Calculate secondary hash */
+               alt_hash = rte_hash_secondary_hash(sig);
+               bucket_idx = alt_hash & h->bucket_bitmask;
+               bkt = &h->buckets[bucket_idx];
+
+               /* Check if key is in secondary location */
+               ret = search_one_bucket(h, key, alt_hash, data, bkt);
+               if (ret != -1) {
+                       __hash_rw_reader_unlock(h);
+                       return ret;
+               }
+
+               /* The loads of sig_current in search_one_bucket
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, key index in primary bucket
+                * and key index in secondary bucket will make sure
+                * that it does not get hoisted.
+                */
+               cnt_a = __atomic_load_n(&h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
 
-       /* Check if key is in secondary location */
-       ret = search_one_bucket(h, key, alt_hash, data, bkt);
-       if (ret != -1) {
-               __hash_rw_reader_unlock(h);
-               return ret;
-       }
        __hash_rw_reader_unlock(h);
        return -ENOENT;
 }
@@ -1190,6 +1247,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const 
void **keys,
        uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+       uint32_t cnt_b, cnt_a;
 
        /* Prefetch first keys */
        for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1225,102 +1283,137 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, 
const void **keys,
        }
 
        __hash_rw_reader_lock(h);
-       /* Compare signatures and prefetch key slot of first hit */
-       for (i = 0; i < num_keys; i++) {
-               compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in compare_signatures are not hoisted.
+                */
+               cnt_b = __atomic_load_n(&h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+
+               /* Compare signatures and prefetch key slot of first hit */
+               for (i = 0; i < num_keys; i++) {
+                       compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
                                primary_bkt[i], secondary_bkt[i],
                                prim_hash[i], sec_hash[i], h->sig_cmp_fn);
 
-               if (prim_hitmask[i]) {
-                       uint32_t first_hit = __builtin_ctzl(prim_hitmask[i]);
-                       uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
-                       continue;
-               }
+                       if (prim_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(prim_hitmask[i]);
+                               uint32_t key_idx =
+                                       primary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
+                               continue;
+                       }
 
-               if (sec_hitmask[i]) {
-                       uint32_t first_hit = __builtin_ctzl(sec_hitmask[i]);
-                       uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
+                       if (sec_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(sec_hitmask[i]);
+                               uint32_t key_idx =
+                                       secondary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
+                       }
                }
-       }
 
-       /* Compare keys, first hits in primary first */
-       for (i = 0; i < num_keys; i++) {
-               positions[i] = -ENOENT;
-               while (prim_hitmask[i]) {
-                       uint32_t hit_index = __builtin_ctzl(prim_hitmask[i]);
+               /* Compare keys, first hits in primary first */
+               for (i = 0; i < num_keys; i++) {
+                       positions[i] = -ENOENT;
+                       while (prim_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(prim_hitmask[i]);
 
-                       uint32_t key_idx =
-                       __atomic_load_n(
-                               &primary_bkt[i]->key_idx[hit_index],
-                               __ATOMIC_ACQUIRE);
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-
-                       if (key_idx != EMPTY_SLOT)
-                               pdata[i] = __atomic_load_n(&key_slot->pdata,
-                                               __ATOMIC_ACQUIRE);
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, 
keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = pdata[i];
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &primary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               prim_hitmask[i] &= ~(1 << (hit_index));
                        }
-                       prim_hitmask[i] &= ~(1 << (hit_index));
-               }
 
-               while (sec_hitmask[i]) {
-                       uint32_t hit_index = __builtin_ctzl(sec_hitmask[i]);
+                       while (sec_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(sec_hitmask[i]);
 
-                       uint32_t key_idx =
-                       __atomic_load_n(
-                               &secondary_bkt[i]->key_idx[hit_index],
-                               __ATOMIC_ACQUIRE);
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-
-                       if (key_idx != EMPTY_SLOT)
-                               pdata[i] = __atomic_load_n(&key_slot->pdata,
-                                               __ATOMIC_ACQUIRE);
-
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &secondary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
 
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, 
keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = pdata[i];
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               sec_hitmask[i] &= ~(1 << (hit_index));
                        }
-                       sec_hitmask[i] &= ~(1 << (hit_index));
-               }
 
 next_key:
-               continue;
-       }
+                       continue;
+               }
+
+               /* The loads of sig_current in compare_signatures
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, primary key index and secondary
+                * key index will make sure that it does not get
+                * hoisted.
+                */
+               cnt_a = __atomic_load_n(&h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
 
        __hash_rw_reader_unlock(h);
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.h 
b/lib/librte_hash/rte_cuckoo_hash.h
index b0c7ef9..5ce864c 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -186,6 +186,8 @@ struct rte_hash {
         * to the key table.
         */
        rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
+       uint32_t tbl_chng_cnt;
+       /**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index 9e7d931..05e024b 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -156,7 +156,7 @@ rte_hash_count(const struct rte_hash *h);
  *   - -ENOSPC if there is no space in the hash for this key.
  */
 int
-rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data);
+rte_hash_add_key_data(struct rte_hash *h, const void *key, void *data);
 
 /**
  * Add a key-value pair with a pre-computed hash value
@@ -180,7 +180,7 @@ rte_hash_add_key_data(const struct rte_hash *h, const void 
*key, void *data);
  *   - -ENOSPC if there is no space in the hash for this key.
  */
 int32_t
-rte_hash_add_key_with_hash_data(const struct rte_hash *h, const void *key,
+rte_hash_add_key_with_hash_data(struct rte_hash *h, const void *key,
                                                hash_sig_t sig, void *data);
 
 /**
@@ -200,7 +200,7 @@ rte_hash_add_key_with_hash_data(const struct rte_hash *h, 
const void *key,
  *     array of user data. This value is unique for this key.
  */
 int32_t
-rte_hash_add_key(const struct rte_hash *h, const void *key);
+rte_hash_add_key(struct rte_hash *h, const void *key);
 
 /**
  * Add a key to an existing hash table.
@@ -222,7 +222,7 @@ rte_hash_add_key(const struct rte_hash *h, const void *key);
  *     array of user data. This value is unique for this key.
  */
 int32_t
-rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, 
hash_sig_t sig);
+rte_hash_add_key_with_hash(struct rte_hash *h, const void *key, hash_sig_t 
sig);
 
 /**
  * Remove a key from an existing hash table.
-- 
2.7.4

Reply via email to