Hi Gavin, Please see the response inline. > -----Original Message----- > From: Gavin Hu <gavin...@arm.com> > Sent: Tuesday, January 7, 2020 11:13 AM > To: Mahipal Challa <mcha...@marvell.com>; dev@dpdk.org > Cc: Jerin Jacob Kollanukkaran <jer...@marvell.com>; Narayana Prasad Raju > Athreya <pathr...@marvell.com>; Subrahmanyam Nilla > <sni...@marvell.com>; Venkateshwarlu Nalla <venk...@marvell.com>; nd > <n...@arm.com> > Subject: [EXT] RE: [dpdk-dev] [PATCH v3 5/6] raw/octeontx2_ep: add > dequeue operation > > External Email > > ---------------------------------------------------------------------- > Hi Mahipal, Jerin, > > > -----Original Message----- > > From: Mahipal Challa <mcha...@marvell.com> > > Sent: Monday, January 6, 2020 8:07 PM > > To: dev@dpdk.org > > Cc: jer...@marvell.com; pathr...@marvell.com; sni...@marvell.com; > > venk...@marvell.com; Gavin Hu <gavin...@arm.com> > > Subject: [dpdk-dev] [PATCH v3 5/6] raw/octeontx2_ep: add dequeue > > operation > > > > Add rawdev dequeue operation for SDP VF devices. > > > > Signed-off-by: Mahipal Challa <mcha...@marvell.com> > > --- > > drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c | 197 > > ++++++++++++++++++++++++++++++ > > drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h | 2 + > > drivers/raw/octeontx2_ep/otx2_ep_rawdev.c | 1 + > > drivers/raw/octeontx2_ep/otx2_ep_rawdev.h | 18 ++- > > 4 files changed, 217 insertions(+), 1 deletion(-) > > > > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > > b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > > index 6910f08..dd270b5 100644 > > --- a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > > +++ b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > > @@ -260,6 +260,7 @@ > > rte_mempool_get(sdpvf->enqdeq_mpool, &buf); > > if (buf == NULL) { > > otx2_err("OQ buffer alloc failed"); > > + droq->stats.rx_alloc_failure++; > > /* sdp_droq_destroy_ring_buffers(droq);*/ > > return -ENOMEM; > > } > > @@ -645,3 +646,199 @@ > > return SDP_IQ_SEND_FAILED; > > } > > > > +static uint32_t > > +sdp_droq_refill(struct sdp_device *sdpvf, struct sdp_droq *droq) { > > + struct sdp_droq_desc *desc_ring; > > + uint32_t desc_refilled = 0; > > + void *buf = NULL; > > + > > + desc_ring = droq->desc_ring; > > + > > + while (droq->refill_count && (desc_refilled < droq->nb_desc)) { > > + /* If a valid buffer exists (happens if there is no dispatch), > > + * reuse the buffer, else allocate. > > + */ > > + if (droq->recv_buf_list[droq->refill_idx].buffer != NULL) > > + break; > > + > > + rte_mempool_get(sdpvf->enqdeq_mpool, &buf); > > + /* If a buffer could not be allocated, no point in > > + * continuing > > + */ > > + if (buf == NULL) { > > + droq->stats.rx_alloc_failure++; > > + break; > > + } > > + > > + droq->recv_buf_list[droq->refill_idx].buffer = buf; > > + desc_ring[droq->refill_idx].buffer_ptr = > > rte_mem_virt2iova(buf); > > + > > + /* Reset any previous values in the length field. */ > > + droq->info_list[droq->refill_idx].length = 0; > > + > > + droq->refill_idx = sdp_incr_index(droq->refill_idx, 1, > > + droq->nb_desc); > > + > > + desc_refilled++; > > + droq->refill_count--; > > + > > + } > > + > > + return desc_refilled; > > +} > > + > > +static int > > +sdp_droq_read_packet(struct sdp_device *sdpvf __rte_unused, > > + struct sdp_droq *droq, > > + struct sdp_droq_pkt *droq_pkt) { > > + struct sdp_droq_info *info; > > + uint32_t total_len = 0; > > + uint32_t pkt_len = 0; > > + > > + info = &droq->info_list[droq->read_idx]; > > + sdp_swap_8B_data((uint64_t *)&info->length, 1); > > + if (!info->length) { > > + otx2_err("OQ info_list->length[%ld]", (long)info->length); > > + goto oq_read_fail; > > + } > > + > > + /* Deduce the actual data size */ > > + info->length -= SDP_RH_SIZE; > > + total_len += (uint32_t)info->length; > > + > > + otx2_sdp_dbg("OQ: pkt_len[%ld], buffer_size %d", > > + (long)info->length, droq->buffer_size); > > + if (info->length > droq->buffer_size) { > > + otx2_err("This mode is not supported: pkt_len > > > buffer_size"); > > + goto oq_read_fail; > > + } > > + > > + if (info->length <= droq->buffer_size) { > > + pkt_len = (uint32_t)info->length; > > + droq_pkt->data = droq->recv_buf_list[droq- > > >read_idx].buffer; > > + droq_pkt->len = pkt_len; > > + > > + droq->recv_buf_list[droq->read_idx].buffer = NULL; > > + droq->read_idx = sdp_incr_index(droq->read_idx, 1,/* > > count */ > > + droq->nb_desc /* max rd > > idx */); > > + droq->refill_count++; > > + > > + } > > + > > + info->length = 0; > > + > > + return SDP_OQ_RECV_SUCCESS; > > + > > +oq_read_fail: > > + return SDP_OQ_RECV_FAILED; > > +} > > + > > +static inline uint32_t > > +sdp_check_droq_pkts(struct sdp_droq *droq, uint32_t burst_size) { > > + uint32_t min_pkts = 0; > > + uint32_t new_pkts; > > + uint32_t pkt_count; > > + > > + /* Latest available OQ packets */ > > + pkt_count = rte_read32(droq->pkts_sent_reg); > > + > > + /* Newly arrived packets */ > > + new_pkts = pkt_count - droq->last_pkt_count; > > + otx2_sdp_dbg("Recvd [%d] new OQ pkts", new_pkts); > > + > > + min_pkts = (new_pkts > burst_size) ? burst_size : new_pkts; > > + if (min_pkts) { > > + rte_atomic64_add(&droq->pkts_pending, min_pkts); > > + /* Back up the aggregated packet count so far */ > > + droq->last_pkt_count += min_pkts; > > + } > > + > > + return min_pkts; > > +} > > + > > +/* Check for response arrival from OCTEON TX2 > > + * returns number of requests completed */ int > > +sdp_rawdev_dequeue(struct rte_rawdev *rawdev, > > + struct rte_rawdev_buf **buffers, unsigned int count, > > + rte_rawdev_obj_t context __rte_unused) { > > + struct sdp_droq_pkt *oq_pkt; > > + struct sdp_device *sdpvf; > > + struct sdp_droq *droq; > > + > > + uint32_t q_no = 0, pkts; > > + uint32_t new_pkts; > > + uint32_t ret; > > + > > + sdpvf = (struct sdp_device *)rawdev->dev_private; > > + > > + droq = sdpvf->droq[q_no]; > > + if (!droq) { > > + otx2_err("Invalid droq[%d]", q_no); > > + goto deq_fail; > > + } > > + > > + /* Grab the lock */ > > + rte_spinlock_lock(&droq->lock); > > + > > + new_pkts = sdp_check_droq_pkts(droq, count); > > + if (!new_pkts) { > > + otx2_sdp_dbg("Zero new_pkts:%d", new_pkts); > > + goto deq_fail; /* No pkts at this moment */ > > + } > > + > > + otx2_sdp_dbg("Received new_pkts = %d", new_pkts); > > + > > + for (pkts = 0; pkts < new_pkts; pkts++) { > > + > > + /* Push the received pkt to application */ > > + oq_pkt = (struct sdp_droq_pkt *)buffers[pkts]; > > + > > + ret = sdp_droq_read_packet(sdpvf, droq, oq_pkt); > > + if (ret) { > > + otx2_err("DROQ read pakt failed."); > > + goto deq_fail; > > + } > > + > > + /* Stats */ > > + droq->stats.pkts_received++; > > + droq->stats.bytes_received += oq_pkt->len; > > + } > > + > > + /* Ack the h/w with no# of pkts read by Host */ > > + rte_write32(pkts, droq->pkts_sent_reg); > Rte_write32 has a built-in rte_io_wmb, aka "dsb" for aarch64, inside, this > hurts performance, this is on the fast data path? > > + rte_cio_wmb(); > This barrier should be hoisted up to between the dequeue operation and > register write to keep the ordering of two. > I am surprised why rte_cio_mb is not defined within DPDK, it is required here > to keep the read/write ordering, neither rmb nor wmb can fulfill the ordering > required.
[Mahipal]: We would like to go with this now, and separately handle performance improvement optimizations in the future. > > + > > + droq->last_pkt_count -= pkts; > > + > > + otx2_sdp_dbg("DROQ pkts[%d] pushed to application", pkts); > > + > > + /* Refill DROQ buffers */ > > + if (droq->refill_count >= 2 /* droq->refill_threshold */) { > > + int desc_refilled = sdp_droq_refill(sdpvf, droq); > > + > > + /* Flush the droq descriptor data to memory to be sure > > + * that when we update the credits the data in memory is > > + * accurate. > > + */ > > + rte_write32(desc_refilled, droq->pkts_credit_reg); > Rte_cio_wmb should be used instead of the built-in rte_io_wmb to improve > the performance. > But if performance is not a concern here, rte_io_wmb rescued the situation. [Mahipal]: As mentioned above we would like to go with this now, and separately handle performance improvement optimizations in the future. > > + > > + /* Ensure mmio write completes */ > > + rte_wmb(); > > + otx2_sdp_dbg("Refilled count = %d", desc_refilled); > > + } > > + > > + /* Release the spin lock */ > > + rte_spinlock_unlock(&droq->lock); > > + > > + return pkts; > > + > > +deq_fail: > > + rte_spinlock_unlock(&droq->lock); > > + return SDP_OQ_RECV_FAILED; > > +} > > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > > b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > > index b9b7c0b..172fdc5 100644 > > --- a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > > +++ b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > > @@ -11,6 +11,8 @@ > > #define SDP_IQ_SEND_FAILED (-1) > > #define SDP_IQ_SEND_SUCCESS (0) > > > > +#define SDP_OQ_RECV_FAILED (-1) > > +#define SDP_OQ_RECV_SUCCESS (0) > > > > static inline uint64_t > > sdp_endian_swap_8B(uint64_t _d) > > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > > b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > > index 22a6beb..7158b97 100644 > > --- a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > > +++ b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > > @@ -252,6 +252,7 @@ > > .dev_stop = sdp_rawdev_stop, > > .dev_close = sdp_rawdev_close, > > .enqueue_bufs = sdp_rawdev_enqueue, > > + .dequeue_bufs = sdp_rawdev_dequeue, > > }; > > > > static int > > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > > b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > > index 8fd06fb..a77cbab 100644 > > --- a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > > +++ b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > > @@ -279,6 +279,18 @@ struct sdp_recv_buffer { }; > > #define SDP_DROQ_RECVBUF_SIZE (sizeof(struct sdp_recv_buffer)) > > > > +/* DROQ statistics. Each output queue has four stats fields. */ > > +struct sdp_droq_stats { > > + /* Number of packets received in this queue. */ > > + uint64_t pkts_received; > > + > > + /* Bytes received by this queue. */ > > + uint64_t bytes_received; > > + > > + /* Num of failures of rte_pktmbuf_alloc() */ > > + uint64_t rx_alloc_failure; > > +}; > > + > > /* Structure to define the configuration attributes for each Output queue. > > */ > > struct sdp_oq_config { > > /* Max number of OQs available */ > > @@ -345,6 +357,9 @@ struct sdp_droq { > > */ > > void *pkts_sent_reg; > > > > + /* Statistics for this DROQ. */ > > + struct sdp_droq_stats stats; > > + > > /* DMA mapped address of the DROQ descriptor ring. */ > > size_t desc_ring_dma; > > > > @@ -476,6 +491,7 @@ struct sdp_device { > > > > int sdp_rawdev_enqueue(struct rte_rawdev *dev, struct rte_rawdev_buf > > **buffers, > > unsigned int count, rte_rawdev_obj_t context); > > - > > +int sdp_rawdev_dequeue(struct rte_rawdev *dev, struct rte_rawdev_buf > > **buffers, > > + unsigned int count, rte_rawdev_obj_t context); > > > > #endif /* _OTX2_EP_RAWDEV_H_ */ > > -- > > 1.8.3.1 Thanks, Mahipal