Hi Adrien

> -----Original Message-----
> From: Adrien Mazarguil [mailto:adrien.mazarg...@6wind.com]
> Sent: Wednesday, December 6, 2017 12:59 PM
> To: Matan Azrad <ma...@mellanox.com>
> Cc: dev@dpdk.org
> Subject: Re: [PATCH 4/8] net/mlx4: optimize Tx multi-segment case
> 
> On Tue, Nov 28, 2017 at 12:19:26PM +0000, Matan Azrad wrote:
> > mlx4 Tx block can handle up to 4 data segments or control segment + up
> > to 3 data segments. The first data segment in each not first Tx block
> > must validate Tx queue wraparound and must use IO memory barrier
> > before writing the byte count.
> >
> > The previous multi-segment code used "for" loop to iterate over all
> > packet segments and separated first Tx block data case by "if"
> > statments.
> 
> statments => statements
> 
> >
> > Use switch case and unconditional branches instead of "for" loop can
> > optimize the case and prevents the unnecessary checks for each data
> > segment; This hints to compiler to create opitimized jump table.
> 
> opitimized => optimized
> 
> >
> > Optimize this case by switch case and unconditional branches usage.
> >
> > Signed-off-by: Matan Azrad <ma...@mellanox.com>
> > ---
> >  drivers/net/mlx4/mlx4_rxtx.c | 165
> > +++++++++++++++++++++++++------------------
> >  drivers/net/mlx4/mlx4_rxtx.h |  33 +++++++++
> >  2 files changed, 128 insertions(+), 70 deletions(-)
> >
> > diff --git a/drivers/net/mlx4/mlx4_rxtx.c
> > b/drivers/net/mlx4/mlx4_rxtx.c index 1d8240a..b9cb2fc 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.c
> > +++ b/drivers/net/mlx4/mlx4_rxtx.c
> > @@ -432,15 +432,14 @@ struct pv {
> >     uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> >     volatile struct mlx4_wqe_ctrl_seg *ctrl;
> >     volatile struct mlx4_wqe_data_seg *dseg;
> > -   struct rte_mbuf *sbuf;
> > +   struct rte_mbuf *sbuf = buf;
> >     uint32_t lkey;
> > -   uintptr_t addr;
> > -   uint32_t byte_count;
> >     int pv_counter = 0;
> > +   int nb_segs = buf->nb_segs;
> >
> >     /* Calculate the needed work queue entry size for this packet. */
> >     wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) +
> > -           buf->nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> > +           nb_segs * sizeof(volatile struct mlx4_wqe_data_seg);
> >     nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size);
> >     /*
> >      * Check that there is room for this WQE in the send queue and that
> > @@ -457,67 +456,99 @@ struct pv {
> >     dseg = (volatile struct mlx4_wqe_data_seg *)
> >                     ((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg));
> >     *pctrl = ctrl;
> > -   /* Fill the data segments with buffer information. */
> > -   for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) {
> > -           addr = rte_pktmbuf_mtod(sbuf, uintptr_t);
> > -           rte_prefetch0((volatile void *)addr);
> > -           /* Memory region key (big endian) for this memory pool. */
> > +   /*
> > +    * Fill the data segments with buffer information.
> > +    * First WQE TXBB head segment is always control segment,
> > +    * so jump to tail TXBB data segments code for the first
> > +    * WQE data segments filling.
> > +    */
> > +   goto txbb_tail_segs;
> > +txbb_head_seg:
> 
> I'm not fundamentally opposed to "goto" unlike a lot of people out there,
> but this doesn't look good. It's OK to use goto for error cases and to 
> extricate
> yourself when trapped in an inner loop, also in some optimization scenarios
> where it sometimes make sense, but not when the same can be achieved
> through standard loop constructs and keywords.
> 
> In this case I'm under the impression you could have managed with a do { ... }
> while (...) construct. You need to try harder to reorganize these changes or
> prove it can't be done without negatively impacting performance.
> 
> Doing so should make this patch shorter as well.
> 

I noticed this could be done with loop and without unconditional branches, but 
I checked it and found nice performance improvement using that way.
When I used the loop I degraded performance.  
 
> > +   /* Memory region key (big endian) for this memory pool. */
> > +   lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > +   if (unlikely(lkey == (uint32_t)-1)) {
> > +           DEBUG("%p: unable to get MP <-> MR association",
> > +                 (void *)txq);
> > +           return -1;
> > +   }
> > +   /* Handle WQE wraparound. */
> > +   if (dseg >=
> > +           (volatile struct mlx4_wqe_data_seg *)sq->eob)
> > +           dseg = (volatile struct mlx4_wqe_data_seg *)
> > +                   sq->buf;
> > +   dseg->addr = rte_cpu_to_be_64(rte_pktmbuf_mtod(sbuf,
> uintptr_t));
> > +   dseg->lkey = rte_cpu_to_be_32(lkey);
> > +   /*
> > +    * This data segment starts at the beginning of a new
> > +    * TXBB, so we need to postpone its byte_count writing
> > +    * for later.
> > +    */
> > +   pv[pv_counter].dseg = dseg;
> > +   /*
> > +    * Zero length segment is treated as inline segment
> > +    * with zero data.
> > +    */
> > +   pv[pv_counter++].val = rte_cpu_to_be_32(sbuf->data_len ?
> > +                                           sbuf->data_len :
> 0x80000000);
> > +   sbuf = sbuf->next;
> > +   dseg++;
> > +   nb_segs--;
> > +txbb_tail_segs:
> > +   /* Jump to default if there are more than two segments remaining.
> */
> > +   switch (nb_segs) {
> > +   default:
> >             lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > -           dseg->lkey = rte_cpu_to_be_32(lkey);
> > -           /* Calculate the needed work queue entry size for this
> packet */
> > -           if (unlikely(lkey == rte_cpu_to_be_32((uint32_t)-1))) {
> > -                   /* MR does not exist. */
> > +           if (unlikely(lkey == (uint32_t)-1)) {
> >                     DEBUG("%p: unable to get MP <-> MR association",
> >                           (void *)txq);
> >                     return -1;
> >             }
> > -           if (likely(sbuf->data_len)) {
> > -                   byte_count = rte_cpu_to_be_32(sbuf->data_len);
> > -           } else {
> > -                   /*
> > -                    * Zero length segment is treated as inline segment
> > -                    * with zero data.
> > -                    */
> > -                   byte_count = RTE_BE32(0x80000000);
> > +           mlx4_fill_tx_data_seg(dseg, lkey,
> > +                                 rte_pktmbuf_mtod(sbuf, uintptr_t),
> > +                                 rte_cpu_to_be_32(sbuf->data_len ?
> > +                                                  sbuf->data_len :
> > +                                                  0x80000000));
> > +           sbuf = sbuf->next;
> > +           dseg++;
> > +           nb_segs--;
> > +           /* fallthrough */
> > +   case 2:
> > +           lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > +           if (unlikely(lkey == (uint32_t)-1)) {
> > +                   DEBUG("%p: unable to get MP <-> MR association",
> > +                         (void *)txq);
> > +                   return -1;
> >             }
> > -           /*
> > -            * If the data segment is not at the beginning of a
> > -            * Tx basic block (TXBB) then write the byte count,
> > -            * else postpone the writing to just before updating the
> > -            * control segment.
> > -            */
> > -           if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) {
> > -                   dseg->addr = rte_cpu_to_be_64(addr);
> > -                   dseg->lkey = rte_cpu_to_be_32(lkey);
> > -#if RTE_CACHE_LINE_SIZE < 64
> > -                   /*
> > -                    * Need a barrier here before writing the byte_count
> > -                    * fields to make sure that all the data is visible
> > -                    * before the byte_count field is set.
> > -                    * Otherwise, if the segment begins a new cacheline,
> > -                    * the HCA prefetcher could grab the 64-byte chunk
> and
> > -                    * get a valid (!= 0xffffffff) byte count but stale
> > -                    * data, and end up sending the wrong data.
> > -                    */
> > -                   rte_io_wmb();
> > -#endif /* RTE_CACHE_LINE_SIZE */
> > -                   dseg->byte_count = byte_count;
> > -           } else {
> > -                   /*
> > -                    * This data segment starts at the beginning of a new
> > -                    * TXBB, so we need to postpone its byte_count
> writing
> > -                    * for later.
> > -                    */
> > -                   /* Handle WQE wraparound. */
> > -                   if (dseg >=
> > -                       (volatile struct mlx4_wqe_data_seg *)sq->eob)
> > -                           dseg = (volatile struct mlx4_wqe_data_seg *)
> > -                                   sq->buf;
> > -                   dseg->addr = rte_cpu_to_be_64(addr);
> > -                   dseg->lkey = rte_cpu_to_be_32(lkey);
> > -                   pv[pv_counter].dseg = dseg;
> > -                   pv[pv_counter++].val = byte_count;
> > +           mlx4_fill_tx_data_seg(dseg, lkey,
> > +                                 rte_pktmbuf_mtod(sbuf, uintptr_t),
> > +                                 rte_cpu_to_be_32(sbuf->data_len ?
> > +                                                  sbuf->data_len :
> > +                                                  0x80000000));
> > +           sbuf = sbuf->next;
> > +           dseg++;
> > +           nb_segs--;
> > +           /* fallthrough */
> > +   case 1:
> > +           lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf));
> > +           if (unlikely(lkey == (uint32_t)-1)) {
> > +                   DEBUG("%p: unable to get MP <-> MR association",
> > +                         (void *)txq);
> > +                   return -1;
> > +           }
> > +           mlx4_fill_tx_data_seg(dseg, lkey,
> > +                                 rte_pktmbuf_mtod(sbuf, uintptr_t),
> > +                                 rte_cpu_to_be_32(sbuf->data_len ?
> > +                                                  sbuf->data_len :
> > +                                                  0x80000000));
> > +           nb_segs--;
> > +           if (nb_segs) {
> > +                   sbuf = sbuf->next;
> > +                   dseg++;
> > +                   goto txbb_head_seg;
> >             }
> > +           /* fallthrough */
> > +   case 0:
> > +           break;
> >     }
> 
> I think this "switch (nb_segs)" idea is an interesting approach, but should
> occur inside a loop construct as previously described.
> 

Same comment as above.

> >     /* Write the first DWORD of each TXBB save earlier. */
> >     if (pv_counter) {
> > @@ -583,7 +614,6 @@ struct pv {
> >             } srcrb;
> >             uint32_t head_idx = sq->head & sq->txbb_cnt_mask;
> >             uint32_t lkey;
> > -           uintptr_t addr;
> >
> >             /* Clean up old buffer. */
> >             if (likely(elt->buf != NULL)) {
> > @@ -618,24 +648,19 @@ struct pv {
> >                     dseg = (volatile struct mlx4_wqe_data_seg *)
> >                                     ((uintptr_t)ctrl +
> >                                     sizeof(struct mlx4_wqe_ctrl_seg));
> > -                   addr = rte_pktmbuf_mtod(buf, uintptr_t);
> > -                   rte_prefetch0((volatile void *)addr);
> > -                   dseg->addr = rte_cpu_to_be_64(addr);
> > -                   /* Memory region key (big endian). */
> > +
> > +                   ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> >                     lkey = mlx4_txq_mp2mr(txq,
> mlx4_txq_mb2mp(buf));
> > -                   dseg->lkey = rte_cpu_to_be_32(lkey);
> > -                   if (unlikely(dseg->lkey ==
> > -                           rte_cpu_to_be_32((uint32_t)-1))) {
> > +                   if (unlikely(lkey == (uint32_t)-1)) {
> >                             /* MR does not exist. */
> >                             DEBUG("%p: unable to get MP <-> MR
> association",
> >                                   (void *)txq);
> >                             elt->buf = NULL;
> >                             break;
> >                     }
> > -                   /* Never be TXBB aligned, no need compiler barrier.
> */
> > -                   dseg->byte_count = rte_cpu_to_be_32(buf-
> >data_len);
> > -                   /* Fill the control parameters for this packet. */
> > -                   ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4)
> & 0x3f;
> > +                   mlx4_fill_tx_data_seg(dseg, lkey,
> > +                                         rte_pktmbuf_mtod(buf,
> uintptr_t),
> > +                                         rte_cpu_to_be_32(buf-
> >data_len));
> >                     nr_txbbs = 1;
> >             } else {
> >                     nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl); diff -
> -git
> > a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h index
> > 463df2b..8207232 100644
> > --- a/drivers/net/mlx4/mlx4_rxtx.h
> > +++ b/drivers/net/mlx4/mlx4_rxtx.h
> > @@ -212,4 +212,37 @@ int mlx4_tx_queue_setup(struct rte_eth_dev
> *dev, uint16_t idx,
> >     return mlx4_txq_add_mr(txq, mp, i);
> >  }
> >
> > +/**
> > + * Write Tx data segment to the SQ.
> > + *
> > + * @param dseg
> > + *   Pointer to data segment in SQ.
> > + * @param lkey
> > + *   Memory region lkey.
> > + * @param addr
> > + *   Data address.
> > + * @param byte_count
> > + *   Big Endian bytes count of the data to send.
> 
> Big Endian => Big endian
> 
> How about using the dedicated type to properly document it?
> See rte_be32_t from rte_byteorder.h.
> 
Learned new something, thanks, will check it.

> > + */
> > +static inline void
> > +mlx4_fill_tx_data_seg(volatile struct mlx4_wqe_data_seg *dseg,
> > +                  uint32_t lkey, uintptr_t addr, uint32_t byte_count) {
> > +   dseg->addr = rte_cpu_to_be_64(addr);
> > +   dseg->lkey = rte_cpu_to_be_32(lkey); #if RTE_CACHE_LINE_SIZE <
> 64
> > +   /*
> > +    * Need a barrier here before writing the byte_count
> > +    * fields to make sure that all the data is visible
> > +    * before the byte_count field is set.
> > +    * Otherwise, if the segment begins a new cacheline,
> > +    * the HCA prefetcher could grab the 64-byte chunk and
> > +    * get a valid (!= 0xffffffff) byte count but stale
> > +    * data, and end up sending the wrong data.
> > +    */
> > +   rte_io_wmb();
> > +#endif /* RTE_CACHE_LINE_SIZE */
> > +   dseg->byte_count = byte_count;
> > +}
> > +
> 
> No need to expose this function in a header file. Note that rte_cpu_*() and
> rte_io*() require the inclusion of rte_byteorder.h and rte_atomic.h
> respectively.
> 

Shouldn't inline functions be in header files?

> >  #endif /* MLX4_RXTX_H_ */
> > --
> > 1.8.3.1
> >
> 
> --
> Adrien Mazarguil
> 6WIND

Reply via email to