On Fri, Jun 13, 2014 at 5:00 PM, Daniele Di Proietto <ddiproie...@vmware.com> wrote: > The netdev_send function has been modified to accept multiple packets, to > allow netdev providers to amortize locking and queuing costs. > This is especially true for netdev-dpdk. > > Later commits exploit the new API. > > Signed-off-by: Daniele Di Proietto <ddiproie...@vmware.com> > --- > lib/dpif-netdev.c | 4 +- > lib/netdev-bsd.c | 56 ++++++++++-------- > lib/netdev-dpdk.c | 160 > ++++++++++++++++++++++++++++++++------------------ > lib/netdev-dummy.c | 69 +++++++++++++--------- > lib/netdev-linux.c | 54 ++++++++++------- > lib/netdev-provider.h | 18 +++--- > lib/netdev.c | 17 ++++-- > lib/netdev.h | 3 +- > 8 files changed, 236 insertions(+), 145 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index 332bbda..86e36bc 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -2123,7 +2123,9 @@ dp_execute_cb(void *aux_, struct dpif_packet *packet, > case OVS_ACTION_ATTR_OUTPUT: > p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a))); > if (p) { > - netdev_send(p->netdev, &packet->ofpbuf, may_steal); > + struct ofpbuf * ofp = &packet->ofpbuf; > + Extra space.
> + netdev_send(p->netdev, &ofp, 1, may_steal); > } > break; > > diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c > index 27d90f0..b53de0d 100644 > --- a/lib/netdev-bsd.c > +++ b/lib/netdev-bsd.c > @@ -685,13 +685,13 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_) > * system or a tap device. > */ > static int > -netdev_bsd_send(struct netdev *netdev_, struct ofpbuf *pkt, bool may_steal) > +netdev_bsd_send(struct netdev *netdev_, struct ofpbuf **pkts, int cnt, > + bool may_steal) > { > struct netdev_bsd *dev = netdev_bsd_cast(netdev_); > const char *name = netdev_get_name(netdev_); > - const void *data = ofpbuf_data(pkt); > - size_t size = ofpbuf_size(pkt); > int error; > + int i; > > ovs_mutex_lock(&dev->mutex); > if (dev->tap_fd < 0 && !dev->pcap) { > @@ -700,35 +700,43 @@ netdev_bsd_send(struct netdev *netdev_, struct ofpbuf > *pkt, bool may_steal) > error = 0; > } > > - while (!error) { > - ssize_t retval; > - if (dev->tap_fd >= 0) { > - retval = write(dev->tap_fd, data, size); > - } else { > - retval = pcap_inject(dev->pcap, data, size); > - } > - if (retval < 0) { > - if (errno == EINTR) { > - continue; > + for (i = 0; i < cnt; i++) { > + const void *data = ofpbuf_data(pkts[i]); > + size_t size = ofpbuf_size(pkts[i]); > + > + while (!error) { > + ssize_t retval; > + if (dev->tap_fd >= 0) { > + retval = write(dev->tap_fd, data, size); > } else { > - error = errno; > - if (error != EAGAIN) { > - VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: " > - "%s", name, ovs_strerror(error)); > + retval = pcap_inject(dev->pcap, data, size); > + } > + if (retval < 0) { > + if (errno == EINTR) { > + continue; > + } else { > + error = errno; > + if (error != EAGAIN) { > + VLOG_WARN_RL(&rl, "error sending Ethernet packet on" > + " %s: %s", name, ovs_strerror(error)); > + } > } > + } else if (retval != size) { > + VLOG_WARN_RL(&rl, "sent partial Ethernet packet " > + "(%"PRIuSIZE" bytes of " > + "%"PRIuSIZE") on %s", retval, size, name); > + error = EMSGSIZE; > + } else { > + break; > } > - } else if (retval != size) { > - VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" > bytes of " > - "%"PRIuSIZE") on %s", retval, size, name); > - error = EMSGSIZE; > - } else { > - break; > } > } > > ovs_mutex_unlock(&dev->mutex); > if (may_steal) { > - ofpbuf_delete(pkt); > + for (i = 0; i < cnt; i++) { > + ofpbuf_delete(pkt); > + } > } > > return error; > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c > index ec6565a..74c8e57 100644 > --- a/lib/netdev-dpdk.c > +++ b/lib/netdev-dpdk.c > @@ -611,103 +611,149 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct > dpif_packet **packets, > } > > inline static void > -dpdk_queue_pkt(struct netdev_dpdk *dev, int qid, > - struct rte_mbuf *pkt) > +dpdk_queue_pkts(struct netdev_dpdk *dev, int qid, > + struct rte_mbuf **pkts, int cnt) > { > struct dpdk_tx_queue *txq = &dev->tx_q[qid]; > uint64_t diff_tsc; > uint64_t cur_tsc; > uint32_t nb_tx; > > + int i = 0; > + > rte_spinlock_lock(&txq->tx_lock); > - txq->burst_pkts[txq->count++] = pkt; > - if (txq->count == MAX_TX_QUEUE_LEN) { > - goto flush; > - } > - cur_tsc = rte_get_timer_cycles(); > - if (txq->count == 1) { > - txq->tsc = cur_tsc; > - } > - diff_tsc = cur_tsc - txq->tsc; > - if (diff_tsc >= DRAIN_TSC) { > - goto flush; > - } > - rte_spinlock_unlock(&txq->tx_lock); > - return; > + while (i < cnt) { > + int freeslots = MAX_TX_QUEUE_LEN - txq->count; > + int tocopy = MIN(freeslots, cnt-i); > > -flush: > - nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count); > - if (nb_tx != txq->count) { > - /* free buffers if we couldn't transmit packets */ > - rte_mempool_put_bulk(dev->dpdk_mp->mp, > - (void **) &txq->burst_pkts[nb_tx], > - (txq->count - nb_tx)); > + memcpy(&txq->burst_pkts[txq->count], &pkts[i], > + tocopy * sizeof (struct rte_mbuf *)); > + > + txq->count += tocopy; > + i += tocopy; > + > + if (txq->count == MAX_TX_QUEUE_LEN) { > + goto flush; > + } > + cur_tsc = rte_get_timer_cycles(); > + if (txq->count == 1) { > + txq->tsc = cur_tsc; > + } > + diff_tsc = cur_tsc - txq->tsc; > + if (diff_tsc >= DRAIN_TSC) { > + goto flush; > + } > + continue; > + > + flush: > + nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, > + txq->count); > + if (nb_tx != txq->count) { > + /* free buffers if we couldn't transmit packets */ > + rte_mempool_put_bulk(dev->dpdk_mp->mp, > + (void **) &txq->burst_pkts[nb_tx], > + (txq->count - nb_tx)); > + } > + txq->count = 0; > } > - txq->count = 0; > rte_spinlock_unlock(&txq->tx_lock); > } > > /* Tx function. Transmit packets indefinitely */ > static void > -dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size) > +dpdk_do_tx_copy(struct netdev *netdev, struct ofpbuf ** ofpbufs, int cnt) > { > struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); > - struct rte_mbuf *pkt; > + struct rte_mbuf *pkts[cnt]; > + int i; > > - pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp); > - if (!pkt) { > - ovs_mutex_lock(&dev->mutex); > - dev->stats.tx_dropped++; > - ovs_mutex_unlock(&dev->mutex); > - return; > - } > + for (i = 0; i < cnt; i++) { > + int size = ofpbuf_size(ofpbufs[i]); > + if (size > dev->max_packet_len) { > + VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", > + (int)size , dev->max_packet_len); > > - /* We have to do a copy for now */ > - memcpy(pkt->pkt.data, buf, size); > + ovs_mutex_lock(&dev->mutex); > + dev->stats.tx_dropped++; > + ovs_mutex_unlock(&dev->mutex); > > - rte_pktmbuf_data_len(pkt) = size; > - rte_pktmbuf_pkt_len(pkt) = size; > + continue; > + } > + > + pkts[i] = rte_pktmbuf_alloc(dev->dpdk_mp->mp); > > - dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt); > + if (!pkts[i]) { > + ovs_mutex_lock(&dev->mutex); > + dev->stats.tx_dropped++; > + ovs_mutex_unlock(&dev->mutex); > + return; > + } > + > + /* We have to do a copy for now */ > + memcpy(pkts[i]->pkt.data, ofpbuf_data(ofpbufs[i]), size); > + > + rte_pktmbuf_data_len(pkts[i]) = size; > + rte_pktmbuf_pkt_len(pkts[i]) = size; > + } > + > + dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, pkts, cnt); > dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE); > } > > static int > -netdev_dpdk_send(struct netdev *netdev, > - struct ofpbuf *ofpbuf, bool may_steal) > +netdev_dpdk_send(struct netdev *netdev, struct ofpbuf **ofpbufs, int cnt, > + bool may_steal) > { > struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); > int ret; > + int i; > > - if (ofpbuf_size(ofpbuf) > dev->max_packet_len) { > - VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", > - (int)ofpbuf_size(ofpbuf) , dev->max_packet_len); > - > - ovs_mutex_lock(&dev->mutex); > - dev->stats.tx_dropped++; > - ovs_mutex_unlock(&dev->mutex); > - > - ret = E2BIG; > - goto out; > - } > - > - if (!may_steal || ofpbuf->source != OFPBUF_DPDK) { > - dpdk_do_tx_copy(netdev, (char *) ofpbuf_data(ofpbuf), > ofpbuf_size(ofpbuf)); > + if (!may_steal || ofpbufs[0]->source != OFPBUF_DPDK) { > + dpdk_do_tx_copy(netdev, ofpbufs, cnt); > > if (may_steal) { > - ofpbuf_delete(ofpbuf); > + for (i = 0; i < cnt; i++) { > + ofpbuf_delete(ofpbufs[i]); > + } > } > } else { > int qid; > + int next_tx_idx = 0; > + int dropped = 0; > > qid = rte_lcore_id() % NR_QUEUE; > > - dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf); > + for (i = 0; i < cnt; i++) { > + int size = ofpbuf_size(ofpbufs[i]); > + if (OVS_UNLIKELY(size > dev->max_packet_len)) { > + if (next_tx_idx != i) { > + dpdk_queue_pkts(dev, qid, > + (struct rte_mbuf > **)&ofpbufs[next_tx_idx], > + i-next_tx_idx); > + > + VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d", > + (int)size , dev->max_packet_len); > + > + ofpbuf_delete(ofpbufs[i]); > + dropped++; > + } > + next_tx_idx = i + 1; > + } > + } > + if (next_tx_idx != cnt) { > + dpdk_queue_pkts(dev, qid, > + (struct rte_mbuf **)&ofpbufs[next_tx_idx], > + cnt-next_tx_idx); > + } > > + if (OVS_UNLIKELY(dropped)) { > + ovs_mutex_lock(&dev->mutex); > + dev->stats.tx_dropped += dropped; > + ovs_mutex_unlock(&dev->mutex); > + } > } > ret = 0; > > -out: > return ret; > } > > diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c > index 1464d29..ca82765 100644 > --- a/lib/netdev-dummy.c > +++ b/lib/netdev-dummy.c > @@ -845,50 +845,61 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_) > } > > static int > -netdev_dummy_send(struct netdev *netdev, struct ofpbuf *pkt, bool may_steal) > +netdev_dummy_send(struct netdev *netdev, struct ofpbuf **pkts, int cnt, > + bool may_steal) > { > struct netdev_dummy *dev = netdev_dummy_cast(netdev); > - const void *buffer = ofpbuf_data(pkt); > - size_t size = ofpbuf_size(pkt); > + int error = 0; > + int i; > > - if (size < ETH_HEADER_LEN) { > - return EMSGSIZE; > - } else { > - const struct eth_header *eth = buffer; > - int max_size; > + for (i = 0; i < cnt; i++) { > + const void *buffer = ofpbuf_data(pkts[i]); > + size_t size = ofpbuf_size(pkts[i]); > > - ovs_mutex_lock(&dev->mutex); > - max_size = dev->mtu + ETH_HEADER_LEN; > - ovs_mutex_unlock(&dev->mutex); > + if (size < ETH_HEADER_LEN) { > + error = EMSGSIZE; > + break; > + } else { > + const struct eth_header *eth = buffer; > + int max_size; > > - if (eth->eth_type == htons(ETH_TYPE_VLAN)) { > - max_size += VLAN_HEADER_LEN; > - } > - if (size > max_size) { > - return EMSGSIZE; > + ovs_mutex_lock(&dev->mutex); > + max_size = dev->mtu + ETH_HEADER_LEN; > + ovs_mutex_unlock(&dev->mutex); > + > + if (eth->eth_type == htons(ETH_TYPE_VLAN)) { > + max_size += VLAN_HEADER_LEN; > + } > + if (size > max_size) { > + error = EMSGSIZE; > + break; > + } > } > - } > > - ovs_mutex_lock(&dev->mutex); > - dev->stats.tx_packets++; > - dev->stats.tx_bytes += size; > + ovs_mutex_lock(&dev->mutex); > + dev->stats.tx_packets++; > + dev->stats.tx_bytes += size; > + > + dummy_packet_conn_send(&dev->conn, buffer, size); > > - dummy_packet_conn_send(&dev->conn, buffer, size); > + if (dev->tx_pcap) { > + struct ofpbuf packet; > > - if (dev->tx_pcap) { > - struct ofpbuf packet; > + ofpbuf_use_const(&packet, buffer, size); > + ovs_pcap_write(dev->tx_pcap, &packet); > + fflush(dev->tx_pcap); > + } > > - ofpbuf_use_const(&packet, buffer, size); > - ovs_pcap_write(dev->tx_pcap, &packet); > - fflush(dev->tx_pcap); > + ovs_mutex_unlock(&dev->mutex); > } > > - ovs_mutex_unlock(&dev->mutex); > if (may_steal) { > - ofpbuf_delete(pkt); > + for (i = 0; i < cnt; i++) { > + ofpbuf_delete(pkts[i]); > + } > } > > - return 0; > + return error; > } > > static int > diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c > index 074a061..21b6ea4 100644 > --- a/lib/netdev-linux.c > +++ b/lib/netdev-linux.c > @@ -1057,12 +1057,16 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_) > * The kernel maintains a packet transmission queue, so the caller is not > * expected to do additional queuing of packets. */ > static int > -netdev_linux_send(struct netdev *netdev_, struct ofpbuf *pkt, bool may_steal) > +netdev_linux_send(struct netdev *netdev_, struct ofpbuf **pkts, int cnt, > + bool may_steal) > { > - const void *data = ofpbuf_data(pkt); > - size_t size = ofpbuf_size(pkt); > + int i; > + int error = 0; > > - for (;;) { > + /* 'i' is incremented only if there's no error */ > + for (i = 0; i < cnt;) { > + const void *data = ofpbuf_data(pkts[i]); > + size_t size = ofpbuf_size(pkts[i]); > ssize_t retval; > > if (!is_tap_netdev(netdev_)) { > @@ -1112,31 +1116,41 @@ netdev_linux_send(struct netdev *netdev_, struct > ofpbuf *pkt, bool may_steal) > retval = write(netdev->tap_fd, data, size); > } > > - if (may_steal) { > - ofpbuf_delete(pkt); > - } > - > if (retval < 0) { > /* The Linux AF_PACKET implementation never blocks waiting for > room > * for packets, instead returning ENOBUFS. Translate this into > * EAGAIN for the caller. */ > - if (errno == ENOBUFS) { > - return EAGAIN; > - } else if (errno == EINTR) { > + error = errno == ENOBUFS ? EAGAIN : errno; > + if (error == EINTR) { > + /* continue without incrementing 'i', i.e. retry this packet > */ > continue; > - } else if (errno != EAGAIN) { > - VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s", > - netdev_get_name(netdev_), ovs_strerror(errno)); > } > - return errno; > + break; > } else if (retval != size) { > - VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" > bytes of " > - "%"PRIuSIZE") on %s", retval, size, > netdev_get_name(netdev_)); > - return EMSGSIZE; > - } else { > - return 0; > + VLOG_WARN_RL(&rl, "sent partial Ethernet packet (%"PRIuSIZE" > bytes" > + " of %"PRIuSIZE") on %s", retval, size, > + netdev_get_name(netdev_)); > + error = EMSGSIZE; > + break; > } > + > + /* Process the next packet in the batch */ > + i++; > } > + > + if (may_steal) { > + for (i = 0; i < cnt; i++) { > + ofpbuf_delete(pkts[i]); > + } > + } > + > + if (error && error != EAGAIN) { > + VLOG_WARN_RL(&rl, "error sending Ethernet packet on %s: %s", > + netdev_get_name(netdev_), ovs_strerror(error)); > + } > + > + return error; > + > } > > /* Registers with the poll loop to wake up from the next call to poll_block() > diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h > index 42c0012..33d5173 100644 > --- a/lib/netdev-provider.h > +++ b/lib/netdev-provider.h > @@ -250,13 +250,16 @@ struct netdev_class { > const struct netdev_tunnel_config * > (*get_tunnel_config)(const struct netdev *netdev); > > - /* Sends the buffer on 'netdev'. > - * Returns 0 if successful, otherwise a positive errno value. Returns > - * EAGAIN without blocking if the packet cannot be queued immediately. > - * Returns EMSGSIZE if a partial packet was transmitted or if the packet > - * is too big or too small to transmit on the device. > + /* Sends buffers on 'netdev'. > + * Returns 0 if successful (for every buffer), otherwise a positive > errno value. > + * Returns EAGAIN without blocking if one or more packets cannot be > + * queued immediately. Returns EMSGSIZE if a partial packet was > transmitted > + * or if a packet is too big or too small to transmit on the device. > * > - * To retain ownership of 'buffer' caller can set may_steal to false. > + * If the function returns a non-zero value, some of the packets might > have > + * been sent anyway. > + * > + * To retain ownership of 'buffers' caller can set may_steal to false. > * > * The network device is expected to maintain a packet transmission > queue, > * so that the caller does not ordinarily have to do additional queuing > of > @@ -268,7 +271,8 @@ struct netdev_class { > * network device from being usefully used by the netdev-based "userspace > * datapath". It will also prevent the OVS implementation of bonding > from > * working properly over 'netdev'.) */ > - int (*send)(struct netdev *netdev, struct ofpbuf *buffer, bool > may_steal); > + int (*send)(struct netdev *netdev, struct ofpbuf **buffers, int cnt, > + bool may_steal); > > /* Registers with the poll loop to wake up from the next call to > * poll_block() when the packet transmission queue for 'netdev' has > diff --git a/lib/netdev.c b/lib/netdev.c > index 6a2ad51..ea4405e 100644 > --- a/lib/netdev.c > +++ b/lib/netdev.c > @@ -650,10 +650,14 @@ netdev_rxq_drain(struct netdev_rxq *rx) > : 0); > } > > -/* Sends 'buffer' on 'netdev'. Returns 0 if successful, otherwise a positive > - * errno value. Returns EAGAIN without blocking if the packet cannot be > queued > - * immediately. Returns EMSGSIZE if a partial packet was transmitted or if > - * the packet is too big or too small to transmit on the device. > +/* Sends 'buffers' on 'netdev'. Returns 0 if successful (for every packet), > + * otherwise a positive errno value. Returns EAGAIN without blocking if > + * at least one the packets cannot be queued immediately. Returns EMSGSIZE > + * if a partial packet was transmitted or if a packet is too big or too small > + * to transmit on the device. > + * > + * If the function returns a non-zero value, some of the packets might have > + * been sent anyway. > * > * To retain ownership of 'buffer' caller can set may_steal to false. > * > @@ -663,12 +667,13 @@ netdev_rxq_drain(struct netdev_rxq *rx) > * Some network devices may not implement support for this function. In such > * cases this function will always return EOPNOTSUPP. */ > int > -netdev_send(struct netdev *netdev, struct ofpbuf *buffer, bool may_steal) > +netdev_send(struct netdev *netdev, struct ofpbuf **buffers, int cnt, > + bool may_steal) > { > int error; > > error = (netdev->netdev_class->send > - ? netdev->netdev_class->send(netdev, buffer, may_steal) > + ? netdev->netdev_class->send(netdev, buffers, cnt, may_steal) > : EOPNOTSUPP); > if (!error) { > COVERAGE_INC(netdev_sent); > diff --git a/lib/netdev.h b/lib/netdev.h > index c8880a4..4e9f96c 100644 > --- a/lib/netdev.h > +++ b/lib/netdev.h > @@ -173,7 +173,8 @@ void netdev_rxq_wait(struct netdev_rxq *); > int netdev_rxq_drain(struct netdev_rxq *); > > /* Packet transmission. */ > -int netdev_send(struct netdev *, struct ofpbuf *, bool may_steal); > +int netdev_send(struct netdev *, struct ofpbuf **buffers, int cnt, > + bool may_steal); > void netdev_send_wait(struct netdev *); > > /* Hardware address. */ > -- > 2.0.0 > Looks good. Acked-by: Pravin B Shelar <pshe...@nicira.com> > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev