From: Eric Kinzie <ekin...@brocade.com>

In the event that the bonding device has a greater number of tx and/or rx
queues than the slave being added, track the queue limits of the slave.
On receive, ignore queue identifiers beyond what the slave interface
can support.  During transmit, pick a different queue id to use if the
intended queue is not available on the slave.

Signed-off-by: Eric Kinzie <ekinzie at brocade.com>
Signed-off-by: Stephen Hemminger <stephen at networkplumber.org>
---
 drivers/net/bonding/rte_eth_bond_api.c     |   6 +-
 drivers/net/bonding/rte_eth_bond_pmd.c     | 141 +++++++++++++++++++++++++----
 drivers/net/bonding/rte_eth_bond_private.h |   5 +-
 3 files changed, 129 insertions(+), 23 deletions(-)

diff --git a/drivers/net/bonding/rte_eth_bond_api.c 
b/drivers/net/bonding/rte_eth_bond_api.c
index 630a461..64058ff 100644
--- a/drivers/net/bonding/rte_eth_bond_api.c
+++ b/drivers/net/bonding/rte_eth_bond_api.c
@@ -340,11 +340,11 @@ __eth_bond_slave_add_lock_free(uint8_t bonded_port_id, 
uint8_t slave_port_id)

        slave_eth_dev = &rte_eth_devices[slave_port_id];

-       /* Add slave details to bonded device */
-       slave_add(internals, slave_eth_dev);
-
        rte_eth_dev_info_get(slave_port_id, &dev_info);

+       /* Add slave details to bonded device */
+       slave_add(internals, slave_eth_dev, &dev_info);
+
        /* We need to store slaves reta_size to be able to synchronize RETA for 
all
         * slave devices even if its sizes are different.
         */
diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c 
b/drivers/net/bonding/rte_eth_bond_pmd.c
index 77582dd..868e66b 100644
--- a/drivers/net/bonding/rte_eth_bond_pmd.c
+++ b/drivers/net/bonding/rte_eth_bond_pmd.c
@@ -76,6 +76,47 @@ get_vlan_offset(struct ether_hdr *eth_hdr, uint16_t *proto)
        return vlan_offset;
 }

+static uint8_t
+bond_active_slaves_by_rxqid(struct bond_dev_private *internals, int queue_id,
+               uint8_t slaves[])
+{
+       struct bond_slave_details *slave_details;
+       uint8_t num_of_slaves;
+       uint8_t i = 0;
+
+       num_of_slaves = internals->active_slave_count;
+       memcpy(slaves, internals->active_slaves,
+                       sizeof(internals->active_slaves[0]) * num_of_slaves);
+
+       if (num_of_slaves < 1 || internals->kvlist)
+               return num_of_slaves;
+
+       /* remove slaves that don't have a queue numbered "queue_id" */
+       while (i < num_of_slaves) {
+               slave_details = &internals->slaves[i];
+               if (unlikely(queue_id >= slave_details->nb_rx_queues)) {
+                       slaves[i] = slaves[num_of_slaves-1];
+                       num_of_slaves--;
+               } else
+                       i++;
+       }
+
+       return num_of_slaves;
+}
+
+static int
+bond_slave_txqid(struct bond_dev_private *internals, uint8_t slave_id,
+               int queue_id)
+{
+       struct bond_slave_details *slave_details;
+
+       if (internals->kvlist)
+               return queue_id;
+
+       slave_details = &internals->slaves[slave_id];
+       return queue_id % slave_details->nb_tx_queues;
+}
+
 static uint16_t
 bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
@@ -83,6 +124,8 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, 
uint16_t nb_pkts)

        uint16_t num_rx_slave = 0;
        uint16_t num_rx_total = 0;
+       uint8_t slaves[RTE_MAX_ETHPORTS];
+       uint8_t num_of_slaves;

        int i;

@@ -91,11 +134,13 @@ bond_ethdev_rx_burst(void *queue, struct rte_mbuf **bufs, 
uint16_t nb_pkts)

        internals = bd_rx_q->dev_private;

+       num_of_slaves = bond_active_slaves_by_rxqid(internals, 
bd_rx_q->queue_id,
+                                                   slaves);

-       for (i = 0; i < internals->active_slave_count && nb_pkts; i++) {
+       for (i = 0; i < num_of_slaves && nb_pkts; i++) {
                /* Offset of pointer to *bufs increases as packets are received
                 * from other slaves */
-               num_rx_slave = rte_eth_rx_burst(internals->active_slaves[i],
+               num_rx_slave = rte_eth_rx_burst(slaves[i],
                                bd_rx_q->queue_id, bufs + num_rx_total, 
nb_pkts);
                if (num_rx_slave) {
                        num_rx_total += num_rx_slave;
@@ -117,8 +162,13 @@ bond_ethdev_rx_burst_active_backup(void *queue, struct 
rte_mbuf **bufs,

        internals = bd_rx_q->dev_private;

-       return rte_eth_rx_burst(internals->current_primary_port,
-                       bd_rx_q->queue_id, bufs, nb_pkts);
+       uint8_t active_slave = internals->current_primary_port;
+       struct rte_eth_dev *dev = &rte_eth_devices[active_slave];
+
+       if (bd_rx_q->queue_id >= dev->data->nb_rx_queues)
+               return 0;
+
+       return rte_eth_rx_burst(active_slave, bd_rx_q->queue_id, bufs, nb_pkts);
 }

 static uint16_t
@@ -144,9 +194,9 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct rte_mbuf 
**bufs,
        rte_eth_macaddr_get(internals->port_id, &bond_mac);
        /* Copy slave list to protect against slave up/down changes during tx
         * bursting */
-       slave_count = internals->active_slave_count;
-       memcpy(slaves, internals->active_slaves,
-                       sizeof(internals->active_slaves[0]) * slave_count);
+
+       slave_count = bond_active_slaves_by_rxqid(internals, bd_rx_q->queue_id,
+                                                 slaves);

        for (i = 0; i < slave_count && num_rx_total < nb_pkts; i++) {
                j = num_rx_total;
@@ -401,6 +451,7 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct 
rte_mbuf **bufs,

        static int slave_idx = 0;
        int i, cslave_idx = 0, tx_fail_total = 0;
+       int queue_id;

        bd_tx_q = (struct bond_tx_queue *)queue;
        internals = bd_tx_q->dev_private;
@@ -427,7 +478,9 @@ bond_ethdev_tx_burst_round_robin(void *queue, struct 
rte_mbuf **bufs,
        /* Send packet burst on each slave device */
        for (i = 0; i < num_of_slaves; i++) {
                if (slave_nb_pkts[i] > 0) {
-                       num_tx_slave = rte_eth_tx_burst(slaves[i], 
bd_tx_q->queue_id,
+                       queue_id = bond_slave_txqid(internals, i,
+                                                   bd_tx_q->queue_id);
+                       num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
                                        slave_bufs[i], slave_nb_pkts[i]);

                        /* if tx burst fails move packets to end of bufs */
@@ -453,14 +506,27 @@ bond_ethdev_tx_burst_active_backup(void *queue,
 {
        struct bond_dev_private *internals;
        struct bond_tx_queue *bd_tx_q;
+       int queue_id;
+       int i;
+       uint8_t num_of_slaves;
+       uint8_t slaves[RTE_MAX_ETHPORTS];

        bd_tx_q = (struct bond_tx_queue *)queue;
        internals = bd_tx_q->dev_private;

-       if (internals->active_slave_count < 1)
+       num_of_slaves = internals->active_slave_count;
+       memcpy(slaves, internals->active_slaves,
+                       sizeof(internals->active_slaves[0]) * num_of_slaves);
+
+       if (num_of_slaves < 1)
                return 0;

-       return rte_eth_tx_burst(internals->current_primary_port, 
bd_tx_q->queue_id,
+       for (i = 0; i < num_of_slaves; i++)
+               if (slaves[i] == internals->current_primary_port)
+                       break;
+
+       queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+       return rte_eth_tx_burst(internals->current_primary_port, queue_id,
                        bufs, nb_pkts);
 }

@@ -696,6 +762,7 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf 
**bufs, uint16_t nb_pkts)
        struct ether_hdr *ether_hdr;
        struct ether_addr primary_slave_addr;
        struct ether_addr active_slave_addr;
+       int queue_id;

        if (num_of_slaves < 1)
                return num_tx_total;
@@ -725,7 +792,8 @@ bond_ethdev_tx_burst_tlb(void *queue, struct rte_mbuf 
**bufs, uint16_t nb_pkts)
 #endif
                }

-               num_tx_total += rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+               num_tx_total += rte_eth_tx_burst(slaves[i], queue_id,
                                bufs + num_tx_total, nb_pkts - num_tx_total);

                if (num_tx_total == nb_pkts)
@@ -903,6 +971,7 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf 
**bufs,
        uint16_t num_tx_total = 0, num_tx_slave = 0, tx_fail_total = 0;

        int i, op_slave_id;
+       int queue_id;

        struct rte_mbuf *slave_bufs[RTE_MAX_ETHPORTS][nb_pkts];
        uint16_t slave_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };
@@ -931,7 +1000,9 @@ bond_ethdev_tx_burst_balance(void *queue, struct rte_mbuf 
**bufs,
        /* Send packet burst on each slave device */
        for (i = 0; i < num_of_slaves; i++) {
                if (slave_nb_pkts[i] > 0) {
-                       num_tx_slave = rte_eth_tx_burst(slaves[i], 
bd_tx_q->queue_id,
+                       queue_id = bond_slave_txqid(internals, i,
+                                                   bd_tx_q->queue_id);
+                       num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
                                        slave_bufs[i], slave_nb_pkts[i]);

                        /* if tx burst fails move packets to end of bufs */
@@ -977,6 +1048,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf 
**bufs,
        /* Slow packets placed in each slave */
        uint8_t slave_slow_nb_pkts[RTE_MAX_ETHPORTS] = { 0 };

+       int queue_id;
+
        bd_tx_q = (struct bond_tx_queue *)queue;
        internals = bd_tx_q->dev_private;

@@ -1022,7 +1095,8 @@ bond_ethdev_tx_burst_8023ad(void *queue, struct rte_mbuf 
**bufs,
                if (slave_nb_pkts[i] == 0)
                        continue;

-               num_tx_slave = rte_eth_tx_burst(slaves[i], bd_tx_q->queue_id,
+               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+               num_tx_slave = rte_eth_tx_burst(slaves[i], queue_id,
                                slave_bufs[i], slave_nb_pkts[i]);

                /* If tx burst fails drop slow packets */
@@ -1057,6 +1131,7 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct 
rte_mbuf **bufs,

        int slave_tx_total[RTE_MAX_ETHPORTS];
        int i, most_successful_tx_slave = -1;
+       int queue_id;

        bd_tx_q = (struct bond_tx_queue *)queue;
        internals = bd_tx_q->dev_private;
@@ -1076,7 +1151,8 @@ bond_ethdev_tx_burst_broadcast(void *queue, struct 
rte_mbuf **bufs,

        /* Transmit burst on each active slave */
        for (i = 0; i < num_of_slaves; i++) {
-               slave_tx_total[i] = rte_eth_tx_burst(slaves[i], 
bd_tx_q->queue_id,
+               queue_id = bond_slave_txqid(internals, i, bd_tx_q->queue_id);
+               slave_tx_total[i] = rte_eth_tx_burst(slaves[i], queue_id,
                                        bufs, nb_pkts);

                if (unlikely(slave_tx_total[i] < nb_pkts))
@@ -1298,9 +1374,22 @@ int
 slave_configure(struct rte_eth_dev *bonded_eth_dev,
                struct rte_eth_dev *slave_eth_dev)
 {
+       struct bond_dev_private *internals;
        struct bond_rx_queue *bd_rx_q;
        struct bond_tx_queue *bd_tx_q;
+       int slave_id;
+
+       internals = bonded_eth_dev->data->dev_private;

+       for (slave_id = 0; slave_id < internals->slave_count; slave_id++)
+               if (internals->slaves[slave_id].port_id ==
+                   slave_eth_dev->data->port_id)
+                       break;
+
+       RTE_VERIFY(slave_id != internals->slave_count);
+
+       uint16_t nb_rx_queues = internals->slaves[slave_id].nb_rx_queues;
+       uint16_t nb_tx_queues = internals->slaves[slave_id].nb_tx_queues;
        int errval;
        uint16_t q_id;

@@ -1331,8 +1420,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,

        /* Configure device */
        errval = rte_eth_dev_configure(slave_eth_dev->data->port_id,
-                       bonded_eth_dev->data->nb_rx_queues,
-                       bonded_eth_dev->data->nb_tx_queues,
+                       nb_rx_queues, nb_tx_queues,
                        &(slave_eth_dev->data->dev_conf));
        if (errval != 0) {
                RTE_BOND_LOG(ERR, "Cannot configure slave device: port %u , err 
(%d)",
@@ -1343,7 +1431,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
        /* Setup Rx Queues */
        /* Use existing queues, if any */
        for (q_id = slave_eth_dev->data->nb_rx_queues;
-            q_id < bonded_eth_dev->data->nb_rx_queues; q_id++) {
+            q_id < nb_rx_queues ; q_id++) {
                bd_rx_q = (struct bond_rx_queue 
*)bonded_eth_dev->data->rx_queues[q_id];

                errval = rte_eth_rx_queue_setup(slave_eth_dev->data->port_id, 
q_id,
@@ -1361,7 +1449,7 @@ slave_configure(struct rte_eth_dev *bonded_eth_dev,
        /* Setup Tx Queues */
        /* Use existing queues, if any */
        for (q_id = slave_eth_dev->data->nb_tx_queues;
-            q_id < bonded_eth_dev->data->nb_tx_queues; q_id++) {
+            q_id < nb_tx_queues ; q_id++) {
                bd_tx_q = (struct bond_tx_queue 
*)bonded_eth_dev->data->tx_queues[q_id];

                errval = rte_eth_tx_queue_setup(slave_eth_dev->data->port_id, 
q_id,
@@ -1440,7 +1528,8 @@ bond_ethdev_slave_link_status_change_monitor(void 
*cb_arg);

 void
 slave_add(struct bond_dev_private *internals,
-               struct rte_eth_dev *slave_eth_dev)
+               struct rte_eth_dev *slave_eth_dev,
+               const struct rte_eth_dev_info *slave_dev_info)
 {
        struct bond_slave_details *slave_details =
                        &internals->slaves[internals->slave_count];
@@ -1448,6 +1537,20 @@ slave_add(struct bond_dev_private *internals,
        slave_details->port_id = slave_eth_dev->data->port_id;
        slave_details->last_link_status = 0;

+       uint16_t bond_nb_rx_queues =
+               rte_eth_devices[internals->port_id].data->nb_rx_queues;
+       uint16_t bond_nb_tx_queues =
+               rte_eth_devices[internals->port_id].data->nb_tx_queues;
+
+       slave_details->nb_rx_queues =
+               bond_nb_rx_queues > slave_dev_info->max_rx_queues
+               ? slave_dev_info->max_rx_queues
+               : bond_nb_rx_queues;
+       slave_details->nb_tx_queues =
+               bond_nb_tx_queues > slave_dev_info->max_tx_queues
+               ? slave_dev_info->max_tx_queues
+               : bond_nb_tx_queues;
+
        /* If slave device doesn't support interrupts then we need to enabled
         * polling to monitor link status */
        if (!(slave_eth_dev->data->dev_flags & RTE_PCI_DRV_INTR_LSC)) {
diff --git a/drivers/net/bonding/rte_eth_bond_private.h 
b/drivers/net/bonding/rte_eth_bond_private.h
index 6c47a29..02f6de1 100644
--- a/drivers/net/bonding/rte_eth_bond_private.h
+++ b/drivers/net/bonding/rte_eth_bond_private.h
@@ -101,6 +101,8 @@ struct bond_slave_details {
        uint8_t link_status_poll_enabled;
        uint8_t link_status_wait_to_complete;
        uint8_t last_link_status;
+       uint16_t nb_rx_queues;
+       uint16_t nb_tx_queues;
        /**< Port Id of slave eth_dev */
        struct ether_addr persisted_mac_addr;

@@ -240,7 +242,8 @@ slave_remove(struct bond_dev_private *internals,

 void
 slave_add(struct bond_dev_private *internals,
-               struct rte_eth_dev *slave_eth_dev);
+               struct rte_eth_dev *slave_eth_dev,
+               const struct rte_eth_dev_info *slave_dev_info);

 uint16_t
 xmit_l2_hash(const struct rte_mbuf *buf, uint8_t slave_count);
-- 
2.1.4

Reply via email to