In this patch,
    * It is possible to switch the running mode of the distributor
using the command line argument.
    * With "-c" parameter, you can run RX and Distributor
on the same core.
    * Without "-c" parameter, you can run RX and Distributor
on the different core.
    * Consecutive termination of the lcores fixed.
The termination order was wrong, and you couldn't terminate the
application while traffic was capturing. The current order is
RX -> Distributor -> TX -> Workers
    * When "-c" parameter is active, the wasted distributor core is
also deactivated in the main function.

Signed-off-by: Abdullah Ömer Yamaç <omer.ya...@ceng.metu.edu.tr>

---
Cc: david.h...@intel.com
---
 doc/guides/sample_app_ug/dist_app.rst |   3 +-
 examples/distributor/main.c           | 222 ++++++++++++++++++--------
 2 files changed, 159 insertions(+), 66 deletions(-)

diff --git a/doc/guides/sample_app_ug/dist_app.rst 
b/doc/guides/sample_app_ug/dist_app.rst
index 3bd03905c3..5c80561187 100644
--- a/doc/guides/sample_app_ug/dist_app.rst
+++ b/doc/guides/sample_app_ug/dist_app.rst
@@ -42,11 +42,12 @@ Running the Application
 
    ..  code-block:: console
 
-       ./<build-dir>/examples/dpdk-distributor [EAL options] -- -p PORTMASK
+       ./<build-dir>/examples/dpdk-distributor [EAL options] -- -p PORTMASK 
[-c]
 
    where,
 
    *   -p PORTMASK: Hexadecimal bitmask of ports to configure
+   *   -c: Combines the RX core with distribution core
 
 #. To run the application in linux environment with 10 lcores, 4 ports,
    issue the command:
diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index 8995806b4e..fc9a75f695 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -39,6 +39,8 @@ volatile uint8_t quit_signal_rx;
 volatile uint8_t quit_signal_dist;
 volatile uint8_t quit_signal_work;
 unsigned int power_lib_initialised;
+bool enable_lcore_rx_distributor;
+unsigned int num_workers;
 
 static volatile struct app_stats {
        struct {
@@ -256,14 +258,82 @@ lcore_rx(struct lcore_params *p)
                }
                app_stats.rx.rx_pkts += nb_rx;
 
-/*
- * You can run the distributor on the rx core with this code. Returned
- * packets are then send straight to the tx core.
- */
-#if 0
-               rte_distributor_process(p->d, bufs, nb_rx);
-               const uint16_t nb_ret = rte_distributor_returned_pkts(p->d,
-                       bufs, BURST_SIZE*2);
+               /*
+                * Swap the following two lines if you want the rx traffic
+                * to go directly to tx, no distribution.
+                */
+               struct rte_ring *out_ring = p->rx_dist_ring;
+               /* struct rte_ring *out_ring = p->dist_tx_ring; */
+
+               uint16_t sent = rte_ring_enqueue_burst(out_ring,
+                               (void *)bufs, nb_rx, NULL);
+
+               app_stats.rx.enqueued_pkts += sent;
+               if (unlikely(sent < nb_rx)) {
+                       app_stats.rx.enqdrop_pkts +=  nb_rx - sent;
+                       RTE_LOG_DP(DEBUG, DISTRAPP,
+                               "%s:Packet loss due to full ring\n", __func__);
+                       while (sent < nb_rx)
+                               rte_pktmbuf_free(bufs[sent++]);
+               }
+               if (++port == nb_ports)
+                       port = 0;
+       }
+       if (power_lib_initialised)
+               rte_power_exit(rte_lcore_id());
+       printf("\nCore %u exiting rx task.\n", rte_lcore_id());
+       /* set distributor threads quit flag */
+       quit_signal_dist = 1;
+       return 0;
+}
+
+static int
+lcore_rx_and_distributor(struct lcore_params *p)
+{
+       struct rte_distributor *d = p->d;
+       const uint16_t nb_ports = rte_eth_dev_count_avail();
+       const int socket_id = rte_socket_id();
+       uint16_t port;
+       struct rte_mbuf *bufs[BURST_SIZE*2];
+
+       RTE_ETH_FOREACH_DEV(port) {
+               /* skip ports that are not enabled */
+               if ((enabled_port_mask & (1 << port)) == 0)
+                       continue;
+
+               if (rte_eth_dev_socket_id(port) > 0 &&
+                               rte_eth_dev_socket_id(port) != socket_id)
+                       printf("WARNING, port %u is on remote NUMA node to "
+                                       "RX thread.\n\tPerformance will not "
+                                       "be optimal.\n", port);
+       }
+
+       printf("\nCore %u doing packet RX and Distributor.\n", rte_lcore_id());
+       port = 0;
+       while (!quit_signal_rx) {
+
+               /* skip ports that are not enabled */
+               if ((enabled_port_mask & (1 << port)) == 0) {
+                       if (++port == nb_ports)
+                               port = 0;
+                       continue;
+               }
+               const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
+                               BURST_SIZE);
+               if (unlikely(nb_rx == 0)) {
+                       if (++port == nb_ports)
+                               port = 0;
+                       continue;
+               }
+               app_stats.rx.rx_pkts += nb_rx;
+
+               /*
+                * Run the distributor on the rx core. Returned
+                * packets are then send straight to the tx core.
+                */
+               rte_distributor_process(d, bufs, nb_rx);
+               const uint16_t nb_ret = rte_distributor_returned_pkts(d,
+                               bufs, BURST_SIZE*2);
 
                app_stats.rx.returned_pkts += nb_ret;
                if (unlikely(nb_ret == 0)) {
@@ -275,18 +345,6 @@ lcore_rx(struct lcore_params *p)
                struct rte_ring *tx_ring = p->dist_tx_ring;
                uint16_t sent = rte_ring_enqueue_burst(tx_ring,
                                (void *)bufs, nb_ret, NULL);
-#else
-               uint16_t nb_ret = nb_rx;
-               /*
-                * Swap the following two lines if you want the rx traffic
-                * to go directly to tx, no distribution.
-                */
-               struct rte_ring *out_ring = p->rx_dist_ring;
-               /* struct rte_ring *out_ring = p->dist_tx_ring; */
-
-               uint16_t sent = rte_ring_enqueue_burst(out_ring,
-                               (void *)bufs, nb_ret, NULL);
-#endif
 
                app_stats.rx.enqueued_pkts += sent;
                if (unlikely(sent < nb_ret)) {
@@ -301,9 +359,14 @@ lcore_rx(struct lcore_params *p)
        }
        if (power_lib_initialised)
                rte_power_exit(rte_lcore_id());
-       /* set worker & tx threads quit flag */
        printf("\nCore %u exiting rx task.\n", rte_lcore_id());
+       /* set tx threads quit flag */
        quit_signal = 1;
+       /* set worker threads quit flag */
+       quit_signal_work = 1;
+       rte_distributor_flush(d);
+       /* Unblock any returns so workers can exit */
+       rte_distributor_clear_returns(d);
        return 0;
 }
 
@@ -381,14 +444,16 @@ lcore_distributor(struct lcore_params *p)
                        }
                }
        }
-       printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
-       quit_signal_work = 1;
        if (power_lib_initialised)
                rte_power_exit(rte_lcore_id());
+       printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
+       /* set tx threads quit flag */
+       quit_signal = 1;
+       /* set worker threads quit flag */
+       quit_signal_work = 1;
        rte_distributor_flush(d);
        /* Unblock any returns so workers can exit */
        rte_distributor_clear_returns(d);
-       quit_signal_rx = 1;
        return 0;
 }
 
@@ -467,7 +532,7 @@ int_handler(int sig_num)
 {
        printf("Exiting on signal %d\n", sig_num);
        /* set quit flag for rx thread to exit */
-       quit_signal_dist = 1;
+       quit_signal_rx = 1;
 }
 
 static void
@@ -475,7 +540,6 @@ print_stats(void)
 {
        struct rte_eth_stats eth_stats;
        unsigned int i, j;
-       const unsigned int num_workers = rte_lcore_count() - 4;
 
        RTE_ETH_FOREACH_DEV(i) {
                rte_eth_stats_get(i, &eth_stats);
@@ -504,20 +568,22 @@ print_stats(void)
                        prev_app_stats.rx.enqdrop_pkts)/1000000.0,
                        ANSI_COLOR_RESET);
 
-       printf("Distributor thread:\n");
-       printf(" - In:          %5.2f\n",
-                       (app_stats.dist.in_pkts -
-                       prev_app_stats.dist.in_pkts)/1000000.0);
-       printf(" - Returned:    %5.2f\n",
-                       (app_stats.dist.ret_pkts -
-                       prev_app_stats.dist.ret_pkts)/1000000.0);
-       printf(" - Sent:        %5.2f\n",
-                       (app_stats.dist.sent_pkts -
-                       prev_app_stats.dist.sent_pkts)/1000000.0);
-       printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
-                       (app_stats.dist.enqdrop_pkts -
-                       prev_app_stats.dist.enqdrop_pkts)/1000000.0,
-                       ANSI_COLOR_RESET);
+       if (!enable_lcore_rx_distributor) {
+               printf("Distributor thread:\n");
+               printf(" - In:          %5.2f\n",
+                               (app_stats.dist.in_pkts -
+                               prev_app_stats.dist.in_pkts)/1000000.0);
+               printf(" - Returned:    %5.2f\n",
+                               (app_stats.dist.ret_pkts -
+                               prev_app_stats.dist.ret_pkts)/1000000.0);
+               printf(" - Sent:        %5.2f\n",
+                               (app_stats.dist.sent_pkts -
+                               prev_app_stats.dist.sent_pkts)/1000000.0);
+               printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
+                               (app_stats.dist.enqdrop_pkts -
+                               prev_app_stats.dist.enqdrop_pkts)/1000000.0,
+                               ANSI_COLOR_RESET);
+       }
 
        printf("TX thread:\n");
        printf(" - Dequeued:    %5.2f\n",
@@ -629,8 +695,9 @@ init_power_library(void)
 static void
 print_usage(const char *prgname)
 {
-       printf("%s [EAL options] -- -p PORTMASK\n"
-                       "  -p PORTMASK: hexadecimal bitmask of ports to 
configure\n",
+       printf("%s [EAL options] -- -p PORTMASK [-c]\n"
+                       "  -p PORTMASK: hexadecimal bitmask of ports to 
configure\n"
+                       "  -c: Combines the RX core with the distribution 
core\n",
                        prgname);
 }
 
@@ -661,8 +728,8 @@ parse_args(int argc, char **argv)
        };
 
        argvopt = argv;
-
-       while ((opt = getopt_long(argc, argvopt, "p:",
+       enable_lcore_rx_distributor = false;
+       while ((opt = getopt_long(argc, argvopt, "cp:",
                        lgopts, &option_index)) != EOF) {
 
                switch (opt) {
@@ -676,6 +743,10 @@ parse_args(int argc, char **argv)
                        }
                        break;
 
+               case 'c':
+                       enable_lcore_rx_distributor = true;
+                       break;
+
                default:
                        print_usage(prgname);
                        return -1;
@@ -705,6 +776,7 @@ main(int argc, char *argv[])
        unsigned int lcore_id, worker_id = 0;
        int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1;
        unsigned nb_ports;
+       unsigned int min_cores;
        uint16_t portid;
        uint16_t nb_ports_available;
        uint64_t t, freq;
@@ -724,12 +796,21 @@ main(int argc, char *argv[])
        if (ret < 0)
                rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-       if (rte_lcore_count() < 5)
+       if (enable_lcore_rx_distributor) {
+       /* RX and distributor combined, 3 fixed function cores (stat, TX, at 
least 1 worker) */
+               min_cores = 4;
+               num_workers = rte_lcore_count() - 3;
+       } else {
+       /* separate RX and distributor, 3 fixed function cores (stat, TX, at 
least 1 worker) */
+               min_cores = 5;
+               num_workers = rte_lcore_count() - 4;
+       }
+
+       if (rte_lcore_count() < min_cores)
                rte_exit(EXIT_FAILURE, "Error, This application needs at "
-                               "least 5 logical cores to run:\n"
+                               "least 4 logical cores to run:\n"
                                "1 lcore for stats (can be core 0)\n"
-                               "1 lcore for packet RX\n"
-                               "1 lcore for distribution\n"
+                               "1 or 2 lcore for packet RX and distribution\n"
                                "1 lcore for packet TX\n"
                                "and at least 1 lcore for worker threads\n");
 
@@ -772,7 +853,7 @@ main(int argc, char *argv[])
        }
 
        d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-                       rte_lcore_count() - 4,
+                       num_workers,
                        RTE_DIST_ALG_BURST);
        if (d == NULL)
                rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
@@ -808,7 +889,7 @@ main(int argc, char *argv[])
                        if (lcore_cap.priority != 1)
                                continue;
 
-                       if (distr_core_id < 0) {
+                       if (distr_core_id < 0 && !enable_lcore_rx_distributor) {
                                distr_core_id = lcore_id;
                                printf("Distributor on priority core %d\n",
                                        lcore_id);
@@ -839,7 +920,7 @@ main(int argc, char *argv[])
                                lcore_id == (unsigned int)rx_core_id ||
                                lcore_id == (unsigned int)tx_core_id)
                        continue;
-               if (distr_core_id < 0) {
+               if (distr_core_id < 0 && !enable_lcore_rx_distributor) {
                        distr_core_id = lcore_id;
                        printf("Distributor on core %d\n", lcore_id);
                        continue;
@@ -856,7 +937,12 @@ main(int argc, char *argv[])
                }
        }
 
-       printf(" tx id %d, dist id %d, rx id %d\n",
+       if (enable_lcore_rx_distributor)
+               printf(" tx id %d, rx id %d\n",
+                       tx_core_id,
+                       rx_core_id);
+       else
+               printf(" tx id %d, dist id %d, rx id %d\n",
                        tx_core_id,
                        distr_core_id,
                        rx_core_id);
@@ -888,15 +974,16 @@ main(int argc, char *argv[])
                        dist_tx_ring, tx_core_id);
 
        /* Start distributor core */
-       struct lcore_params *pd =
-               rte_malloc(NULL, sizeof(*pd), 0);
-       if (!pd)
-               rte_panic("malloc failure\n");
-       *pd = (struct lcore_params){worker_id++, d,
-               rx_dist_ring, dist_tx_ring, mbuf_pool};
-       rte_eal_remote_launch(
-                       (lcore_function_t *)lcore_distributor,
-                       pd, distr_core_id);
+       struct lcore_params *pd = NULL;
+       if (!enable_lcore_rx_distributor) {
+               pd = rte_malloc(NULL, sizeof(*pd), 0);
+               if (!pd)
+                       rte_panic("malloc failure\n");
+               *pd = (struct lcore_params){worker_id++, d,
+                       rx_dist_ring, dist_tx_ring, mbuf_pool};
+               rte_eal_remote_launch((lcore_function_t *)lcore_distributor,
+                               pd, distr_core_id);
+       }
 
        /* Start rx core */
        struct lcore_params *pr =
@@ -905,12 +992,16 @@ main(int argc, char *argv[])
                rte_panic("malloc failure\n");
        *pr = (struct lcore_params){worker_id++, d, rx_dist_ring,
                dist_tx_ring, mbuf_pool};
-       rte_eal_remote_launch((lcore_function_t *)lcore_rx,
-                       pr, rx_core_id);
+       if (enable_lcore_rx_distributor)
+               rte_eal_remote_launch((lcore_function_t 
*)lcore_rx_and_distributor,
+                               pr, rx_core_id);
+       else
+               rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+                               pr, rx_core_id);
 
        freq = rte_get_timer_hz();
        t = rte_rdtsc() + freq;
-       while (!quit_signal_dist) {
+       while (!quit_signal) {
                if (t < rte_rdtsc()) {
                        print_stats();
                        t = rte_rdtsc() + freq;
@@ -925,7 +1016,8 @@ main(int argc, char *argv[])
 
        print_stats();
 
-       rte_free(pd);
+       if (pd)
+               rte_free(pd);
        rte_free(pr);
 
        /* clean up the EAL */
-- 
2.27.0

Reply via email to