<snip> > > rte_distributor_return_pkt function which is run on worker cores must wait > for distributor core to clear handshake on retptr64 before using those > buffers. While the handshake is set distributor core controls buffers and any > operations on worker side might overwrite buffers which are unread yet. > Same situation appears in the legacy single distributor. Function > rte_distributor_return_pkt_single shouldn't modify the bufptr64 until > handshake on it is cleared by distributor lcore. > > Fixes: 775003ad2f96 ("distributor: add new burst-capable library") > Cc: david.h...@intel.com > Cc: sta...@dpdk.org > > Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com> > Acked-by: David Hunt <david.h...@intel.com> > --- > lib/librte_distributor/rte_distributor.c | 14 ++++++++++++++ > lib/librte_distributor/rte_distributor_single.c | 4 ++++ > 2 files changed, 18 insertions(+) > > diff --git a/lib/librte_distributor/rte_distributor.c > b/lib/librte_distributor/rte_distributor.c > index 1c047f065..89493c331 100644 > --- a/lib/librte_distributor/rte_distributor.c > +++ b/lib/librte_distributor/rte_distributor.c > @@ -160,6 +160,7 @@ rte_distributor_return_pkt(struct rte_distributor *d, > { > struct rte_distributor_buffer *buf = &d->bufs[worker_id]; > unsigned int i; > + volatile int64_t *retptr64; volatile is not needed here as use of __atomic_load_n implies volatile inherently.
> > if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) { > if (num == 1) > @@ -169,6 +170,19 @@ rte_distributor_return_pkt(struct rte_distributor *d, > return -EINVAL; > } > > + retptr64 = &(buf->retptr64[0]); > + /* Spin while handshake bits are set (scheduler clears it). > + * Sync with worker on GET_BUF flag. > + */ > + while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE) nit. we could avoid using the temp variable retptr64, you could use '&buf->retptr64[0]' directly. RELAXED memory order should be good as the thread_fence below will ensure that this load does not sink. [1] > + & RTE_DISTRIB_GET_BUF)) { > + rte_pause(); > + uint64_t t = rte_rdtsc()+100; > + > + while (rte_rdtsc() < t) > + rte_pause(); > + } > + > /* Sync with distributor to acquire retptrs */ > __atomic_thread_fence(__ATOMIC_ACQUIRE); > for (i = 0; i < RTE_DIST_BURST_SIZE; i++) diff --git > a/lib/librte_distributor/rte_distributor_single.c > b/lib/librte_distributor/rte_distributor_single.c > index abaf7730c..f4725b1d0 100644 > --- a/lib/librte_distributor/rte_distributor_single.c > +++ b/lib/librte_distributor/rte_distributor_single.c > @@ -74,6 +74,10 @@ rte_distributor_return_pkt_single(struct > rte_distributor_single *d, > union rte_distributor_buffer_single *buf = &d->bufs[worker_id]; > uint64_t req = (((int64_t)(uintptr_t)oldpkt) << > RTE_DISTRIB_FLAG_BITS) > | RTE_DISTRIB_RETURN_BUF; > + while (unlikely(__atomic_load_n(&buf->bufptr64, > __ATOMIC_RELAXED) > + & RTE_DISTRIB_FLAGS_MASK)) > + rte_pause(); > + > /* Sync with distributor on RETURN_BUF flag. */ > __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE); > return 0; > -- > 2.17.1