Use io_uring to read and write from TAP device. Signed-off-by: Stephen Hemminger <step...@networkplumber.org> --- drivers/net/ioring/rte_eth_ioring.c | 364 +++++++++++++++++++++++++++- 1 file changed, 363 insertions(+), 1 deletion(-)
diff --git a/drivers/net/ioring/rte_eth_ioring.c b/drivers/net/ioring/rte_eth_ioring.c index ddef57adfb..fa79bc5667 100644 --- a/drivers/net/ioring/rte_eth_ioring.c +++ b/drivers/net/ioring/rte_eth_ioring.c @@ -2,6 +2,7 @@ * Copyright (c) Stephen Hemminger */ +#include <assert.h> #include <ctype.h> #include <errno.h> #include <fcntl.h> @@ -9,8 +10,10 @@ #include <stdlib.h> #include <string.h> #include <unistd.h> +#include <liburing.h> #include <sys/ioctl.h> #include <sys/socket.h> +#include <sys/uio.h> #include <net/if.h> #include <linux/if.h> #include <linux/if_arp.h> @@ -27,6 +30,13 @@ #include <rte_kvargs.h> #include <rte_log.h> +#define IORING_DEFAULT_BURST 64 +#define IORING_NUM_BUFFERS 1024 +#define IORING_MAX_QUEUES 128 + + +static_assert(IORING_MAX_QUEUES <= RTE_MP_MAX_FD_NUM, "Max queues exceeds MP fd limit"); + #define IORING_DEFAULT_IFNAME "enio%d" #define IORING_MP_KEY "ioring_mp_send_fds" @@ -34,6 +44,20 @@ RTE_LOG_REGISTER_DEFAULT(ioring_logtype, NOTICE); #define RTE_LOGTYPE_IORING ioring_logtype #define PMD_LOG(level, ...) RTE_LOG_LINE_PREFIX(level, IORING, "%s(): ", __func__, __VA_ARGS__) +#ifdef RTE_ETHDEV_DEBUG_RX +#define PMD_RX_LOG(level, ...) \ + RTE_LOG_LINE_PREFIX(level, IORING, "%s() rx: ", __func__, __VA_ARGS__) +#else +#define PMD_RX_LOG(...) do { } while (0) +#endif + +#ifdef RTE_ETHDEV_DEBUG_TX +#define PMD_TX_LOG(level, ...) \ + RTE_LOG_LINE_PREFIX(level, IORING, "%s() tx: ", __func__, __VA_ARGS__) +#else +#define PMD_TX_LOG(...) do { } while (0) +#endif + #define IORING_IFACE_ARG "iface" #define IORING_PERSIST_ARG "persist" @@ -43,6 +67,30 @@ static const char * const valid_arguments[] = { NULL }; +struct rx_queue { + struct rte_mempool *mb_pool; /* rx buffer pool */ + struct io_uring io_ring; /* queue of posted read's */ + uint16_t port_id; + uint16_t queue_id; + + uint64_t rx_packets; + uint64_t rx_bytes; + uint64_t rx_nombuf; + uint64_t rx_errors; +}; + +struct tx_queue { + struct io_uring io_ring; + + uint16_t port_id; + uint16_t queue_id; + uint16_t free_thresh; + + uint64_t tx_packets; + uint64_t tx_bytes; + uint64_t tx_errors; +}; + struct pmd_internals { int keep_fd; /* keep alive file descriptor */ char ifname[IFNAMSIZ]; /* name assigned by kernel */ @@ -300,6 +348,15 @@ eth_dev_info(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info) dev_info->if_index = if_nametoindex(pmd->ifname); dev_info->max_mac_addrs = 1; dev_info->max_rx_pktlen = RTE_ETHER_MAX_LEN; + dev_info->max_rx_queues = IORING_MAX_QUEUES; + dev_info->max_tx_queues = IORING_MAX_QUEUES; + dev_info->min_rx_bufsize = 0; + + dev_info->default_rxportconf = (struct rte_eth_dev_portconf) { + .burst_size = IORING_DEFAULT_BURST, + .ring_size = IORING_NUM_BUFFERS, + .nb_queues = 1, + }; return 0; } @@ -311,6 +368,14 @@ eth_dev_close(struct rte_eth_dev *dev) PMD_LOG(INFO, "Closing %s", pmd->ifname); + int *fds = dev->process_private; + for (uint16_t i = 0; i < dev->data->nb_rx_queues; i++) { + if (fds[i] == -1) + continue; + close(fds[i]); + fds[i] = -1; + } + if (rte_eal_process_type() != RTE_PROC_PRIMARY) return 0; @@ -324,6 +389,296 @@ eth_dev_close(struct rte_eth_dev *dev) return 0; } +/* Setup another fd to TAP device for the queue */ +static int +eth_queue_setup(struct rte_eth_dev *dev, const char *name, uint16_t queue_id) +{ + int *fds = dev->process_private; + + if (fds[queue_id] != -1) + return 0; /* already setup */ + + struct ifreq ifr = { }; + int tap_fd = tap_open(name, &ifr, 0); + if (tap_fd < 0) { + PMD_LOG(ERR, "tap_open failed"); + return -1; + } + + PMD_LOG(DEBUG, "opened %d for queue %u", tap_fd, queue_id); + fds[queue_id] = tap_fd; + return 0; +} + +static int +eth_queue_fd(uint16_t port_id, uint16_t queue_id) +{ + struct rte_eth_dev *dev = &rte_eth_devices[port_id]; + int *fds = dev->process_private; + + return fds[queue_id]; +} + +/* setup an submit queue to read mbuf */ +static inline void +eth_rx_submit(struct rx_queue *rxq, int fd, struct rte_mbuf *mb) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&rxq->io_ring); + + if (unlikely(sqe == NULL)) { + PMD_LOG(DEBUG, "io_uring no rx sqe"); + rxq->rx_errors++; + } else { + void *base = rte_pktmbuf_mtod(mb, void *); + size_t len = mb->buf_len; + + io_uring_prep_read(sqe, fd, base, len, 0); + io_uring_sqe_set_data(sqe, mb); + } +} + +static uint16_t +eth_ioring_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + struct rx_queue *rxq = queue; + struct io_uring_cqe *cqe; + unsigned int head, num_cqe = 0; + uint16_t num_rx = 0; + uint32_t num_bytes = 0; + int fd = eth_queue_fd(rxq->port_id, rxq->queue_id); + + io_uring_for_each_cqe(&rxq->io_ring, head, cqe) { + struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data; + ssize_t len = cqe->res; + + PMD_RX_LOG(DEBUG, "cqe %u len %zd", num_cqe, len); + num_cqe++; + + if (unlikely(len < RTE_ETHER_HDR_LEN)) { + if (len < 0) + PMD_LOG(ERR, "io_uring_read: %s", strerror(-len)); + else + PMD_LOG(ERR, "io_uring_read missing hdr"); + + rxq->rx_errors++; + goto resubmit; + } + + struct rte_mbuf *nmb = rte_pktmbuf_alloc(rxq->mb_pool); + if (unlikely(nmb == 0)) { + PMD_LOG(DEBUG, "Rx mbuf alloc failed"); + ++rxq->rx_nombuf; + goto resubmit; + } + + mb->pkt_len = len; + mb->data_len = len; + mb->port = rxq->port_id; + __rte_mbuf_sanity_check(mb, 1); + + num_bytes += len; + bufs[num_rx++] = mb; + + mb = nmb; +resubmit: + eth_rx_submit(rxq, fd, mb); + + if (num_rx == nb_pkts) + break; + } + io_uring_cq_advance(&rxq->io_ring, num_cqe); + + rxq->rx_packets += num_rx; + rxq->rx_bytes += num_bytes; + return num_rx; +} + +static int +eth_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, uint16_t nb_rx_desc, + unsigned int socket_id, + const struct rte_eth_rxconf *rx_conf __rte_unused, + struct rte_mempool *mb_pool) +{ + struct pmd_internals *pmd = dev->data->dev_private; + + PMD_LOG(DEBUG, "setup port %u queue %u rx_descriptors %u", + dev->data->port_id, queue_id, nb_rx_desc); + + /* open shared tap fd maybe already setup */ + if (eth_queue_setup(dev, pmd->ifname, queue_id) < 0) + return -1; + + struct rx_queue *rxq = rte_zmalloc_socket(NULL, sizeof(*rxq), + RTE_CACHE_LINE_SIZE, socket_id); + if (rxq == NULL) { + PMD_LOG(ERR, "rxq alloc failed"); + return -1; + } + + rxq->mb_pool = mb_pool; + rxq->port_id = dev->data->port_id; + rxq->queue_id = queue_id; + dev->data->rx_queues[queue_id] = rxq; + + struct rte_mbuf *mbufs[nb_rx_desc]; + if (rte_pktmbuf_alloc_bulk(mb_pool, mbufs, nb_rx_desc) < 0) { + PMD_LOG(ERR, "Rx mbuf alloc %u bufs failed", nb_rx_desc); + return -1; + } + + if (io_uring_queue_init(nb_rx_desc, &rxq->io_ring, 0) != 0) { + PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno)); + goto error; + } + + int fd = eth_queue_fd(rxq->port_id, rxq->queue_id); + + for (uint16_t i = 0; i < nb_rx_desc; i++) { + struct rte_mbuf *mb = mbufs[i]; + + eth_rx_submit(rxq, fd, mb); + } + + io_uring_submit(&rxq->io_ring); + return 0; + +error: + rte_pktmbuf_free_bulk(mbufs, nb_rx_desc); + return -1; +} + +static void +eth_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id) +{ + struct rx_queue *rxq = dev->data->rx_queues[queue_id]; + + struct io_uring_sqe *sqe = io_uring_get_sqe(&rxq->io_ring); + if (sqe == NULL) { + PMD_LOG(ERR, "io_uring_get_sqe failed: %s", strerror(errno)); + } else { + io_uring_prep_cancel(sqe, NULL, IORING_ASYNC_CANCEL_ANY); + io_uring_submit_and_wait(&rxq->io_ring, 1); + } + + io_uring_queue_exit(&rxq->io_ring); +} + +static int +eth_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, + uint16_t nb_tx_desc, unsigned int socket_id, + const struct rte_eth_txconf *tx_conf) +{ + struct pmd_internals *pmd = dev->data->dev_private; + + /* open shared tap fd maybe already setup */ + if (eth_queue_setup(dev, pmd->ifname, queue_id) < 0) + return -1; + + struct tx_queue *txq = rte_zmalloc_socket(NULL, sizeof(*txq), RTE_CACHE_LINE_SIZE, socket_id); + if (txq == NULL) { + PMD_LOG(ERR, "txq alloc failed"); + return -1; + } + + txq->port_id = dev->data->port_id; + txq->queue_id = queue_id; + txq->free_thresh = tx_conf->tx_free_thresh; + dev->data->tx_queues[queue_id] = txq; + + if (io_uring_queue_init(nb_tx_desc, &txq->io_ring, 0) != 0) { + PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno)); + return -1; + } + + return 0; +} + +static void +eth_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id) +{ + struct tx_queue *txq = dev->data->tx_queues[queue_id]; + + struct io_uring_sqe *sqe = io_uring_get_sqe(&txq->io_ring); + if (sqe == NULL) { + PMD_LOG(ERR, "io_uring_get_sqe failed: %s", strerror(errno)); + } else { + io_uring_prep_cancel(sqe, NULL, IORING_ASYNC_CANCEL_ANY); + io_uring_submit_and_wait(&txq->io_ring, 1); + } + + io_uring_queue_exit(&txq->io_ring); +} + +static void +eth_ioring_tx_cleanup(struct tx_queue *txq) +{ + struct io_uring_cqe *cqe; + unsigned int head; + unsigned int tx_done = 0; + uint64_t tx_bytes = 0; + + io_uring_for_each_cqe(&txq->io_ring, head, cqe) { + struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data; + + PMD_TX_LOG(DEBUG, " mbuf len %u result: %d", mb->pkt_len, cqe->res); + if (unlikely(cqe->res < 0)) { + ++txq->tx_errors; + } else { + ++tx_done; + tx_bytes += mb->pkt_len; + } + + rte_pktmbuf_free(mb); + } + io_uring_cq_advance(&txq->io_ring, tx_done); + + txq->tx_packets += tx_done; + txq->tx_bytes += tx_bytes; +} + +static uint16_t +eth_ioring_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + struct tx_queue *txq = queue; + uint16_t num_tx; + + if (unlikely(nb_pkts == 0)) + return 0; + + PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); + + if (io_uring_sq_space_left(&txq->io_ring) < txq->free_thresh) + eth_ioring_tx_cleanup(txq); + + int fd = eth_queue_fd(txq->port_id, txq->queue_id); + + for (num_tx = 0; num_tx < nb_pkts; num_tx++) { + struct rte_mbuf *mb = bufs[num_tx]; + + struct io_uring_sqe *sqe = io_uring_get_sqe(&txq->io_ring); + if (sqe == NULL) + break; /* submit ring is full */ + + io_uring_sqe_set_data(sqe, mb); + + if (rte_mbuf_refcnt_read(mb) == 1 && + RTE_MBUF_DIRECT(mb) && mb->nb_segs == 1) { + void *base = rte_pktmbuf_mtod(mb, void *); + io_uring_prep_write(sqe, fd, base, mb->pkt_len, 0); + + PMD_TX_LOG(DEBUG, "tx mbuf: %p submit", mb); + } else { + PMD_LOG(ERR, "Can't do mbuf without space yet!"); + ++txq->tx_errors; + continue; + } + } + if (num_tx > 0) + io_uring_submit(&txq->io_ring); + + return num_tx; +} + static const struct eth_dev_ops ops = { .dev_start = eth_dev_start, .dev_stop = eth_dev_stop, @@ -339,9 +694,12 @@ static const struct eth_dev_ops ops = { .promiscuous_disable = eth_dev_promiscuous_disable, .allmulticast_enable = eth_dev_allmulticast_enable, .allmulticast_disable = eth_dev_allmulticast_disable, + .rx_queue_setup = eth_rx_queue_setup, + .rx_queue_release = eth_rx_queue_release, + .tx_queue_setup = eth_tx_queue_setup, + .tx_queue_release = eth_tx_queue_release, }; - static int ioring_create(struct rte_eth_dev *dev, const char *tap_name, uint8_t persist) { @@ -379,6 +737,10 @@ ioring_create(struct rte_eth_dev *dev, const char *tap_name, uint8_t persist) } PMD_LOG(DEBUG, "%s setup", ifr.ifr_name); + + dev->rx_pkt_burst = eth_ioring_rx; + dev->tx_pkt_burst = eth_ioring_tx; + return 0; error: -- 2.45.2