I removed call to rte_mempool_put_bulk() from dpdk_queue_flush__().
Also removed unnecessary dpdk_buf pointer from ofpbuf. Then Pushed to
master.

Thanks.

On Thu, Jul 17, 2014 at 2:29 PM, Daniele Di Proietto
<ddiproie...@vmware.com> wrote:
> DPDK mempools rely on rte_lcore_id() to implement a thread-local cache.
> Our non pmd threads had rte_lcore_id() == 0. This allowed concurrent access to
> the "thread-local" cache, causing crashes.
>
> This commit resolves the issue with the following changes:
>
> - Every non pmd thread has the same lcore_id (0, for management reasons), 
> which
>   is not shared with any pmd thread (lcore_id for pmd threads now start from 
> 1)
> - DPDK mbufs must be allocated/freed in pmd threads. When there is the need to
>   use mempools in non pmd threads, like in dpdk_do_tx_copy(), a mutex must be
>   held.
> - The previous change does not allow us anymore to pass DPDK mbufs to handler
>   threads: therefore this commit partially revert 143859ec63d45e. Now packets
>   are copied for upcall processing. We can remove the extra memcpy by
>   processing upcalls in the pmd thread itself.
>
> With the introduction of the extra locking, the packet throughput will be 
> lower
> in the following cases:
>
> - When using internal (tap) devices with DPDK devices on the same datapath.
>   Anyway, to support internal devices efficiently, we needed DPDK KNI devices,
>   which will be proper pmd devices and will not need this locking.
> - When packets are processed in the slow path by non pmd threads. This 
> overhead
>   can be avoided by handling the upcalls directly in pmd threads (a change 
> that
>   has already been proposed by Ryan Wilson)
>
> Also, the following two fixes have been introduced:
> - In dpdk_free_buf() use rte_pktmbuf_free_seg() instead of rte_mempool_put().
>   This allows OVS to run properly with CONFIG_RTE_LIBRTE_MBUF_DEBUG DPDK 
> option
> - Do not bulk free mbufs in a transmission queue. They may belong to different
>   mempools
>
> Signed-off-by: Daniele Di Proietto <ddiproie...@vmware.com>
> ---
>  lib/dpif-netdev.c | 25 +++++++++++++++----------
>  lib/dpif-netdev.h |  1 +
>  lib/netdev-dpdk.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++---
>  lib/netdev-dpdk.h |  8 +++++++-
>  lib/ovs-thread.c  |  3 +++
>  5 files changed, 77 insertions(+), 14 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 9270873..7ca929a 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -15,7 +15,7 @@
>   */
>
>  #include <config.h>
> -#include "dpif.h"
> +#include "dpif-netdev.h"
>
>  #include <ctype.h>
>  #include <errno.h>
> @@ -71,7 +71,6 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
>  #define NETDEV_RULE_PRIORITY 0x8000
>
>  #define FLOW_DUMP_MAX_BATCH 50
> -#define NR_THREADS 1
>  /* Use per thread recirc_depth to prevent recirculation loop. */
>  #define MAX_RECIRC_DEPTH 5
>  DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
> @@ -733,7 +732,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, 
> const char *type,
>
>      if (netdev_is_pmd(netdev)) {
>          dp->pmd_count++;
> -        dp_netdev_set_pmd_threads(dp, NR_THREADS);
> +        dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS);
>          dp_netdev_reload_pmd_threads(dp);
>      }
>      ovs_refcount_init(&port->ref_cnt);
> @@ -2095,6 +2094,7 @@ dp_netdev_input(struct dp_netdev *dp, struct 
> dpif_packet **packets, int cnt,
>
>                  dp_netdev_output_userspace(dp, &buf, 1, hash % 
> dp->n_handlers,
>                                             DPIF_UC_MISS, mfs[i], NULL);
> +                dpif_packet_delete(packets[i]);
>              } else {
>                  /* No upcall queue.  Freeing the packet */
>                  dpif_packet_delete(packets[i]);
> @@ -2161,6 +2161,7 @@ OVS_REQUIRES(q->mutex)
>          if (userdata) {
>              buf_size += NLA_ALIGN(userdata->nla_len);
>          }
> +        buf_size += ofpbuf_size(packet);
>          ofpbuf_init(buf, buf_size);
>
>          /* Put ODP flow. */
> @@ -2175,16 +2176,19 @@ OVS_REQUIRES(q->mutex)
>                      NLA_ALIGN(userdata->nla_len));
>          }
>
> -        upcall->packet = *packet;
> +        /* We have to perform a copy of the packet, because we cannot send 
> DPDK
> +         * mbufs to a non pmd thread. When the upcall processing will be done
> +         * in the pmd thread, this copy can be avoided */
> +        ofpbuf_set_data(&upcall->packet, ofpbuf_put(buf, ofpbuf_data(packet),
> +                        ofpbuf_size(packet)));
> +        ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet));
>
>          seq_change(q->seq);
>
>          return 0;
>      } else {
> -        ofpbuf_delete(packet);
>          return ENOBUFS;
>      }
> -
>  }
>
>  static int
> @@ -2252,19 +2256,20 @@ dp_execute_cb(void *aux_, struct dpif_packet 
> **packets, int cnt,
>          miniflow_initialize(&key.flow, key.buf);
>
>          for (i = 0; i < cnt; i++) {
> -            struct ofpbuf *packet, *userspace_packet;
> +            struct ofpbuf *packet;
>
>              packet = &packets[i]->ofpbuf;
>
>              miniflow_extract(packet, md, &key.flow);
>
> -            userspace_packet = may_steal ? packet : ofpbuf_clone(packet);
> -
> -            dp_netdev_output_userspace(aux->dp, &userspace_packet, 1,
> +            dp_netdev_output_userspace(aux->dp, &packet, 1,
>                                         miniflow_hash_5tuple(&key.flow, 0)
>                                             % aux->dp->n_handlers,
>                                         DPIF_UC_ACTION, &key.flow,
>                                         userdata);
> +            if (may_steal) {
> +                dpif_packet_delete(packets[i]);
> +            }
>          }
>          break;
>      }
> diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h
> index af4a969..0f42d7a 100644
> --- a/lib/dpif-netdev.h
> +++ b/lib/dpif-netdev.h
> @@ -40,6 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b)
>  }
>
>  #define NR_QUEUE   1
> +#define NR_PMD_THREADS 1
>
>  #ifdef  __cplusplus
>  }
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index b925dd28..9315ed9 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -138,6 +138,11 @@ static struct list dpdk_list OVS_GUARDED_BY(dpdk_mutex)
>  static struct list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex)
>      = LIST_INITIALIZER(&dpdk_mp_list);
>
> +/* This mutex must be used by non pmd threads when allocating or freeing
> + * mbufs through mempools. Since dpdk_queue_pkts() and dpdk_queue_flush() may
> + * use mempools, a non pmd thread should hold this mutex while calling them 
> */
> +struct ovs_mutex nonpmd_mempool_mutex = OVS_MUTEX_INITIALIZER;
> +
>  struct dpdk_mp {
>      struct rte_mempool *mp;
>      int mtu;
> @@ -200,6 +205,8 @@ struct netdev_rxq_dpdk {
>      int port_id;
>  };
>
> +static bool thread_is_pmd(void);
> +
>  static int netdev_dpdk_construct(struct netdev *);
>
>  static bool
> @@ -223,13 +230,15 @@ dpdk_rte_mzalloc(size_t sz)
>      return ptr;
>  }
>
> +/* XXX this function should be called only by pmd threads (or by non pmd
> + * threads holding the nonpmd_mempool_mutex) */
>  void
>  free_dpdk_buf(struct dpif_packet *p)
>  {
>      struct ofpbuf *buf = &p->ofpbuf;
>      struct rte_mbuf *pkt = (struct rte_mbuf *) buf->dpdk_buf;
>
> -    rte_mempool_put(pkt->pool, pkt);
> +    rte_pktmbuf_free_seg(pkt);
>  }
>
>  static void
> @@ -617,7 +626,13 @@ dpdk_queue_flush__(struct netdev_dpdk *dev, int qid)
>
>      nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
>      if (OVS_UNLIKELY(nb_tx != txq->count)) {
> -        /* free buffers if we couldn't transmit packets */
> +        /* free buffers, which we couldn't transmit, one at a time (each
> +         * packet could come from a different mempool) */
> +        int i;
> +
> +        for (i = nb_tx; i < txq->count; i++) {
> +            rte_pktmbuf_free_seg(txq->burst_pkts[i]);
> +        }
>          rte_mempool_put_bulk(dev->dpdk_mp->mp,
>                               (void **) &txq->burst_pkts[nb_tx],
>                               (txq->count - nb_tx));
> @@ -697,6 +712,7 @@ dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
>  /* Tx function. Transmit packets indefinitely */
>  static void
>  dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
> +    OVS_NO_THREAD_SAFETY_ANALYSIS
>  {
>      struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>      struct rte_mbuf *mbufs[cnt];
> @@ -704,6 +720,13 @@ dpdk_do_tx_copy(struct netdev *netdev, struct 
> dpif_packet ** pkts, int cnt)
>      int newcnt = 0;
>      int i;
>
> +    /* If we are on a non pmd thread we have to use the mempool mutex, 
> because
> +     * every non pmd thread shares the same mempool cache */
> +
> +    if (!thread_is_pmd()) {
> +        ovs_mutex_lock(&nonpmd_mempool_mutex);
> +    }
> +
>      for (i = 0; i < cnt; i++) {
>          int size = ofpbuf_size(&pkts[i]->ofpbuf);
>
> @@ -739,6 +762,10 @@ dpdk_do_tx_copy(struct netdev *netdev, struct 
> dpif_packet ** pkts, int cnt)
>
>      dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
>      dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
> +
> +    if (!thread_is_pmd()) {
> +        ovs_mutex_unlock(&nonpmd_mempool_mutex);
> +    }
>  }
>
>  static int
> @@ -1384,6 +1411,9 @@ dpdk_init(int argc, char **argv)
>          argv[result] = argv[0];
>      }
>
> +    /* We are called from the main thread here */
> +    thread_set_nonpmd();
> +
>      return result + 1;
>  }
>
> @@ -1429,7 +1459,25 @@ pmd_thread_setaffinity_cpu(int cpu)
>          VLOG_ERR("Thread affinity error %d",err);
>          return err;
>      }
> -    RTE_PER_LCORE(_lcore_id) = cpu;
> +    /* lcore_id 0 is reseved for use by non pmd threads. */
> +    RTE_PER_LCORE(_lcore_id) = cpu + 1;
>
>      return 0;
>  }
> +
> +void
> +thread_set_nonpmd(void)
> +{
> +    /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is 
> reserved
> +     * for non pmd threads */
> +    BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
> +    /* We have to use 0 to allow non pmd threads to perform certain DPDK
> +     * operations, like rte_eth_dev_configure(). */
> +    RTE_PER_LCORE(_lcore_id) = 0;
> +}
> +
> +static bool
> +thread_is_pmd(void)
> +{
> +    return rte_lcore_id() != 0;
> +}
> diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
> index b021aa5..e4ba6fc 100644
> --- a/lib/netdev-dpdk.h
> +++ b/lib/netdev-dpdk.h
> @@ -2,7 +2,6 @@
>  #define NETDEV_DPDK_H
>
>  #include <config.h>
> -#include "ofpbuf.h"
>
>  struct dpif_packet;
>
> @@ -25,6 +24,7 @@ int dpdk_init(int argc, char **argv);
>  void netdev_dpdk_register(void);
>  void free_dpdk_buf(struct dpif_packet *);
>  int pmd_thread_setaffinity_cpu(int cpu);
> +void thread_set_nonpmd(void);
>
>  #else
>
> @@ -52,5 +52,11 @@ pmd_thread_setaffinity_cpu(int cpu OVS_UNUSED)
>      return 0;
>  }
>
> +static inline void
> +thread_set_nonpmd(void)
> +{
> +    /* Nothing */
> +}
> +
>  #endif /* DPDK_NETDEV */
>  #endif
> diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> index cbee582..fe6fb43 100644
> --- a/lib/ovs-thread.c
> +++ b/lib/ovs-thread.c
> @@ -25,6 +25,7 @@
>  #include <unistd.h>
>  #include "compiler.h"
>  #include "hash.h"
> +#include "netdev-dpdk.h"
>  #include "ovs-rcu.h"
>  #include "poll-loop.h"
>  #include "seq.h"
> @@ -326,6 +327,8 @@ ovsthread_wrapper(void *aux_)
>      set_subprogram_name("%s%u", aux.name, id);
>      ovsrcu_quiesce_end();
>
> +    thread_set_nonpmd();
> +
>      return aux.start(aux.arg);
>  }
>
> --
> 2.0.0
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to