Put in a DPDK queue to receive from multiple core SMP input from vSwitch for NIC TX output. Eliminated the inside polling loop SMP TX output lock (DPDK queue handles SMP). Added a SMP lock for non-polling operation to allow TX output by the non-polling thread when interface not being polled. Lock accessed only when polling is not enabled. Added new netdev subroutine to control polling lock and enable and disable flag. Packets do not get discarded between TX pre-queue and NIC queue to handle surges.
Measured improved average PMD port to port 2544 zero loss packet rate of 268,000 for packets 512 bytes and smaller. Predict double that when using 1 cpu core/interface. Observed better persistence of obtaining 100% 10 GbE for larger packets with the added DPDK queue, consistent with other tests outside of OVS where large surges from fast path interfaces transferring larger sized packets from VMs were being absorbed in the NIC TX pre-queue and TX queue and packet loss was suppressed. Signed-off-by: Mike A. Polehn <mike.a.pol...@intel.com> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 6c281fe..478a0d9 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -1873,6 +1873,10 @@ reload: poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt); atomic_read(&f->change_seq, &port_seq); + /* get poll ownership */ + for (i = 0; i < poll_cnt; i++) + netdev_rxq_do_polling(poll_list[i].rx, true); + for (;;) { unsigned int c_port_seq; int i; @@ -1895,6 +1899,10 @@ reload: } } + /* release poll ownership */ + for (i = 0; i < poll_cnt; i++) + netdev_rxq_do_polling(poll_list[i].rx, false); + if (!latch_is_set(&f->dp->exit_latch)){ goto reload; } diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c index 35a8da4..0f61777 100644 --- a/lib/netdev-bsd.c +++ b/lib/netdev-bsd.c @@ -1596,6 +1596,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off, netdev_bsd_rxq_recv, \ netdev_bsd_rxq_wait, \ netdev_bsd_rxq_drain, \ + NULL, /* rxq_do_polling */ \ } const struct netdev_class netdev_bsd_class = diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index d1bcc73..78f0329 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -73,6 +73,9 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20); #define NIC_PORT_RX_Q_SIZE 2048 /* Size of Physical NIC RX Queue */ #define NIC_PORT_TX_Q_SIZE 2048 /* Size of Physical NIC TX Queue */ +#define NIC_TX_PRE_Q_SIZE 4096 /* Size of Physical NIC TX Pre Queue (2**n)*/ +#define NIC_TX_PRE_Q_TRANS 64 /* Pre Queue to Physical NIC Transfer */ + /* TODO: Needs per NIC value for these constants. */ #define RX_PTHRESH 32 /* Default values of RX prefetch threshold reg. */ #define RX_HTHRESH 32 /* Default values of RX host threshold reg. */ @@ -122,8 +125,6 @@ static const struct rte_eth_txconf tx_conf = { }; enum { MAX_RX_QUEUE_LEN = 64 }; -enum { MAX_TX_QUEUE_LEN = 64 }; -enum { DRAIN_TSC = 200000ULL }; static int rte_eal_init_ret = ENODEV; @@ -145,10 +146,12 @@ struct dpdk_mp { }; struct dpdk_tx_queue { - rte_spinlock_t tx_lock; + bool is_polled; + int port_id; int count; - uint64_t tsc; - struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN]; + struct rte_mbuf *tx_trans[NIC_TX_PRE_Q_TRANS]; + struct rte_ring *tx_preq; + rte_spinlock_t tx_lock; }; struct netdev_dpdk { @@ -360,6 +363,7 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex) struct ether_addr eth_addr; int diag; int i; + char qname[32]; if (dev->port_id < 0 || dev->port_id >= rte_eth_dev_count()) { return -ENODEV; @@ -372,12 +376,21 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex) } for (i = 0; i < NR_QUEUE; i++) { + dev->tx_q[i].port_id = dev->port_id; diag = rte_eth_tx_queue_setup(dev->port_id, i, NIC_PORT_TX_Q_SIZE, dev->socket_id, &tx_conf); if (diag) { VLOG_ERR("eth dev tx queue setup error %d",diag); return diag; } + + snprintf(qname, sizeof(qname),"NIC_TX_Pre_Q_%u_%u", dev->port_id, i); + dev->tx_q[i].tx_preq = rte_ring_create(qname, NIC_TX_PRE_Q_SIZE, + dev->socket_id, RING_F_SC_DEQ); + if (NULL == dev->tx_q[i].tx_preq) { + VLOG_ERR("eth dev tx pre-queue alloc error"); + return -ENOMEM; + } } for (i = 0; i < NR_QUEUE; i++) { @@ -451,6 +464,7 @@ netdev_dpdk_construct(struct netdev *netdev_) port_no = strtol(cport, 0, 0); /* string must be null terminated */ for (i = 0; i < NR_QUEUE; i++) { + netdev->tx_q[i].is_polled = false; rte_spinlock_init(&netdev->tx_q[i].tx_lock); } @@ -568,24 +582,59 @@ netdev_dpdk_rxq_dealloc(struct netdev_rxq *rxq_) } inline static void -dpdk_queue_flush(struct netdev_dpdk *dev, int qid) +dpdk_port_out(struct dpdk_tx_queue *tx_q, int qid) { - struct dpdk_tx_queue *txq = &dev->tx_q[qid]; - uint32_t nb_tx; + /* get packets from NIC tx staging queue */ + if (likely(tx_q->count == 0)) + tx_q->count = rte_ring_sc_dequeue_burst(tx_q->tx_preq, + (void **)&tx_q->tx_trans[0], NIC_TX_PRE_Q_TRANS); + + /* send packets to NIC tx queue */ + if (likely(tx_q->count != 0)) { + unsigned sent = rte_eth_tx_burst(tx_q->port_id, qid, tx_q->tx_trans, + tx_q->count); + tx_q->count -= sent; + + if (unlikely((tx_q->count != 0) && (sent > 0))) + /* move unsent packets to front of list */ + memmove(&tx_q->tx_trans[0], &tx_q->tx_trans[sent], + (sizeof(struct rte_mbuf *) * tx_q->count)); + } +} - if (txq->count == 0) { - return; +static void netdev_dpdk_do_poll(struct netdev_rxq *rxq_, unsigned enable) +{ + struct netdev_rxq_dpdk *rx = netdev_rxq_dpdk_cast(rxq_); + struct netdev *netdev = rx->up.netdev; + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); + struct dpdk_tx_queue *tx_q = &dev->tx_q[rxq_->queue_id]; + + if (enable) { + tx_q->is_polled = true; + /* get polling ownership */ + rte_spinlock_lock(&tx_q->tx_lock); + } else { + tx_q->is_polled = false; + /* clear queue after flag for race of the non-poll queuer */ + dpdk_port_out(tx_q, rxq_->queue_id); + rte_spinlock_unlock(&tx_q->tx_lock); } - rte_spinlock_lock(&txq->tx_lock); - nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count); - if (nb_tx != txq->count) { - /* free buffers if we couldn't transmit packets */ - rte_mempool_put_bulk(dev->dpdk_mp->mp, - (void **) &txq->burst_pkts[nb_tx], - (txq->count - nb_tx)); +} + +static void netdev_dpdk_push_tx_to_nic(struct netdev_dpdk *dev, int qid) +{ + struct dpdk_tx_queue *tx_q = &dev->tx_q[qid]; + + for (;;) { + if (tx_q->is_polled) + break; + + if (rte_spinlock_trylock(&tx_q->tx_lock)) { + dpdk_port_out(tx_q, qid); + rte_spinlock_unlock(&tx_q->tx_lock); + return; + } } - txq->count = 0; - rte_spinlock_unlock(&txq->tx_lock); } static int @@ -594,9 +643,12 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct ofpbuf **packets, int *c) struct netdev_rxq_dpdk *rx = netdev_rxq_dpdk_cast(rxq_); struct netdev *netdev = rx->up.netdev; struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); + struct dpdk_tx_queue *tx_q = &dev->tx_q[rxq_->queue_id]; int nb_rx; - dpdk_queue_flush(dev, rxq_->queue_id); + /* if being polled, push tx out */ + if (likely(tx_q->is_polled)) + dpdk_port_out(tx_q, rxq_->queue_id); nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id, (struct rte_mbuf **) packets, @@ -612,40 +664,30 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct ofpbuf **packets, int *c) } inline static void -dpdk_queue_pkt(struct netdev_dpdk *dev, int qid, - struct rte_mbuf *pkt) +dpdk_queue_pkts(struct netdev_dpdk *dev, int qid, + struct rte_mbuf **pkt, unsigned n_out) { - struct dpdk_tx_queue *txq = &dev->tx_q[qid]; - uint64_t diff_tsc; - uint64_t cur_tsc; - uint32_t nb_tx; - - rte_spinlock_lock(&txq->tx_lock); - txq->burst_pkts[txq->count++] = pkt; - if (txq->count == MAX_TX_QUEUE_LEN) { - goto flush; - } - cur_tsc = rte_get_timer_cycles(); - if (txq->count == 1) { - txq->tsc = cur_tsc; - } - diff_tsc = cur_tsc - txq->tsc; - if (diff_tsc >= DRAIN_TSC) { - goto flush; - } - rte_spinlock_unlock(&txq->tx_lock); - return; - -flush: - nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count); - if (nb_tx != txq->count) { - /* free buffers if we couldn't transmit packets */ - rte_mempool_put_bulk(dev->dpdk_mp->mp, - (void **) &txq->burst_pkts[nb_tx], - (txq->count - nb_tx)); + struct dpdk_tx_queue *tx_q = &dev->tx_q[qid]; + unsigned n_qed; + + /* queuing can occur from any thread */ + n_qed = rte_ring_mp_enqueue_burst(tx_q->tx_preq, + (void **) pkt, n_out); + if (unlikely(n_qed != n_out)) { /* discard at tx prequeue */ + + /* free buffers we couldn't queue */ + unsigned dropped = n_out - n_qed; + rte_mempool_put_bulk(dev->dpdk_mp->mp, (void **) &pkt[n_qed], + dropped); + + ovs_mutex_lock(&dev->mutex); + dev->stats.tx_dropped += dropped; + ovs_mutex_unlock(&dev->mutex); } - txq->count = 0; - rte_spinlock_unlock(&txq->tx_lock); + + /* if not being polled, do tx output */ + if (unlikely(!tx_q->is_polled)) + netdev_dpdk_push_tx_to_nic(dev, qid); } /* Tx function. Transmit packets indefinitely */ @@ -656,7 +698,7 @@ dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size) struct rte_mbuf *pkt; pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp); - if (!pkt) { + if (unlikely(!pkt)) { ovs_mutex_lock(&dev->mutex); dev->stats.tx_dropped++; ovs_mutex_unlock(&dev->mutex); @@ -669,8 +711,7 @@ dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size) rte_pktmbuf_data_len(pkt) = size; rte_pktmbuf_pkt_len(pkt) = size; - dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt); - dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE); + dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, &pkt, 1); } static int @@ -680,7 +721,7 @@ netdev_dpdk_send(struct netdev *netdev, struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); int ret; - if (ofpbuf_size(ofpbuf) > dev->max_packet_len) { + if (unlikely(ofpbuf_size(ofpbuf) > dev->max_packet_len)) { VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", (int)ofpbuf_size(ofpbuf) , dev->max_packet_len); @@ -703,8 +744,7 @@ netdev_dpdk_send(struct netdev *netdev, qid = rte_lcore_id() % NR_QUEUE; - dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf); - + dpdk_queue_pkts(dev, qid, (struct rte_mbuf **)&ofpbuf, 1); } ret = 0; @@ -1175,6 +1215,7 @@ static struct netdev_class netdev_dpdk_class = { netdev_dpdk_rxq_recv, NULL, /* rxq_wait */ NULL, /* rxq_drain */ + netdev_dpdk_do_poll, }; int diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index b087ed1..b615f98 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -1076,6 +1076,7 @@ static const struct netdev_class dummy_class = { netdev_dummy_rxq_recv, netdev_dummy_rxq_wait, netdev_dummy_rxq_drain, + NULL, /* rxq_do_polling */ }; static struct ofpbuf * diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c index 840022d..7c08988 100644 --- a/lib/netdev-linux.c +++ b/lib/netdev-linux.c @@ -2782,6 +2782,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off, netdev_linux_rxq_recv, \ netdev_linux_rxq_wait, \ netdev_linux_rxq_drain, \ + NULL, /* rxq_do_polling */ \ } const struct netdev_class netdev_linux_class = diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index 37b9da3..95fdf05 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -681,6 +681,10 @@ struct netdev_class { /* Discards all packets waiting to be received from 'rx'. */ int (*rxq_drain)(struct netdev_rxq *rx); + + /* Get poll ownership for PMD, enable before starting RX polling loop and + * disable after exiting the polling loop. NULL if not supported. */ + void (*rxq_do_polling)(struct netdev_rxq *rx, bool enable); }; int netdev_register_provider(const struct netdev_class *); diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c index 835a98c..b2dad10 100644 --- a/lib/netdev-vport.c +++ b/lib/netdev-vport.c @@ -816,7 +816,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats *stats) NULL, /* rx_dealloc */ \ NULL, /* rx_recv */ \ NULL, /* rx_wait */ \ - NULL, /* rx_drain */ + NULL, /* rx_drain */ \ + NULL, /* rxq_do_polling */ #define TUNNEL_CLASS(NAME, DPIF_PORT) \ { DPIF_PORT, \ diff --git a/lib/netdev.c b/lib/netdev.c index 07cda42..39dc918 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -650,6 +650,14 @@ netdev_rxq_drain(struct netdev_rxq *rx) : 0); } +/* Tell when entering and exiting polling mode for 'rx' interface */ +void +netdev_rxq_do_polling(struct netdev_rxq *rx, bool enable) +{ + if (rx->netdev->netdev_class->rxq_do_polling) + rx->netdev->netdev_class->rxq_do_polling(rx, enable); +} + /* Sends 'buffer' on 'netdev'. Returns 0 if successful, otherwise a positive * errno value. Returns EAGAIN without blocking if the packet cannot be queued * immediately. Returns EMSGSIZE if a partial packet was transmitted or if diff --git a/lib/netdev.h b/lib/netdev.h index a4bd01a..147999c 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -43,6 +43,7 @@ extern "C" { * netdev_rxq_recv() * netdev_rxq_wait() * netdev_rxq_drain() + * netdev_rxq_do_polling() * * These functions are conditionally thread-safe: they may be called from * different threads only on different netdev_rxq objects. (The client may @@ -169,6 +170,7 @@ const char *netdev_rxq_get_name(const struct netdev_rxq *); int netdev_rxq_recv(struct netdev_rxq *rx, struct ofpbuf **buffers, int *cnt); void netdev_rxq_wait(struct netdev_rxq *); int netdev_rxq_drain(struct netdev_rxq *); +void netdev_rxq_do_polling(struct netdev_rxq *rx, bool enable); /* Packet transmission. */ int netdev_send(struct netdev *, struct ofpbuf *, bool may_steal); _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev