API to flush instruction queue checks how many packets reached device
and frees associated host buffers using request list.

Signed-off-by: Shijith Thotton <shijith.thot...@caviumnetworks.com>
Signed-off-by: Jerin Jacob <jerin.ja...@caviumnetworks.com>
Signed-off-by: Derek Chickles <derek.chick...@caviumnetworks.com>
Signed-off-by: Venkat Koppula <venkat.kopp...@caviumnetworks.com>
Signed-off-by: Srisivasubramanian S <ssriniva...@caviumnetworks.com>
Signed-off-by: Mallesham Jatharakonda <mjatharako...@oneconvergence.com>
 drivers/net/liquidio/lio_ethdev.c |   1 +
 drivers/net/liquidio/lio_rxtx.c   | 187 +++++++++++++++++++++++++++++++++++++-
 drivers/net/liquidio/lio_rxtx.h   |   1 +
 3 files changed, 188 insertions(+), 1 deletion(-)

diff --git a/drivers/net/liquidio/lio_ethdev.c 
index 5c5aade..b8baa4f 100644
--- a/drivers/net/liquidio/lio_ethdev.c
+++ b/drivers/net/liquidio/lio_ethdev.c
@@ -281,6 +281,7 @@ static int lio_dev_configure(struct rte_eth_dev *eth_dev)
         * response arrived or timed-out.
        while ((*sc->status_word == LIO_COMPLETION_WORD_INIT) && --timeout) {
+               lio_flush_iq(lio_dev, lio_dev->instr_queue[sc->iq_no]);
diff --git a/drivers/net/liquidio/lio_rxtx.c b/drivers/net/liquidio/lio_rxtx.c
index 75ecbd5..64c0385 100644
--- a/drivers/net/liquidio/lio_rxtx.c
+++ b/drivers/net/liquidio/lio_rxtx.c
@@ -41,6 +41,9 @@
 #include "lio_rxtx.h"
 #define LIO_MAX_SG 12
+/* Flush iq if available tx_desc fall below LIO_FLUSH_WM */
+#define LIO_FLUSH_WM(_iq) ((_iq)->max_count / 2)
 static void
 lio_droq_compute_max_packet_bufs(struct lio_droq *droq)
@@ -977,6 +980,146 @@
        iq->request_list[idx].reqtype = reqtype;
+static inline void
+lio_free_netsgbuf(void *buf)
+       struct lio_buf_free_info *finfo = buf;
+       struct lio_device *lio_dev = finfo->lio_dev;
+       struct rte_mbuf *m = finfo->mbuf;
+       struct lio_gather *g = finfo->g;
+       uint8_t iq = finfo->iq_no;
+       /* This will take care of multiple segments also */
+       rte_pktmbuf_free(m);
+       rte_spinlock_lock(&lio_dev->glist_lock[iq]);
+       STAILQ_INSERT_TAIL(&lio_dev->glist_head[iq], &g->list, entries);
+       rte_spinlock_unlock(&lio_dev->glist_lock[iq]);
+       rte_free(finfo);
+/* Can only run in process context */
+static int
+lio_process_iq_request_list(struct lio_device *lio_dev,
+                           struct lio_instr_queue *iq)
+       struct octeon_instr_irh *irh = NULL;
+       uint32_t old = iq->flush_index;
+       struct lio_soft_command *sc;
+       uint32_t inst_count = 0;
+       int reqtype;
+       void *buf;
+       while (old != iq->lio_read_index) {
+               reqtype = iq->request_list[old].reqtype;
+               buf     = iq->request_list[old].buf;
+               if (reqtype == LIO_REQTYPE_NONE)
+                       goto skip_this;
+               switch (reqtype) {
+               case LIO_REQTYPE_NORESP_NET:
+                       rte_pktmbuf_free((struct rte_mbuf *)buf);
+                       break;
+               case LIO_REQTYPE_NORESP_NET_SG:
+                       lio_free_netsgbuf(buf);
+                       break;
+               case LIO_REQTYPE_SOFT_COMMAND:
+                       sc = buf;
+                       irh = (struct octeon_instr_irh *)&sc->cmd.cmd3.irh;
+                       if (irh->rflag) {
+                               /* We're expecting a response from Octeon.
+                                * It's up to lio_process_ordered_list() to
+                                * process sc. Add sc to the ordered soft
+                                * command response list because we expect
+                                * a response from Octeon.
+                                */
+                               rte_spinlock_lock(&lio_dev->response_list.lock);
+                               rte_atomic64_inc(
+                                   &lio_dev->response_list.pending_req_count);
+                               STAILQ_INSERT_TAIL(
+                                       &lio_dev->response_list.head,
+                                       &sc->node, entries);
+                               rte_spinlock_unlock(
+                                               &lio_dev->response_list.lock);
+                       } else {
+                               if (sc->callback) {
+                                       /* This callback must not sleep */
+                                       sc->callback(LIO_REQUEST_DONE,
+                                                    sc->callback_arg);
+                               }
+                       }
+                       break;
+               default:
+                       lio_dev_err(lio_dev,
+                                   "Unknown reqtype: %d buf: %p at idx %d\n",
+                                   reqtype, buf, old);
+               }
+               iq->request_list[old].buf = NULL;
+               iq->request_list[old].reqtype = 0;
+               inst_count++;
+               old = lio_incr_index(old, 1, iq->max_count);
+       }
+       iq->flush_index = old;
+       return inst_count;
+static void
+lio_update_read_index(struct lio_instr_queue *iq)
+       uint32_t pkt_in_done = rte_read32(iq->inst_cnt_reg);
+       uint32_t last_done;
+       last_done = pkt_in_done - iq->pkt_in_done;
+       iq->pkt_in_done = pkt_in_done;
+       /* Add last_done and modulo with the IQ size to get new index */
+       iq->lio_read_index = (iq->lio_read_index +
+                       (uint32_t)(last_done & LIO_PKT_IN_DONE_CNT_MASK)) %
+                       iq->max_count;
+lio_flush_iq(struct lio_device *lio_dev, struct lio_instr_queue *iq)
+       uint32_t tot_inst_processed = 0;
+       uint32_t inst_processed = 0;
+       int tx_done = 1;
+       if (rte_atomic64_test_and_set(&iq->iq_flush_running) == 0)
+               return tx_done;
+       rte_spinlock_lock(&iq->lock);
+       lio_update_read_index(iq);
+       do {
+               /* Process any outstanding IQ packets. */
+               if (iq->flush_index == iq->lio_read_index)
+                       break;
+               inst_processed = lio_process_iq_request_list(lio_dev, iq);
+               if (inst_processed)
+                       rte_atomic64_sub(&iq->instr_pending, inst_processed);
+               tot_inst_processed += inst_processed;
+               inst_processed = 0;
+       } while (1);
+       rte_spinlock_unlock(&iq->lock);
+       rte_atomic64_clear(&iq->iq_flush_running);
+       return tx_done;
 static int
 lio_send_command(struct lio_device *lio_dev, uint32_t iq_no, void *cmd,
                 void *buf, uint32_t datasize __rte_unused, uint32_t reqtype)
@@ -1385,6 +1528,35 @@ struct lio_soft_command *
+static inline uint32_t
+lio_iq_get_available(struct lio_device *lio_dev, uint32_t q_no)
+       return ((lio_dev->instr_queue[q_no]->max_count - 1) -
+               (uint32_t)rte_atomic64_read(
+                               &lio_dev->instr_queue[q_no]->instr_pending));
+static inline int
+lio_iq_is_full(struct lio_device *lio_dev, uint32_t q_no)
+       return ((uint32_t)rte_atomic64_read(
+                               &lio_dev->instr_queue[q_no]->instr_pending) >=
+                               (lio_dev->instr_queue[q_no]->max_count - 2));
+static int
+lio_dev_cleanup_iq(struct lio_device *lio_dev, int iq_no)
+       struct lio_instr_queue *iq = lio_dev->instr_queue[iq_no];
+       uint32_t count = 10000;
+       while ((lio_iq_get_available(lio_dev, iq_no) < LIO_FLUSH_WM(iq)) &&
+                       --count)
+               lio_flush_iq(lio_dev, iq);
+       return count ? 0 : 1;
 /** Send data packet to the device
  *  @param lio_dev - lio device pointer
  *  @param ndata   - control structure with queueing, and buffer information
@@ -1421,6 +1593,8 @@ struct lio_soft_command *
                goto xmit_failed;
+       lio_dev_cleanup_iq(lio_dev, iq_no);
        for (i = 0; i < nb_pkts; i++) {
                uint32_t pkt_len = 0;
@@ -1432,6 +1606,14 @@ struct lio_soft_command *
                ndata.buf = m;
                ndata.q_no = iq_no;
+               if (lio_iq_is_full(lio_dev, ndata.q_no)) {
+                       if (lio_dev_cleanup_iq(lio_dev, iq_no)) {
+                               PMD_TX_LOG(lio_dev, ERR,
+                                          "Transmit failed iq:%d full\n",
+                                          ndata.q_no);
+                               break;
+                       }
+               }
                cmdsetup.cmd_setup64 = 0;
                cmdsetup.s.iq_no = iq_no;
@@ -1524,8 +1706,11 @@ struct lio_soft_command *
-               if (unlikely(status == LIO_IQ_SEND_STOP))
+               if (unlikely(status == LIO_IQ_SEND_STOP)) {
                        PMD_TX_LOG(lio_dev, DEBUG, "iq full\n");
+                       /* create space as iq is full */
+                       lio_dev_cleanup_iq(lio_dev, iq_no);
+               }
diff --git a/drivers/net/liquidio/lio_rxtx.h b/drivers/net/liquidio/lio_rxtx.h
index b555bde..0a4cc2b 100644
--- a/drivers/net/liquidio/lio_rxtx.h
+++ b/drivers/net/liquidio/lio_rxtx.h
@@ -706,6 +706,7 @@ uint16_t lio_dev_xmit_pkts(void *tx_queue, struct rte_mbuf 
 int lio_setup_iq(struct lio_device *lio_dev, int q_index,
                 union octeon_txpciq iq_no, uint32_t num_descs, void *app_ctx,
                 unsigned int socket_id);
+int lio_flush_iq(struct lio_device *lio_dev, struct lio_instr_queue *iq);
 void lio_delete_instruction_queue(struct lio_device *lio_dev, int iq_no);
 /** Setup instruction queue zero for the device
  *  @param lio_dev which lio device to setup

Reply via email to