From: Konstantin Ananyev <konstantin.anan...@huawei.com> Note upfront: this change shouldn't affect rte_ring public API. Though as layout of public structures have changed - it is an ABI breakage.
This is an attempt to implement rte_ring optimization that was suggested by Morten and discussed on this mailing list a while ago. The idea is to optimize MP/SP & MC/SC ring enqueue/dequeue ops by storing along with the head its Cached Foreign Tail (CFT) value. I.E.: for producer we cache consumer tail value and visa-versa. To avoid races head and CFT values are read/written using atomic 64-bit ops. In theory that might help by reducing number of times producer needs to access consumer's cache-line and visa-versa. In practice, I didn't see any impressive boost so far: - ring_per_autotest micro-bench - results are a mixed bag, Some are a bit better, some are worse. - [so]ring_stress_autotest micro-benchs: ~10-15% improvement - l3fwd in wqorder/wqundorder (see previous patch for details): no real difference. Though so far my testing scope was quite limited, I tried it only on x86 machines. So can I ask all interested parties: different platform vendors (ARM, PPC, etc.) and people who do use rte_ring extensively to give it a try and come up with the feedback. If there would be no real performance improvements on any platform we support, or some problems will be encountered - I am ok to drop that patch. Signed-off-by: Konstantin Ananyev <konstantin.anan...@huawei.com> --- drivers/net/mlx5/mlx5_hws_cnt.h | 5 ++-- drivers/net/ring/rte_eth_ring.c | 2 +- lib/ring/rte_ring.c | 6 ++-- lib/ring/rte_ring_core.h | 12 +++++++- lib/ring/rte_ring_generic_pvt.h | 46 +++++++++++++++++++++---------- lib/ring/rte_ring_peek_elem_pvt.h | 4 +-- lib/ring/soring.c | 31 +++++++++++++++------ lib/ring/soring.h | 4 +-- 8 files changed, 77 insertions(+), 33 deletions(-) diff --git a/drivers/net/mlx5/mlx5_hws_cnt.h b/drivers/net/mlx5/mlx5_hws_cnt.h index 996ac8dd9a..663146563c 100644 --- a/drivers/net/mlx5/mlx5_hws_cnt.h +++ b/drivers/net/mlx5/mlx5_hws_cnt.h @@ -388,11 +388,12 @@ __mlx5_hws_cnt_pool_enqueue_revert(struct rte_ring *r, unsigned int n, MLX5_ASSERT(r->prod.sync_type == RTE_RING_SYNC_ST); MLX5_ASSERT(r->cons.sync_type == RTE_RING_SYNC_ST); - current_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_relaxed); + current_head = rte_atomic_load_explicit(&r->prod.head.val.pos, + rte_memory_order_relaxed); MLX5_ASSERT(n <= r->capacity); MLX5_ASSERT(n <= rte_ring_count(r)); revert2head = current_head - n; - r->prod.head = revert2head; /* This ring should be SP. */ + r->prod.head.val.pos = revert2head; /* This ring should be SP. */ __rte_ring_get_elem_addr(r, revert2head, sizeof(cnt_id_t), n, &zcd->ptr1, &zcd->n1, &zcd->ptr2); /* Update tail */ diff --git a/drivers/net/ring/rte_eth_ring.c b/drivers/net/ring/rte_eth_ring.c index 1346a0dba3..31009e90d2 100644 --- a/drivers/net/ring/rte_eth_ring.c +++ b/drivers/net/ring/rte_eth_ring.c @@ -325,7 +325,7 @@ eth_get_monitor_addr(void *rx_queue, struct rte_power_monitor_cond *pmc) */ pmc->addr = &rng->prod.head; pmc->size = sizeof(rng->prod.head); - pmc->opaque[0] = rng->prod.head; + pmc->opaque[0] = rng->prod.head.val.pos; pmc->fn = ring_monitor_callback; return 0; } diff --git a/lib/ring/rte_ring.c b/lib/ring/rte_ring.c index aebb6d6728..cb2c39c7ad 100644 --- a/lib/ring/rte_ring.c +++ b/lib/ring/rte_ring.c @@ -102,7 +102,7 @@ reset_headtail(void *p) switch (ht->sync_type) { case RTE_RING_SYNC_MT: case RTE_RING_SYNC_ST: - ht->head = 0; + ht->head.raw = 0; ht->tail = 0; break; case RTE_RING_SYNC_MT_RTS: @@ -373,9 +373,9 @@ rte_ring_dump(FILE *f, const struct rte_ring *r) fprintf(f, " size=%"PRIu32"\n", r->size); fprintf(f, " capacity=%"PRIu32"\n", r->capacity); fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); - fprintf(f, " ch=%"PRIu32"\n", r->cons.head); + fprintf(f, " ch=%"PRIu32"\n", r->cons.head.val.pos); fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); - fprintf(f, " ph=%"PRIu32"\n", r->prod.head); + fprintf(f, " ph=%"PRIu32"\n", r->prod.head.val.pos); fprintf(f, " used=%u\n", rte_ring_count(r)); fprintf(f, " avail=%u\n", rte_ring_free_count(r)); } diff --git a/lib/ring/rte_ring_core.h b/lib/ring/rte_ring_core.h index 270869d214..b88a1bc352 100644 --- a/lib/ring/rte_ring_core.h +++ b/lib/ring/rte_ring_core.h @@ -66,8 +66,17 @@ enum rte_ring_sync_type { * Depending on sync_type format of that structure might be different, * but offset for *sync_type* and *tail* values should remain the same. */ +union __rte_ring_head_cft { + /** raw 8B value to read/write *cnt* and *pos* as one atomic op */ + alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw; + struct { + uint32_t pos; /**< head position */ + uint32_t cft; /**< cached foreign tail value*/ + } val; +}; + struct rte_ring_headtail { - volatile RTE_ATOMIC(uint32_t) head; /**< prod/consumer head. */ + uint32_t __unused; volatile RTE_ATOMIC(uint32_t) tail; /**< prod/consumer tail. */ union { /** sync type of prod/cons */ @@ -75,6 +84,7 @@ struct rte_ring_headtail { /** deprecated - True if single prod/cons */ uint32_t single; }; + union __rte_ring_head_cft head; }; union __rte_ring_rts_poscnt { diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h index 12f3595926..e70f4ff32c 100644 --- a/lib/ring/rte_ring_generic_pvt.h +++ b/lib/ring/rte_ring_generic_pvt.h @@ -38,17 +38,18 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, { unsigned int max = n; int success; + uint32_t tail; + union __rte_ring_head_cft nh, oh; + + oh.raw = rte_atomic_load_explicit(&d->head.raw, + rte_memory_order_acquire); do { /* Reset n to the initial burst count */ n = max; - *old_head = d->head; - - /* add rmb barrier to avoid load/load reorder in weak - * memory model. It is noop on x86 - */ - rte_smp_rmb(); + *old_head = oh.val.pos; + tail = oh.val.cft; /* * The subtraction is done between two unsigned 32bits value @@ -56,24 +57,41 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, * *old_head > s->tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ - *entries = (capacity + s->tail - *old_head); + *entries = (capacity + tail - *old_head); - /* check that we have enough room in ring */ - if (unlikely(n > *entries)) - n = (behavior == RTE_RING_QUEUE_FIXED) ? + /* attempt #1: check that we have enough room with + * cached-foreign-tail value. + * Note that actual tail value can go forward till we cached + * it, in that case we might have to update our cached value. + */ + if (unlikely(n > *entries)) { + + tail = rte_atomic_load_explicit(&s->tail, + rte_memory_order_relaxed); + *entries = (capacity + tail - *old_head); + + /* attempt #2: check that we have enough room in ring */ + if (unlikely(n > *entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + } if (n == 0) return 0; *new_head = *old_head + n; + nh.val.pos = *new_head; + nh.val.cft = tail; + if (is_st) { - d->head = *new_head; + d->head.raw = nh.raw; success = 1; } else - success = rte_atomic32_cmpset( - (uint32_t *)(uintptr_t)&d->head, - *old_head, *new_head); + success = rte_atomic_compare_exchange_strong_explicit( + &d->head.raw, (uint64_t *)(uintptr_t)&oh.raw, + nh.raw, rte_memory_order_acquire, + rte_memory_order_acquire); + } while (unlikely(success == 0)); return n; } diff --git a/lib/ring/rte_ring_peek_elem_pvt.h b/lib/ring/rte_ring_peek_elem_pvt.h index b5f0822b7e..e4dd0ae094 100644 --- a/lib/ring/rte_ring_peek_elem_pvt.h +++ b/lib/ring/rte_ring_peek_elem_pvt.h @@ -33,7 +33,7 @@ __rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail, { uint32_t h, n, t; - h = ht->head; + h = ht->head.val.pos; t = ht->tail; n = h - t; @@ -58,7 +58,7 @@ __rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail, RTE_SET_USED(enqueue); pos = tail + num; - ht->head = pos; + ht->head.val.pos = pos; rte_atomic_store_explicit(&ht->tail, pos, rte_memory_order_release); } diff --git a/lib/ring/soring.c b/lib/ring/soring.c index 929bde9697..baa449c872 100644 --- a/lib/ring/soring.c +++ b/lib/ring/soring.c @@ -90,7 +90,8 @@ __rte_soring_stage_finalize(struct soring_stage_headtail *sht, * already finished. */ - head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed); + head = rte_atomic_load_explicit(&sht->head.val.pos, + rte_memory_order_relaxed); n = RTE_MIN(head - ot.pos, maxn); for (i = 0, tail = ot.pos; i < n; i += k, tail += k) { @@ -213,22 +214,36 @@ __rte_soring_stage_move_head(struct soring_stage_headtail *d, uint32_t *old_head, uint32_t *new_head, uint32_t *avail) { uint32_t n, tail; + union __rte_ring_head_cft nh, oh; - *old_head = rte_atomic_load_explicit(&d->head, + oh.raw = rte_atomic_load_explicit(&d->head.raw, rte_memory_order_acquire); do { n = num; - tail = rte_atomic_load_explicit(&s->tail, - rte_memory_order_relaxed); + *old_head = oh.val.pos; + tail = oh.val.cft; *avail = capacity + tail - *old_head; - if (n > *avail) - n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail; + + if (n > *avail) { + tail = rte_atomic_load_explicit(&s->tail, + rte_memory_order_relaxed); + *avail = capacity + tail - *old_head; + + if (n > *avail) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *avail; + } + if (n == 0) return 0; + *new_head = *old_head + n; - } while (rte_atomic_compare_exchange_strong_explicit(&d->head, - old_head, *new_head, rte_memory_order_acquire, + nh.val.pos = *new_head; + nh.val.cft = tail; + + } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw, + &oh.raw, nh.raw, rte_memory_order_acquire, rte_memory_order_acquire) == 0); return n; diff --git a/lib/ring/soring.h b/lib/ring/soring.h index 3a3f6efa76..0fb333aa71 100644 --- a/lib/ring/soring.h +++ b/lib/ring/soring.h @@ -60,8 +60,8 @@ union soring_stage_tail { struct soring_stage_headtail { volatile union soring_stage_tail tail; - enum rte_ring_sync_type unused; /**< unused */ - volatile RTE_ATOMIC(uint32_t) head; + enum rte_ring_sync_type __unused; /**< unused */ + union __rte_ring_head_cft head; }; /** -- 2.35.3