<snip>

> 
> rte_distributor_return_pkt function which is run on worker cores must wait
> for distributor core to clear handshake on retptr64 before using those
> buffers. While the handshake is set distributor core controls buffers and any
> operations on worker side might overwrite buffers which are unread yet.
> Same situation appears in the legacy single distributor. Function
> rte_distributor_return_pkt_single shouldn't modify the bufptr64 until
> handshake on it is cleared by distributor lcore.
> 
> Fixes: 775003ad2f96 ("distributor: add new burst-capable library")
> Cc: david.h...@intel.com
> Cc: sta...@dpdk.org
> 
> Signed-off-by: Lukasz Wojciechowski <l.wojciec...@partner.samsung.com>
> Acked-by: David Hunt <david.h...@intel.com>
> ---
>  lib/librte_distributor/rte_distributor.c        | 14 ++++++++++++++
>  lib/librte_distributor/rte_distributor_single.c |  4 ++++
>  2 files changed, 18 insertions(+)
> 
> diff --git a/lib/librte_distributor/rte_distributor.c
> b/lib/librte_distributor/rte_distributor.c
> index 1c047f065..89493c331 100644
> --- a/lib/librte_distributor/rte_distributor.c
> +++ b/lib/librte_distributor/rte_distributor.c
> @@ -160,6 +160,7 @@ rte_distributor_return_pkt(struct rte_distributor *d,
> {
>       struct rte_distributor_buffer *buf = &d->bufs[worker_id];
>       unsigned int i;
> +     volatile int64_t *retptr64;
volatile is not needed here as use of __atomic_load_n implies volatile 
inherently.

> 
>       if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
>               if (num == 1)
> @@ -169,6 +170,19 @@ rte_distributor_return_pkt(struct rte_distributor *d,
>                       return -EINVAL;
>       }
> 
> +     retptr64 = &(buf->retptr64[0]);
> +     /* Spin while handshake bits are set (scheduler clears it).
> +      * Sync with worker on GET_BUF flag.
> +      */
> +     while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
nit. we could avoid using the temp variable retptr64, you could use 
'&buf->retptr64[0]' directly.
RELAXED memory order should be good as the thread_fence below will ensure that 
this load does not sink.

[1] 
> +                     & RTE_DISTRIB_GET_BUF)) {
> +             rte_pause();
> +             uint64_t t = rte_rdtsc()+100;
> +
> +             while (rte_rdtsc() < t)
> +                     rte_pause();
> +     }
> +
>       /* Sync with distributor to acquire retptrs */
>       __atomic_thread_fence(__ATOMIC_ACQUIRE);
>       for (i = 0; i < RTE_DIST_BURST_SIZE; i++) diff --git
> a/lib/librte_distributor/rte_distributor_single.c
> b/lib/librte_distributor/rte_distributor_single.c
> index abaf7730c..f4725b1d0 100644
> --- a/lib/librte_distributor/rte_distributor_single.c
> +++ b/lib/librte_distributor/rte_distributor_single.c
> @@ -74,6 +74,10 @@ rte_distributor_return_pkt_single(struct
> rte_distributor_single *d,
>       union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
>       uint64_t req = (((int64_t)(uintptr_t)oldpkt) <<
> RTE_DISTRIB_FLAG_BITS)
>                       | RTE_DISTRIB_RETURN_BUF;
> +     while (unlikely(__atomic_load_n(&buf->bufptr64,
> __ATOMIC_RELAXED)
> +                     & RTE_DISTRIB_FLAGS_MASK))
> +             rte_pause();
> +
>       /* Sync with distributor on RETURN_BUF flag. */
>       __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
>       return 0;
> --
> 2.17.1

Reply via email to