Introduce a separate data structure (rte_eth_queue_local)
to store local to given process (i.e. no-shareable) information
for each configured rx/tx queue.
Memory for that structure is allocated/freed dynamically during
rte_eth_dev_configure().
Put a placeholder pointers for queue specific (rx|tx)_pkt_burst(),
tx_pkt_prepare() functions inside that structure.
Move rx/tx callback related information inside that structure.
That introduces a change in current behavior: all callbacks for
un-configured queues will be automatically removed.
Let say: rte_eth_dev_configure(port, 0, 0, ...);
would wipe out all installed callbacks for that port.


Signed-off-by: Konstantin Ananyev <konstantin.anan...@intel.com>
---
 lib/librte_ether/rte_ethdev.c | 228 +++++++++++++++++++++++++++++-------------
 lib/librte_ether/rte_ethdev.h |  85 +++++++++++-----
 2 files changed, 216 insertions(+), 97 deletions(-)

diff --git a/lib/librte_ether/rte_ethdev.c b/lib/librte_ether/rte_ethdev.c
index 318af2869..ff8571d60 100644
--- a/lib/librte_ether/rte_ethdev.c
+++ b/lib/librte_ether/rte_ethdev.c
@@ -241,6 +241,14 @@ rte_eth_dev_allocate(const char *name)
        return eth_dev;
 }
 
+static struct rte_eth_queue_local *
+alloc_queue_local(uint16_t nb_queues)
+{
+       struct rte_eth_queue_local *ql;
+       ql = rte_zmalloc(NULL, sizeof(ql[0]) * nb_queues, RTE_CACHE_LINE_SIZE);
+       return ql;
+}
+
 /*
  * Attach to a port already registered by the primary process, which
  * makes sure that the same device would have the same port id both
@@ -269,6 +277,16 @@ rte_eth_dev_attach_secondary(const char *name)
        eth_dev = eth_dev_get(i);
        RTE_ASSERT(eth_dev->data->port_id == i);
 
+       /*
+        * obviously it is quite error prone:
+        * if the primary process will decide to (re)configure device later,
+        * there is no way for secondary one to notice that and update
+        * it's local data (queue_local, rx|tx_pkt_burst, etc.).
+        * We probably need an eth_dev_configure_secondary() or so.
+        */
+       eth_dev->rx_ql = alloc_queue_local(eth_dev->data->nb_rx_queues);
+       eth_dev->tx_ql = alloc_queue_local(eth_dev->data->nb_tx_queues);
+
        return eth_dev;
 }
 
@@ -444,52 +462,92 @@ rte_eth_dev_detach(uint16_t port_id, char *name)
        return ret;
 }
 
+/*
+ * Helper routine - removes all registered RX/TX queue callbacks
+ * from the FIFO list.
+ * It is a caller responsibility to make sure no actual RX/TX happens
+ * simultanesoulsy on that queue.
+ */
+static void
+free_rxtx_cbs(struct rte_eth_rxtx_callback *cbs)
+{
+       struct rte_eth_rxtx_callback *cb, *next;
+
+       for (cb = cbs; cb != NULL; cb = next) {
+               next = cb->next;
+               rte_free(cb);
+       }
+}
+
+static void
+reset_queue_local(struct rte_eth_queue_local *ql)
+{
+       free_rxtx_cbs(ql->cbs);
+       memset(ql, 0, sizeof(*ql));
+}
+
 static int
 rte_eth_dev_rx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 {
-       uint16_t old_nb_queues = dev->data->nb_rx_queues;
+       uint32_t diff, i, old_nb_queues;
+       struct rte_eth_queue_local *rlq;
        void **rxq;
-       unsigned i;
 
-       if (dev->data->rx_queues == NULL && nb_queues != 0) { /* first time 
configuration */
-               dev->data->rx_queues = rte_zmalloc("ethdev->rx_queues",
-                               sizeof(dev->data->rx_queues[0]) * nb_queues,
-                               RTE_CACHE_LINE_SIZE);
-               if (dev->data->rx_queues == NULL) {
-                       dev->data->nb_rx_queues = 0;
-                       return -(ENOMEM);
-               }
-       } else if (dev->data->rx_queues != NULL && nb_queues != 0) { /* 
re-configure */
-               RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, 
-ENOTSUP);
+       old_nb_queues = dev->data->nb_rx_queues;
+
+       /* same # of queues nothing need to be done */
+       if (nb_queues == old_nb_queues)
+               return 0;
+
+       rxq = dev->data->rx_queues;
+       rlq = dev->rx_ql;
 
-               rxq = dev->data->rx_queues;
+       /* reduce # of queues */
+       if (old_nb_queues > nb_queues) {
 
-               for (i = nb_queues; i < old_nb_queues; i++)
+               RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release,
+                       -ENOTSUP);
+
+               dev->data->nb_rx_queues = nb_queues;
+
+               /* free resources used by the queues we are going to remove */
+               for (i = nb_queues; i < old_nb_queues; i++) {
+                       reset_queue_local(rlq + i);
                        (*dev->dev_ops->rx_queue_release)(rxq[i]);
-               rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues,
-                               RTE_CACHE_LINE_SIZE);
-               if (rxq == NULL)
-                       return -(ENOMEM);
-               if (nb_queues > old_nb_queues) {
-                       uint16_t new_qs = nb_queues - old_nb_queues;
-
-                       memset(rxq + old_nb_queues, 0,
-                               sizeof(rxq[0]) * new_qs);
+                       rxq[i] = 0;
                }
 
-               dev->data->rx_queues = rxq;
+               if (nb_queues == 0) {
+                       dev->data->rx_queues = NULL;
+                       dev->rx_ql = NULL;
+                       rte_free(rxq);
+                       rte_free(rlq);
+               }
+
+               return 0;
+       }
 
-       } else if (dev->data->rx_queues != NULL && nb_queues == 0) {
-               RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->rx_queue_release, 
-ENOTSUP);
+       /* increase # of queues */
 
-               rxq = dev->data->rx_queues;
+       diff = nb_queues - old_nb_queues;
 
-               for (i = nb_queues; i < old_nb_queues; i++)
-                       (*dev->dev_ops->rx_queue_release)(rxq[i]);
+       rxq = rte_realloc(rxq, sizeof(rxq[0]) * nb_queues,
+                       RTE_CACHE_LINE_SIZE);
+       rlq = rte_realloc(rlq, sizeof(rlq[0]) * nb_queues,
+                       RTE_CACHE_LINE_SIZE);
 
-               rte_free(dev->data->rx_queues);
-               dev->data->rx_queues = NULL;
+       if (rxq != NULL) {
+               memset(rxq + old_nb_queues, 0, sizeof(rxq[0]) * diff);
+               dev->data->rx_queues = rxq;
+       }
+       if (rlq != NULL) {
+               memset(rlq + old_nb_queues, 0, sizeof(rlq[0]) * diff);
+               dev->rx_ql = rlq;
        }
+
+       if (rxq == NULL || rlq == NULL)
+               return -ENOMEM;
+
        dev->data->nb_rx_queues = nb_queues;
        return 0;
 }
@@ -601,49 +659,65 @@ rte_eth_dev_tx_queue_stop(uint16_t port_id, uint16_t 
tx_queue_id)
 static int
 rte_eth_dev_tx_queue_config(struct rte_eth_dev *dev, uint16_t nb_queues)
 {
-       uint16_t old_nb_queues = dev->data->nb_tx_queues;
+       uint32_t diff, i, old_nb_queues;
+       struct rte_eth_queue_local *tlq;
        void **txq;
-       unsigned i;
 
-       if (dev->data->tx_queues == NULL && nb_queues != 0) { /* first time 
configuration */
-               dev->data->tx_queues = rte_zmalloc("ethdev->tx_queues",
-                                                  
sizeof(dev->data->tx_queues[0]) * nb_queues,
-                                                  RTE_CACHE_LINE_SIZE);
-               if (dev->data->tx_queues == NULL) {
-                       dev->data->nb_tx_queues = 0;
-                       return -(ENOMEM);
-               }
-       } else if (dev->data->tx_queues != NULL && nb_queues != 0) { /* 
re-configure */
-               RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, 
-ENOTSUP);
+       old_nb_queues = dev->data->nb_tx_queues;
+
+       /* same # of queues nothing need to be done */
+       if (nb_queues == old_nb_queues)
+               return 0;
+
+       txq = dev->data->tx_queues;
+       tlq = dev->tx_ql;
+
+       /* reduce # of queues */
+       if (old_nb_queues > nb_queues) {
+
+               RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release,
+                       -ENOTSUP);
 
-               txq = dev->data->tx_queues;
+               dev->data->nb_tx_queues = nb_queues;
 
-               for (i = nb_queues; i < old_nb_queues; i++)
+               /* free resources used by the queues we are going to remove */
+               for (i = nb_queues; i < old_nb_queues; i++) {
+                       reset_queue_local(tlq + i);
                        (*dev->dev_ops->tx_queue_release)(txq[i]);
-               txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
-                                 RTE_CACHE_LINE_SIZE);
-               if (txq == NULL)
-                       return -ENOMEM;
-               if (nb_queues > old_nb_queues) {
-                       uint16_t new_qs = nb_queues - old_nb_queues;
-
-                       memset(txq + old_nb_queues, 0,
-                              sizeof(txq[0]) * new_qs);
+                       txq[i] = 0;
                }
 
-               dev->data->tx_queues = txq;
+               if (nb_queues == 0) {
+                       dev->data->tx_queues = NULL;
+                       dev->tx_ql = NULL;
+                       rte_free(txq);
+                       rte_free(tlq);
+               }
+
+               return 0;
+       }
 
-       } else if (dev->data->tx_queues != NULL && nb_queues == 0) {
-               RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->tx_queue_release, 
-ENOTSUP);
+       /* increase # of queues */
 
-               txq = dev->data->tx_queues;
+       diff = nb_queues - old_nb_queues;
 
-               for (i = nb_queues; i < old_nb_queues; i++)
-                       (*dev->dev_ops->tx_queue_release)(txq[i]);
+       txq = rte_realloc(txq, sizeof(txq[0]) * nb_queues,
+                       RTE_CACHE_LINE_SIZE);
+       tlq = rte_realloc(tlq, sizeof(tlq[0]) * nb_queues,
+                       RTE_CACHE_LINE_SIZE);
 
-               rte_free(dev->data->tx_queues);
-               dev->data->tx_queues = NULL;
+       if (txq != NULL) {
+               memset(txq + old_nb_queues, 0, sizeof(txq[0]) * diff);
+               dev->data->tx_queues = txq;
+       }
+       if (tlq != NULL) {
+               memset(tlq + old_nb_queues, 0, sizeof(tlq[0]) * diff);
+               dev->tx_ql = tlq;
        }
+
+       if (txq == NULL || tlq == NULL)
+               return -ENOMEM;
+
        dev->data->nb_tx_queues = nb_queues;
        return 0;
 }
@@ -1084,6 +1158,7 @@ void
 rte_eth_dev_close(uint16_t port_id)
 {
        struct rte_eth_dev *dev;
+       uint32_t i;
 
        RTE_ETH_VALID_PORTID_OR_RET(port_id);
        dev = &rte_eth_devices[port_id];
@@ -1092,12 +1167,25 @@ rte_eth_dev_close(uint16_t port_id)
        dev->data->dev_started = 0;
        (*dev->dev_ops->dev_close)(dev);
 
+       /* free local resources for RX/TX queues */
+
+       for (i = 0; i != dev->data->nb_rx_queues; i++)
+               reset_queue_local(dev->rx_ql + i);
+
+       for (i = 0; i != dev->data->nb_tx_queues; i++)
+               reset_queue_local(dev->tx_ql + i);
+
        dev->data->nb_rx_queues = 0;
        rte_free(dev->data->rx_queues);
        dev->data->rx_queues = NULL;
+       rte_free(dev->rx_ql);
+       dev->rx_ql = NULL;
+
        dev->data->nb_tx_queues = 0;
        rte_free(dev->data->tx_queues);
        dev->data->tx_queues = NULL;
+       rte_free(dev->tx_ql);
+       dev->tx_ql = NULL;
 }
 
 int
@@ -3111,10 +3199,10 @@ rte_eth_add_rx_callback(uint16_t port_id, uint16_t 
queue_id,
        rte_spinlock_lock(&rte_eth_rx_cb_lock);
        /* Add the callbacks in fifo order. */
        struct rte_eth_rxtx_callback *tail =
-               rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+               rte_eth_devices[port_id].rx_ql[queue_id].cbs;
 
        if (!tail) {
-               rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+               rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
 
        } else {
                while (tail->next)
@@ -3153,9 +3241,9 @@ rte_eth_add_first_rx_callback(uint16_t port_id, uint16_t 
queue_id,
 
        rte_spinlock_lock(&rte_eth_rx_cb_lock);
        /* Add the callbacks at fisrt position*/
-       cb->next = rte_eth_devices[port_id].post_rx_burst_cbs[queue_id];
+       cb->next = rte_eth_devices[port_id].rx_ql[queue_id].cbs;
        rte_smp_wmb();
-       rte_eth_devices[port_id].post_rx_burst_cbs[queue_id] = cb;
+       rte_eth_devices[port_id].rx_ql[queue_id].cbs = cb;
        rte_spinlock_unlock(&rte_eth_rx_cb_lock);
 
        return cb;
@@ -3189,10 +3277,10 @@ rte_eth_add_tx_callback(uint16_t port_id, uint16_t 
queue_id,
        rte_spinlock_lock(&rte_eth_tx_cb_lock);
        /* Add the callbacks in fifo order. */
        struct rte_eth_rxtx_callback *tail =
-               rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id];
+               rte_eth_devices[port_id].tx_ql[queue_id].cbs;
 
        if (!tail) {
-               rte_eth_devices[port_id].pre_tx_burst_cbs[queue_id] = cb;
+               rte_eth_devices[port_id].tx_ql[queue_id].cbs = cb;
 
        } else {
                while (tail->next)
@@ -3223,7 +3311,7 @@ rte_eth_remove_rx_callback(uint16_t port_id, uint16_t 
queue_id,
        int ret = -EINVAL;
 
        rte_spinlock_lock(&rte_eth_rx_cb_lock);
-       prev_cb = &dev->post_rx_burst_cbs[queue_id];
+       prev_cb = &dev->rx_ql[queue_id].cbs;
        for (; *prev_cb != NULL; prev_cb = &cb->next) {
                cb = *prev_cb;
                if (cb == user_cb) {
@@ -3257,7 +3345,7 @@ rte_eth_remove_tx_callback(uint16_t port_id, uint16_t 
queue_id,
        struct rte_eth_rxtx_callback **prev_cb;
 
        rte_spinlock_lock(&rte_eth_tx_cb_lock);
-       prev_cb = &dev->pre_tx_burst_cbs[queue_id];
+       prev_cb = &dev->tx_ql[queue_id].cbs;
        for (; *prev_cb != NULL; prev_cb = &cb->next) {
                cb = *prev_cb;
                if (cb == user_cb) {
diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 341c2d624..d62e1bcc3 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1694,6 +1694,10 @@ typedef uint16_t (*rte_tx_callback_fn)(uint16_t port, 
uint16_t queue,
  * @internal
  * Structure used to hold information about the callbacks to be called for a
  * queue on RX and TX.
+ * On RX: user-supplied functions called from rte_eth_rx_burst to post-process
+ * received packets before passing them to the user
+ * On TX: user-supplied functions called from rte_eth_tx_burst to pre-process
+ * received packets before passing them to the driver for transmission.
  */
 struct rte_eth_rxtx_callback {
        struct rte_eth_rxtx_callback *next;
@@ -1715,6 +1719,24 @@ enum rte_eth_dev_state {
 
 /**
  * @internal
+ * Structure to hold process non-shareable information
+ * about RX/TX queue of the ethernet device.
+ */
+struct rte_eth_queue_local {
+       /**
+        * placeholder for queue specific rx/tx and tx_preapare
+        * functions pointers
+        */
+       eth_rx_burst_t rx_pkt_burst; /**< receive function pointer. */
+       eth_tx_burst_t tx_pkt_burst; /**< transmit function pointer. */
+       eth_tx_prep_t tx_pkt_prepare; /**< transmit prepare function pointer. */
+
+       struct rte_eth_rxtx_callback *cbs;
+       /**< list of user supplied callbacks */
+} __rte_cache_aligned;
+
+/**
+ * @internal
  * The generic data structure associated with each ethernet device.
  *
  * Pointers to burst-oriented packet receive and transmit functions are
@@ -1730,19 +1752,13 @@ struct rte_eth_dev {
        struct rte_eth_dev_data *data;  /**< Pointer to device data */
        const struct eth_dev_ops *dev_ops; /**< Functions exported by PMD */
        struct rte_device *device; /**< Backing device */
+
+       struct rte_eth_queue_local *rx_ql;  /**< RX queues local data */
+       struct rte_eth_queue_local *tx_ql;  /**< TX queues local data */
+
        struct rte_intr_handle *intr_handle; /**< Device interrupt handle */
        /** User application callbacks for NIC interrupts */
        struct rte_eth_dev_cb_list link_intr_cbs;
-       /**
-        * User-supplied functions called from rx_burst to post-process
-        * received packets before passing them to the user
-        */
-       struct rte_eth_rxtx_callback 
*post_rx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
-       /**
-        * User-supplied functions called from tx_burst to pre-process
-        * received packets before passing them to the driver for transmission.
-        */
-       struct rte_eth_rxtx_callback *pre_tx_burst_cbs[RTE_MAX_QUEUES_PER_PORT];
        enum rte_eth_dev_state state; /**< Flag indicating the port state */
        void *security_ctx; /**< Context for security ops */
 } __rte_cache_aligned;
@@ -2883,29 +2899,37 @@ static inline uint16_t
 rte_eth_rx_burst(uint16_t port_id, uint16_t queue_id,
                 struct rte_mbuf **rx_pkts, const uint16_t nb_pkts)
 {
+       uint16_t nb_rx;
        struct rte_eth_dev *dev = &rte_eth_devices[port_id];
 
 #ifdef RTE_LIBRTE_ETHDEV_DEBUG
        RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
        RTE_FUNC_PTR_OR_ERR_RET(*dev->rx_pkt_burst, 0);
 
-       if (queue_id >= dev->data->nb_rx_queues) {
+       if (queue_id >= dev->data->nb_rx_queues || dev->rx_ql == NULL) {
                RTE_PMD_DEBUG_TRACE("Invalid RX queue_id=%d\n", queue_id);
                return 0;
        }
 #endif
-       int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
+       nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id],
                        rx_pkts, nb_pkts);
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-       struct rte_eth_rxtx_callback *cb = dev->post_rx_burst_cbs[queue_id];
+       {
+               struct rte_eth_queue_local *ql;
+               struct rte_eth_rxtx_callback *cb;
 
-       if (unlikely(cb != NULL)) {
-               do {
-                       nb_rx = cb->fn.rx(port_id, queue_id, rx_pkts, nb_rx,
+               ql = dev->rx_ql + queue_id;
+               cb = ql->cbs;
+
+               if (unlikely(cb != NULL)) {
+                       do {
+                               nb_rx = cb->fn.rx(port_id, queue_id,
+                                               rx_pkts, nb_rx,
                                                nb_pkts, cb->param);
-                       cb = cb->next;
-               } while (cb != NULL);
+                               cb = cb->next;
+                       } while (cb != NULL);
+               }
        }
 #endif
 
@@ -3151,21 +3175,28 @@ rte_eth_tx_burst(uint16_t port_id, uint16_t queue_id,
        RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, 0);
        RTE_FUNC_PTR_OR_ERR_RET(*dev->tx_pkt_burst, 0);
 
-       if (queue_id >= dev->data->nb_tx_queues) {
+       if (queue_id >= dev->data->nb_tx_queues || dev->tx_ql == NULL) {
                RTE_PMD_DEBUG_TRACE("Invalid TX queue_id=%d\n", queue_id);
                return 0;
        }
 #endif
 
 #ifdef RTE_ETHDEV_RXTX_CALLBACKS
-       struct rte_eth_rxtx_callback *cb = dev->pre_tx_burst_cbs[queue_id];
-
-       if (unlikely(cb != NULL)) {
-               do {
-                       nb_pkts = cb->fn.tx(port_id, queue_id, tx_pkts, nb_pkts,
-                                       cb->param);
-                       cb = cb->next;
-               } while (cb != NULL);
+       {
+               struct rte_eth_queue_local *ql;
+               struct rte_eth_rxtx_callback *cb;
+
+               ql = dev->tx_ql + queue_id;
+               cb = ql->cbs;
+
+               if (unlikely(cb != NULL)) {
+                       do {
+                               nb_pkts = cb->fn.tx(port_id, queue_id,
+                                               tx_pkts, nb_pkts,
+                                               cb->param);
+                               cb = cb->next;
+                       } while (cb != NULL);
+               }
        }
 #endif
 
-- 
2.13.5

Reply via email to