This app uses the shared memory poll, and shared ethdev infrastructure
to act as a zero-copy IO proxy to other applications. It has been tested
and verified to work successfully proxying data to testpmd instances on
the system, with those testpmd instances each being passed a unix socket
to work with via the shared memory bus "-a sock:/path/to/sock..."
parameter.

Signed-off-by: Bruce Richardson <bruce.richard...@intel.com>
---
 app/io-proxy/command_fns.c | 160 ++++++++++
 app/io-proxy/commands.list |   6 +
 app/io-proxy/datapath.c    | 595 +++++++++++++++++++++++++++++++++++++
 app/io-proxy/datapath.h    |  37 +++
 app/io-proxy/datapath_mp.c |  78 +++++
 app/io-proxy/main.c        |  71 +++++
 app/io-proxy/meson.build   |  12 +
 app/meson.build            |   1 +
 8 files changed, 960 insertions(+)
 create mode 100644 app/io-proxy/command_fns.c
 create mode 100644 app/io-proxy/commands.list
 create mode 100644 app/io-proxy/datapath.c
 create mode 100644 app/io-proxy/datapath.h
 create mode 100644 app/io-proxy/datapath_mp.c
 create mode 100644 app/io-proxy/main.c
 create mode 100644 app/io-proxy/meson.build

diff --git a/app/io-proxy/command_fns.c b/app/io-proxy/command_fns.c
new file mode 100644
index 0000000000..f48921e005
--- /dev/null
+++ b/app/io-proxy/command_fns.c
@@ -0,0 +1,160 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+
+#include <rte_ethdev.h>
+
+#include "datapath.h"
+#include "commands.h"
+
+extern volatile bool quit;
+extern volatile bool running_startup_script;
+
+void
+cmd_add_socket_parsed(void *parsed_result, struct cmdline *cl __rte_unused,
+               void *data __rte_unused)
+{
+       struct cmd_add_socket_result *res = parsed_result;
+       uint64_t maxmem = 0;
+       char *endchar;
+
+       maxmem = strtoull(res->memsize, &endchar, 0);
+       switch (*endchar) {
+       case 'G': case 'g':
+               maxmem *= 1024;
+               /* fall-through */
+       case 'M': case 'm':
+               maxmem *= 1024;
+               /* fall-through */
+       case 'K': case 'k':
+               maxmem *= 1024;
+               break;
+       }
+       if (res->port >= MAX_PORTS_SUPPORTED) {
+               fprintf(stderr, "Port id out of range. Must be <%u\n", 
MAX_PORTS_SUPPORTED);
+               goto err;
+       }
+       if (res->queue >= MAX_QUEUES_SUPPORTED) {
+               fprintf(stderr, "Queue id out of range. Must be <%u\n", 
MAX_QUEUES_SUPPORTED);
+               goto err;
+       }
+       if (listen_unix_socket(res->path, maxmem, res->port, res->queue) != 0) {
+               fprintf(stderr, "error initializing socket: %s\n", res->path);
+               goto err;
+       }
+
+       printf("Created socket = %s with memsize = %s using port = %u, queue = 
%u\n",
+                       res->path, res->memsize, res->port, res->queue);
+       return;
+
+err:
+       if (running_startup_script) {
+               quit = true;
+               /* wait for main thread to quit. Just spin here for condition 
which
+                * will never actually come true, as main thread should just 
exit
+                */
+               while (quit)
+                       usleep(100);
+       }
+       /* if running interactively, do nothing on error except report it above 
*/
+}
+
+void
+cmd_list_sockets_parsed(__rte_unused void *parsed_result,
+               __rte_unused struct cmdline *cl,
+               __rte_unused void *data)
+{
+       const char *path;
+       int sock;
+       uint64_t maxmem;
+       uint16_t port, queue;
+       bool connected;
+
+       for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, 
&connected);
+                       i < MAX_SOCKETS;
+                       i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+                                       &queue, &connected)) {
+               char memstr[32];
+               if (maxmem % (1UL << 30) == 0)
+                       snprintf(memstr, sizeof(memstr), "%" PRIu64 "G", maxmem 
>> 30);
+               else if (maxmem % (1UL << 20) == 0)
+                       snprintf(memstr, sizeof(memstr), "%" PRIu64 "M", maxmem 
>> 20);
+               else if (maxmem % (1UL << 10) == 0)
+                       snprintf(memstr, sizeof(memstr), "%" PRIu64 "K", maxmem 
>> 10);
+               else
+                       snprintf(memstr, sizeof(memstr), "%" PRIu64, maxmem);
+
+               printf("Socket %s [%s]: mem=%s, port=%u, queue=%u\n",
+                               path, connected ? "connected" : "idle", memstr, 
port, queue);
+       }
+}
+
+void
+cmd_list_ports_parsed(__rte_unused void *parsed_result,
+               __rte_unused struct cmdline *cl,
+               __rte_unused void *data)
+{
+       for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+               struct rte_ether_addr addr;
+               int retval = rte_eth_macaddr_get(i, &addr);
+               if (retval != 0) {
+                       printf("Port %d - MAC UNKNOWN\n", i);
+                       continue;
+               }
+               printf("Port %d - "RTE_ETHER_ADDR_PRT_FMT"\n", i, 
RTE_ETHER_ADDR_BYTES(&addr));
+       }
+}
+
+void
+cmd_show_port_stats_parsed(__rte_unused void *parsed_result,
+               __rte_unused struct cmdline *cl,
+               __rte_unused void *data)
+{
+       for (int i = 0; i < rte_eth_dev_count_avail(); i++) {
+               struct rte_eth_stats stats = {0};
+               int retval = rte_eth_stats_get(i, &stats);
+               if (retval != 0) {
+                       printf("Port %d - Cannot get stats\n", i);
+                       continue;
+               }
+               printf("Port %d - ipkts: %"PRIu64", imissed: %"PRIu64
+                               ", ierrors: %"PRIu64", opkts: %"PRIu64"\n",
+                               i, stats.ipackets, stats.imissed, 
stats.ierrors, stats.opackets);
+       }
+}
+
+void
+cmd_show_socket_stats_parsed(__rte_unused void *parsed_result,
+               __rte_unused struct cmdline *cl,
+               __rte_unused void *data)
+{
+       const char *path;
+       int sock;
+       uint64_t maxmem;
+       uint16_t port, queue;
+       bool connected;
+
+       for (int i = get_next_socket(0, &path, &sock, &maxmem, &port, &queue, 
&connected);
+                       i < MAX_SOCKETS;
+                       i = get_next_socket(i + 1, &path, &sock, &maxmem, &port,
+                                       &queue, &connected)) {
+               if (connected || dp_stats[i].rx != 0 || dp_stats[i].deq != 0)
+                       printf("Socket %u [port %u, q %u]: RX %" PRIu64 ", 
Enq_drops %" PRIu64
+                                       ", Deq %" PRIu64 ", TX_drops %" PRIu64 
"\n",
+                                       i, i / MAX_QUEUES_SUPPORTED, i % 
MAX_QUEUES_SUPPORTED,
+                                       dp_stats[i].rx, dp_stats[i].enq_drop,
+                                       dp_stats[i].deq, dp_stats[i].tx_drop);
+
+       }
+}
+
+void
+cmd_quit_parsed(__rte_unused void *parsed_result, struct cmdline *cl,
+               __rte_unused void *data)
+{
+       cmdline_quit(cl);
+}
diff --git a/app/io-proxy/commands.list b/app/io-proxy/commands.list
new file mode 100644
index 0000000000..9dab9bba28
--- /dev/null
+++ b/app/io-proxy/commands.list
@@ -0,0 +1,6 @@
+add socket <STRING>path <STRING>memsize <UINT16>port <UINT16>queue
+list sockets
+list ports
+show port stats
+show socket stats
+quit
diff --git a/app/io-proxy/datapath.c b/app/io-proxy/datapath.c
new file mode 100644
index 0000000000..1f7162de18
--- /dev/null
+++ b/app/io-proxy/datapath.c
@@ -0,0 +1,595 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <errno.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <libgen.h>
+#include <sys/un.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <linux/memfd.h>
+
+#include <rte_eal.h>
+#include <rte_dev.h>
+#include <rte_malloc.h>
+#include <rte_ethdev.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_mempool.h>
+#include <shared_mem_bus.h>
+
+#include "datapath.h"
+
+static int mempool_ops_index = -1;
+static struct rte_mempool *default_mempool;
+static volatile unsigned long long port_poll_mask;
+static volatile unsigned long long used_poll_mask;
+
+struct listen_socket_params {
+       const char *path;
+       int sock;
+       uint16_t port_id;
+       uint16_t qid;
+       uint64_t maxmem;
+};
+
+#define S_IDX(p, q) (((p) * MAX_QUEUES_SUPPORTED) + (q))
+static struct rte_ring *rx_rings[MAX_SOCKETS];
+static struct rte_ring *tx_rings[MAX_SOCKETS];
+static uintptr_t base_addrs[MAX_SOCKETS];
+static uint64_t lengths[MAX_SOCKETS];
+static struct rte_mempool *mps[MAX_SOCKETS];
+static struct listen_socket_params sock_params[MAX_SOCKETS];
+struct rxtx_stats dp_stats[MAX_SOCKETS] = {0};
+
+int
+get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+               uint16_t *port, uint16_t *queue, bool *connected)
+{
+       int i;
+       for (i = start; i < MAX_SOCKETS; i++) {
+               if (sock_params[i].sock > 0) {
+                       *path = sock_params[i].path;
+                       *sock = sock_params[i].sock;
+                       *maxmem = sock_params[i].maxmem;
+                       *port = sock_params[i].port_id;
+                       *queue = sock_params[i].qid;
+                       *connected = (port_poll_mask & (1U << i)) != 0;
+                       break;
+               }
+       }
+       return i;
+}
+
+static int
+init_port(uint16_t port_id, struct rte_mempool *mbuf_pool)
+{
+       struct rte_eth_conf port_conf = {
+               .rxmode = { .mq_mode = RTE_ETH_MQ_RX_RSS, },
+               .rx_adv_conf = {
+                       .rss_conf = { .rss_hf = RTE_ETH_RSS_IP | 
RTE_ETH_RSS_UDP, },
+               },
+       };
+       struct rte_eth_dev_info dev_info;
+       int socket = rte_socket_id();
+
+       int retval = rte_eth_dev_info_get(port_id, &dev_info);
+       if (retval != 0) {
+               printf("Error during getting device (port %u) info: %s\n",
+                               port_id, strerror(-retval));
+               return retval;
+       }
+
+       if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
+               port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
+
+       port_conf.rx_adv_conf.rss_conf.rss_hf &= 
dev_info.flow_type_rss_offloads;
+
+       if (rte_eth_dev_configure(port_id, MAX_QUEUES_SUPPORTED, 
MAX_QUEUES_SUPPORTED,
+                       &port_conf) < 0) {
+               printf("Error configuring port\n");
+               return -1;
+       }
+
+       for (uint16_t q = 0; q < MAX_QUEUES_SUPPORTED; q++) {
+               retval = rte_eth_rx_queue_setup(port_id, q, 128, socket, NULL, 
mbuf_pool);
+               if (retval < 0) {
+                       printf("Error running rx_queue_setup\n");
+                       return retval;
+               }
+               retval = rte_eth_tx_queue_setup(port_id, q, 256, socket, NULL);
+               if (retval < 0) {
+                       printf("Error running tx_queue_setup\n");
+                       return retval;
+               }
+       }
+
+       retval = rte_eth_dev_start(port_id);
+       if (retval < 0) {
+               printf("Error running dev_start\n");
+               return retval;
+       }
+       printf("Port %u started ok\n", port_id);
+
+       if (rte_eth_promiscuous_enable(port_id) < 0)
+               printf("Warning: could not enable promisc mode on port %u\n", 
port_id);
+
+       return 0;
+}
+
+int
+datapath_init(const char *corelist)
+{
+       /* eal init requires non-const parameters, so copy */
+       char *cl = strdup(corelist); /* todo, free copy */
+       char l_flag[] = "-l";
+       char in_mem[] = "--in-memory";
+       char use_avx512[] = "--force-max-simd-bitwidth=512";
+       char *argv[] = {
+                       program_invocation_short_name,
+                       l_flag, cl,
+                       in_mem,
+                       use_avx512,
+                       NULL,
+       };
+
+       RTE_BUILD_BUG_ON(sizeof(port_poll_mask) * CHAR_BIT < MAX_SOCKETS);
+
+       int ret = rte_eal_init(RTE_DIM(argv) - 1, argv);
+       if (ret < 0)
+               return ret;
+
+       mempool_ops_index = check_mempool_ops();
+       if (mempool_ops_index == -1)
+               rte_panic("Cannot get mempool ops");
+       printf("Mempool ops index is %d\n", mempool_ops_index);
+
+       default_mempool = rte_pktmbuf_pool_create("proxy_def",
+                       MAX_SOCKETS * 200, 32, 0,
+                       RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+       if (default_mempool == NULL)
+               rte_panic("Cannot create default mempool\n");
+
+       int nb_ethdevs = rte_eth_dev_count_avail();
+       if (nb_ethdevs > MAX_PORTS_SUPPORTED) {
+               fprintf(stderr, "More ports available than supported, some will 
be unused\n");
+               nb_ethdevs = MAX_PORTS_SUPPORTED;
+       }
+       for (int i = 0; i < nb_ethdevs; i++) {
+               if (init_port(i, default_mempool) != 0)
+                       rte_panic("Cannot init port %d\n", i);
+       }
+       return 0;
+}
+
+static int
+send_fd(int to, int fd, uint64_t fd_size, rte_iova_t iova, uint64_t pg_size)
+{
+       struct iovec iov = {0};
+       struct msghdr msg = {0};
+       size_t cmsglen = CMSG_LEN(sizeof(fd));
+       struct cmsghdr *cmhdr = malloc(cmsglen);
+       int ret = 0;
+
+       struct {
+               uint64_t fd_size;
+               rte_iova_t iova;
+               uint64_t pg_size;
+       } data_message = {fd_size, iova, pg_size};
+
+       if (cmhdr == NULL)
+               return -1;
+       iov.iov_base = (void *)&data_message;
+       iov.iov_len = sizeof(data_message);
+       msg.msg_iov = &iov;
+       msg.msg_iovlen = 1;
+       cmhdr->cmsg_level = SOL_SOCKET;
+       cmhdr->cmsg_type = SCM_RIGHTS;
+       cmhdr->cmsg_len = cmsglen;
+       msg.msg_control = cmhdr;
+       msg.msg_controllen = cmsglen;
+       *(int *)CMSG_DATA(cmhdr) = fd;
+
+       if (sendmsg(to, &msg, 0) != (int)iov.iov_len) {
+               printf("Error sending message to client, %s\n", 
strerror(errno));
+               ret = -1;
+       }
+       free(cmhdr);
+       return ret;
+}
+
+static int
+reconfigure_queue(uint16_t port_id, uint16_t qid, struct rte_mempool *p)
+{
+       if (rte_eth_dev_rx_queue_stop(port_id, qid) != 0) {
+               printf("Error with rx_queue_stop\n");
+               return -1;
+       }
+       if (rte_eth_dev_tx_queue_stop(port_id, qid) != 0) {
+               printf("Error with tx_queue_stop\n");
+               return -1;
+       }
+       if (rte_eth_rx_queue_setup(port_id, qid, 1024,
+                       rte_socket_id(), NULL, p) != 0) {
+               printf("Error with rx_queue_setup\n");
+               return -1;
+       }
+       if (rte_eth_dev_tx_queue_start(port_id, qid) != 0) {
+               printf("Error with tx_queue_start\n");
+               return -1;
+       }
+       if (rte_eth_dev_rx_queue_start(port_id, qid) != 0) {
+               printf("Error with rx_queue_start\n");
+               return -1;
+       }
+       return 0;
+}
+
+static void
+handle_connection(int client, void *const client_mem, uint64_t memsize,
+               uint16_t port_id, uint16_t qid)
+{
+       uintptr_t client_mmap_addr = 0;
+       struct rte_ring *rx_ring, *tx_ring;
+       struct rte_mempool *local_mp;
+       size_t mempool_memsize = sizeof(*local_mp)
+                                       + sizeof(local_mp->local_cache[0]) * 
RTE_MAX_LCORE
+                                       + sizeof(struct 
rte_pktmbuf_pool_private);
+       local_mp = rte_malloc(NULL, mempool_memsize, 0);
+       if (local_mp == NULL) {
+               printf("Error allocating mempool struct\n");
+               return;
+       }
+       memset(local_mp, 0, mempool_memsize);
+       *local_mp = (struct rte_mempool){
+               .name = "proxy_mp",
+               .cache_size = 256,
+               .ops_index = mempool_ops_index,
+               .pool_config = client_mem,
+               .private_data_size = sizeof(struct rte_pktmbuf_pool_private),
+               .local_cache = RTE_PTR_ADD(local_mp, sizeof(*local_mp)),
+       };
+       for (uint i = 0; i < RTE_MAX_LCORE; i++) {
+               local_mp->local_cache[i].size = 256;
+               local_mp->local_cache[i].flushthresh = 300;
+       }
+
+       struct eth_shared_mem_msg *msg = malloc(sizeof(*msg) + 1024);
+       if (msg == NULL) {
+               printf("Error mallocing message buffer\n");
+               goto out;
+       }
+       int bytes_read = read(client, msg, sizeof(msg) + 1024);
+       while (bytes_read != 0) {
+               switch (msg->type) {
+               case MSG_TYPE_MMAP_BASE_ADDR:
+                       client_mmap_addr = msg->offset;
+                       printf("Got mmap base addr of %p\n", (void 
*)client_mmap_addr);
+                       break;
+               case MSG_TYPE_MEMPOOL_OFFSET: {
+                       struct rte_mempool *remote_pool;
+                       uintptr_t remote_pd_offset;
+
+                       remote_pool = RTE_PTR_ADD(client_mem, msg->offset);
+                       remote_pd_offset = (uintptr_t)remote_pool->pool_data - 
client_mmap_addr;
+                       local_mp->pool_data = RTE_PTR_ADD(client_mem, 
remote_pd_offset);
+                       memcpy(rte_mempool_get_priv(local_mp), 
rte_mempool_get_priv(remote_pool),
+                                       sizeof(struct 
rte_pktmbuf_pool_private));
+
+                       printf("Got mempool offset of %p, stack name is %s\n",
+                                       (void *)msg->offset, (char 
*)local_mp->pool_data);
+                       struct rte_mbuf *mb = rte_pktmbuf_alloc(local_mp);
+                       if (mb == NULL) {
+                               printf("Error allocating buffer\n");
+                               return;
+                       }
+                       if ((uintptr_t)mb->buf_addr != (uintptr_t)mb + 128)
+                               rte_panic("Error, bad buffer\n");
+                       rte_pktmbuf_free(mb);
+                       break;
+               }
+               case MSG_TYPE_RX_RING_OFFSET:
+                       printf("Got Rx ring offset of %p\n", (void 
*)msg->offset);
+                       rx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+                       rx_rings[S_IDX(port_id, qid)] = rx_ring;
+                       break;
+               case MSG_TYPE_TX_RING_OFFSET:
+                       printf("Got Tx ring offset of %p\n", (void 
*)msg->offset);
+                       tx_ring = RTE_PTR_ADD(client_mem, msg->offset);
+                       tx_rings[S_IDX(port_id, qid)] = tx_ring;
+                       break;
+
+               case MSG_TYPE_START:
+                       base_addrs[S_IDX(port_id, qid)] = (uintptr_t)client_mem;
+                       lengths[S_IDX(port_id, qid)] = memsize;
+                       mps[S_IDX(port_id, qid)] = local_mp;
+                       if (reconfigure_queue(port_id, qid, local_mp) < 0)
+                               goto out;
+
+                       port_poll_mask |= (1UL << S_IDX(port_id, qid));
+                       while (used_poll_mask != port_poll_mask)
+                               usleep(10);
+
+                       *msg = (struct eth_shared_mem_msg){ .type = 
MSG_TYPE_ACK, };
+                       if (write(client, msg, sizeof(*msg)) < 
(int)sizeof(*msg))
+                               goto out;
+
+                       dp_stats[S_IDX(port_id, qid)] = (struct rxtx_stats){0};
+                       break;
+
+               case MSG_TYPE_GET_MAC:
+                       *msg = (struct eth_shared_mem_msg){
+                               .type = MSG_TYPE_REPORT_MAC,
+                       };
+                       rte_eth_macaddr_get(port_id, &msg->ethaddr);
+                       if (write(client, msg, sizeof(*msg)) < 
(int)sizeof(*msg))
+                               goto out;
+                       break;
+               default:
+                       printf("Unknown message\n");
+               }
+               bytes_read = read(client, msg, sizeof(msg) + 1024);
+       }
+out:
+       port_poll_mask &= ~(1UL << S_IDX(port_id, qid));
+       while (used_poll_mask != port_poll_mask)
+               usleep(10);
+
+       reconfigure_queue(port_id, qid, default_mempool);
+
+       free(msg);
+       rte_free(local_mp);
+
+       printf("Client disconnect\n");
+}
+
+static int
+accept_client(const int sock, uint64_t maxmem, uint16_t port_id, uint16_t qid)
+{
+       int ret = 0;
+       rte_iova_t *iovas = NULL;
+       const int client = accept(sock, NULL, NULL);
+       if (client < 0) {
+               printf("Error with accept\n");
+               return errno;
+       }
+       printf("Client connected\n");
+
+       char filename[32];
+       int flags = MFD_HUGETLB;
+       uint32_t pgsize = (1 << 21);
+       if (maxmem % (1 << 30) == 0) {
+               flags |= MFD_HUGE_1GB;
+               pgsize = (1 << 30);
+       }
+       snprintf(filename, sizeof(filename), "client_memory_%d", client);
+
+       const int memfd = memfd_create(filename, flags);
+       if (memfd < 0) {
+               printf("Error with memfd_create\n");
+               return errno;
+       }
+       if (ftruncate(memfd, maxmem) < 0) {
+               printf("Error with ftruncate\n");
+               close(memfd);
+               return errno;
+       }
+       void * const client_mem = mmap(NULL, maxmem, PROT_READ | PROT_WRITE,
+                       MAP_SHARED, memfd, 0);
+       if (client_mem == MAP_FAILED) {
+               printf("Error with mmap\n");
+               ret = errno;
+               goto out;
+       }
+
+       const int nb_pages = maxmem / pgsize;
+       printf("Registering %d pages of memory with DPDK\n", nb_pages);
+       iovas = malloc(sizeof(*iovas) * nb_pages);
+       if (iovas == NULL) {
+               printf("Error with malloc for iovas\n");
+               ret = ENOMEM;
+               goto out;
+       }
+       /* assume vfio, VA = IOVA */
+       iovas[0] = (uintptr_t)client_mem;
+       for (int i = 1; i < nb_pages; i++)
+               iovas[i] = iovas[i - 1] + pgsize;
+
+
+       if (rte_extmem_register(client_mem, maxmem, iovas, nb_pages, pgsize) < 
0) {
+               printf("Error registering memory with DPDK, %s\n", 
strerror(rte_errno));
+               ret = rte_errno;
+               goto out;
+       }
+       printf("Registered memory: %" PRIu64 " @ %p in heap %s\n", maxmem, 
client_mem, filename);
+
+       struct rte_eth_dev_info info;
+       if (rte_eth_dev_info_get(port_id, &info) < 0) {
+               printf("Error getting ethdev info\n");
+               ret = -1;
+               goto out;
+       }
+       if (rte_dev_dma_map(info.device, client_mem, iovas[0], maxmem) < 0) {
+               printf("Error mapping dma for device, %s\n", 
strerror(rte_errno));
+               ret = rte_errno;
+               goto out;
+       }
+
+       if (send_fd(client, memfd, maxmem, iovas[0], pgsize) != 0) {
+               printf("Error sending fd to client\n");
+               ret = errno;
+               goto out;
+       }
+       printf("Sent FD to client for mapping\n");
+
+       handle_connection(client, client_mem, maxmem, port_id, qid);
+out:
+       if (iovas != NULL)
+               rte_dev_dma_unmap(info.device, client_mem, iovas[0], maxmem);
+       printf("Unregistering memory: %" PRIu64 " @ %p in heap %s\n", maxmem, 
client_mem, filename);
+       if (rte_extmem_unregister(client_mem, maxmem) < 0)
+               printf("Error unregistering memory, %s\n", strerror(rte_errno));
+       close(memfd);
+       close(client);
+       if (client_mem != NULL)
+               munmap(client_mem, maxmem);
+       return ret;
+}
+
+static void *
+listen_fn(void *param)
+{
+       struct listen_socket_params *p = param;
+       int ret = 0;
+
+       rte_thread_register();
+
+       while (1) {
+               const int ret = accept_client(p->sock, p->maxmem, p->port_id, 
p->qid);
+               if (ret != 0)
+                       goto out;
+       }
+out:
+       free(p);
+       return (void *)(uintptr_t)ret;
+}
+
+int
+listen_unix_socket(const char *path, const uint64_t maxmem, uint16_t port_id, 
uint16_t qid)
+{
+       if (sock_params[S_IDX(port_id, qid)].sock != 0) {
+               printf("Error, port already in use\n");
+               return EEXIST;
+       }
+
+       if (port_id >= rte_eth_dev_count_avail()) {
+               printf("Error, port %u does not exist\n", port_id);
+               return EINVAL;
+       }
+
+       printf("Opening and listening on socket: %s\n", path);
+       char *pathcp = strdup(path);
+       if (pathcp == NULL) {
+               printf("Error with strdup()\n");
+               free(pathcp);
+               return ENOMEM;
+       }
+       char *dirpath = dirname(pathcp);
+       mkdir(dirpath, 0700);
+       free(pathcp);
+
+       int sock = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+       if (sock < 0) {
+               printf("Error creating socket\n");
+               return errno;
+       }
+
+       struct sockaddr_un sun = {.sun_family = AF_UNIX};
+       strlcpy(sun.sun_path, path, sizeof(sun.sun_path));
+       printf("Attempting socket bind to path '%s'\n", path);
+       printf("Associated parameters are: maxmem = %"PRIu64", port = %u, qid = 
%u\n",
+                       maxmem, port_id, qid);
+
+       if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+               printf("Initial bind to socket '%s' failed.\n", path);
+
+               /* check if current socket is active */
+               if (connect(sock, (void *)&sun, sizeof(sun)) == 0) {
+                       close(sock);
+                       return EADDRINUSE;
+               }
+
+               /* socket is not active, delete and attempt rebind */
+               printf("Attempting unlink and retrying bind\n");
+               unlink(sun.sun_path);
+               if (bind(sock, (void *) &sun, sizeof(sun)) < 0) {
+                       printf("Error binding socket: %s\n", strerror(errno));
+                       close(sock);
+                       return errno; /* if unlink failed, this will be 
-EADDRINUSE as above */
+               }
+       }
+
+       if (listen(sock, 1) < 0) {
+               printf("Error calling listen for socket: %s\n", 
strerror(errno));
+               unlink(sun.sun_path);
+               close(sock);
+               return errno;
+       }
+       printf("Socket %s listening ok\n", path);
+
+       struct listen_socket_params *p = &sock_params[S_IDX(port_id, qid)];
+       pthread_t listen_thread;
+       *p = (struct listen_socket_params){strdup(path), sock, port_id, qid, 
maxmem};
+       pthread_create(&listen_thread, NULL, listen_fn, p);
+       pthread_detach(listen_thread);
+       return 0;
+}
+
+void
+handle_forwarding(void)
+{
+       const typeof(port_poll_mask) to_poll = port_poll_mask;
+       if (used_poll_mask != to_poll) {
+               printf("Poll mask is now %#llx\n", to_poll);
+               used_poll_mask = to_poll;
+       }
+       if (to_poll == 0) {
+               usleep(100);
+               return;
+       }
+       for (uint16_t i = 0; i < sizeof(to_poll) * CHAR_BIT; i++) {
+               struct rte_mbuf *mbs[32];
+               void *offsets[32];
+               if (((1UL << i) & to_poll) == 0)
+                       continue;
+
+               uint16_t port_id = i / MAX_QUEUES_SUPPORTED;
+               uint16_t qid = i % MAX_QUEUES_SUPPORTED;
+               uint16_t nb_rx = rte_eth_rx_burst(port_id, qid, mbs, 
RTE_DIM(mbs));
+               if (nb_rx != 0) {
+                       dp_stats[i].rx += nb_rx;
+                       for (uint pkt = 0; pkt < nb_rx; pkt++) {
+                               mbs[pkt]->buf_addr = 
RTE_PTR_SUB(mbs[pkt]->buf_addr, base_addrs[i]);
+                               offsets[pkt] = RTE_PTR_SUB(mbs[pkt], 
base_addrs[i]);
+                       }
+                       uint16_t nb_enq = rte_ring_enqueue_burst(rx_rings[i], 
offsets, nb_rx, NULL);
+                       if (nb_enq != nb_rx) {
+                               dp_stats[i].enq_drop += nb_rx - nb_enq;
+                               for (uint pkt = nb_enq; pkt < nb_rx; pkt++) {
+                                       mbs[pkt]->buf_addr = 
RTE_PTR_ADD(mbs[pkt]->buf_addr,
+                                                       base_addrs[i]);
+                                       mbs[pkt]->pool = mps[i];
+                               }
+                               rte_mempool_put_bulk(mps[i], (void 
*)&mbs[nb_enq], nb_rx - nb_enq);
+                       }
+               }
+
+               uint16_t nb_deq = rte_ring_dequeue_burst(tx_rings[i], offsets,
+                               RTE_DIM(offsets), NULL);
+               if (nb_deq != 0) {
+                       dp_stats[i].deq += nb_deq;
+                       for (uint pkt = 0; pkt < nb_deq; pkt++) {
+                               mbs[pkt] = RTE_PTR_ADD(offsets[pkt], 
base_addrs[i]);
+                               rte_prefetch0_write(mbs[pkt]);
+                       }
+                       for (uint pkt = 0; pkt < nb_deq; pkt++) {
+                               mbs[pkt]->pool = mps[i];
+                               mbs[pkt]->buf_addr = 
RTE_PTR_ADD(mbs[pkt]->buf_addr, base_addrs[i]);
+                       }
+                       uint16_t nb_tx = rte_eth_tx_burst(port_id, qid, mbs, 
nb_deq);
+                       if (nb_tx != nb_deq) {
+                               dp_stats[i].tx_drop += (nb_deq - nb_tx);
+                               rte_pktmbuf_free_bulk(&mbs[nb_tx], nb_deq - 
nb_tx);
+                       }
+               }
+       }
+}
+
+unsigned int
+lcore_id(void)
+{
+       return rte_lcore_id();
+}
diff --git a/app/io-proxy/datapath.h b/app/io-proxy/datapath.h
new file mode 100644
index 0000000000..ec5b395164
--- /dev/null
+++ b/app/io-proxy/datapath.h
@@ -0,0 +1,37 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#ifndef DATAPATH_H_INC
+#define DATAPATH_H_INC
+
+#include <stdint.h>
+
+#define MEMPOOL_OPS_NAME "proxy_mp"
+#define MAX_PORTS_SUPPORTED 8
+#define MAX_QUEUES_SUPPORTED 2
+#define MAX_SOCKETS (MAX_PORTS_SUPPORTED * MAX_QUEUES_SUPPORTED)
+
+struct rxtx_stats {
+       uint64_t rx;
+       uint64_t enq_drop;
+       uint64_t deq;
+       uint64_t tx_drop;
+};
+
+extern struct rxtx_stats dp_stats[MAX_SOCKETS];
+
+int check_mempool_ops(void);
+
+int datapath_init(const char *corelist);
+
+int listen_unix_socket(const char *path, uint64_t maxmem, uint16_t port, 
uint16_t qid);
+
+void handle_forwarding(void);
+
+unsigned int lcore_id(void);
+
+int get_next_socket(int start, const char **path, int *sock, uint64_t *maxmem,
+               uint16_t *port, uint16_t *queue, bool *connected);
+
+
+#endif
diff --git a/app/io-proxy/datapath_mp.c b/app/io-proxy/datapath_mp.c
new file mode 100644
index 0000000000..bba21a5b14
--- /dev/null
+++ b/app/io-proxy/datapath_mp.c
@@ -0,0 +1,78 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <sys/types.h>
+#include <rte_stack.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include "datapath.h"
+
+/* Mempool value "pool_config" contains pointer to base address for this 
mapping */
+/* no alloc/free etc. functions for this pool, as we never create/destroy it, 
only use
+ * enqueue and dequeue from it.
+ */
+
+static int
+proxy_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
+             unsigned int n)
+{
+       struct rte_stack *s = mp->pool_data;
+       void *offset_table[n];
+       uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+
+       for (uint i = 0; i < n; i++)
+               offset_table[i] = RTE_PTR_SUB(obj_table[i], mempool_base);
+
+       return rte_stack_push(s, offset_table, n) == 0 ? -ENOBUFS : 0;
+}
+
+static int
+proxy_mp_dequeue(struct rte_mempool *mp, void **obj_table,
+             unsigned int n)
+{
+       struct rte_stack *s = mp->pool_data;
+       uintptr_t mempool_base = (uintptr_t)mp->pool_config;
+
+       if (rte_stack_pop(s, obj_table, n) == 0)
+               return -ENOBUFS;
+       for (uint i = 0; i < n; i++) {
+               obj_table[i] = RTE_PTR_ADD(obj_table[i], mempool_base);
+               struct rte_mbuf *mb = obj_table[i];
+               mb->buf_addr = RTE_PTR_ADD(mb, sizeof(struct rte_mbuf) + 
rte_pktmbuf_priv_size(mp));
+               mb->pool = mp;
+       }
+       return 0;
+}
+
+static int
+proxy_mp_alloc(struct rte_mempool *mp __rte_unused)
+{
+       rte_panic("Should not be called\n");
+}
+
+static unsigned int
+proxy_mp_get_count(const struct rte_mempool *mp __rte_unused)
+{
+       rte_panic("Should not be called\n");
+}
+
+
+static struct rte_mempool_ops ops_proxy_mp = {
+       .name = MEMPOOL_OPS_NAME,
+       .alloc = proxy_mp_alloc,
+       .enqueue = proxy_mp_enqueue,
+       .dequeue = proxy_mp_dequeue,
+       .get_count = proxy_mp_get_count,
+};
+
+RTE_MEMPOOL_REGISTER_OPS(ops_proxy_mp);
+
+int
+check_mempool_ops(void)
+{
+       for (uint i = 0; i < rte_mempool_ops_table.num_ops; i++) {
+               if (strcmp(rte_mempool_ops_table.ops[i].name, MEMPOOL_OPS_NAME) 
== 0)
+                       return i;
+       }
+       return -1;
+}
diff --git a/app/io-proxy/main.c b/app/io-proxy/main.c
new file mode 100644
index 0000000000..82eef81fb0
--- /dev/null
+++ b/app/io-proxy/main.c
@@ -0,0 +1,71 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2023 Intel Corporation
+ */
+#include <stdio.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <rte_eal.h>
+#include <rte_common.h>
+#include <cmdline.h>
+#include <cmdline_socket.h>
+
+#include "datapath.h"
+#include "commands.h"
+
+volatile bool quit;
+volatile bool running_startup_script;
+static const char *startup_file = "dpdk-io-proxy.cmds";
+
+static void *
+run_cmdline(void *arg __rte_unused)
+{
+       struct cmdline *cl;
+       int fd = open(startup_file, O_RDONLY);
+
+       if (fd >= 0) {
+               running_startup_script = true;
+               cl = cmdline_new(ctx, "\n# ", fd, STDOUT_FILENO);
+               if (cl == NULL) {
+                       fprintf(stderr, "Error processing %s\n", startup_file);
+                       goto end_startup;
+               }
+               cmdline_interact(cl);
+               cmdline_quit(cl);
+end_startup:
+               running_startup_script = false;
+               close(fd);
+       }
+
+       cl = cmdline_stdin_new(ctx, "\nProxy>> ");
+       if (cl == NULL)
+               goto out;
+
+       cmdline_interact(cl);
+       cmdline_stdin_exit(cl);
+
+out:
+       quit = true;
+       return NULL;
+}
+
+int
+main(int argc, char *argv[])
+{
+       pthread_t cmdline_th;
+
+       if (argc != 2 || datapath_init(argv[1]) < 0) {
+               fprintf(stderr, "Usage %s <corelist>\n", 
program_invocation_short_name);
+               rte_exit(EXIT_FAILURE, "Cannot init\n");
+       }
+
+       if (pthread_create(&cmdline_th, NULL, run_cmdline, NULL) < 0)
+               rte_exit(EXIT_FAILURE, "Cannot spawn cmdline thread\n");
+       pthread_detach(cmdline_th);
+
+       while (!quit)
+               handle_forwarding();
+       return 0;
+}
diff --git a/app/io-proxy/meson.build b/app/io-proxy/meson.build
new file mode 100644
index 0000000000..f03783b68f
--- /dev/null
+++ b/app/io-proxy/meson.build
@@ -0,0 +1,12 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023 Intel Corporation
+
+cmd_h = custom_target('commands_hdr',
+        output: 'commands.h',
+        input: files('commands.list'),
+        capture: true,
+        command: [cmdline_gen_cmd, '@INPUT@']
+)
+sources += files('datapath.c', 'datapath_mp.c', 'main.c', 'command_fns.c')
+sources += cmd_h
+deps += ['cmdline', 'ethdev', 'stack', 'bus_shared_mem']
diff --git a/app/meson.build b/app/meson.build
index e4bf5c531c..27f69d883e 100644
--- a/app/meson.build
+++ b/app/meson.build
@@ -18,6 +18,7 @@ apps = [
         'dumpcap',
         'pdump',
         'proc-info',
+        'io-proxy',
         'test-acl',
         'test-bbdev',
         'test-cmdline',
-- 
2.39.2

Reply via email to