Hi Mahipal, > -----Original Message----- > From: dev <dev-boun...@dpdk.org> On Behalf Of Mahipal Challa > Sent: Friday, December 6, 2019 2:39 PM > To: dev@dpdk.org > Cc: jer...@marvell.com; pathr...@marvell.com; sni...@marvell.com; > venk...@marvell.com > Subject: [dpdk-dev] [PATCH v1 5/6] raw/octeontx2_ep: add dequeue > operation > > Add rawdev dequeue operation for SDP VF devices. > > Signed-off-by: Mahipal Challa <mcha...@marvell.com> > --- > drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c | 199 > ++++++++++++++++++++++++++++++ > drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h | 2 + > drivers/raw/octeontx2_ep/otx2_ep_rawdev.c | 1 + > drivers/raw/octeontx2_ep/otx2_ep_rawdev.h | 18 ++- > 4 files changed, 219 insertions(+), 1 deletion(-) > > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > index ebbacfb..451fcc0 100644 > --- a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > +++ b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.c > @@ -260,6 +260,7 @@ > rte_mempool_get(sdpvf->enqdeq_mpool, &buf); > if (buf == NULL) { > otx2_err("OQ buffer alloc failed"); > + droq->stats.rx_alloc_failure++; > /* sdp_droq_destroy_ring_buffers(droq);*/ > return -ENOMEM; > } > @@ -645,3 +646,201 @@ > return SDP_IQ_SEND_FAILED; > } > > +static uint32_t > +sdp_droq_refill(struct sdp_device *sdpvf, struct sdp_droq *droq) > +{ > + struct sdp_droq_desc *desc_ring; > + uint32_t desc_refilled = 0; > + void *buf = NULL; > + > + desc_ring = droq->desc_ring; > + > + while (droq->refill_count && (desc_refilled < droq->nb_desc)) { > + /* If a valid buffer exists (happens if there is no dispatch), > + * reuse the buffer, else allocate. > + */ > + if (droq->recv_buf_list[droq->refill_idx].buffer != NULL) > + break; > + > + rte_mempool_get(sdpvf->enqdeq_mpool, &buf); > + /* If a buffer could not be allocated, no point in > + * continuing > + */ > + if (buf == NULL) { > + droq->stats.rx_alloc_failure++; > + break; > + } > + > + droq->recv_buf_list[droq->refill_idx].buffer = buf; > + desc_ring[droq->refill_idx].buffer_ptr = > rte_mem_virt2iova(buf); > + > + /* Reset any previous values in the length field. */ > + droq->info_list[droq->refill_idx].length = 0; > + > + droq->refill_idx = sdp_incr_index(droq->refill_idx, 1, > + droq->nb_desc); > + > + desc_refilled++; > + droq->refill_count--; > + > + } > + > + return desc_refilled; > +} > + > +static int > +sdp_droq_read_packet(struct sdp_device *sdpvf __rte_unused, > + struct sdp_droq *droq, > + struct sdp_droq_pkt *droq_pkt) > +{ > + struct sdp_droq_info *info; > + uint32_t total_len = 0; > + uint32_t pkt_len = 0; > + > + info = &droq->info_list[droq->read_idx]; > + sdp_swap_8B_data((uint64_t *)&info->length, 1); > + if (!info->length) { > + otx2_err("OQ info_list->length[%ld]", (long)info->length); > + goto oq_read_fail; > + } > + > + /* Deduce the actual data size */ > + info->length -= SDP_RH_SIZE; > + total_len += (uint32_t)info->length; > + > + otx2_sdp_dbg("OQ: pkt_len[%ld], buffer_size %d", > + (long)info->length, droq->buffer_size); > + if (info->length > droq->buffer_size) { > + otx2_err("This mode is not supported: pkt_len > buffer_size"); > + goto oq_read_fail; > + } > + > + if (info->length <= droq->buffer_size) { > + pkt_len = (uint32_t)info->length; > + droq_pkt->data = droq->recv_buf_list[droq->read_idx].buffer; > + droq_pkt->len = pkt_len; > + > + droq->recv_buf_list[droq->read_idx].buffer = NULL; > + droq->read_idx = sdp_incr_index(droq->read_idx, 1,/* > count */ > + droq->nb_desc /* max rd idx > */); > + droq->refill_count++; > + > + } > + > + info->length = 0; > + > + return SDP_OQ_RECV_SUCCESS; > + > +oq_read_fail: > + return SDP_OQ_RECV_FAILED; > +} > + > +static inline uint32_t > +sdp_check_droq_pkts(struct sdp_droq *droq, uint32_t burst_size) > +{ > + uint32_t min_pkts = 0; > + uint32_t new_pkts; > + uint32_t pkt_count; > + > + /* Latest available OQ packets */ > + pkt_count = rte_read32(droq->pkts_sent_reg); > + > + /* Newly arrived packets */ > + new_pkts = pkt_count - droq->last_pkt_count; > + otx2_sdp_dbg("Recvd [%d] new OQ pkts", new_pkts); > + > + min_pkts = (new_pkts > burst_size) ? burst_size : new_pkts; > + if (min_pkts) { > + rte_atomic64_add(&droq->pkts_pending, min_pkts); > + /* Back up the aggregated packet count so far */ > + droq->last_pkt_count += min_pkts; > + } > + > + return min_pkts; > +} > + > +/* Check for response arrival from OCTEON TX2 > + * returns number of requests completed > + */ > +int > +sdp_rawdev_dequeue(struct rte_rawdev *rawdev, > + struct rte_rawdev_buf **buffers, unsigned int count, > + rte_rawdev_obj_t context __rte_unused) > +{ > + struct sdp_droq_pkt *oq_pkt; > + struct sdp_device *sdpvf; > + struct sdp_droq *droq; > + > + uint32_t q_no = 0, pkts; > + uint32_t new_pkts; > + uint32_t ret; > + > + sdpvf = (struct sdp_device *)rawdev->dev_private; > + > + droq = sdpvf->droq[q_no]; > + if (!droq) { > + otx2_err("Invalid droq[%d]", q_no); > + goto deq_fail; > + } > + > + /* Grab the lock */ > + rte_spinlock_lock(&droq->lock); > + > + new_pkts = sdp_check_droq_pkts(droq, count); > + if (!new_pkts) { > + otx2_sdp_dbg("Zero new_pkts:%d", new_pkts); > + goto deq_fail; /* No pkts at this moment */ > + } > + > + otx2_sdp_dbg("Received new_pkts = %d", new_pkts); > + > + for (pkts = 0; pkts < new_pkts; pkts++) { > + > + /* Push the received pkt to application */ > + oq_pkt = (struct sdp_droq_pkt *)buffers[pkts]; > + > + ret = sdp_droq_read_packet(sdpvf, droq, oq_pkt); > + if (ret) { > + otx2_err("DROQ read pakt failed."); > + goto deq_fail; > + } > + > + /* Stats */ > + droq->stats.pkts_received++; > + droq->stats.bytes_received += oq_pkt->len; > + } > + > + /* Ack the h/w with no# of pkts read by Host */ > + rte_wmb(); The following rte_write32 call has a built-in rte_io_wmb, which is sufficient for IO device. The above barrier can be saved. > + rte_write32(pkts, droq->pkts_sent_reg); > + rte_wmb(); Rte_cio_wmb is sufficient to keep order between coherent memory domain between lcore and I/O device. https://code.dpdk.org/dpdk/latest/source/lib/librte_eal/common/include/generic/rte_atomic.h#L137
> + > + droq->last_pkt_count -= pkts; > + > + otx2_sdp_dbg("DROQ pkts[%d] pushed to application", pkts); > + > + /* Refill DROQ buffers */ > + if (droq->refill_count >= 2 /* droq->refill_threshold */) { > + int desc_refilled = sdp_droq_refill(sdpvf, droq); > + > + /* Flush the droq descriptor data to memory to be sure > + * that when we update the credits the data in memory is > + * accurate. > + */ > + rte_wmb(); Rte_cio_wmb should be the correct one, but the stronger rte_io_wmb within rte_write32 API suffice. > + rte_write32(desc_refilled, droq->pkts_credit_reg); > + > + /* Ensure mmio write completes */ > + rte_wmb(); If the target to ensure the completeness of the above writes, this is ok, otherwise it is overkill. > + otx2_sdp_dbg("Refilled count = %d", desc_refilled); > + } > + > + /* Release the spin lock */ > + rte_spinlock_unlock(&droq->lock); > + > + return pkts; > + > +deq_fail: > + rte_spinlock_unlock(&droq->lock); > + return SDP_OQ_RECV_FAILED; > +} > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > index b9b7c0b..172fdc5 100644 > --- a/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > +++ b/drivers/raw/octeontx2_ep/otx2_ep_enqdeq.h > @@ -11,6 +11,8 @@ > #define SDP_IQ_SEND_FAILED (-1) > #define SDP_IQ_SEND_SUCCESS (0) > > +#define SDP_OQ_RECV_FAILED (-1) > +#define SDP_OQ_RECV_SUCCESS (0) > > static inline uint64_t > sdp_endian_swap_8B(uint64_t _d) > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > index 4ba8473..ddb208d 100644 > --- a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > +++ b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.c > @@ -252,6 +252,7 @@ > .dev_stop = sdp_rawdev_stop, > .dev_close = sdp_rawdev_close, > .enqueue_bufs = sdp_rawdev_enqueue, > + .dequeue_bufs = sdp_rawdev_dequeue, > }; > > static int > diff --git a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > index 8fd06fb..a77cbab 100644 > --- a/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > +++ b/drivers/raw/octeontx2_ep/otx2_ep_rawdev.h > @@ -279,6 +279,18 @@ struct sdp_recv_buffer { > }; > #define SDP_DROQ_RECVBUF_SIZE (sizeof(struct sdp_recv_buffer)) > > +/* DROQ statistics. Each output queue has four stats fields. */ > +struct sdp_droq_stats { > + /* Number of packets received in this queue. */ > + uint64_t pkts_received; > + > + /* Bytes received by this queue. */ > + uint64_t bytes_received; > + > + /* Num of failures of rte_pktmbuf_alloc() */ > + uint64_t rx_alloc_failure; > +}; > + > /* Structure to define the configuration attributes for each Output queue. */ > struct sdp_oq_config { > /* Max number of OQs available */ > @@ -345,6 +357,9 @@ struct sdp_droq { > */ > void *pkts_sent_reg; > > + /* Statistics for this DROQ. */ > + struct sdp_droq_stats stats; > + > /* DMA mapped address of the DROQ descriptor ring. */ > size_t desc_ring_dma; > > @@ -476,6 +491,7 @@ struct sdp_device { > > int sdp_rawdev_enqueue(struct rte_rawdev *dev, struct rte_rawdev_buf > **buffers, > unsigned int count, rte_rawdev_obj_t context); > - > +int sdp_rawdev_dequeue(struct rte_rawdev *dev, struct rte_rawdev_buf > **buffers, > + unsigned int count, rte_rawdev_obj_t context); > > #endif /* _OTX2_EP_RAWDEV_H_ */ > -- > 1.8.3.1