Currently, worker uses burst dequeue and burst enqueue to forward events.
Add a non burst mode based on the event dev capabilities.

Signed-off-by: Pavan Nikhilesh <pbhagavat...@caviumnetworks.com>
---
 examples/eventdev_pipeline_sw_pmd/main.c           |  12 +-
 .../pipeline_worker_generic.c                      | 153 ++++++++++++++++++++-
 2 files changed, 160 insertions(+), 5 deletions(-)

diff --git a/examples/eventdev_pipeline_sw_pmd/main.c 
b/examples/eventdev_pipeline_sw_pmd/main.c
index 2e80841d0..153467893 100644
--- a/examples/eventdev_pipeline_sw_pmd/main.c
+++ b/examples/eventdev_pipeline_sw_pmd/main.c
@@ -383,8 +383,16 @@ static void
 do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id)
 {
        RTE_SET_USED(nb_ethdev);
-       RTE_SET_USED(eventdev_id);
-       set_worker_generic_setup_data(&fdata->cap, 1);
+       uint8_t burst = 0;
+
+       struct rte_event_dev_info eventdev_info;
+       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+
+       rte_event_dev_info_get(eventdev_id, &eventdev_info);
+       burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
+               0;
+
+       set_worker_generic_setup_data(&fdata->cap, burst);
 }
 
 static void
diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c 
b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
index a72b7b2f9..5998aae95 100644
--- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
+++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c
@@ -1,5 +1,91 @@
+/*
+ *   BSD LICENSE
+ *
+ *   Copyright 2016 Intel Corporation.
+ *   Copyright 2016 Cavium, Inc.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Cavium, Inc nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
 #include "pipeline_common.h"
 
+static __rte_always_inline int
+worker_generic(void *arg)
+{
+       struct rte_event ev;
+
+       struct worker_data *data = (struct worker_data *)arg;
+       uint8_t dev_id = data->dev_id;
+       uint8_t port_id = data->port_id;
+       size_t sent = 0, received = 0;
+       unsigned int lcore_id = rte_lcore_id();
+
+       while (!fdata->done) {
+
+               if (fdata->cap.schedule_loop)
+                       fdata->cap.schedule_loop(lcore_id);
+
+               if (!fdata->worker_core[lcore_id]) {
+                       rte_pause();
+                       continue;
+               }
+
+               const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+                               &ev, 1, 0);
+
+               if (nb_rx == 0) {
+                       rte_pause();
+                       continue;
+               }
+               received++;
+
+               /* The first worker stage does classification */
+               if (ev.queue_id == cdata.qid[0])
+                       ev.flow_id = ev.mbuf->hash.rss
+                                               % cdata.num_fids;
+
+               ev.queue_id = cdata.next_qid[ev.queue_id];
+               ev.op = RTE_EVENT_OP_FORWARD;
+               ev.sched_type = cdata.queue_type;
+
+               work(ev.mbuf);
+
+               while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
+                       rte_pause();
+               sent++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu TX=%zu\n",
+                               rte_lcore_id(), received, sent);
+
+       return 0;
+}
+
 static int
 worker_generic_burst(void *arg)
 {
@@ -60,6 +146,63 @@ worker_generic_burst(void *arg)
        return 0;
 }
 
+static __rte_always_inline int
+consumer(void)
+{
+       const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+       struct rte_event packet;
+
+       static uint64_t received;
+       static uint64_t last_pkts;
+       static uint64_t last_time;
+       static uint64_t start_time;
+       int i;
+       uint8_t dev_id = cons_data.dev_id;
+       uint8_t port_id = cons_data.port_id;
+
+       do {
+               uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+                               &packet, 1, 0);
+
+               if (n == 0) {
+                       for (i = 0; i < rte_eth_dev_count(); i++)
+                               rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
+                       return 0;
+               }
+               if (start_time == 0)
+                       last_time = start_time = rte_get_timer_cycles();
+
+               received++;
+               uint8_t outport = packet.mbuf->port;
+               rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+                               packet.mbuf);
+
+               /* Print out mpps every 1<22 packets */
+               if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+                       const uint64_t now = rte_get_timer_cycles();
+                       const uint64_t total_ms = (now - start_time) / freq_khz;
+                       const uint64_t delta_ms = (now - last_time) / freq_khz;
+                       uint64_t delta_pkts = received - last_pkts;
+
+                       printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
+                                       "avg %.3f mpps [current %.3f mpps]\n",
+                                       received,
+                                       total_ms,
+                                       received / (total_ms * 1000.0),
+                                       delta_pkts / (delta_ms * 1000.0));
+                       last_pkts = received;
+                       last_time = now;
+               }
+
+               cdata.num_packets--;
+               if (cdata.num_packets <= 0)
+                       fdata->done = 1;
+       /* Be stuck in this loop if single. */
+       } while (!fdata->done && fdata->tx_single);
+
+       return 0;
+}
+
 static __rte_always_inline int
 consumer_burst(void)
 {
@@ -412,9 +555,13 @@ generic_opt_check(void)
 void
 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
 {
-       RTE_SET_USED(burst);
-       caps->consumer_loop = consumer_burst;
-       caps->worker_loop = worker_generic_burst;
+       if (burst) {
+               caps->consumer_loop = consumer_burst;
+               caps->worker_loop = worker_generic_burst;
+       } else {
+               caps->consumer_loop = consumer;
+               caps->worker_loop = worker_generic;
+       }
 
        caps->opt_check = generic_opt_check;
        caps->rx_adapter_setup = init_rx_adapter;
-- 
2.14.1

Reply via email to