SPDK blobfs encountered a crash around rte_ring dequeues in ppc64.
It uses a single consumer and multiple producers for a rte_ring.
The problem was a load-load reorder in rte_ring_sc_dequeue_bulk().

The reordered loads happened on r->prod.tail in
__rte_ring_move_cons_head() (rte_ring_generic.h) and ring[idx] in
DEQUEUE_PTRS() (rte_ring.h). They have a load-load control
dependency, but the code does not satisfy it. Note that they are
not reordered if __rte_ring_move_cons_head() with is_sc != 1 because
cmpset invokes a read barrier.

The paired stores on these loads are in ENQUEUE_PTRS() and
update_tail(). Simplified code around the reorder is the following.

Consumer             Producer
load idx[ring]
                     store idx[ring]
                     store r->prod.tail
load r->prod.tail

In this case, the consumer loads old idx[ring] and confirms the load
is valid with the new r->prod.tail.

I added a read barrier in the case where __IS_SC is passed to
__rte_ring_move_cons_head(). I also fixed __rte_ring_move_prod_head()
to avoid similar problems with a single producer.

Cc: sta...@dpdk.org

Signed-off-by: Takeshi Yoshimura <t...@jp.ibm.com>
---
 lib/librte_ring/rte_ring_generic.h | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/lib/librte_ring/rte_ring_generic.h 
b/lib/librte_ring/rte_ring_generic.h
index ea7dbe5b9..477326180 100644
--- a/lib/librte_ring/rte_ring_generic.h
+++ b/lib/librte_ring/rte_ring_generic.h
@@ -90,9 +90,10 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int 
is_sp,
                        return 0;
 
                *new_head = *old_head + n;
-               if (is_sp)
+               if (is_sp) {
+                       rte_smp_rmb();
                        r->prod.head = *new_head, success = 1;
-               else
+               } else
                        success = rte_atomic32_cmpset(&r->prod.head,
                                        *old_head, *new_head);
        } while (unlikely(success == 0));
@@ -158,9 +159,10 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int 
is_sc,
                        return 0;
 
                *new_head = *old_head + n;
-               if (is_sc)
+               if (is_sc) {
+                       rte_smp_rmb();
                        r->cons.head = *new_head, success = 1;
-               else
+               } else
                        success = rte_atomic32_cmpset(&r->cons.head, *old_head,
                                        *new_head);
        } while (unlikely(success == 0));
-- 
2.17.1

Reply via email to