This will need to be implemented for some of the other RX burst methods at some point for other modes to see this performance improvement (with the exception of active-backup).
On Thu, Aug 16, 2018 at 9:32 AM Luca Boccassi <bl...@debian.org> wrote: > During bond 802.3ad receive, a burst of packets is fetched from > each slave into a local array and appended to per-slave ring buffer. > Packets are taken from the head of the ring buffer and returned to > the caller. The number of mbufs provided to each slave is sufficient > to meet the requirements of the ixgbe vector receive. > > This small change improves performances of the bonding driver > considerably. Vyatta has been using it for years in production. > > Cc: sta...@dpdk.org > > Signed-off-by: Eric Kinzie <ekin...@brocade.com> > Signed-off-by: Luca Boccassi <bl...@debian.org> > Acked-by: Chas Williams <ch...@att.com> > --- > v2 and v3: fix checkpatch warnings > v4: add Eric's original signed-off-by from the Vyatta internal repo > > drivers/net/bonding/rte_eth_bond_api.c | 13 +++ > drivers/net/bonding/rte_eth_bond_pmd.c | 98 +++++++++++++++++----- > drivers/net/bonding/rte_eth_bond_private.h | 4 + > 3 files changed, 95 insertions(+), 20 deletions(-) > > diff --git a/drivers/net/bonding/rte_eth_bond_api.c > b/drivers/net/bonding/rte_eth_bond_api.c > index 8bc04cfd11..3d22203e91 100644 > --- a/drivers/net/bonding/rte_eth_bond_api.c > +++ b/drivers/net/bonding/rte_eth_bond_api.c > @@ -524,6 +524,10 @@ __eth_bond_slave_remove_lock_free(uint16_t > bonded_port_id, > > sizeof(*(rte_eth_devices[bonded_port_id].data->mac_addrs))); > } > if (internals->slave_count == 0) { > + /* Remove any remaining packets in the receive ring */ > + struct rte_mbuf *bufs[PMD_BOND_RECV_PKTS_PER_SLAVE]; > + unsigned int j, count; > + > internals->rx_offload_capa = 0; > internals->tx_offload_capa = 0; > internals->rx_queue_offload_capa = 0; > @@ -532,6 +536,15 @@ __eth_bond_slave_remove_lock_free(uint16_t > bonded_port_id, > internals->reta_size = 0; > internals->candidate_max_rx_pktlen = 0; > internals->max_rx_pktlen = 0; > + > + do { > + count = rte_ring_dequeue_burst(internals->rx_ring, > + (void **)bufs, > + PMD_BOND_RECV_PKTS_PER_SLAVE, > + NULL); > + for (j = 0; j < count; j++) > + rte_pktmbuf_free(bufs[j]); > + } while (count > 0); > } > return 0; > } > diff --git a/drivers/net/bonding/rte_eth_bond_pmd.c > b/drivers/net/bonding/rte_eth_bond_pmd.c > index 58f7377c60..ae756c4e3a 100644 > --- a/drivers/net/bonding/rte_eth_bond_pmd.c > +++ b/drivers/net/bonding/rte_eth_bond_pmd.c > @@ -18,6 +18,8 @@ > #include <rte_alarm.h> > #include <rte_cycles.h> > #include <rte_string_fns.h> > +#include <rte_errno.h> > +#include <rte_lcore.h> > > #include "rte_eth_bond.h" > #include "rte_eth_bond_private.h" > @@ -402,10 +404,15 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct > rte_mbuf **bufs, > struct bond_dev_private *internals = bd_rx_q->dev_private; > struct ether_addr bond_mac; > > + unsigned int rx_ring_avail = > rte_ring_free_count(internals->rx_ring); > + struct rte_mbuf *mbuf_bounce[PMD_BOND_RECV_PKTS_PER_SLAVE]; > + > struct ether_hdr *hdr; > > const uint16_t ether_type_slow_be = > rte_be_to_cpu_16(ETHER_TYPE_SLOW); > uint16_t num_rx_total = 0; /* Total number of received > packets */ > + uint16_t num_rx_slave; > + uint16_t num_enq_slave; > uint16_t slaves[RTE_MAX_ETHPORTS]; > uint16_t slave_count, idx; > > @@ -414,6 +421,9 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct > rte_mbuf **bufs, > uint8_t i, j, k; > uint8_t subtype; > > + if (rx_ring_avail < PMD_BOND_RECV_PKTS_PER_SLAVE) > + goto dequeue; > + > rte_eth_macaddr_get(internals->port_id, &bond_mac); > /* Copy slave list to protect against slave up/down changes during > tx > * bursting */ > @@ -426,62 +436,96 @@ bond_ethdev_rx_burst_8023ad(void *queue, struct > rte_mbuf **bufs, > internals->active_slave = 0; > idx = 0; > } > - for (i = 0; i < slave_count && num_rx_total < nb_pkts; i++) { > - j = num_rx_total; > + for (i = 0; i < slave_count && num_rx_total < rx_ring_avail; i++) { > + j = 0; > collecting = ACTOR_STATE(&mode_8023ad_ports[slaves[idx]], > COLLECTING); > > /* Read packets from this slave */ > - num_rx_total += rte_eth_rx_burst(slaves[idx], > bd_rx_q->queue_id, > - &bufs[num_rx_total], nb_pkts - > num_rx_total); > + if (unlikely(rx_ring_avail - num_rx_total < > + PMD_BOND_RECV_PKTS_PER_SLAVE)) > + continue; > + num_rx_slave = rte_eth_rx_burst(slaves[idx], > bd_rx_q->queue_id, > + mbuf_bounce, PMD_BOND_RECV_PKTS_PER_SLAVE); > > - for (k = j; k < 2 && k < num_rx_total; k++) > - rte_prefetch0(rte_pktmbuf_mtod(bufs[k], void *)); > + for (k = j; k < 2 && k < num_rx_slave; k++) > + rte_prefetch0(rte_pktmbuf_mtod(mbuf_bounce[k], > void *)); > > /* Handle slow protocol packets. */ > - while (j < num_rx_total) { > + while (j < num_rx_slave) { > > /* If packet is not pure L2 and is known, skip it > */ > - if ((bufs[j]->packet_type & ~RTE_PTYPE_L2_ETHER) > != 0) { > + if ((mbuf_bounce[j]->packet_type & > ~RTE_PTYPE_L2_ETHER) != 0) { > j++; > continue; > } > > - if (j + 3 < num_rx_total) > - rte_prefetch0(rte_pktmbuf_mtod(bufs[j + > 3], void *)); > + if (j + 3 < num_rx_slave) > + > rte_prefetch0(rte_pktmbuf_mtod(mbuf_bounce[j + 3], > + void *)); > > - hdr = rte_pktmbuf_mtod(bufs[j], struct ether_hdr > *); > + hdr = rte_pktmbuf_mtod(mbuf_bounce[j], > + struct ether_hdr *); > subtype = ((struct slow_protocol_frame > *)hdr)->slow_protocol.subtype; > > /* Remove packet from array if it is slow packet > or slave is not > * in collecting state or bonding interface is not > in promiscuous > * mode and packet address does not match. */ > - if (unlikely(is_lacp_packets(hdr->ether_type, > subtype, bufs[j]) || > + if (unlikely(is_lacp_packets(hdr->ether_type, > + subtype, > mbuf_bounce[j]) || > !collecting || (!promisc && > > !is_multicast_ether_addr(&hdr->d_addr) && > !is_same_ether_addr(&bond_mac, > &hdr->d_addr)))) { > > if (hdr->ether_type == ether_type_slow_be) > { > bond_mode_8023ad_handle_slow_pkt( > - internals, slaves[idx], > bufs[j]); > + internals, slaves[idx], > + mbuf_bounce[j]); > } else > - rte_pktmbuf_free(bufs[j]); > + rte_pktmbuf_free(mbuf_bounce[j]); > > /* Packet is managed by mode 4 or dropped, > shift the array */ > - num_rx_total--; > - if (j < num_rx_total) { > - memmove(&bufs[j], &bufs[j + 1], > sizeof(bufs[0]) * > - (num_rx_total - j)); > + num_rx_slave--; > + if (j < num_rx_slave) { > + memmove(&mbuf_bounce[j], > + &mbuf_bounce[j + 1], > + sizeof(mbuf_bounce[0]) * > + (num_rx_slave - j)); > } > - } else > + } else { > j++; > + } > } > + > + if (num_rx_slave > 0) { > + if (mbuf_bounce[0] == NULL) > + RTE_LOG(ERR, PMD, "%s: Enqueue a NULL??\n", > + __func__); > + > + num_enq_slave = > rte_ring_enqueue_burst(internals->rx_ring, > + (void > **)mbuf_bounce, > + > num_rx_slave, > + NULL); > + > + if (num_enq_slave < num_rx_slave) { > + RTE_LOG(ERR, PMD, > + "%s: failed to enqueue %u packets", > + __func__, > + (num_rx_slave - num_enq_slave)); > + for (j = num_enq_slave; j < num_rx_slave; > j++) > + rte_pktmbuf_free(mbuf_bounce[j]); > + } > + num_rx_total += num_enq_slave; > + } > + > if (unlikely(++idx == slave_count)) > idx = 0; > } > > internals->active_slave = idx; > - return num_rx_total; > +dequeue: > + return rte_ring_dequeue_burst(internals->rx_ring, (void **)bufs, > + nb_pkts, NULL); > } > > #if defined(RTE_LIBRTE_BOND_DEBUG_ALB) || > defined(RTE_LIBRTE_BOND_DEBUG_ALB_L1) > @@ -3065,6 +3109,7 @@ bond_alloc(struct rte_vdev_device *dev, uint8_t mode) > struct bond_dev_private *internals = NULL; > struct rte_eth_dev *eth_dev = NULL; > uint32_t vlan_filter_bmp_size; > + char mem_name[RTE_ETH_NAME_MAX_LEN]; > > /* now do all data allocation - for eth_dev structure, dummy pci > driver > * and internal (private) data > @@ -3129,6 +3174,19 @@ bond_alloc(struct rte_vdev_device *dev, uint8_t > mode) > TAILQ_INIT(&internals->flow_list); > internals->flow_isolated_valid = 0; > > + snprintf(mem_name, RTE_DIM(mem_name), "bond_%u_rx", > internals->port_id); > + internals->rx_ring = rte_ring_lookup(mem_name); > + if (internals->rx_ring == NULL) { > + internals->rx_ring = rte_ring_create(mem_name, > + > rte_align32pow2(PMD_BOND_RECV_RING_PKTS * > + rte_lcore_count()), > + socket_id, 0); > + if (internals->rx_ring == NULL) > + rte_panic("%s: Failed to create rx ring '%s': > %s\n", name, > + mem_name, rte_strerror(rte_errno)); > + } > + > + > /* Set mode 4 default configuration */ > bond_mode_8023ad_setup(eth_dev, NULL); > if (bond_ethdev_mode_set(eth_dev, mode)) { > diff --git a/drivers/net/bonding/rte_eth_bond_private.h > b/drivers/net/bonding/rte_eth_bond_private.h > index 43e0e448df..80261c6b14 100644 > --- a/drivers/net/bonding/rte_eth_bond_private.h > +++ b/drivers/net/bonding/rte_eth_bond_private.h > @@ -26,6 +26,8 @@ > #define PMD_BOND_LSC_POLL_PERIOD_KVARG ("lsc_poll_period_ms") > #define PMD_BOND_LINK_UP_PROP_DELAY_KVARG ("up_delay") > #define PMD_BOND_LINK_DOWN_PROP_DELAY_KVARG ("down_delay") > +#define PMD_BOND_RECV_RING_PKTS 512 > +#define PMD_BOND_RECV_PKTS_PER_SLAVE 32 > > #define PMD_BOND_XMIT_POLICY_LAYER2_KVARG ("l2") > #define PMD_BOND_XMIT_POLICY_LAYER23_KVARG ("l23") > @@ -175,6 +177,8 @@ struct bond_dev_private { > > void *vlan_filter_bmpmem; /* enabled vlan filter > bitmap */ > struct rte_bitmap *vlan_filter_bmp; > + > + struct rte_ring *rx_ring; > }; > > extern const struct eth_dev_ops default_dev_ops; > -- > 2.18.0 > >