From: Konstantin Ananyev <konstantin.anan...@huawei.com>

Note upfront: this change shouldn't affect rte_ring public API.
Though as layout of public structures have changed - it is an ABI
breakage.

This is an attempt to implement rte_ring optimization
that was suggested by Morten and discussed on this mailing
list a while ago.
The idea is to optimize MP/SP & MC/SC ring enqueue/dequeue ops
by storing along with the head its Cached Foreign Tail (CFT) value.
I.E.: for producer we cache consumer tail value and visa-versa.
To avoid races head and CFT values are read/written using atomic
64-bit ops.
In theory that might help by reducing number of times producer
needs to access consumer's cache-line and visa-versa.
In practice, I didn't see any impressive boost so far:
-  ring_per_autotest micro-bench - results are a mixed bag,
   Some are a bit better, some are worse.
 - [so]ring_stress_autotest  micro-benchs: ~10-15% improvement
 - l3fwd in wqorder/wqundorder (see previous patch for details):
   no real difference.

Though so far my testing scope was quite limited, I tried it only
on x86 machines. So can I ask all interested parties:
different platform vendors (ARM, PPC, etc.)
and people who do use rte_ring extensively to give it a try and come up
with the feedback.

If there would be no real performance improvements on
any platform we support, or some problems will be encountered -
I am ok to drop that patch.

Signed-off-by: Konstantin Ananyev <konstantin.anan...@huawei.com>
---
 drivers/net/mlx5/mlx5_hws_cnt.h   |  5 ++--
 drivers/net/ring/rte_eth_ring.c   |  2 +-
 lib/ring/rte_ring.c               |  6 ++--
 lib/ring/rte_ring_core.h          | 12 +++++++-
 lib/ring/rte_ring_generic_pvt.h   | 46 +++++++++++++++++++++----------
 lib/ring/rte_ring_peek_elem_pvt.h |  4 +--
 lib/ring/soring.c                 | 31 +++++++++++++++------
 lib/ring/soring.h                 |  4 +--
 8 files changed, 77 insertions(+), 33 deletions(-)

diff --git a/drivers/net/mlx5/mlx5_hws_cnt.h b/drivers/net/mlx5/mlx5_hws_cnt.h
index 996ac8dd9a..663146563c 100644
--- a/drivers/net/mlx5/mlx5_hws_cnt.h
+++ b/drivers/net/mlx5/mlx5_hws_cnt.h
@@ -388,11 +388,12 @@ __mlx5_hws_cnt_pool_enqueue_revert(struct rte_ring *r, 
unsigned int n,
 
        MLX5_ASSERT(r->prod.sync_type == RTE_RING_SYNC_ST);
        MLX5_ASSERT(r->cons.sync_type == RTE_RING_SYNC_ST);
-       current_head = rte_atomic_load_explicit(&r->prod.head, 
rte_memory_order_relaxed);
+       current_head = rte_atomic_load_explicit(&r->prod.head.val.pos,
+                       rte_memory_order_relaxed);
        MLX5_ASSERT(n <= r->capacity);
        MLX5_ASSERT(n <= rte_ring_count(r));
        revert2head = current_head - n;
-       r->prod.head = revert2head; /* This ring should be SP. */
+       r->prod.head.val.pos = revert2head; /* This ring should be SP. */
        __rte_ring_get_elem_addr(r, revert2head, sizeof(cnt_id_t), n,
                        &zcd->ptr1, &zcd->n1, &zcd->ptr2);
        /* Update tail */
diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c
index 1346a0dba3..31009e90d2 100644
--- a/drivers/net/ring/rte_eth_ring.c
+++ b/drivers/net/ring/rte_eth_ring.c
@@ -325,7 +325,7 @@ eth_get_monitor_addr(void *rx_queue, struct 
rte_power_monitor_cond *pmc)
         */
        pmc->addr = &rng->prod.head;
        pmc->size = sizeof(rng->prod.head);
-       pmc->opaque[0] = rng->prod.head;
+       pmc->opaque[0] = rng->prod.head.val.pos;
        pmc->fn = ring_monitor_callback;
        return 0;
 }
diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c
index aebb6d6728..cb2c39c7ad 100644
--- a/lib/ring/rte_ring.c
+++ b/lib/ring/rte_ring.c
@@ -102,7 +102,7 @@ reset_headtail(void *p)
        switch (ht->sync_type) {
        case RTE_RING_SYNC_MT:
        case RTE_RING_SYNC_ST:
-               ht->head = 0;
+               ht->head.raw = 0;
                ht->tail = 0;
                break;
        case RTE_RING_SYNC_MT_RTS:
@@ -373,9 +373,9 @@ rte_ring_dump(FILE *f, const struct rte_ring *r)
        fprintf(f, "  size=%"PRIu32"\n", r->size);
        fprintf(f, "  capacity=%"PRIu32"\n", r->capacity);
        fprintf(f, "  ct=%"PRIu32"\n", r->cons.tail);
-       fprintf(f, "  ch=%"PRIu32"\n", r->cons.head);
+       fprintf(f, "  ch=%"PRIu32"\n", r->cons.head.val.pos);
        fprintf(f, "  pt=%"PRIu32"\n", r->prod.tail);
-       fprintf(f, "  ph=%"PRIu32"\n", r->prod.head);
+       fprintf(f, "  ph=%"PRIu32"\n", r->prod.head.val.pos);
        fprintf(f, "  used=%u\n", rte_ring_count(r));
        fprintf(f, "  avail=%u\n", rte_ring_free_count(r));
 }
diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h
index 270869d214..b88a1bc352 100644
--- a/lib/ring/rte_ring_core.h
+++ b/lib/ring/rte_ring_core.h
@@ -66,8 +66,17 @@ enum rte_ring_sync_type {
  * Depending on sync_type format of that structure might be different,
  * but offset for *sync_type* and *tail* values should remain the same.
  */
+union __rte_ring_head_cft {
+       /** raw 8B value to read/write *cnt* and *pos* as one atomic op */
+       alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw;
+       struct {
+               uint32_t pos; /**< head position */
+               uint32_t cft; /**< cached foreign tail value*/
+       } val;
+};
+
 struct rte_ring_headtail {
-       volatile RTE_ATOMIC(uint32_t) head;      /**< prod/consumer head. */
+       uint32_t __unused;
        volatile RTE_ATOMIC(uint32_t) tail;      /**< prod/consumer tail. */
        union {
                /** sync type of prod/cons */
@@ -75,6 +84,7 @@ struct rte_ring_headtail {
                /** deprecated -  True if single prod/cons */
                uint32_t single;
        };
+       union __rte_ring_head_cft head;
 };
 
 union __rte_ring_rts_poscnt {
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index 12f3595926..e70f4ff32c 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -38,17 +38,18 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 {
        unsigned int max = n;
        int success;
+       uint32_t tail;
+       union __rte_ring_head_cft nh, oh;
+
+       oh.raw = rte_atomic_load_explicit(&d->head.raw,
+                       rte_memory_order_acquire);
 
        do {
                /* Reset n to the initial burst count */
                n = max;
 
-               *old_head = d->head;
-
-               /* add rmb barrier to avoid load/load reorder in weak
-                * memory model. It is noop on x86
-                */
-               rte_smp_rmb();
+               *old_head = oh.val.pos;
+               tail = oh.val.cft;
 
                /*
                 *  The subtraction is done between two unsigned 32bits value
@@ -56,24 +57,41 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
                 * *old_head > s->tail). So 'free_entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *entries = (capacity + s->tail - *old_head);
+               *entries = (capacity + tail - *old_head);
 
-               /* check that we have enough room in ring */
-               if (unlikely(n > *entries))
-                       n = (behavior == RTE_RING_QUEUE_FIXED) ?
+               /* attempt #1: check that we have enough room with
+                * cached-foreign-tail value.
+                * Note that actual tail value can go forward till we cached
+                * it, in that case we might have to update our cached value.
+                */
+               if (unlikely(n > *entries)) {
+
+                       tail = rte_atomic_load_explicit(&s->tail,
+                               rte_memory_order_relaxed);
+                       *entries = (capacity + tail - *old_head);
+
+                       /* attempt #2: check that we have enough room in ring */
+                       if (unlikely(n > *entries))
+                               n = (behavior == RTE_RING_QUEUE_FIXED) ?
                                        0 : *entries;
+               }
 
                if (n == 0)
                        return 0;
 
                *new_head = *old_head + n;
+               nh.val.pos = *new_head;
+               nh.val.cft = tail;
+
                if (is_st) {
-                       d->head = *new_head;
+                       d->head.raw = nh.raw;
                        success = 1;
                } else
-                       success = rte_atomic32_cmpset(
-                                       (uint32_t *)(uintptr_t)&d->head,
-                                       *old_head, *new_head);
+                       success = rte_atomic_compare_exchange_strong_explicit(
+                               &d->head.raw, (uint64_t *)(uintptr_t)&oh.raw,
+                               nh.raw, rte_memory_order_acquire,
+                               rte_memory_order_acquire);
+
        } while (unlikely(success == 0));
        return n;
 }
diff --git a/lib/ring/rte_ring_peek_elem_pvt.h 
b/lib/ring/rte_ring_peek_elem_pvt.h
index b5f0822b7e..e4dd0ae094 100644
--- a/lib/ring/rte_ring_peek_elem_pvt.h
+++ b/lib/ring/rte_ring_peek_elem_pvt.h
@@ -33,7 +33,7 @@ __rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t 
*tail,
 {
        uint32_t h, n, t;
 
-       h = ht->head;
+       h = ht->head.val.pos;
        t = ht->tail;
        n = h - t;
 
@@ -58,7 +58,7 @@ __rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, 
uint32_t tail,
        RTE_SET_USED(enqueue);
 
        pos = tail + num;
-       ht->head = pos;
+       ht->head.val.pos = pos;
        rte_atomic_store_explicit(&ht->tail, pos, rte_memory_order_release);
 }
 
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 929bde9697..baa449c872 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -90,7 +90,8 @@ __rte_soring_stage_finalize(struct soring_stage_headtail *sht,
         * already finished.
         */
 
-       head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
+       head = rte_atomic_load_explicit(&sht->head.val.pos,
+                       rte_memory_order_relaxed);
        n = RTE_MIN(head - ot.pos, maxn);
        for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
 
@@ -213,22 +214,36 @@ __rte_soring_stage_move_head(struct soring_stage_headtail 
*d,
        uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
 {
        uint32_t n, tail;
+       union __rte_ring_head_cft nh, oh;
 
-       *old_head = rte_atomic_load_explicit(&d->head,
+       oh.raw = rte_atomic_load_explicit(&d->head.raw,
                        rte_memory_order_acquire);
 
        do {
                n = num;
-               tail = rte_atomic_load_explicit(&s->tail,
-                               rte_memory_order_relaxed);
+               *old_head = oh.val.pos;
+               tail = oh.val.cft;
                *avail = capacity + tail - *old_head;
-               if (n > *avail)
-                       n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
+
+               if (n > *avail) {
+                       tail = rte_atomic_load_explicit(&s->tail,
+                               rte_memory_order_relaxed);
+                       *avail = capacity + tail - *old_head;
+
+                       if (n > *avail)
+                               n = (behavior == RTE_RING_QUEUE_FIXED) ?
+                                       0 : *avail;
+               }
+
                if (n == 0)
                        return 0;
+
                *new_head = *old_head + n;
-       } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
-                       old_head, *new_head, rte_memory_order_acquire,
+               nh.val.pos = *new_head;
+               nh.val.cft = tail;
+
+       } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
+                       &oh.raw, nh.raw, rte_memory_order_acquire,
                        rte_memory_order_acquire) == 0);
 
        return n;
diff --git a/lib/ring/soring.h b/lib/ring/soring.h
index 3a3f6efa76..0fb333aa71 100644
--- a/lib/ring/soring.h
+++ b/lib/ring/soring.h
@@ -60,8 +60,8 @@ union soring_stage_tail {
 
 struct soring_stage_headtail {
        volatile union soring_stage_tail tail;
-       enum rte_ring_sync_type unused;  /**< unused */
-       volatile RTE_ATOMIC(uint32_t) head;
+       enum rte_ring_sync_type __unused;  /**< unused */
+       union __rte_ring_head_cft head;
 };
 
 /**
-- 
2.35.3

Reply via email to