Hi Pavan,

Few comments below.

On 8/31/2018 4:10 PM, Pavan Nikhilesh wrote:
Convert existing Tx service based pipeline to Tx adapter based APIs and
simplify worker functions.

Signed-off-by: Pavan Nikhilesh <pbhagavat...@caviumnetworks.com>
---
  app/test-eventdev/test_pipeline_atq.c    | 216 +++++++++++---------
  app/test-eventdev/test_pipeline_common.c | 193 ++++++------------
  app/test-eventdev/test_pipeline_common.h |  43 ++--
  app/test-eventdev/test_pipeline_queue.c  | 238 ++++++++++++-----------
  4 files changed, 322 insertions(+), 368 deletions(-)

diff --git a/app/test-eventdev/test_pipeline_atq.c 
b/app/test-eventdev/test_pipeline_atq.c
</snip>
-static int
+static __rte_noinline int
  pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
  {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
-       const uint8_t nb_stages = t->opt->nb_stages;
-       const uint8_t tx_queue = t->tx_service.queue_id;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
while (t->done == false) {
                uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
@@ -253,9 +235,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
if (cq_id == last_queue) {
                                w->processed_pkts++;
-                               ev[i].queue_id = tx_queue;
+                               ev[i].queue_id = tx_queue[ev[i].mbuf->port];
                                pipeline_fwd_event(&ev[i],
                                                RTE_SCHED_TYPE_ATOMIC);
+
Unintentional newline ?
                        } else {
                                ev[i].sub_event_type++;
                                pipeline_fwd_event(&ev[i],
  static int
@@ -317,23 +296,25 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
        int nb_ports;
        int nb_queues;
        uint8_t queue;
-       struct rte_event_dev_info info;
-       struct test_pipeline *t = evt_test_priv(test);
-       uint8_t tx_evqueue_id = 0;
+       uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS] = {0};
        uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
        uint8_t nb_worker_queues = 0;
+       uint8_t tx_evport_id = 0;
+       uint16_t prod = 0;
+       struct rte_event_dev_info info;
+       struct test_pipeline *t = evt_test_priv(test);
nb_ports = evt_nr_active_lcores(opt->wlcores);
        nb_queues = rte_eth_dev_count_avail();
- /* One extra port and queueu for Tx service */
-       if (t->mt_unsafe) {
-               tx_evqueue_id = nb_queues;
-               nb_ports++;
-               nb_queues++;
+       /* One queue for Tx service */
+       if (!t->internal_port) {
See comment about struct test_pipeline::internal_port in the test_pipeline_common.h review below.

+               RTE_ETH_FOREACH_DEV(prod) {
+                       tx_evqueue_id[prod] = nb_queues;
+                       nb_queues++;
+               }
        }
@@ -388,14 +371,11 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
                        .new_event_threshold = info.max_num_events,
        };
- if (t->mt_unsafe) {
+       if (!t->internal_port) {
                ret = pipeline_event_port_setup(test, opt, queue_arr,
                                nb_worker_queues, p_conf);
                if (ret)
                        return ret;
-
-               ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
-                               nb_ports - 1, p_conf);
        } else
                ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
                                p_conf);
@@ -424,14 +404,17 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct 
evt_options *opt)
         *      stride = 1
         *
         *      event queue pipelines:
-        *      eth0 -> q0
-        *                } (q3->tx) Tx service
-        *      eth1 -> q1
+        *      eth0 -> q0 \
+        *                  q3->tx
+        *      eth1 -> q1 /
         *
         *      q0,q1 are configured as stated above.
         *      q3 configured as SINGLE_LINK|ATOMIC.
         */
        ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
+       if (ret)
+               return ret;
+       ret = pipeline_event_tx_adapter_setup(opt, p_conf);
pipeline_event_tx_adapter_setup() creates a tx adapter per eth port, that doesn't match the preceding diagram.

diff --git a/app/test-eventdev/test_pipeline_common.c b/app/test-eventdev/test_pipeline_common.c
index a54068df3..7f858e23f 100644
--- a/app/test-eventdev/test_pipeline_common.c
+++ b/app/test-eventdev/test_pipeline_common.c
@@ -5,58 +5,6 @@

</snip>

@@ -215,7 +160,6 @@ pipeline_ethdev_setup(struct evt_test *test, struct 
evt_options *opt)
  {
        uint16_t i;
        uint8_t nb_queues = 1;
-       uint8_t mt_state = 0;
        struct test_pipeline *t = evt_test_priv(test);
        struct rte_eth_rxconf rx_conf;
        struct rte_eth_conf port_conf = {
@@ -238,13 +182,13 @@ pipeline_ethdev_setup(struct evt_test *test, struct 
evt_options *opt)
                return -ENODEV;
        }
+ t->internal_port = 0;
        RTE_ETH_FOREACH_DEV(i) {
                struct rte_eth_dev_info dev_info;
                struct rte_eth_conf local_port_conf = port_conf;
+               uint32_t caps = 0;
rte_eth_dev_info_get(i, &dev_info);
-               mt_state = !(dev_info.tx_offload_capa &
-                               DEV_TX_OFFLOAD_MT_LOCKFREE);
                rx_conf = dev_info.default_rxconf;
                rx_conf.offloads = port_conf.rxmode.offloads;
@@ -279,11 +223,9 @@ pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
                        return -EINVAL;
                }
- t->mt_unsafe |= mt_state;
-               t->tx_service.tx_buf[i] =
-                       rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(BURST_SIZE), 0);
-               if (t->tx_service.tx_buf[i] == NULL)
-                       rte_panic("Unable to allocate Tx buffer memory.");
+               rte_event_eth_tx_adapter_caps_get(opt->dev_id, i, &caps);
+               if ((caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT))
+                       t->internal_port = 1;See comment about struct 
test_pipeline::internal_port below
                rte_eth_promiscuous_enable(i);
        }
@@ -295,7 +237,6 @@ pipeline_event_port_setup(struct evt_test *test, struct evt_options *opt,
                uint8_t *queue_arr, uint8_t nb_queues,
                const struct rte_event_port_conf p_conf)
  {
-       int i;
        int ret;
        uint8_t port;
        struct test_pipeline *t = evt_test_priv(test);
@@ -321,10 +262,9 @@ pipeline_event_port_setup(struct evt_test *test, struct 
evt_options *opt,
                                                0) != nb_queues)
                                goto link_fail;
                } else {
-                       for (i = 0; i < nb_queues; i++) {
-                               if (rte_event_port_link(opt->dev_id, port,
-                                               &queue_arr[i], NULL, 1) != 1)
-                                       goto link_fail;
+                       if (rte_event_port_link(opt->dev_id, port, queue_arr,
+                                       NULL, nb_queues) != nb_queues) {
+                               goto link_fail;
                        }
Minor, isn't it possible to replace the if (queue_arr == NULL) {} else {} block with a single call to rte_event_port_link() ?

diff --git a/app/test-eventdev/test_pipeline_common.h b/app/test-eventdev/test_pipeline_common.h
index 9cd6b905b..b8939db81 100644
--- a/app/test-eventdev/test_pipeline_common.h
+++ b/app/test-eventdev/test_pipeline_common.h
@@ -14,6 +14,7 @@
  #include <rte_ethdev.h>
  #include <rte_eventdev.h>
  #include <rte_event_eth_rx_adapter.h>
+#include <rte_event_eth_tx_adapter.h>
  #include <rte_lcore.h>
  #include <rte_malloc.h>
  #include <rte_mempool.h>
@@ -26,6 +27,9 @@
  #include "evt_options.h"
  #include "evt_test.h"
+#define PIPELINE_TX_ADPTR_ENQ 0x1
+#define PIPELINE_TX_ADPTR_FWD 0x2
+
I don't see a reference to these defines.

  struct test_pipeline;
struct worker_data {
@@ -35,30 +39,19 @@ struct worker_data {
        struct test_pipeline *t;
  } __rte_cache_aligned;
-struct tx_service_data {
-       uint8_t dev_id;
-       uint8_t queue_id;
-       uint8_t port_id;
-       uint32_t service_id;
-       uint64_t processed_pkts;
-       uint16_t nb_ethports;
-       struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
-       struct test_pipeline *t;
-} __rte_cache_aligned;
-
  struct test_pipeline {
        /* Don't change the offset of "done". Signal handler use this memory
         * to terminate all lcores work.
         */
        int done;
        uint8_t nb_workers;
-       uint8_t mt_unsafe;
Can we also replace references to mt_unsafe in comments ?

+       uint8_t internal_port;
Shouldn't internal_port be a per eth device flag ? Or is there an assumption that all eth devices will be such that the eventdev PMD's internal port capability will be set ?


diff --git a/app/test-eventdev/test_pipeline_queue.c 
b/app/test-eventdev/test_pipeline_queue.c
index 2e0d93d99..e1153573b 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -15,7 +15,7 @@ pipeline_queue_nb_event_queues(struct evt_options *opt)
        return (eth_count * opt->nb_stages) + eth_count;
  }
-static int
+static __rte_noinline int
  pipeline_queue_worker_single_stage_tx(void *arg)
  {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
@@ -29,9 +29,12 @@ pipeline_queue_worker_single_stage_tx(void *arg)
                }
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                       pipeline_tx_pkt(ev.mbuf);
+                       rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
+                       rte_event_eth_tx_adapter_enqueue(dev, port,
+                                       &ev, 1);
Do we need a retry loop for enqueue above and at other usages in this patch ?

                        w->processed_pkts++;
                } else {
+
new line is intentional ?
                        ev.queue_id++;
                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
                        pipeline_event_enqueue(dev, port, &ev);
@@ -41,11 +44,11 @@ pipeline_queue_worker_single_stage_tx(void *arg)
        return 0;
  }

Nikhil

Reply via email to