Hi Adrien > -----Original Message----- > From: Adrien Mazarguil [mailto:adrien.mazarg...@6wind.com] > Sent: Wednesday, December 6, 2017 12:59 PM > To: Matan Azrad <ma...@mellanox.com> > Cc: dev@dpdk.org > Subject: Re: [PATCH 4/8] net/mlx4: optimize Tx multi-segment case > > On Tue, Nov 28, 2017 at 12:19:26PM +0000, Matan Azrad wrote: > > mlx4 Tx block can handle up to 4 data segments or control segment + up > > to 3 data segments. The first data segment in each not first Tx block > > must validate Tx queue wraparound and must use IO memory barrier > > before writing the byte count. > > > > The previous multi-segment code used "for" loop to iterate over all > > packet segments and separated first Tx block data case by "if" > > statments. > > statments => statements > > > > > Use switch case and unconditional branches instead of "for" loop can > > optimize the case and prevents the unnecessary checks for each data > > segment; This hints to compiler to create opitimized jump table. > > opitimized => optimized > > > > > Optimize this case by switch case and unconditional branches usage. > > > > Signed-off-by: Matan Azrad <ma...@mellanox.com> > > --- > > drivers/net/mlx4/mlx4_rxtx.c | 165 > > +++++++++++++++++++++++++------------------ > > drivers/net/mlx4/mlx4_rxtx.h | 33 +++++++++ > > 2 files changed, 128 insertions(+), 70 deletions(-) > > > > diff --git a/drivers/net/mlx4/mlx4_rxtx.c > > b/drivers/net/mlx4/mlx4_rxtx.c index 1d8240a..b9cb2fc 100644 > > --- a/drivers/net/mlx4/mlx4_rxtx.c > > +++ b/drivers/net/mlx4/mlx4_rxtx.c > > @@ -432,15 +432,14 @@ struct pv { > > uint32_t head_idx = sq->head & sq->txbb_cnt_mask; > > volatile struct mlx4_wqe_ctrl_seg *ctrl; > > volatile struct mlx4_wqe_data_seg *dseg; > > - struct rte_mbuf *sbuf; > > + struct rte_mbuf *sbuf = buf; > > uint32_t lkey; > > - uintptr_t addr; > > - uint32_t byte_count; > > int pv_counter = 0; > > + int nb_segs = buf->nb_segs; > > > > /* Calculate the needed work queue entry size for this packet. */ > > wqe_real_size = sizeof(volatile struct mlx4_wqe_ctrl_seg) + > > - buf->nb_segs * sizeof(volatile struct mlx4_wqe_data_seg); > > + nb_segs * sizeof(volatile struct mlx4_wqe_data_seg); > > nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size); > > /* > > * Check that there is room for this WQE in the send queue and that > > @@ -457,67 +456,99 @@ struct pv { > > dseg = (volatile struct mlx4_wqe_data_seg *) > > ((uintptr_t)ctrl + sizeof(struct mlx4_wqe_ctrl_seg)); > > *pctrl = ctrl; > > - /* Fill the data segments with buffer information. */ > > - for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) { > > - addr = rte_pktmbuf_mtod(sbuf, uintptr_t); > > - rte_prefetch0((volatile void *)addr); > > - /* Memory region key (big endian) for this memory pool. */ > > + /* > > + * Fill the data segments with buffer information. > > + * First WQE TXBB head segment is always control segment, > > + * so jump to tail TXBB data segments code for the first > > + * WQE data segments filling. > > + */ > > + goto txbb_tail_segs; > > +txbb_head_seg: > > I'm not fundamentally opposed to "goto" unlike a lot of people out there, > but this doesn't look good. It's OK to use goto for error cases and to > extricate > yourself when trapped in an inner loop, also in some optimization scenarios > where it sometimes make sense, but not when the same can be achieved > through standard loop constructs and keywords. > > In this case I'm under the impression you could have managed with a do { ... } > while (...) construct. You need to try harder to reorganize these changes or > prove it can't be done without negatively impacting performance. > > Doing so should make this patch shorter as well. >
I noticed this could be done with loop and without unconditional branches, but I checked it and found nice performance improvement using that way. When I used the loop I degraded performance. > > + /* Memory region key (big endian) for this memory pool. */ > > + lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf)); > > + if (unlikely(lkey == (uint32_t)-1)) { > > + DEBUG("%p: unable to get MP <-> MR association", > > + (void *)txq); > > + return -1; > > + } > > + /* Handle WQE wraparound. */ > > + if (dseg >= > > + (volatile struct mlx4_wqe_data_seg *)sq->eob) > > + dseg = (volatile struct mlx4_wqe_data_seg *) > > + sq->buf; > > + dseg->addr = rte_cpu_to_be_64(rte_pktmbuf_mtod(sbuf, > uintptr_t)); > > + dseg->lkey = rte_cpu_to_be_32(lkey); > > + /* > > + * This data segment starts at the beginning of a new > > + * TXBB, so we need to postpone its byte_count writing > > + * for later. > > + */ > > + pv[pv_counter].dseg = dseg; > > + /* > > + * Zero length segment is treated as inline segment > > + * with zero data. > > + */ > > + pv[pv_counter++].val = rte_cpu_to_be_32(sbuf->data_len ? > > + sbuf->data_len : > 0x80000000); > > + sbuf = sbuf->next; > > + dseg++; > > + nb_segs--; > > +txbb_tail_segs: > > + /* Jump to default if there are more than two segments remaining. > */ > > + switch (nb_segs) { > > + default: > > lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf)); > > - dseg->lkey = rte_cpu_to_be_32(lkey); > > - /* Calculate the needed work queue entry size for this > packet */ > > - if (unlikely(lkey == rte_cpu_to_be_32((uint32_t)-1))) { > > - /* MR does not exist. */ > > + if (unlikely(lkey == (uint32_t)-1)) { > > DEBUG("%p: unable to get MP <-> MR association", > > (void *)txq); > > return -1; > > } > > - if (likely(sbuf->data_len)) { > > - byte_count = rte_cpu_to_be_32(sbuf->data_len); > > - } else { > > - /* > > - * Zero length segment is treated as inline segment > > - * with zero data. > > - */ > > - byte_count = RTE_BE32(0x80000000); > > + mlx4_fill_tx_data_seg(dseg, lkey, > > + rte_pktmbuf_mtod(sbuf, uintptr_t), > > + rte_cpu_to_be_32(sbuf->data_len ? > > + sbuf->data_len : > > + 0x80000000)); > > + sbuf = sbuf->next; > > + dseg++; > > + nb_segs--; > > + /* fallthrough */ > > + case 2: > > + lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf)); > > + if (unlikely(lkey == (uint32_t)-1)) { > > + DEBUG("%p: unable to get MP <-> MR association", > > + (void *)txq); > > + return -1; > > } > > - /* > > - * If the data segment is not at the beginning of a > > - * Tx basic block (TXBB) then write the byte count, > > - * else postpone the writing to just before updating the > > - * control segment. > > - */ > > - if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) { > > - dseg->addr = rte_cpu_to_be_64(addr); > > - dseg->lkey = rte_cpu_to_be_32(lkey); > > -#if RTE_CACHE_LINE_SIZE < 64 > > - /* > > - * Need a barrier here before writing the byte_count > > - * fields to make sure that all the data is visible > > - * before the byte_count field is set. > > - * Otherwise, if the segment begins a new cacheline, > > - * the HCA prefetcher could grab the 64-byte chunk > and > > - * get a valid (!= 0xffffffff) byte count but stale > > - * data, and end up sending the wrong data. > > - */ > > - rte_io_wmb(); > > -#endif /* RTE_CACHE_LINE_SIZE */ > > - dseg->byte_count = byte_count; > > - } else { > > - /* > > - * This data segment starts at the beginning of a new > > - * TXBB, so we need to postpone its byte_count > writing > > - * for later. > > - */ > > - /* Handle WQE wraparound. */ > > - if (dseg >= > > - (volatile struct mlx4_wqe_data_seg *)sq->eob) > > - dseg = (volatile struct mlx4_wqe_data_seg *) > > - sq->buf; > > - dseg->addr = rte_cpu_to_be_64(addr); > > - dseg->lkey = rte_cpu_to_be_32(lkey); > > - pv[pv_counter].dseg = dseg; > > - pv[pv_counter++].val = byte_count; > > + mlx4_fill_tx_data_seg(dseg, lkey, > > + rte_pktmbuf_mtod(sbuf, uintptr_t), > > + rte_cpu_to_be_32(sbuf->data_len ? > > + sbuf->data_len : > > + 0x80000000)); > > + sbuf = sbuf->next; > > + dseg++; > > + nb_segs--; > > + /* fallthrough */ > > + case 1: > > + lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf)); > > + if (unlikely(lkey == (uint32_t)-1)) { > > + DEBUG("%p: unable to get MP <-> MR association", > > + (void *)txq); > > + return -1; > > + } > > + mlx4_fill_tx_data_seg(dseg, lkey, > > + rte_pktmbuf_mtod(sbuf, uintptr_t), > > + rte_cpu_to_be_32(sbuf->data_len ? > > + sbuf->data_len : > > + 0x80000000)); > > + nb_segs--; > > + if (nb_segs) { > > + sbuf = sbuf->next; > > + dseg++; > > + goto txbb_head_seg; > > } > > + /* fallthrough */ > > + case 0: > > + break; > > } > > I think this "switch (nb_segs)" idea is an interesting approach, but should > occur inside a loop construct as previously described. > Same comment as above. > > /* Write the first DWORD of each TXBB save earlier. */ > > if (pv_counter) { > > @@ -583,7 +614,6 @@ struct pv { > > } srcrb; > > uint32_t head_idx = sq->head & sq->txbb_cnt_mask; > > uint32_t lkey; > > - uintptr_t addr; > > > > /* Clean up old buffer. */ > > if (likely(elt->buf != NULL)) { > > @@ -618,24 +648,19 @@ struct pv { > > dseg = (volatile struct mlx4_wqe_data_seg *) > > ((uintptr_t)ctrl + > > sizeof(struct mlx4_wqe_ctrl_seg)); > > - addr = rte_pktmbuf_mtod(buf, uintptr_t); > > - rte_prefetch0((volatile void *)addr); > > - dseg->addr = rte_cpu_to_be_64(addr); > > - /* Memory region key (big endian). */ > > + > > + ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) > & 0x3f; > > lkey = mlx4_txq_mp2mr(txq, > mlx4_txq_mb2mp(buf)); > > - dseg->lkey = rte_cpu_to_be_32(lkey); > > - if (unlikely(dseg->lkey == > > - rte_cpu_to_be_32((uint32_t)-1))) { > > + if (unlikely(lkey == (uint32_t)-1)) { > > /* MR does not exist. */ > > DEBUG("%p: unable to get MP <-> MR > association", > > (void *)txq); > > elt->buf = NULL; > > break; > > } > > - /* Never be TXBB aligned, no need compiler barrier. > */ > > - dseg->byte_count = rte_cpu_to_be_32(buf- > >data_len); > > - /* Fill the control parameters for this packet. */ > > - ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) > & 0x3f; > > + mlx4_fill_tx_data_seg(dseg, lkey, > > + rte_pktmbuf_mtod(buf, > uintptr_t), > > + rte_cpu_to_be_32(buf- > >data_len)); > > nr_txbbs = 1; > > } else { > > nr_txbbs = mlx4_tx_burst_segs(buf, txq, &ctrl); diff - > -git > > a/drivers/net/mlx4/mlx4_rxtx.h b/drivers/net/mlx4/mlx4_rxtx.h index > > 463df2b..8207232 100644 > > --- a/drivers/net/mlx4/mlx4_rxtx.h > > +++ b/drivers/net/mlx4/mlx4_rxtx.h > > @@ -212,4 +212,37 @@ int mlx4_tx_queue_setup(struct rte_eth_dev > *dev, uint16_t idx, > > return mlx4_txq_add_mr(txq, mp, i); > > } > > > > +/** > > + * Write Tx data segment to the SQ. > > + * > > + * @param dseg > > + * Pointer to data segment in SQ. > > + * @param lkey > > + * Memory region lkey. > > + * @param addr > > + * Data address. > > + * @param byte_count > > + * Big Endian bytes count of the data to send. > > Big Endian => Big endian > > How about using the dedicated type to properly document it? > See rte_be32_t from rte_byteorder.h. > Learned new something, thanks, will check it. > > + */ > > +static inline void > > +mlx4_fill_tx_data_seg(volatile struct mlx4_wqe_data_seg *dseg, > > + uint32_t lkey, uintptr_t addr, uint32_t byte_count) { > > + dseg->addr = rte_cpu_to_be_64(addr); > > + dseg->lkey = rte_cpu_to_be_32(lkey); #if RTE_CACHE_LINE_SIZE < > 64 > > + /* > > + * Need a barrier here before writing the byte_count > > + * fields to make sure that all the data is visible > > + * before the byte_count field is set. > > + * Otherwise, if the segment begins a new cacheline, > > + * the HCA prefetcher could grab the 64-byte chunk and > > + * get a valid (!= 0xffffffff) byte count but stale > > + * data, and end up sending the wrong data. > > + */ > > + rte_io_wmb(); > > +#endif /* RTE_CACHE_LINE_SIZE */ > > + dseg->byte_count = byte_count; > > +} > > + > > No need to expose this function in a header file. Note that rte_cpu_*() and > rte_io*() require the inclusion of rte_byteorder.h and rte_atomic.h > respectively. > Shouldn't inline functions be in header files? > > #endif /* MLX4_RXTX_H_ */ > > -- > > 1.8.3.1 > > > > -- > Adrien Mazarguil > 6WIND