The netdev_send function has been modified to accept multiple packets, to allow netdev providers to amortize locking and queuing costs. This is especially true for netdev-dpdk.
Later commits exploit the new API. Signed-off-by: Daniele Di Proietto <ddiproie...@vmware.com> --- lib/dpif-netdev.c | 2 +- lib/netdev-bsd.c | 55 +++++++++-------- lib/netdev-dpdk.c | 167 ++++++++++++++++++++++++++++++++------------------ lib/netdev-dummy.c | 68 +++++++++++--------- lib/netdev-linux.c | 55 ++++++++++------- lib/netdev-provider.h | 17 ++--- lib/netdev.c | 17 +++-- lib/netdev.h | 3 +- 8 files changed, 235 insertions(+), 149 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 32ef969..0f15e52 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -2123,7 +2123,7 @@ dp_execute_cb(void *aux_, struct dpif_packet *packet, case OVS_ACTION_ATTR_OUTPUT: p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a))); if (p) { - netdev_send(p->netdev, packet, may_steal); + netdev_send(p->netdev, &packet, 1, may_steal); } break; diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c index 92838a5..65ae9f9 100644 --- a/lib/netdev-bsd.c +++ b/lib/netdev-bsd.c @@ -686,14 +686,13 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_) * system or a tap device. */ static int -netdev_bsd_send(struct netdev *netdev_, struct dpif_packet *pkt, +netdev_bsd_send(struct netdev *netdev_, struct dpif_packet **pkts, int cnt, bool may_steal) { struct netdev_bsd *dev = netdev_bsd_cast(netdev_); const char *name = netdev_get_name(netdev_); - const void *data = ofpbuf_data(&pkt->ofpbuf); - size_t size = ofpbuf_size(&pkt->ofpbuf); int error; + int i; ovs_mutex_lock(&dev->mutex); if (dev->tap_fd < 0 && !dev->pcap) { @@ -702,35 +701,43 @@ netdev_bsd_send(struct netdev *netdev_, struct dpif_packet *pkt, error = 0; } - while (!error) { - ssize_t retval; - if (dev->tap_fd >= 0) { - retval = write(dev->tap_fd, data, size); - } else { - retval = pcap_inject(dev->pcap, data, size); - } - if (retval < 0) { - if (errno == EINTR) { - continue; + for (i = 0; i < cnt; i++) { + const void *data = ofpbuf_data(&pkts[i]->ofpbuf); + size_t size = ofpbuf_size(&pkts[i]->ofpbuf); + + while (!error) { + ssize_t retval; + if (dev->tap_fd >= 0) { + retval = write(dev->tap_fd, data, size); } else { - error = errno; - if (error != EAGAIN) { - VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: " - "%s", name, ovs_strerror(error)); + retval = pcap_inject(dev->pcap, data, size); + } + if (retval < 0) { + if (errno == EINTR) { + continue; + } else { + error = errno; + if (error != EAGAIN) { + VLOG_WARN_RL(&rl, "error sending Ethernet packet on" + " %s: %s", name, ovs_strerror(error)); + } } + } else if (retval != size) { + VLOG_WARN_RL(&rl, "sent partial Ethernet packet " + "(%"PRIuSIZE" bytes of " + "%"PRIuSIZE") on %s", retval, size, name); + error = EMSGSIZE; + } else { + break; } - } else if (retval != size) { - VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes of " - "%"PRIuSIZE") on %s", retval, size, name); - error = EMSGSIZE; - } else { - break; } } ovs_mutex_unlock(&dev->mutex); if (may_steal) { - dpif_packet_delete(pkt); + for (i = 0; i < cnt; i++) { + dpif_packet_delete(pkts[i]); + } } return error; diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index 17e01dd..c43049a 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -206,8 +206,8 @@ dpdk_rte_mzalloc(size_t sz) void free_dpdk_buf(struct dpif_packet *p) { - struct ofpbuf *b = &p->ofpbuf; - struct rte_mbuf *pkt = (struct rte_mbuf *) b->dpdk_buf; + struct ofpbuf *ofp = &p->ofpbuf; + struct rte_mbuf *pkt = (struct rte_mbuf *) ofp->dpdk_buf; rte_mempool_put(pkt->pool, pkt); } @@ -612,104 +612,151 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets, } inline static void -dpdk_queue_pkt(struct netdev_dpdk *dev, int qid, - struct rte_mbuf *pkt) +dpdk_queue_pkts(struct netdev_dpdk *dev, int qid, + struct rte_mbuf **pkts, int cnt) { struct dpdk_tx_queue *txq = &dev->tx_q[qid]; uint64_t diff_tsc; uint64_t cur_tsc; uint32_t nb_tx; + int i = 0; + rte_spinlock_lock(&txq->tx_lock); - txq->burst_pkts[txq->count++] = pkt; - if (txq->count == MAX_TX_QUEUE_LEN) { - goto flush; - } - cur_tsc = rte_get_timer_cycles(); - if (txq->count == 1) { - txq->tsc = cur_tsc; - } - diff_tsc = cur_tsc - txq->tsc; - if (diff_tsc >= DRAIN_TSC) { - goto flush; - } - rte_spinlock_unlock(&txq->tx_lock); - return; + while (i < cnt) { + int freeslots = MAX_TX_QUEUE_LEN - txq->count; + int tocopy = MIN(freeslots, cnt-i); -flush: - nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count); - if (nb_tx != txq->count) { - /* free buffers if we couldn't transmit packets */ - rte_mempool_put_bulk(dev->dpdk_mp->mp, - (void **) &txq->burst_pkts[nb_tx], - (txq->count - nb_tx)); + memcpy(&txq->burst_pkts[txq->count], &pkts[i], + tocopy * sizeof (struct rte_mbuf *)); + + txq->count += tocopy; + i += tocopy; + + if (txq->count == MAX_TX_QUEUE_LEN) { + goto flush; + } + cur_tsc = rte_get_timer_cycles(); + if (txq->count == 1) { + txq->tsc = cur_tsc; + } + diff_tsc = cur_tsc - txq->tsc; + if (diff_tsc >= DRAIN_TSC) { + goto flush; + } + continue; + + flush: + nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, + txq->count); + if (nb_tx != txq->count) { + /* free buffers if we couldn't transmit packets */ + rte_mempool_put_bulk(dev->dpdk_mp->mp, + (void **) &txq->burst_pkts[nb_tx], + (txq->count - nb_tx)); + } + txq->count = 0; } - txq->count = 0; rte_spinlock_unlock(&txq->tx_lock); } /* Tx function. Transmit packets indefinitely */ static void -dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size) +dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt) { struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); - struct rte_mbuf *pkt; + struct rte_mbuf *mbufs[cnt]; + int i, newcnt = 0; - pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp); - if (!pkt) { - ovs_mutex_lock(&dev->mutex); - dev->stats.tx_dropped++; - ovs_mutex_unlock(&dev->mutex); - return; - } + for (i = 0; i < cnt; i++) { + int size = ofpbuf_size(&pkts[i]->ofpbuf); + if (size > dev->max_packet_len) { + VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", + (int)size , dev->max_packet_len); - /* We have to do a copy for now */ - memcpy(pkt->pkt.data, buf, size); + ovs_mutex_lock(&dev->mutex); + dev->stats.tx_dropped++; + ovs_mutex_unlock(&dev->mutex); - rte_pktmbuf_data_len(pkt) = size; - rte_pktmbuf_pkt_len(pkt) = size; + continue; + } - dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt); + mbufs[newcnt] = rte_pktmbuf_alloc(dev->dpdk_mp->mp); + + if (!mbufs[newcnt]) { + ovs_mutex_lock(&dev->mutex); + dev->stats.tx_dropped++; + ovs_mutex_unlock(&dev->mutex); + return; + } + + /* We have to do a copy for now */ + memcpy(mbufs[newcnt]->pkt.data, ofpbuf_data(&pkts[i]->ofpbuf), size); + + rte_pktmbuf_data_len(mbufs[newcnt]) = size; + rte_pktmbuf_pkt_len(mbufs[newcnt]) = size; + + newcnt++; + } + + dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt); dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE); } static int -netdev_dpdk_send(struct netdev *netdev, - struct dpif_packet *packet, bool may_steal) +netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt, + bool may_steal) { struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); - struct ofpbuf *ofpbuf = &packet->ofpbuf; int ret; + int i; - if (ofpbuf_size(ofpbuf) > dev->max_packet_len) { - VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", - (int)ofpbuf_size(ofpbuf) , dev->max_packet_len); - - ovs_mutex_lock(&dev->mutex); - dev->stats.tx_dropped++; - ovs_mutex_unlock(&dev->mutex); - - ret = E2BIG; - goto out; - } - - if (!may_steal || ofpbuf->source != OFPBUF_DPDK) { - dpdk_do_tx_copy(netdev, (char *) ofpbuf_data(ofpbuf), ofpbuf_size(ofpbuf)); + if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) { + dpdk_do_tx_copy(netdev, pkts, cnt); if (may_steal) { - dpif_packet_delete(packet); + for (i = 0; i < cnt; i++) { + dpif_packet_delete(pkts[i]); + } } } else { int qid; + int next_tx_idx = 0; + int dropped = 0; qid = rte_lcore_id() % NR_QUEUE; - dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf); + for (i = 0; i < cnt; i++) { + int size = ofpbuf_size(&pkts[i]->ofpbuf); + if (OVS_UNLIKELY(size > dev->max_packet_len)) { + if (next_tx_idx != i) { + dpdk_queue_pkts(dev, qid, + (struct rte_mbuf **)&pkts[next_tx_idx], + i-next_tx_idx); + + VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", + (int)size , dev->max_packet_len); + + dpif_packet_delete(pkts[i]); + dropped++; + } + next_tx_idx = i + 1; + } + } + if (next_tx_idx != cnt) { + dpdk_queue_pkts(dev, qid, + (struct rte_mbuf **)&pkts[next_tx_idx], + cnt-next_tx_idx); + } + if (OVS_UNLIKELY(dropped)) { + ovs_mutex_lock(&dev->mutex); + dev->stats.tx_dropped += dropped; + ovs_mutex_unlock(&dev->mutex); + } } ret = 0; -out: return ret; } diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index 1a51533..8d1c298 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -846,51 +846,61 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_) } static int -netdev_dummy_send(struct netdev *netdev, struct dpif_packet *pkt, +netdev_dummy_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt, bool may_steal) { struct netdev_dummy *dev = netdev_dummy_cast(netdev); - const void *buffer = ofpbuf_data(&pkt->ofpbuf); - size_t size = ofpbuf_size(&pkt->ofpbuf); + int error = 0; + int i; - if (size < ETH_HEADER_LEN) { - return EMSGSIZE; - } else { - const struct eth_header *eth = buffer; - int max_size; + for (i = 0; i < cnt; i++) { + const void *buffer = ofpbuf_data(&pkts[i]->ofpbuf); + size_t size = ofpbuf_size(&pkts[i]->ofpbuf); - ovs_mutex_lock(&dev->mutex); - max_size = dev->mtu + ETH_HEADER_LEN; - ovs_mutex_unlock(&dev->mutex); + if (size < ETH_HEADER_LEN) { + error = EMSGSIZE; + break; + } else { + const struct eth_header *eth = buffer; + int max_size; - if (eth->eth_type == htons(ETH_TYPE_VLAN)) { - max_size += VLAN_HEADER_LEN; - } - if (size > max_size) { - return EMSGSIZE; + ovs_mutex_lock(&dev->mutex); + max_size = dev->mtu + ETH_HEADER_LEN; + ovs_mutex_unlock(&dev->mutex); + + if (eth->eth_type == htons(ETH_TYPE_VLAN)) { + max_size += VLAN_HEADER_LEN; + } + if (size > max_size) { + error = EMSGSIZE; + break; + } } - } - ovs_mutex_lock(&dev->mutex); - dev->stats.tx_packets++; - dev->stats.tx_bytes += size; + ovs_mutex_lock(&dev->mutex); + dev->stats.tx_packets++; + dev->stats.tx_bytes += size; + + dummy_packet_conn_send(&dev->conn, buffer, size); - dummy_packet_conn_send(&dev->conn, buffer, size); + if (dev->tx_pcap) { + struct ofpbuf packet; - if (dev->tx_pcap) { - struct ofpbuf packet; + ofpbuf_use_const(&packet, buffer, size); + ovs_pcap_write(dev->tx_pcap, &packet); + fflush(dev->tx_pcap); + } - ofpbuf_use_const(&packet, buffer, size); - ovs_pcap_write(dev->tx_pcap, &packet); - fflush(dev->tx_pcap); + ovs_mutex_unlock(&dev->mutex); } - ovs_mutex_unlock(&dev->mutex); if (may_steal) { - dpif_packet_delete(pkt); + for (i = 0; i < cnt; i++) { + dpif_packet_delete(pkts[i]); + } } - return 0; + return error; } static int diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c index 77a0299..1780639 100644 --- a/lib/netdev-linux.c +++ b/lib/netdev-linux.c @@ -1012,7 +1012,7 @@ netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets, VLOG_WARN_RL(&rl, "error receiving Ethernet packet on %s: %s", ovs_strerror(errno), netdev_rxq_get_name(rxq_)); } - ofpbuf_delete(buffer); + dpif_packet_delete(packet); } else { dp_packet_pad(buffer); packets[0] = packet; @@ -1057,13 +1057,16 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_) * The kernel maintains a packet transmission queue, so the caller is not * expected to do additional queuing of packets. */ static int -netdev_linux_send(struct netdev *netdev_, struct dpif_packet *pkt, +netdev_linux_send(struct netdev *netdev_, struct dpif_packet **pkts, int cnt, bool may_steal) { - const void *data = ofpbuf_data(&pkt->ofpbuf); - size_t size = ofpbuf_size(&pkt->ofpbuf); + int i; + int error = 0; - for (;;) { + /* 'i' is incremented only if there's no error */ + for (i = 0; i < cnt;) { + const void *data = ofpbuf_data(&pkts[i]->ofpbuf); + size_t size = ofpbuf_size(&pkts[i]->ofpbuf); ssize_t retval; if (!is_tap_netdev(netdev_)) { @@ -1113,31 +1116,41 @@ netdev_linux_send(struct netdev *netdev_, struct dpif_packet *pkt, retval = write(netdev->tap_fd, data, size); } - if (may_steal) { - dpif_packet_delete(pkt); - } - if (retval < 0) { /* The Linux AF_PACKET implementation never blocks waiting for room * for packets, instead returning ENOBUFS. Translate this into * EAGAIN for the caller. */ - if (errno == ENOBUFS) { - return EAGAIN; - } else if (errno == EINTR) { + error = errno == ENOBUFS ? EAGAIN : errno; + if (error == EINTR) { + /* continue without incrementing 'i', i.e. retry this packet */ continue; - } else if (errno != EAGAIN) { - VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s", - netdev_get_name(netdev_), ovs_strerror(errno)); } - return errno; + break; } else if (retval != size) { - VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes of " - "%"PRIuSIZE") on %s", retval, size, netdev_get_name(netdev_)); - return EMSGSIZE; - } else { - return 0; + VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" bytes" + " of %"PRIuSIZE") on %s", retval, size, + netdev_get_name(netdev_)); + error = EMSGSIZE; + break; } + + /* Process the next packet in the batch */ + i++; } + + if (may_steal) { + for (i = 0; i < cnt; i++) { + dpif_packet_delete(pkts[i]); + } + } + + if (error && error != EAGAIN) { + VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s", + netdev_get_name(netdev_), ovs_strerror(error)); + } + + return error; + } /* Registers with the poll loop to wake up from the next call to poll_block() diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index 17109f7..6b8160d 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -250,13 +250,16 @@ struct netdev_class { const struct netdev_tunnel_config * (*get_tunnel_config)(const struct netdev *netdev); - /* Sends the buffer on 'netdev'. - * Returns 0 if successful, otherwise a positive errno value. Returns - * EAGAIN without blocking if the packet cannot be queued immediately. - * Returns EMSGSIZE if a partial packet was transmitted or if the packet - * is too big or too small to transmit on the device. + /* Sends buffers on 'netdev'. + * Returns 0 if successful (for every buffer), otherwise a positive errno value. + * Returns EAGAIN without blocking if one or more packets cannot be + * queued immediately. Returns EMSGSIZE if a partial packet was transmitted + * or if a packet is too big or too small to transmit on the device. * - * To retain ownership of 'buffer' caller can set may_steal to false. + * If the function returns a non-zero value, some of the packets might have + * been sent anyway. + * + * To retain ownership of 'buffers' caller can set may_steal to false. * * The network device is expected to maintain a packet transmission queue, * so that the caller does not ordinarily have to do additional queuing of @@ -268,7 +271,7 @@ struct netdev_class { * network device from being usefully used by the netdev-based "userspace * datapath". It will also prevent the OVS implementation of bonding from * working properly over 'netdev'.) */ - int (*send)(struct netdev *netdev, struct dpif_packet *buffer, + int (*send)(struct netdev *netdev, struct dpif_packet **buffers, int cnt, bool may_steal); /* Registers with the poll loop to wake up from the next call to diff --git a/lib/netdev.c b/lib/netdev.c index aaafb11..25edc16 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -650,10 +650,14 @@ netdev_rxq_drain(struct netdev_rxq *rx) : 0); } -/* Sends 'buffer' on 'netdev'. Returns 0 if successful, otherwise a positive - * errno value. Returns EAGAIN without blocking if the packet cannot be queued - * immediately. Returns EMSGSIZE if a partial packet was transmitted or if - * the packet is too big or too small to transmit on the device. +/* Sends 'buffers' on 'netdev'. Returns 0 if successful (for every packet), + * otherwise a positive errno value. Returns EAGAIN without blocking if + * at least one the packets cannot be queued immediately. Returns EMSGSIZE + * if a partial packet was transmitted or if a packet is too big or too small + * to transmit on the device. + * + * If the function returns a non-zero value, some of the packets might have + * been sent anyway. * * To retain ownership of 'buffer' caller can set may_steal to false. * @@ -663,12 +667,13 @@ netdev_rxq_drain(struct netdev_rxq *rx) * Some network devices may not implement support for this function. In such * cases this function will always return EOPNOTSUPP. */ int -netdev_send(struct netdev *netdev, struct dpif_packet *buffer, bool may_steal) +netdev_send(struct netdev *netdev, struct dpif_packet **buffers, int cnt, + bool may_steal) { int error; error = (netdev->netdev_class->send - ? netdev->netdev_class->send(netdev, buffer, may_steal) + ? netdev->netdev_class->send(netdev, buffers, cnt, may_steal) : EOPNOTSUPP); if (!error) { COVERAGE_INC(netdev_sent); diff --git a/lib/netdev.h b/lib/netdev.h index 3a0b0d9..53415b2 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -173,7 +173,8 @@ void netdev_rxq_wait(struct netdev_rxq *); int netdev_rxq_drain(struct netdev_rxq *); /* Packet transmission. */ -int netdev_send(struct netdev *, struct dpif_packet *, bool may_steal); +int netdev_send(struct netdev *, struct dpif_packet **, int cnt, + bool may_steal); void netdev_send_wait(struct netdev *); /* Hardware address. */ -- 2.0.0 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev