Hi Ophir, On Mon, Oct 23, 2017 at 02:22:00PM +0000, Ophir Munk wrote: > This commit optimizes handling of one segment and calls a > dedicated function for handling multi segments > > Signed-off-by: Ophir Munk <ophi...@mellanox.com>
While it indeed moves the code to a separate function I'm not sure by how much it improves performance. Is it noticeably better, can you provide a short performance summary with and without this patch? Is that the case for both single and multi-segment scenarios, or was this improvement at the cost of a degradation in the latter case? If it splits a large function in two smaller ones for readability and no performance validation was done on this specific patch alone, please not label it as a performance improvement. I'm fine with readability improvements when properly identified as such. A few additional comments below. > --- > drivers/net/mlx4/mlx4_rxtx.c | 284 > +++++++++++++++++++++++-------------------- > 1 file changed, 154 insertions(+), 130 deletions(-) > > diff --git a/drivers/net/mlx4/mlx4_rxtx.c b/drivers/net/mlx4/mlx4_rxtx.c > index 3236552..9596859 100644 > --- a/drivers/net/mlx4/mlx4_rxtx.c > +++ b/drivers/net/mlx4/mlx4_rxtx.c > @@ -62,6 +62,9 @@ > #include "mlx4_rxtx.h" > #include "mlx4_utils.h" > > +#define WQE_ONE_DATA_SEG_SIZE \ > + (sizeof(struct mlx4_wqe_ctrl_seg) + sizeof(struct mlx4_wqe_data_seg)) > + > /** > * Pointer-value pair structure used in tx_post_send for saving the first > * DWORD (32 byte) of a TXBB. > @@ -140,22 +143,19 @@ mlx4_txq_stamp_freed_wqe(struct mlx4_sq *sq, uint16_t > index, uint8_t owner) > * @return > * 0 on success, -1 on failure. > */ > -static int > -mlx4_txq_complete(struct txq *txq) > +static inline int __attribute__((always_inline)) Should be static only, leave the rest to the compiler. This function is large enough that it shouldn't make much of a difference anyway (unless proved otherwise). > +mlx4_txq_complete(struct txq *txq, const unsigned int elts_n, > + struct mlx4_sq *sq) > { > unsigned int elts_comp = txq->elts_comp; > unsigned int elts_tail = txq->elts_tail; > - const unsigned int elts_n = txq->elts_n; > struct mlx4_cq *cq = &txq->mcq; > - struct mlx4_sq *sq = &txq->msq; > struct mlx4_cqe *cqe; > uint32_t cons_index = cq->cons_index; > uint16_t new_index; > uint16_t nr_txbbs = 0; > int pkts = 0; > > - if (unlikely(elts_comp == 0)) > - return 0; > /* > * Traverse over all CQ entries reported and handle each WQ entry > * reported by them. > @@ -266,6 +266,120 @@ rte_be32_t mlx4_txq_add_mr(struct txq *txq, struct > rte_mempool *mp, uint32_t i) > return txq->mp2mr[i].lkey; > } > > +static int handle_multi_segs(struct rte_mbuf *buf, > + struct txq *txq, > + struct mlx4_wqe_ctrl_seg **pctrl) > +{ > + int wqe_real_size; > + int nr_txbbs; > + struct pv *pv = (struct pv *)txq->bounce_buf; > + struct mlx4_sq *sq = &txq->msq; > + uint32_t head_idx = sq->head & sq->txbb_cnt_mask; > + struct mlx4_wqe_ctrl_seg *ctrl; > + struct mlx4_wqe_data_seg *dseg; > + uintptr_t addr; > + uint32_t byte_count; > + int pv_counter = 0; > + > + /* Calculate the needed work queue entry size for this packet. */ > + wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) + > + buf->nb_segs * sizeof(struct mlx4_wqe_data_seg); > + nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size); > + /* > + * Check that there is room for this WQE in the send queue and that > + * the WQE size is legal. > + */ > + if (((sq->head - sq->tail) + nr_txbbs + > + sq->headroom_txbbs) >= sq->txbb_cnt || > + nr_txbbs > MLX4_MAX_WQE_TXBBS) { > + return -1; > + } > + > + /* Get the control and data entries of the WQE. */ > + ctrl = (struct mlx4_wqe_ctrl_seg *)mlx4_get_send_wqe(sq, head_idx); > + dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl + > + sizeof(struct mlx4_wqe_ctrl_seg)); > + *pctrl = ctrl; > + /* Fill the data segments with buffer information. */ > + struct rte_mbuf *sbuf; > + > + for (sbuf = buf; sbuf != NULL; sbuf = sbuf->next, dseg++) { > + addr = rte_pktmbuf_mtod(sbuf, uintptr_t); > + rte_prefetch0((volatile void *)addr); > + /* Handle WQE wraparound. */ > + if (unlikely(dseg >= (struct mlx4_wqe_data_seg *)sq->eob)) > + dseg = (struct mlx4_wqe_data_seg *)sq->buf; > + dseg->addr = rte_cpu_to_be_64(addr); > + /* Memory region key (big endian) for this memory pool. */ > + dseg->lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf)); > +#ifndef NDEBUG > + /* Calculate the needed work queue entry size for this packet */ > + if (unlikely(dseg->lkey == rte_cpu_to_be_32((uint32_t)-1))) { > + /* MR does not exist. */ > + DEBUG("%p: unable to get MP <-> MR association", > + (void *)txq); > + /* > + * Restamp entry in case of failure. > + * Make sure that size is written correctly > + * Note that we give ownership to the SW, not the HW. > + */ > + wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) + > + buf->nb_segs * sizeof(struct mlx4_wqe_data_seg); > + ctrl->fence_size = (wqe_real_size >> 4) & 0x3f; > + mlx4_txq_stamp_freed_wqe(sq, head_idx, > + (sq->head & sq->txbb_cnt) ? 0 : 1); > + return -1; > + } > +#endif /* NDEBUG */ > + if (likely(sbuf->data_len)) { > + byte_count = rte_cpu_to_be_32(sbuf->data_len); > + } else { > + /* > + * Zero length segment is treated as inline segment > + * with zero data. > + */ > + byte_count = RTE_BE32(0x80000000); > + } > + /* > + * If the data segment is not at the beginning of a > + * Tx basic block (TXBB) then write the byte count, > + * else postpone the writing to just before updating the > + * control segment. > + */ > + if ((uintptr_t)dseg & (uintptr_t)(MLX4_TXBB_SIZE - 1)) { > + /* > + * Need a barrier here before writing the byte_count > + * fields to make sure that all the data is visible > + * before the byte_count field is set. > + * Otherwise, if the segment begins a new cacheline, > + * the HCA prefetcher could grab the 64-byte chunk and > + * get a valid (!= 0xffffffff) byte count but stale > + * data, and end up sending the wrong data. > + */ > + rte_io_wmb(); > + dseg->byte_count = byte_count; > + } else { > + /* > + * This data segment starts at the beginning of a new > + * TXBB, so we need to postpone its byte_count writing > + * for later. > + */ > + pv[pv_counter].dseg = dseg; > + pv[pv_counter++].val = byte_count; > + } > + } > + /* Write the first DWORD of each TXBB save earlier. */ > + if (pv_counter) { > + /* Need a barrier here before writing the byte_count. */ > + rte_io_wmb(); > + for (--pv_counter; pv_counter >= 0; pv_counter--) > + pv[pv_counter].dseg->byte_count = pv[pv_counter].val; > + } > + /* Fill the control parameters for this packet. */ > + ctrl->fence_size = (wqe_real_size >> 4) & 0x3f; > + > + return nr_txbbs; > +} > /** > * DPDK callback for Tx. > * > @@ -288,10 +402,11 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, > uint16_t pkts_n) > unsigned int i; > unsigned int max; > struct mlx4_sq *sq = &txq->msq; > - struct pv *pv = (struct pv *)txq->bounce_buf; > + int nr_txbbs; > > assert(txq->elts_comp_cd != 0); > - mlx4_txq_complete(txq); > + if (likely(txq->elts_comp != 0)) > + mlx4_txq_complete(txq, elts_n, sq); > max = (elts_n - (elts_head - txq->elts_tail)); > if (max > elts_n) > max -= elts_n; > @@ -316,10 +431,6 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, > uint16_t pkts_n) > } srcrb; > uint32_t head_idx = sq->head & sq->txbb_cnt_mask; > uintptr_t addr; > - uint32_t byte_count; > - int wqe_real_size; > - int nr_txbbs; > - int pv_counter = 0; > > /* Clean up old buffer. */ > if (likely(elt->buf != NULL)) { > @@ -338,31 +449,22 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, > uint16_t pkts_n) > } while (tmp != NULL); > } > RTE_MBUF_PREFETCH_TO_FREE(elt_next->buf); > - > - /* > - * Calculate the needed work queue entry size > - * for this packet. > - */ > - wqe_real_size = sizeof(struct mlx4_wqe_ctrl_seg) + > - buf->nb_segs * sizeof(struct mlx4_wqe_data_seg); > - nr_txbbs = MLX4_SIZE_TO_TXBBS(wqe_real_size); > - /* > - * Check that there is room for this WQE in the send > - * queue and that the WQE size is legal. > - */ > - if (((sq->head - sq->tail) + nr_txbbs + > - sq->headroom_txbbs) >= sq->txbb_cnt || > - nr_txbbs > MLX4_MAX_WQE_TXBBS) { > - elt->buf = NULL; > - break; > - } > - /* Get the control and data entries of the WQE. */ > - ctrl = (struct mlx4_wqe_ctrl_seg *) > - mlx4_get_send_wqe(sq, head_idx); > - dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl + > - sizeof(struct mlx4_wqe_ctrl_seg)); > - /* Fill the data segments with buffer information. */ > if (likely(buf->nb_segs == 1)) { > + /* > + * Check that there is room for this WQE in the send > + * queue and that the WQE size is legal > + */ > + if (((sq->head - sq->tail) + 1 + sq->headroom_txbbs) > + >= sq->txbb_cnt || > + 1 > MLX4_MAX_WQE_TXBBS) { > + elt->buf = NULL; > + break; > + } > + /* Get the control and data entries of the WQE. */ > + ctrl = (struct mlx4_wqe_ctrl_seg *) > + mlx4_get_send_wqe(sq, head_idx); > + dseg = (struct mlx4_wqe_data_seg *)((uintptr_t)ctrl + > + sizeof(struct mlx4_wqe_ctrl_seg)); > addr = rte_pktmbuf_mtod(buf, uintptr_t); > rte_prefetch0((volatile void *)addr); > /* Handle WQE wraparound. */ > @@ -371,120 +473,42 @@ mlx4_tx_burst(void *dpdk_txq, struct rte_mbuf **pkts, > uint16_t pkts_n) > dseg = (struct mlx4_wqe_data_seg *)sq->buf; > dseg->addr = rte_cpu_to_be_64(addr); > /* Memory region key (big endian). */ > - dseg->lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(sbuf)); > - #ifndef NDEBUG > + dseg->lkey = mlx4_txq_mp2mr(txq, mlx4_txq_mb2mp(buf)); > +#ifndef NDEBUG > if (unlikely(dseg->lkey == > rte_cpu_to_be_32((uint32_t)-1))) { > /* MR does not exist. */ > DEBUG("%p: unable to get MP <-> MR association", > - (void *)txq); > + (void *)txq); > /* > * Restamp entry in case of failure. > * Make sure that size is written correctly > * Note that we give ownership to the SW, > * not the HW. > */ > - ctrl->fence_size = (wqe_real_size >> 4) & 0x3f; > + ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) > + & 0x3f; > mlx4_txq_stamp_freed_wqe(sq, head_idx, > - (sq->head & sq->txbb_cnt) ? 0 : 1); > + (sq->head & sq->txbb_cnt) ? 0 : 1); > elt->buf = NULL; > break; > } > - #endif /* NDEBUG */ > +#endif /* NDEBUG */ > /* Need a barrier here before writing the byte_count. */ > rte_io_wmb(); > dseg->byte_count = rte_cpu_to_be_32(buf->data_len); > + > + /* Fill the control parameters for this packet. */ > + ctrl->fence_size = (WQE_ONE_DATA_SEG_SIZE >> 4) & 0x3f; > + nr_txbbs = 1; > } else { > - /* Fill the data segments with buffer information. */ > - struct rte_mbuf *sbuf; > - > - for (sbuf = buf; > - sbuf != NULL; > - sbuf = sbuf->next, dseg++) { > - addr = rte_pktmbuf_mtod(sbuf, uintptr_t); > - rte_prefetch0((volatile void *)addr); > - /* Handle WQE wraparound. */ > - if (unlikely(dseg >= > - (struct mlx4_wqe_data_seg *)sq->eob)) > - dseg = (struct mlx4_wqe_data_seg *) > - sq->buf; > - dseg->addr = rte_cpu_to_be_64(addr); > - /* Memory region key (big endian). */ > - dseg->lkey = mlx4_txq_mp2mr(txq, > - mlx4_txq_mb2mp(sbuf)); > - #ifndef NDEBUG > - if (unlikely(dseg->lkey == > - rte_cpu_to_be_32((uint32_t)-1))) { > - /* MR does not exist. */ > - DEBUG("%p: unable to get MP <-> MR > association", > - (void *)txq); > - /* > - * Restamp entry in case of failure. > - * Make sure that size is written > - * correctly, note that we give > - * ownership to the SW, not the HW. > - */ > - ctrl->fence_size = > - (wqe_real_size >> 4) & 0x3f; > - mlx4_txq_stamp_freed_wqe(sq, head_idx, > - (sq->head & sq->txbb_cnt) ? 0 : 1); > - elt->buf = NULL; > - break; > - } > - #endif /* NDEBUG */ > - if (likely(sbuf->data_len)) { > - byte_count = > - rte_cpu_to_be_32(sbuf->data_len); > - } else { > - /* > - * Zero length segment is treated as > - * inline segment with zero data. > - */ > - byte_count = RTE_BE32(0x80000000); > - } > - /* > - * If the data segment is not at the beginning > - * of a Tx basic block (TXBB) then write the > - * byte count, else postpone the writing to > - * just before updating the control segment. > - */ > - if ((uintptr_t)dseg & > - (uintptr_t)(MLX4_TXBB_SIZE - 1)) { > - /* > - * Need a barrier here before writing > - * the byte_count fields to make sure > - * that all the data is visible before > - * the byte_count field is set. > - * Otherwise, if the segment begins a > - * new cacheline, the HCA prefetcher > - * could grab the 64-byte chunk and get > - * a valid (!= 0xffffffff) byte count > - * but stale data, and end up sending > - * the wrong data. > - */ > - rte_io_wmb(); > - dseg->byte_count = byte_count; > - } else { > - /* > - * This data segment starts at the > - * beginning of a new TXBB, so we > - * need to postpone its byte_count > - * writing for later. > - */ > - pv[pv_counter].dseg = dseg; > - pv[pv_counter++].val = byte_count; > - } > + nr_txbbs = handle_multi_segs(buf, txq, &ctrl); Having all this part non-inline could degrade multi-segment performance, is that OK? > + if (nr_txbbs < 0) { > + elt->buf = NULL; > + break; > } > - /* Write the first DWORD of each TXBB save earlier. */ > - if (pv_counter) { > - /* Need a barrier before writing the byte_count. */ > - rte_io_wmb(); > - for (--pv_counter; pv_counter >= 0; pv_counter--) > - pv[pv_counter].dseg->byte_count = > - pv[pv_counter].val; > } > - /* Fill the control parameters for this packet. */ > - ctrl->fence_size = (wqe_real_size >> 4) & 0x3f; > + > /* > * For raw Ethernet, the SOLICIT flag is used to indicate > * that no ICRC should be calculated. > -- > 2.7.4 > -- Adrien Mazarguil 6WIND