I removed call to rte_mempool_put_bulk() from dpdk_queue_flush__(). Also removed unnecessary dpdk_buf pointer from ofpbuf. Then Pushed to master.
Thanks. On Thu, Jul 17, 2014 at 2:29 PM, Daniele Di Proietto <ddiproie...@vmware.com> wrote: > DPDK mempools rely on rte_lcore_id() to implement a thread-local cache. > Our non pmd threads had rte_lcore_id() == 0. This allowed concurrent access to > the "thread-local" cache, causing crashes. > > This commit resolves the issue with the following changes: > > - Every non pmd thread has the same lcore_id (0, for management reasons), > which > is not shared with any pmd thread (lcore_id for pmd threads now start from > 1) > - DPDK mbufs must be allocated/freed in pmd threads. When there is the need to > use mempools in non pmd threads, like in dpdk_do_tx_copy(), a mutex must be > held. > - The previous change does not allow us anymore to pass DPDK mbufs to handler > threads: therefore this commit partially revert 143859ec63d45e. Now packets > are copied for upcall processing. We can remove the extra memcpy by > processing upcalls in the pmd thread itself. > > With the introduction of the extra locking, the packet throughput will be > lower > in the following cases: > > - When using internal (tap) devices with DPDK devices on the same datapath. > Anyway, to support internal devices efficiently, we needed DPDK KNI devices, > which will be proper pmd devices and will not need this locking. > - When packets are processed in the slow path by non pmd threads. This > overhead > can be avoided by handling the upcalls directly in pmd threads (a change > that > has already been proposed by Ryan Wilson) > > Also, the following two fixes have been introduced: > - In dpdk_free_buf() use rte_pktmbuf_free_seg() instead of rte_mempool_put(). > This allows OVS to run properly with CONFIG_RTE_LIBRTE_MBUF_DEBUG DPDK > option > - Do not bulk free mbufs in a transmission queue. They may belong to different > mempools > > Signed-off-by: Daniele Di Proietto <ddiproie...@vmware.com> > --- > lib/dpif-netdev.c | 25 +++++++++++++++---------- > lib/dpif-netdev.h | 1 + > lib/netdev-dpdk.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++--- > lib/netdev-dpdk.h | 8 +++++++- > lib/ovs-thread.c | 3 +++ > 5 files changed, 77 insertions(+), 14 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index 9270873..7ca929a 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -15,7 +15,7 @@ > */ > > #include <config.h> > -#include "dpif.h" > +#include "dpif-netdev.h" > > #include <ctype.h> > #include <errno.h> > @@ -71,7 +71,6 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev); > #define NETDEV_RULE_PRIORITY 0x8000 > > #define FLOW_DUMP_MAX_BATCH 50 > -#define NR_THREADS 1 > /* Use per thread recirc_depth to prevent recirculation loop. */ > #define MAX_RECIRC_DEPTH 5 > DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) > @@ -733,7 +732,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, > const char *type, > > if (netdev_is_pmd(netdev)) { > dp->pmd_count++; > - dp_netdev_set_pmd_threads(dp, NR_THREADS); > + dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS); > dp_netdev_reload_pmd_threads(dp); > } > ovs_refcount_init(&port->ref_cnt); > @@ -2095,6 +2094,7 @@ dp_netdev_input(struct dp_netdev *dp, struct > dpif_packet **packets, int cnt, > > dp_netdev_output_userspace(dp, &buf, 1, hash % > dp->n_handlers, > DPIF_UC_MISS, mfs[i], NULL); > + dpif_packet_delete(packets[i]); > } else { > /* No upcall queue. Freeing the packet */ > dpif_packet_delete(packets[i]); > @@ -2161,6 +2161,7 @@ OVS_REQUIRES(q->mutex) > if (userdata) { > buf_size += NLA_ALIGN(userdata->nla_len); > } > + buf_size += ofpbuf_size(packet); > ofpbuf_init(buf, buf_size); > > /* Put ODP flow. */ > @@ -2175,16 +2176,19 @@ OVS_REQUIRES(q->mutex) > NLA_ALIGN(userdata->nla_len)); > } > > - upcall->packet = *packet; > + /* We have to perform a copy of the packet, because we cannot send > DPDK > + * mbufs to a non pmd thread. When the upcall processing will be done > + * in the pmd thread, this copy can be avoided */ > + ofpbuf_set_data(&upcall->packet, ofpbuf_put(buf, ofpbuf_data(packet), > + ofpbuf_size(packet))); > + ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet)); > > seq_change(q->seq); > > return 0; > } else { > - ofpbuf_delete(packet); > return ENOBUFS; > } > - > } > > static int > @@ -2252,19 +2256,20 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > miniflow_initialize(&key.flow, key.buf); > > for (i = 0; i < cnt; i++) { > - struct ofpbuf *packet, *userspace_packet; > + struct ofpbuf *packet; > > packet = &packets[i]->ofpbuf; > > miniflow_extract(packet, md, &key.flow); > > - userspace_packet = may_steal ? packet : ofpbuf_clone(packet); > - > - dp_netdev_output_userspace(aux->dp, &userspace_packet, 1, > + dp_netdev_output_userspace(aux->dp, &packet, 1, > miniflow_hash_5tuple(&key.flow, 0) > % aux->dp->n_handlers, > DPIF_UC_ACTION, &key.flow, > userdata); > + if (may_steal) { > + dpif_packet_delete(packets[i]); > + } > } > break; > } > diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h > index af4a969..0f42d7a 100644 > --- a/lib/dpif-netdev.h > +++ b/lib/dpif-netdev.h > @@ -40,6 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b) > } > > #define NR_QUEUE 1 > +#define NR_PMD_THREADS 1 > > #ifdef __cplusplus > } > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c > index b925dd28..9315ed9 100644 > --- a/lib/netdev-dpdk.c > +++ b/lib/netdev-dpdk.c > @@ -138,6 +138,11 @@ static struct list dpdk_list OVS_GUARDED_BY(dpdk_mutex) > static struct list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex) > = LIST_INITIALIZER(&dpdk_mp_list); > > +/* This mutex must be used by non pmd threads when allocating or freeing > + * mbufs through mempools. Since dpdk_queue_pkts() and dpdk_queue_flush() may > + * use mempools, a non pmd thread should hold this mutex while calling them > */ > +struct ovs_mutex nonpmd_mempool_mutex = OVS_MUTEX_INITIALIZER; > + > struct dpdk_mp { > struct rte_mempool *mp; > int mtu; > @@ -200,6 +205,8 @@ struct netdev_rxq_dpdk { > int port_id; > }; > > +static bool thread_is_pmd(void); > + > static int netdev_dpdk_construct(struct netdev *); > > static bool > @@ -223,13 +230,15 @@ dpdk_rte_mzalloc(size_t sz) > return ptr; > } > > +/* XXX this function should be called only by pmd threads (or by non pmd > + * threads holding the nonpmd_mempool_mutex) */ > void > free_dpdk_buf(struct dpif_packet *p) > { > struct ofpbuf *buf = &p->ofpbuf; > struct rte_mbuf *pkt = (struct rte_mbuf *) buf->dpdk_buf; > > - rte_mempool_put(pkt->pool, pkt); > + rte_pktmbuf_free_seg(pkt); > } > > static void > @@ -617,7 +626,13 @@ dpdk_queue_flush__(struct netdev_dpdk *dev, int qid) > > nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count); > if (OVS_UNLIKELY(nb_tx != txq->count)) { > - /* free buffers if we couldn't transmit packets */ > + /* free buffers, which we couldn't transmit, one at a time (each > + * packet could come from a different mempool) */ > + int i; > + > + for (i = nb_tx; i < txq->count; i++) { > + rte_pktmbuf_free_seg(txq->burst_pkts[i]); > + } > rte_mempool_put_bulk(dev->dpdk_mp->mp, > (void **) &txq->burst_pkts[nb_tx], > (txq->count - nb_tx)); > @@ -697,6 +712,7 @@ dpdk_queue_pkts(struct netdev_dpdk *dev, int qid, > /* Tx function. Transmit packets indefinitely */ > static void > dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt) > + OVS_NO_THREAD_SAFETY_ANALYSIS > { > struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); > struct rte_mbuf *mbufs[cnt]; > @@ -704,6 +720,13 @@ dpdk_do_tx_copy(struct netdev *netdev, struct > dpif_packet ** pkts, int cnt) > int newcnt = 0; > int i; > > + /* If we are on a non pmd thread we have to use the mempool mutex, > because > + * every non pmd thread shares the same mempool cache */ > + > + if (!thread_is_pmd()) { > + ovs_mutex_lock(&nonpmd_mempool_mutex); > + } > + > for (i = 0; i < cnt; i++) { > int size = ofpbuf_size(&pkts[i]->ofpbuf); > > @@ -739,6 +762,10 @@ dpdk_do_tx_copy(struct netdev *netdev, struct > dpif_packet ** pkts, int cnt) > > dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt); > dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE); > + > + if (!thread_is_pmd()) { > + ovs_mutex_unlock(&nonpmd_mempool_mutex); > + } > } > > static int > @@ -1384,6 +1411,9 @@ dpdk_init(int argc, char **argv) > argv[result] = argv[0]; > } > > + /* We are called from the main thread here */ > + thread_set_nonpmd(); > + > return result + 1; > } > > @@ -1429,7 +1459,25 @@ pmd_thread_setaffinity_cpu(int cpu) > VLOG_ERR("Thread affinity error %d",err); > return err; > } > - RTE_PER_LCORE(_lcore_id) = cpu; > + /* lcore_id 0 is reseved for use by non pmd threads. */ > + RTE_PER_LCORE(_lcore_id) = cpu + 1; > > return 0; > } > + > +void > +thread_set_nonpmd(void) > +{ > + /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is > reserved > + * for non pmd threads */ > + BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE); > + /* We have to use 0 to allow non pmd threads to perform certain DPDK > + * operations, like rte_eth_dev_configure(). */ > + RTE_PER_LCORE(_lcore_id) = 0; > +} > + > +static bool > +thread_is_pmd(void) > +{ > + return rte_lcore_id() != 0; > +} > diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h > index b021aa5..e4ba6fc 100644 > --- a/lib/netdev-dpdk.h > +++ b/lib/netdev-dpdk.h > @@ -2,7 +2,6 @@ > #define NETDEV_DPDK_H > > #include <config.h> > -#include "ofpbuf.h" > > struct dpif_packet; > > @@ -25,6 +24,7 @@ int dpdk_init(int argc, char **argv); > void netdev_dpdk_register(void); > void free_dpdk_buf(struct dpif_packet *); > int pmd_thread_setaffinity_cpu(int cpu); > +void thread_set_nonpmd(void); > > #else > > @@ -52,5 +52,11 @@ pmd_thread_setaffinity_cpu(int cpu OVS_UNUSED) > return 0; > } > > +static inline void > +thread_set_nonpmd(void) > +{ > + /* Nothing */ > +} > + > #endif /* DPDK_NETDEV */ > #endif > diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c > index cbee582..fe6fb43 100644 > --- a/lib/ovs-thread.c > +++ b/lib/ovs-thread.c > @@ -25,6 +25,7 @@ > #include <unistd.h> > #include "compiler.h" > #include "hash.h" > +#include "netdev-dpdk.h" > #include "ovs-rcu.h" > #include "poll-loop.h" > #include "seq.h" > @@ -326,6 +327,8 @@ ovsthread_wrapper(void *aux_) > set_subprogram_name("%s%u", aux.name, id); > ovsrcu_quiesce_end(); > > + thread_set_nonpmd(); > + > return aux.start(aux.arg); > } > > -- > 2.0.0 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev