Added support for copying packets
using software copy mode and MAC address
changing. The copies are processed using
one lcore.

Signed-off-by: Marcin Baran <marcinx.ba...@intel.com>
Signed-off-by: Pawel Modrak <pawelx.mod...@intel.com>
---
 examples/ioat/ioatfwd.c | 211 +++++++++++++++++++++++++++++++++++-----
 1 file changed, 188 insertions(+), 23 deletions(-)

diff --git a/examples/ioat/ioatfwd.c b/examples/ioat/ioatfwd.c
index 977ea6a61c..3a092c6cfb 100644
--- a/examples/ioat/ioatfwd.c
+++ b/examples/ioat/ioatfwd.c
@@ -13,7 +13,7 @@
 #include <rte_rawdev.h>
 #include <rte_ioat_rawdev.h>
 
-/* size of ring used for software copying between rx and tx. */
+ /* size of ring used for software copying between rx and tx. */
 #define RTE_LOGTYPE_IOAT RTE_LOGTYPE_USER1
 #define MAX_PKT_BURST 32
 #define MEMPOOL_CACHE_SIZE 512
@@ -89,6 +89,142 @@ static struct rte_ether_addr 
ioat_ports_eth_addr[RTE_MAX_ETHPORTS];
 static struct rte_eth_dev_tx_buffer *tx_buffer[RTE_MAX_ETHPORTS];
 struct rte_mempool *ioat_pktmbuf_pool;
 
+static void
+update_mac_addrs(struct rte_mbuf *m, uint32_t dest_portid)
+{
+       struct rte_ether_hdr *eth;
+       void *tmp;
+
+       eth = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
+
+       /* 02:00:00:00:00:xx - overwriting 2 bytes of source address but
+        * it's acceptable cause it gets overwritten by rte_ether_addr_copy
+        */
+       tmp = &eth->d_addr.addr_bytes[0];
+       *((uint64_t *)tmp) = 0x000000000002 + ((uint64_t)dest_portid << 40);
+
+       /* src addr */
+       rte_ether_addr_copy(&ioat_ports_eth_addr[dest_portid], &eth->s_addr);
+}
+
+static inline void
+pktmbuf_sw_copy(struct rte_mbuf *src, struct rte_mbuf *dst)
+{
+       /* Copy packet metadata */
+       rte_memcpy(&dst->rearm_data,
+               &src->rearm_data,
+               offsetof(struct rte_mbuf, cacheline1)
+               - offsetof(struct rte_mbuf, rearm_data));
+
+       /* Copy packet data */
+       rte_memcpy(rte_pktmbuf_mtod(dst, char *),
+               rte_pktmbuf_mtod(src, char *), src->data_len);
+}
+
+/* Receive packets on one port and enqueue to IOAT rawdev or rte_ring. */
+static void
+ioat_rx_port(struct rxtx_port_config *rx_config)
+{
+       uint32_t nb_rx, nb_enq, i, j;
+       struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
+
+       for (i = 0; i < rx_config->nb_queues; i++) {
+
+               nb_rx = rte_eth_rx_burst(rx_config->rxtx_port, i,
+                       pkts_burst, MAX_PKT_BURST);
+
+               if (nb_rx == 0)
+                       continue;
+
+               /* Perform packet software copy, free source packets */
+               int ret;
+               struct rte_mbuf *pkts_burst_copy[MAX_PKT_BURST];
+
+               ret = rte_mempool_get_bulk(ioat_pktmbuf_pool,
+                       (void *)pkts_burst_copy, nb_rx);
+
+               if (unlikely(ret < 0))
+                       rte_exit(EXIT_FAILURE,
+                               "Unable to allocate memory.\n");
+
+               for (j = 0; j < nb_rx; j++)
+                       pktmbuf_sw_copy(pkts_burst[j],
+                               pkts_burst_copy[j]);
+
+               rte_mempool_put_bulk(ioat_pktmbuf_pool,
+                       (void *)pkts_burst, nb_rx);
+
+               nb_enq = rte_ring_enqueue_burst(
+                       rx_config->rx_to_tx_ring,
+                       (void *)pkts_burst_copy, nb_rx, NULL);
+
+               /* Free any not enqueued packets. */
+               rte_mempool_put_bulk(ioat_pktmbuf_pool,
+                       (void *)&pkts_burst_copy[nb_enq],
+                       nb_rx - nb_enq);
+       }
+}
+
+/* Transmit packets from IOAT rawdev/rte_ring for one port. */
+static void
+ioat_tx_port(struct rxtx_port_config *tx_config)
+{
+       uint32_t i, nb_dq = 0;
+       struct rte_mbuf *mbufs_dst[MAX_PKT_BURST];
+
+       /* Deque the mbufs from rx_to_tx_ring. */
+       nb_dq = rte_ring_dequeue_burst(tx_config->rx_to_tx_ring,
+               (void *)mbufs_dst, MAX_PKT_BURST, NULL);
+
+       if (nb_dq == 0)
+               return;
+
+       /* Update macs if enabled */
+       if (mac_updating) {
+               for (i = 0; i < nb_dq; i++)
+                       update_mac_addrs(mbufs_dst[i],
+                               tx_config->rxtx_port);
+       }
+
+       const uint16_t nb_tx = rte_eth_tx_burst(tx_config->rxtx_port,
+               0, (void *)mbufs_dst, nb_dq);
+
+       /* Free any unsent packets. */
+       if (unlikely(nb_tx < nb_dq))
+               rte_mempool_put_bulk(ioat_pktmbuf_pool,
+               (void *)&mbufs_dst[nb_tx],
+                       nb_dq - nb_tx);
+}
+
+/* Main rx and tx loop if only one slave lcore available */
+static void
+rxtx_main_loop(void)
+{
+       uint16_t i;
+       uint16_t nb_ports = cfg.nb_ports;
+
+       RTE_LOG(INFO, IOAT, "Entering main rx and tx loop for copy on"
+               " lcore %u\n", rte_lcore_id());
+
+       while (!force_quit)
+               for (i = 0; i < nb_ports; i++) {
+                       ioat_rx_port(&cfg.ports[i]);
+                       ioat_tx_port(&cfg.ports[i]);
+               }
+}
+
+static void start_forwarding_cores(void)
+{
+       uint32_t lcore_id = rte_lcore_id();
+
+       RTE_LOG(INFO, IOAT, "Entering %s on lcore %u\n",
+               __func__, rte_lcore_id());
+
+       lcore_id = rte_get_next_lcore(lcore_id, true, true);
+       rte_eal_remote_launch((lcore_function_t *)rxtx_main_loop,
+               NULL, lcore_id);
+}
+
 /* Display usage */
 static void
 ioat_usage(const char *prgname)
@@ -102,7 +238,7 @@ ioat_usage(const char *prgname)
                "       - The destination MAC address is replaced by 
02:00:00:00:00:TX_PORT_ID\n"
                "  -c --copy-type CT: type of copy: sw|rawdev\n"
                "  -s --ring-size RS: size of IOAT rawdev ring for hardware 
copy mode or rte_ring for software copy mode\n",
-              prgname);
+               prgname);
 }
 
 static int
@@ -161,16 +297,16 @@ ioat_parse_args(int argc, char **argv, unsigned int 
nb_ports)
        argvopt = argv;
 
        while ((opt = getopt_long(argc, argvopt, short_options,
-                                 lgopts, &option_index)) != EOF) {
+               lgopts, &option_index)) != EOF) {
 
                switch (opt) {
-               /* portmask */
+                       /* portmask */
                case 'p':
                        ioat_enabled_port_mask = ioat_parse_portmask(optarg);
                        if (ioat_enabled_port_mask & ~default_port_mask ||
-                                       ioat_enabled_port_mask <= 0) {
+                               ioat_enabled_port_mask <= 0) {
                                printf("Invalid portmask, %s, suggest 0x%x\n",
-                                               optarg, default_port_mask);
+                                       optarg, default_port_mask);
                                ioat_usage(prgname);
                                return -1;
                        }
@@ -204,7 +340,7 @@ ioat_parse_args(int argc, char **argv, unsigned int 
nb_ports)
                        }
                        break;
 
-               /* long options */
+                       /* long options */
                case 0:
                        break;
 
@@ -216,9 +352,9 @@ ioat_parse_args(int argc, char **argv, unsigned int 
nb_ports)
 
        printf("MAC updating %s\n", mac_updating ? "enabled" : "disabled");
        if (optind >= 0)
-               argv[optind-1] = prgname;
+               argv[optind - 1] = prgname;
 
-       ret = optind-1;
+       ret = optind - 1;
        optind = 1; /* reset getopt lib */
        return ret;
 }
@@ -253,6 +389,26 @@ check_link_status(uint32_t port_mask)
        return retval;
 }
 
+static void
+assign_rings(void)
+{
+       uint32_t i;
+
+       for (i = 0; i < cfg.nb_ports; i++) {
+               char ring_name[RTE_RING_NAMESIZE];
+
+               snprintf(ring_name, sizeof(ring_name), "rx_to_tx_ring_%u", i);
+               /* Create ring for inter core communication */
+               cfg.ports[i].rx_to_tx_ring = rte_ring_create(
+                       ring_name, ring_size,
+                       rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
+
+               if (cfg.ports[i].rx_to_tx_ring == NULL)
+                       rte_exit(EXIT_FAILURE, "Ring create failed: %s\n",
+                               rte_strerror(rte_errno));
+       }
+}
+
 /*
  * Initializes a given port using global settings and with the RX buffers
  * coming from the mbuf_pool passed as a parameter.
@@ -263,7 +419,7 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool, 
uint16_t nb_queues)
        /* configuring port to use RSS for multiple RX queues */
        static const struct rte_eth_conf port_conf = {
                .rxmode = {
-                       .mq_mode        = ETH_MQ_RX_RSS,
+                       .mq_mode = ETH_MQ_RX_RSS,
                        .max_rx_pkt_len = RTE_ETHER_MAX_LEN
                },
                .rx_adv_conf = {
@@ -294,14 +450,14 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool, 
uint16_t nb_queues)
                dev_info.flow_type_rss_offloads;
        if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
                local_port_conf.txmode.offloads |=
-                       DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+               DEV_TX_OFFLOAD_MBUF_FAST_FREE;
        ret = rte_eth_dev_configure(portid, nb_queues, 1, &local_port_conf);
        if (ret < 0)
                rte_exit(EXIT_FAILURE, "Cannot configure device:"
                        " err=%d, port=%u\n", ret, portid);
 
        ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd,
-                                               &nb_txd);
+               &nb_txd);
        if (ret < 0)
                rte_exit(EXIT_FAILURE,
                        "Cannot adjust number of descriptors: err=%d, 
port=%u\n",
@@ -326,8 +482,8 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool, 
uint16_t nb_queues)
        txq_conf = dev_info.default_txconf;
        txq_conf.offloads = local_port_conf.txmode.offloads;
        ret = rte_eth_tx_queue_setup(portid, 0, nb_txd,
-                       rte_eth_dev_socket_id(portid),
-                       &txq_conf);
+               rte_eth_dev_socket_id(portid),
+               &txq_conf);
        if (ret < 0)
                rte_exit(EXIT_FAILURE,
                        "rte_eth_tx_queue_setup:err=%d,port=%u\n",
@@ -335,8 +491,8 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool, 
uint16_t nb_queues)
 
        /* Initialize TX buffers */
        tx_buffer[portid] = rte_zmalloc_socket("tx_buffer",
-                       RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
-                       rte_eth_dev_socket_id(portid));
+               RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0,
+               rte_eth_dev_socket_id(portid));
        if (tx_buffer[portid] == NULL)
                rte_exit(EXIT_FAILURE,
                        "Cannot allocate buffer for tx on port %u\n",
@@ -354,13 +510,13 @@ port_init(uint16_t portid, struct rte_mempool *mbuf_pool, 
uint16_t nb_queues)
        rte_eth_promiscuous_enable(portid);
 
        printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n",
-                       portid,
-                       ioat_ports_eth_addr[portid].addr_bytes[0],
-                       ioat_ports_eth_addr[portid].addr_bytes[1],
-                       ioat_ports_eth_addr[portid].addr_bytes[2],
-                       ioat_ports_eth_addr[portid].addr_bytes[3],
-                       ioat_ports_eth_addr[portid].addr_bytes[4],
-                       ioat_ports_eth_addr[portid].addr_bytes[5]);
+               portid,
+               ioat_ports_eth_addr[portid].addr_bytes[0],
+               ioat_ports_eth_addr[portid].addr_bytes[1],
+               ioat_ports_eth_addr[portid].addr_bytes[2],
+               ioat_ports_eth_addr[portid].addr_bytes[3],
+               ioat_ports_eth_addr[portid].addr_bytes[4],
+               ioat_ports_eth_addr[portid].addr_bytes[5]);
 
        cfg.ports[cfg.nb_ports].rxtx_port = portid;
        cfg.ports[cfg.nb_ports++].nb_queues = nb_queues;
@@ -428,10 +584,19 @@ main(int argc, char **argv)
        if (cfg.nb_lcores < 1)
                rte_exit(EXIT_FAILURE,
                        "There should be at least one slave lcore.\n");
+
+       assign_rings();
+
+       start_forwarding_cores();
+
+       /* force_quit is true when we get here */
+       rte_eal_mp_wait_lcore();
+
        for (i = 0; i < cfg.nb_ports; i++) {
                printf("Closing port %d\n", cfg.ports[i].rxtx_port);
                rte_eth_dev_stop(cfg.ports[i].rxtx_port);
                rte_eth_dev_close(cfg.ports[i].rxtx_port);
+               rte_ring_free(cfg.ports[i].rx_to_tx_ring);
        }
 
        printf("Bye...\n");
-- 
2.22.0.windows.1

Reply via email to