Add optimized eventdev pipeline when ethdev supports thread safe Tx
and number of configured stages is one.

Signed-off-by: Pavan Nikhilesh <pbhagavat...@caviumnetworks.com>
---
 .../eventdev_pipeline_sw_pmd/pipeline_worker_tx.c  | 109 +++++++++++++++++++--
 1 file changed, 101 insertions(+), 8 deletions(-)

diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c 
b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
index e25a06027..15df21b7e 100644
--- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
+++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c
@@ -68,6 +68,91 @@ worker_tx_pkt(struct rte_mbuf *mbuf)
                rte_pause();
 }
 
+/* Single stage pipeline workers */
+
+static int
+worker_do_tx_single(void *arg)
+{
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       size_t fwd = 0, received = 0, tx = 0;
+       struct rte_event ev;
+
+       while (!fdata->done) {
+
+               if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+                       rte_pause();
+                       continue;
+               }
+
+               received++;
+
+               if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                       worker_tx_pkt(ev.mbuf);
+                       tx++;
+                       continue;
+               }
+               work(ev.mbuf);
+               ev.queue_id++;
+               worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+               worker_event_enqueue(dev, port, &ev);
+               fwd++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+       return 0;
+}
+
+static int
+worker_do_tx_single_burst(void *arg)
+{
+       struct rte_event ev[BATCH_SIZE + 1];
+
+       struct worker_data *data = (struct worker_data *)arg;
+       const uint8_t dev = data->dev_id;
+       const uint8_t port = data->port_id;
+       size_t fwd = 0, received = 0, tx = 0;
+
+       while (!fdata->done) {
+               uint16_t i;
+               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+                               BATCH_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+               received += nb_rx;
+
+               for (i = 0; i < nb_rx; i++) {
+                       rte_prefetch0(ev[i + 1].mbuf);
+                       if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+                               worker_tx_pkt(ev[i].mbuf);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
+                               tx++;
+
+                       } else {
+                               ev[i].queue_id++;
+                               worker_fwd_event(&ev[i],
+                                               RTE_SCHED_TYPE_ATOMIC);
+                       }
+                       work(ev[i].mbuf);
+               }
+
+               worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_rx;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+       return 0;
+}
+
 /* Multi stage Pipeline Workers */
 
 static int
@@ -265,6 +350,7 @@ worker_do_tx_burst_atq(void *arg)
                                worker_fwd_event(&ev[i],
                                                cdata.queue_type);
                        }
+                       work(ev[i].mbuf);
                }
 
                worker_event_enqueue_burst(dev, port, ev, nb_rx);
@@ -610,14 +696,21 @@ set_worker_tx_setup_data(struct setup_data *caps, bool 
burst)
 {
        uint8_t atq = cdata.all_type_queues ? 1 : 0;
 
-       if (burst && atq)
-               caps->worker_loop = worker_do_tx_burst_atq;
-       if (burst && !atq)
-               caps->worker_loop = worker_do_tx_burst;
-       if (!burst && atq)
-               caps->worker_loop = worker_do_tx_atq;
-       if (!burst && !atq)
-               caps->worker_loop = worker_do_tx;
+       if (cdata.num_stages == 1) {
+               if (burst)
+                       caps->worker_loop = worker_do_tx_single_burst;
+               if (!burst)
+                       caps->worker_loop = worker_do_tx_single;
+       } else {
+               if (burst && atq)
+                       caps->worker_loop = worker_do_tx_burst_atq;
+               if (burst && !atq)
+                       caps->worker_loop = worker_do_tx_burst;
+               if (!burst && atq)
+                       caps->worker_loop = worker_do_tx_atq;
+               if (!burst && !atq)
+                       caps->worker_loop = worker_do_tx;
+       }
 
        caps->opt_check = opt_check;
        caps->consumer_loop = NULL;
-- 
2.14.1

Reply via email to