Many sample apps include internal buffering for single-packet-at-a-time
operation. Since this is such a common paradigm, this functionality is
better suited to being inside the core ethdev API.
The new APIs include three functions:
* rte_eth_tx_buffer - buffer up a single packet for future transmission
* rte_eth_tx_buffer_flush - flush any unsent buffered packets
* rte_eth_tx_buffer_set_err_callback - set up a callback to be called in
  case transmitting a buffered burst fails. By default, we just free the
  unsent packets.
---
 config/common_bsdapp          |  1 +
 config/common_linuxapp        |  1 +
 lib/librte_ether/rte_ethdev.c | 55 +++++++++++++++++++++++++++++++++++--
 lib/librte_ether/rte_ethdev.h | 63 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 118 insertions(+), 2 deletions(-)

diff --git a/config/common_bsdapp b/config/common_bsdapp
index 989e1da..98da1f5 100644
--- a/config/common_bsdapp
+++ b/config/common_bsdapp
@@ -138,6 +138,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
 CONFIG_RTE_MAX_ETHPORTS=32
 CONFIG_RTE_LIBRTE_IEEE1588=n
 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32

 #
 # Compile burst-oriented IGB & EM PMD drivers
diff --git a/config/common_linuxapp b/config/common_linuxapp
index 5b896c3..0f509d0 100644
--- a/config/common_linuxapp
+++ b/config/common_linuxapp
@@ -161,6 +161,7 @@ CONFIG_RTE_LIBRTE_ETHDEV_DEBUG=n
 CONFIG_RTE_MAX_ETHPORTS=32
 CONFIG_RTE_LIBRTE_IEEE1588=n
 CONFIG_RTE_ETHDEV_QUEUE_STAT_CNTRS=16
+CONFIG_RTE_ETHDEV_TX_BUFSIZE=32

 #
 # Support NIC bypass logic
diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 7256841..68d4d22 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -397,11 +397,32 @@ rte_eth_dev_tx_queue_stop(uint8_t port_id, uint16_t 
tx_queue_id)

 }

+static void
+free_unsent_pkts(struct rte_mbuf **pkts, uint16_t unsent,
+               void *userdata __rte_unused)
+{
+       unsigned i;
+       for (i = 0; i < unsent; i++)
+               rte_pktmbuf_free(pkts[i]);
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+               buffer_tx_error_fn cbfn, void *userdata)
+{
+       struct rte_eth_dev_data *dev_data = rte_eth_devices[port_id].data;
+       struct rte_eth_dev_tx_buffer *buf = dev_data->tx_queues[queue_id];
+
+       buf->userdata = userdata;
+       buf->flush_cb = cbfn;
+}
+
 static int
 rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 {
        uint16_t old_nb_queues = dev->data->nb_tx_queues;
        void **txq;
+       struct rte_eth_dev_tx_buffer *new_bufs;
        unsigned i;

        if (dev->data->tx_queues == NULL) { /* first time configuration */
@@ -412,24 +433,54 @@ rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, 
uint16_t nb_queues)
                        dev->data->nb_tx_queues = 0;
                        return -(ENOMEM);
                }
+
+               dev->data->txq_bufs = rte_zmalloc("ethdev->txq_bufs",
+                               sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+               if (dev->data->txq_bufs == NULL) {
+                       dev->data->nb_tx_queues = 0;
+                       rte_free(dev->data->tx_queues);
+                       return -(ENOMEM);
+               }
+               for (i = 0; i < nb_queues; i++ )
+                       dev->data->txq_bufs[i].flush_cb = free_unsent_pkts;
        } else { /* re-configure */
+
+               /* flush the packets queued for all queues*/
+               for (i = 0; i < old_nb_queues; i++)
+                       rte_eth_tx_buffer_flush(dev->data->port_id, i);
+
                FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, -ENOTSUP);

+               /* get new buffer space first, but keep old space around */
+               new_bufs = rte_zmalloc("ethdev->txq_bufs",
+                               sizeof(*dev->data->txq_bufs) * nb_queues, 0);
+               if (new_bufs == NULL)
+                       return -(ENOMEM);
+
                txq = dev->data->tx_queues;

                for (i = nb_queues; i < old_nb_queues; i++)
                        (*dev->dev_ops->tx_queue_release)(txq[i]);
                txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
                                CACHE_LINE_SIZE);
-               if (txq == NULL)
+               if (txq == NULL) {
+                       rte_free(new_bufs);
                        return -(ENOMEM);
+               }

-               if (nb_queues > old_nb_queues)
+               if (nb_queues > old_nb_queues) {
                        memset(txq + old_nb_queues, 0,
                                sizeof(txq[0]) * (nb_queues - old_nb_queues));
+                       for (i = old_nb_queues; i < nb_queues; i++)
+                               dev->data->txq_bufs[i].flush_cb =
+                                               free_unsent_pkts;
+               }

                dev->data->tx_queues = txq;

+               /* now replace old buffers with new */
+               rte_free(dev->data->txq_bufs);
+               dev->data->txq_bufs = new_bufs;
        }
        dev->data->nb_tx_queues = nb_queues;
        return (0);
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 2406e45..0ec7076 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -176,6 +176,7 @@ extern "C" {
 #include <rte_interrupts.h>
 #include <rte_pci.h>
 #include <rte_mbuf.h>
+#include <rte_branch_prediction.h>
 #include "rte_ether.h"

 /**
@@ -1497,6 +1498,16 @@ struct rte_eth_dev_sriov {
 };
 #define RTE_ETH_DEV_SRIOV(dev)         ((dev)->data->sriov)

+typedef void (*buffer_tx_error_fn)(struct rte_mbuf **unsent, uint16_t count,
+               void *userdata);
+
+struct rte_eth_dev_tx_buffer {
+       struct rte_mbuf *pkts[RTE_ETHDEV_TX_BUFSIZE];
+       unsigned nb_pkts;
+       buffer_tx_error_fn flush_cb; /* callback for when tx_burst fails */
+       void *userdata;              /* userdata for callback */
+};
+
 /**
  * @internal
  * The data part, with no function pointers, associated with each ethernet 
device.
@@ -1534,6 +1545,9 @@ struct rte_eth_dev_data {
                scattered_rx : 1,  /**< RX of scattered packets is ON(1) / 
OFF(0) */
                all_multicast : 1, /**< RX all multicast mode ON(1) / OFF(0). */
                dev_started : 1;   /**< Device state: STARTED(1) / STOPPED(0). 
*/
+
+       struct rte_eth_dev_tx_buffer
+                       *txq_bufs; /**< space to allow buffered transmits */
 };

 /**
@@ -2426,6 +2440,55 @@ rte_eth_tx_burst(uint8_t port_id, uint16_t queue_id,
 }
 #endif

+static inline uint16_t
+rte_eth_tx_buffer(uint8_t port_id, uint16_t queue_id, struct rte_mbuf *tx_pkt)
+{
+       struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+       struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+       qbuf->pkts[qbuf->nb_pkts++] = tx_pkt;
+       if (qbuf->nb_pkts < RTE_ETHDEV_TX_BUFSIZE)
+               return 0;
+
+       const uint16_t sent = (*dev->tx_pkt_burst)(
+                       dev->data->tx_queues[queue_id], qbuf->pkts,
+                       RTE_ETHDEV_TX_BUFSIZE);
+
+       qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+                           * callback below */
+       if (unlikely(sent != RTE_ETHDEV_TX_BUFSIZE))
+               qbuf->flush_cb(&qbuf->pkts[sent], RTE_ETHDEV_TX_BUFSIZE - sent,
+                               qbuf->userdata);
+
+       return sent;
+}
+
+static inline uint16_t
+rte_eth_tx_buffer_flush(uint8_t port_id, uint16_t queue_id)
+{
+       struct rte_eth_dev *dev = &rte_eth_devices[port_id];
+       struct rte_eth_dev_tx_buffer *qbuf = &dev->data->txq_bufs[queue_id];
+
+       if (qbuf->nb_pkts != 0)
+               return 0;
+
+       const uint16_t to_send = qbuf->nb_pkts;
+       const uint16_t sent = (*dev->tx_pkt_burst)(
+                       dev->data->tx_queues[queue_id], qbuf->pkts, to_send);
+
+       qbuf->nb_pkts = 0; /* all packets sent, or to be dealt with by
+                           * callback below */
+       if (unlikely(sent != qbuf->nb_pkts))
+               qbuf->flush_cb(&qbuf->pkts[sent], to_send - sent,
+                               qbuf->userdata);
+
+       return sent;
+}
+
+void
+rte_eth_tx_buffer_set_err_callback(uint8_t port_id, uint16_t queue_id,
+               buffer_tx_error_fn cbfn, void *userdata);
+
 /**
  * Setup a new signature filter rule on an Ethernet device
  *
-- 
1.9.3

Reply via email to