rte_distributor_return_pkt function which is run on worker cores
must wait for distributor core to clear handshake on retptr64
before using those buffers. While the handshake is set distributor
core controls buffers and any operations on worker side might overwrite
buffers which are unread yet.

Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
Cc: david.h...@intel.com
Cc: sta...@dpdk.org

Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com>
---
 lib/librte_distributor/rte_distributor.c | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/lib/librte_distributor/rte_distributor.c 
b/lib/librte_distributor/rte_distributor.c
index 1c047f065..89493c331 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -160,6 +160,7 @@ rte_distributor_return_pkt(struct rte_distributor *d,
 {
        struct rte_distributor_buffer *buf = &d->bufs[worker_id];
        unsigned int i;
+       volatile int64_t *retptr64;
 
        if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
                if (num == 1)
@@ -169,6 +170,19 @@ rte_distributor_return_pkt(struct rte_distributor *d,
                        return -EINVAL;
        }
 
+       retptr64 = &(buf->retptr64[0]);
+       /* Spin while handshake bits are set (scheduler clears it).
+        * Sync with worker on GET_BUF flag.
+        */
+       while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
+                       & RTE_DISTRIB_GET_BUF)) {
+               rte_pause();
+               uint64_t t = rte_rdtsc()+100;
+
+               while (rte_rdtsc() < t)
+                       rte_pause();
+       }
+
        /* Sync with distributor to acquire retptrs */
        __atomic_thread_fence(__ATOMIC_ACQUIRE);
        for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
-- 
2.17.1

Reply via email to