Adding burst no Tx internal-port eventmode worker

Signed-off-by: Anoob Joseph <ano...@marvell.com>
Signed-off-by: Lukasz Bartosik <lbarto...@marvell.com>
---
 examples/l2fwd-event/l2fwd_worker.c | 219 +++++++++++++++++++++++++++++++++++-
 1 file changed, 218 insertions(+), 1 deletion(-)

diff --git a/examples/l2fwd-event/l2fwd_worker.c 
b/examples/l2fwd-event/l2fwd_worker.c
index 0a413a4..251c546 100644
--- a/examples/l2fwd-event/l2fwd_worker.c
+++ b/examples/l2fwd-event/l2fwd_worker.c
@@ -315,7 +315,7 @@ l2fwd_poll_mode_worker(void)
  */
 
 /* Workers registered */
-#define L2FWD_EVENTMODE_WORKERS                2
+#define L2FWD_EVENTMODE_WORKERS                3
 
 /*
  * Event mode worker
@@ -639,6 +639,214 @@ l2fwd_eventmode_non_burst_tx_internal_port(void *args)
                rte_free(links);
 }
 
+/*
+ * Event mode worker
+ * Operating mode : burst no internal port (regular tx worker)
+ */
+static void
+l2fwd_eventmode_burst_no_internal_port(void *args)
+{
+       struct rte_event ev[MAX_PKT_BURST];
+       struct rte_mbuf *pkt;
+       struct rte_eventmode_helper_conf *mode_conf;
+       struct rte_eventmode_helper_event_link_info *links = NULL;
+       unsigned int lcore_nb_link = 0;
+       uint32_t lcore_id;
+       unsigned int i, j, left, nb_rx = 0;
+       unsigned int portid;
+       struct lcore_queue_conf *qconf;
+       int is_master_core;
+       struct rte_event_port_conf event_port_conf;
+       uint16_t deq_len = 0, enq_len = 0;
+       uint8_t tx_queue;
+       struct tsc_tracker tsc = {0};
+
+       /* Get core ID */
+       lcore_id = rte_lcore_id();
+
+       RTE_LOG(INFO, L2FWD,
+               "Launching event mode burst worker no internal port "
+               "(regular tx worker) on lcore %d\n", lcore_id);
+
+       /* Set the flag if master core */
+       is_master_core = (lcore_id == rte_get_master_lcore()) ? 1 : 0;
+
+       /* Get qconf for this core */
+       qconf = &lcore_queue_conf[lcore_id];
+
+       /* Set drain tsc */
+       tsc.drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
+                       US_PER_S * BURST_TX_DRAIN_US;
+
+       /* Mode conf will be passed as args */
+       mode_conf = (struct rte_eventmode_helper_conf *)args;
+
+       /* Get the links configured for this lcore */
+       lcore_nb_link = rte_eventmode_helper_get_event_lcore_links(lcore_id,
+                       mode_conf, &links);
+
+       /* Check if we have links registered for this lcore */
+       if (lcore_nb_link == 0) {
+               /* No links registered. The core could do periodic drains */
+               l2fwd_drain_loop(qconf, &tsc, is_master_core);
+               goto clean_and_exit;
+       }
+
+       /* We have valid links */
+
+       /* Reset stats before proceeding */
+       reset_eth_stats(is_master_core);
+
+       /*
+        * There is no internal port between ethdev and eventdev. So the worker
+        * thread needs to submit event to a designated tx queue. Internally
+        * eth core would receive events from multiple worker threads and send
+        * out packets on wire.
+        */
+       tx_queue = rte_eventmode_helper_get_tx_queue(mode_conf,
+                                                    links[0].eventdev_id);
+
+       /* Get the burst size of the event device */
+
+       /* Get the default conf of the first link */
+       rte_event_port_default_conf_get(links[0].eventdev_id,
+                       links[0].event_portid,
+                       &event_port_conf);
+
+       /* Save the burst size */
+       deq_len = event_port_conf.dequeue_depth;
+       enq_len = event_port_conf.enqueue_depth;
+
+       /* Dequeue and enqueue length should not exceed MAX_PKT_BURST */
+       if (deq_len > MAX_PKT_BURST)
+               deq_len = MAX_PKT_BURST;
+       if (enq_len > MAX_PKT_BURST)
+               enq_len = MAX_PKT_BURST;
+
+       /* See if it's single link */
+       if (lcore_nb_link == 1)
+               goto single_link_loop;
+       else
+               goto multi_link_loop;
+
+single_link_loop:
+
+       RTE_LOG(INFO, L2FWD, " -- lcoreid=%u event_port_id=%u\n", lcore_id,
+               links[0].event_portid);
+
+       while (!force_quit) {
+
+               /* Do periodic operations (buffer drain & stats monitor) */
+               l2fwd_periodic_drain_stats_monitor(qconf, &tsc, is_master_core);
+
+               /* Read packet from event queues */
+               nb_rx = rte_event_dequeue_burst(links[0].eventdev_id,
+                               links[0].event_portid,
+                               ev,             /* events */
+                               deq_len,        /* nb_events */
+                               0               /* timeout_ticks */);
+
+               if (nb_rx == 0)
+                       continue;
+
+               for (j = 0; j < nb_rx; j++) {
+
+                       portid = ev[j].queue_id;
+                       port_statistics[portid].rx++;
+                       pkt = ev[j].mbuf;
+
+                       rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
+
+                       /* Process packet */
+                       l2fwd_event_pre_forward(&(ev[j]), portid);
+
+                       /*
+                        * Internal port is not available, the packet needs
+                        * to be enqueued to the designated event queue.
+                        */
+
+                       /* Prepare event for submission to tx event queue */
+                       l2fwd_event_switch_to_tx_queue(&(ev[j]), tx_queue);
+               }
+
+               for (j = 0, left = nb_rx;
+                       j < (nb_rx + enq_len - 1)/enq_len; j++) {
+
+                       /* Submit the updated events for tx stage */
+                       left -= rte_event_enqueue_burst(links[0].eventdev_id,
+                                       links[0].event_portid,
+                                       &(ev[j*enq_len]), /* events */
+                                       left > enq_len ?
+                                       enq_len : left /* nb_events */);
+               }
+       }
+       goto clean_and_exit;
+
+multi_link_loop:
+
+       for (i = 0; i < lcore_nb_link; i++) {
+               RTE_LOG(INFO, L2FWD, " -- lcoreid=%u event_port_id=%u\n",
+                       lcore_id, links[i].event_portid);
+       }
+
+       while (!force_quit) {
+
+               /* Do periodic operations (buffer drain & stats monitor) */
+               l2fwd_periodic_drain_stats_monitor(qconf, &tsc, is_master_core);
+
+               for (i = 0; i < lcore_nb_link; i++) {
+                       /* Read packet from event queues */
+                       nb_rx = rte_event_dequeue_burst(links[i].eventdev_id,
+                                       links[i].event_portid,
+                                       ev,             /* events */
+                                       deq_len,        /* nb_events */
+                                       0               /* timeout_ticks */);
+
+                       if (nb_rx == 0)
+                               continue;
+
+                       for (j = 0; j < nb_rx; j++) {
+
+                               portid = ev[j].queue_id;
+                               port_statistics[portid].rx++;
+                               pkt = ev[j].mbuf;
+
+                               rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
+
+                               /* Process packet */
+                               l2fwd_event_pre_forward(&(ev[j]), portid);
+
+                               /*
+                                * Internal port is not available, the packet
+                                * needs to be enqueued to the designated event
+                                * queue.
+                                */
+
+                               /* Update the scheduling type for tx stage */
+                               l2fwd_event_switch_to_tx_queue(&(ev[j]),
+                                               tx_queue);
+                       }
+
+                       for (j = 0, left = nb_rx;
+                               j < (nb_rx + enq_len - 1)/enq_len; j++) {
+
+                               /* Submit the updated events for tx stage */
+                               left -= rte_event_enqueue_burst(
+                                       links[i].eventdev_id,
+                                       links[i].event_portid,
+                                       &(ev[j*enq_len]), /* events */
+                                       left > enq_len ?
+                                       enq_len : left /* nb_events */);
+                       }
+               }
+       }
+       goto clean_and_exit;
+
+clean_and_exit:
+       if (links != NULL)
+               rte_free(links);
+}
+
 static uint8_t
 l2fwd_eventmode_populate_wrkr_params(
                struct rte_eventmode_helper_app_worker_params *wrkrs)
@@ -665,6 +873,15 @@ l2fwd_eventmode_populate_wrkr_params(
        wrkr->worker_thread = l2fwd_eventmode_non_burst_tx_internal_port;
 
        nb_wrkr_param++;
+       wrkr++;
+
+       /* Burst no internal port (regular tx worker) */
+       wrkr->cap.burst = RTE_EVENTMODE_HELPER_RX_TYPE_BURST;
+       wrkr->cap.tx_internal_port =
+                       RTE_EVENTMODE_HELPER_TX_TYPE_NO_INTERNAL_PORT;
+       wrkr->worker_thread = l2fwd_eventmode_burst_no_internal_port;
+
+       nb_wrkr_param++;
        return nb_wrkr_param;
 }
 
-- 
2.7.4

Reply via email to