This commit adds telemetry introducing the callback functions
and returning measured values

Signed-off-by: Ronan Randles <ronan.rand...@intel.com>
---
 examples/generator/main.c | 142 +++++++++++++++++++++++++++++++-------
 1 file changed, 118 insertions(+), 24 deletions(-)

diff --git a/examples/generator/main.c b/examples/generator/main.c
index 1ddf4c1603..1ac3caafcc 100644
--- a/examples/generator/main.c
+++ b/examples/generator/main.c
@@ -13,6 +13,7 @@
 #include <rte_lcore.h>
 #include <rte_mbuf.h>
 #include <rte_gen.h>
+#include <rte_telemetry.h>
 
 #define RX_RING_SIZE 1024
 #define TX_RING_SIZE 1024
@@ -32,6 +33,27 @@ static volatile int done;
 static struct rte_mempool *mbuf_pool;
 struct rte_gen *gen;
 
+struct gen_args {
+       /* Inputs */
+       struct rte_gen *gen;
+
+       /* Outputs */
+       uint64_t tx_total_packets;
+       uint64_t rx_total_packets;
+       uint64_t rx_missed_total;
+       uint64_t tx_failed;
+       uint64_t last_tx_total;
+       uint64_t measured_tx_pps;
+} __rte_cache_aligned;
+/* Expose a struct as a global so the telemetry callbacks can access the
+ * data required to provide stats etc.
+ */
+struct telemetry_userdata {
+       struct gen_args *prod;
+       struct gen_args *cons;
+};
+static struct telemetry_userdata telemetry_userdata;
+
 static void handle_sigint(int sig);
 
 /* Initializes a given port using global settings and with the RX buffers
@@ -123,10 +145,11 @@ port_init(uint16_t port, struct rte_mempool *mbuf_pool)
  * an input port and writing to an output port.
  */
 static int
-lcore_producer(__rte_unused void *arg)
+lcore_producer(void *arg)
 {
+       struct gen_args *args = arg;
+       struct rte_gen *gen = args->gen;
        uint16_t port;
-
        /* Check that the port is on the same NUMA node as the polling thread
         * for best performance.
         */
@@ -138,25 +161,34 @@ lcore_producer(__rte_unused void *arg)
                                        "polling thread.\n\tPerformance will "
                                        "not be optimal.\n", port);
 
+       uint64_t tsc_hz = rte_get_tsc_hz();
+       uint64_t last_tsc_reading = 0;
+       uint64_t last_tx_total = 0;
+
        /* Run until the application is quit or killed. */
        while (!done) {
                struct rte_mbuf *bufs[BURST_SIZE];
-               int i;
+               uint16_t nb_tx = 0;
                /* Receive packets from gen and then tx them over port */
                RTE_ETH_FOREACH_DEV(port) {
-                       int nb_recieved = rte_gen_rx_burst(gen, bufs,
+                       int nb_generated = rte_gen_rx_burst(gen, bufs,
                                                        BURST_SIZE);
-                       for (i = 0; i < nb_recieved; i++) {
-                               bufs[i]->pkt_len = 64;
-                               bufs[i]->data_len = 64;
-                       }
 
-                       uint16_t nb_tx = rte_eth_tx_burst(port, 0, bufs,
-                                                       nb_recieved);
-                       if (nb_tx != nb_recieved)
-                               rte_pktmbuf_free_bulk(&bufs[nb_tx],
-                                                       (nb_recieved - nb_tx));
+                       uint64_t start_tsc = rte_rdtsc();
+                       if (start_tsc > last_tsc_reading + tsc_hz) {
+                               args->measured_tx_pps = args->tx_total_packets -
+                                                               last_tx_total;
+                               last_tx_total = args->tx_total_packets;
+                               last_tsc_reading = start_tsc;
+                       }
+                       nb_tx = rte_eth_tx_burst(port, 0, bufs, nb_generated);
+                       args->tx_total_packets += nb_tx;
 
+                       uint64_t tx_failed = nb_generated - nb_tx;
+                       if (nb_tx != nb_generated) {
+                               rte_pktmbuf_free_bulk(&bufs[nb_tx], tx_failed);
+                               args->tx_failed += tx_failed;
+                       }
                        if (unlikely(nb_tx == 0))
                                continue;
 
@@ -169,10 +201,11 @@ lcore_producer(__rte_unused void *arg)
  * an input port and writing to an output port.
  */
 static int
-lcore_consumer(__rte_unused void *arg)
+lcore_consumer(void *arg)
 {
+       struct gen_args *args = arg;
+       struct rte_gen *gen = args->gen;
        uint16_t port;
-
        /* Check that the port is on the same NUMA node as the polling thread
         * for best performance.
         */
@@ -195,16 +228,16 @@ lcore_consumer(__rte_unused void *arg)
                        uint64_t latency[BURST_SIZE];
                        uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
                                                        BURST_SIZE);
-                       rte_gen_tx_burst(gen, bufs, latency, nb_rx);
+                       if (unlikely(nb_rx == 0))
+                               continue;
+
+                       args->rx_total_packets += nb_rx;
 
                        int nb_sent = rte_gen_tx_burst(gen, bufs,
                                                        latency, nb_rx);
                        if (nb_sent != nb_rx)
                                rte_panic("invalid tx quantity\n");
 
-                       if (unlikely(nb_rx == 0))
-                               continue;
-
                }
        }
        return 0;
@@ -217,6 +250,45 @@ void handle_sigint(int sig)
        done = 1;
 }
 
+static int
+tele_gen_mpps(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+       RTE_SET_USED(cmd);
+       RTE_SET_USED(params);
+
+       struct gen_args *args = telemetry_userdata.prod;
+       rte_tel_data_start_dict(d);
+       rte_tel_data_add_dict_int(d, "pps",
+                                       (args->measured_tx_pps));
+       return 0;
+}
+
+static int
+tele_gen_stats(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+       RTE_SET_USED(cmd);
+       RTE_SET_USED(params);
+
+       struct gen_args *args_prod = telemetry_userdata.prod;
+       struct gen_args *args_cons = telemetry_userdata.cons;
+       rte_tel_data_start_dict(d);
+       static const char * const stats[] = {
+               "tx_total_packets",
+               "rx_total_packets",
+               "measured_tx_pps"
+       };
+
+       uint64_t values[RTE_DIM(stats)] = {0};
+       values[0] = args_prod->tx_total_packets;
+       values[1] = args_cons->rx_total_packets;
+       values[2] = args_prod->measured_tx_pps;
+
+       uint32_t i;
+       for (i = 0; i < RTE_DIM(stats); i++)
+               rte_tel_data_add_dict_int(d, stats[i], values[i]);
+
+       return 0;
+}
 /* The main function, which does initialization and calls the per-lcore
  * functions.
  */
@@ -224,6 +296,10 @@ int
 main(int argc, char *argv[])
 {
        signal(SIGINT, handle_sigint);
+
+       #define CORE_COUNT 2
+       struct gen_args core_launch_args[CORE_COUNT];
+
        unsigned int nb_ports;
        uint16_t portid;
 
@@ -253,7 +329,7 @@ main(int argc, char *argv[])
                        rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
                                        portid);
 
-       gen = rte_gen_create(mbuf_pool);
+       struct rte_gen *gen = rte_gen_create(mbuf_pool);
        if (!gen)
                rte_panic("Gen failed to initialize\n");
 
@@ -261,19 +337,37 @@ main(int argc, char *argv[])
        if (err)
                rte_panic("Failed to parse input args");
 
+       memset(core_launch_args, 0, sizeof(struct gen_args) * CORE_COUNT);
        /* launch lcore functions */
        uint32_t lcore_count = 0;
        uint32_t lcore_id = 0;
        RTE_LCORE_FOREACH_WORKER(lcore_id) {
-               if (lcore_count == 0)
-                       rte_eal_remote_launch(lcore_producer, NULL, lcore_id);
-               else if (lcore_count == 1)
-                       rte_eal_remote_launch(lcore_consumer, NULL, lcore_id);
+               core_launch_args[lcore_count].gen = gen;
+               if (lcore_count == 0) {
+                       telemetry_userdata.prod =
+                                               &core_launch_args[lcore_count];
+                       rte_eal_remote_launch(lcore_producer,
+                                             telemetry_userdata.prod,
+                                             lcore_id);
+               } else if (lcore_count == 1) {
+                       telemetry_userdata.cons =
+                                               &core_launch_args[lcore_count];
+                       rte_eal_remote_launch(lcore_consumer,
+                                             telemetry_userdata.cons,
+                                             lcore_id);
+               }
                else
                        break;
 
                lcore_count++;
        }
+
+       /* Export stats via Telemetry */
+       rte_telemetry_register_cmd("/gen/stats", tele_gen_stats,
+                       "Return statistics of the Gen instance.");
+       rte_telemetry_register_cmd("/gen/mpps", tele_gen_mpps,
+                       "Get/Set the mpps rate");
+
        /* Stall the main thread until all other threads have returned. */
        rte_eal_mp_wait_lcore();
 
-- 
2.25.1

Reply via email to