One example is when STP config pkts are received from the dpdk port, when processing the pkts in xlate's process_special(), if the stp state machine need to send a config immediately via the dpdk interface, the state machine will directly invoke the stp->send() callback function and sends the pkt immediately (via calling dpif_netdev_execute()).
On Thu, Sep 11, 2014 at 10:39 PM, Pravin Shelar <pshe...@nicira.com> wrote: > On Tue, Sep 9, 2014 at 5:00 PM, Alex Wang <al...@nicira.com> wrote: > > With this commit, ovs by default will try creating 'number of > > dpdk interfaces on numa node' pmd threads for each numa node > > and pin the pmd threads to available cpu cores on the numa node. > > > > NON_PMD_CORE_ID (currently 0) is used to reserve a particular > > cpu core for the I/O of all non-pmd threads. No pmd thread > > can be pinned to this reserved core. > > > > As side-effects of this commit: > > > > - the exact-match cache for non-pmd threads is removed from > > 'struct dp_netdev'. Instead, all non-pmd threads will use > > the exact-match cache defined in the 'struct dp_netdev_pmd_thread' > > for NON_PMD_CORE_ID. > > > > - the received packet processing functions are refactored to use > > 'struct dp_netdev_pmd_thread' as input. > > > > - the 'netdev_send()' function will be called with the proper > > queue id. > > > > Signed-off-by: Alex Wang <al...@nicira.com> > > > > --- > > PATCH -> V2 > > - rebase and refactor the code. > > > > V2 -> V3: > > - both pmd and non-pmd threads can call the dpif_netdev_execute(). > > so, use a per-thread variable to help recognize the calling thread. > > --- > > lib/dpif-netdev.c | 407 > +++++++++++++++++++++++++++++++++++++---------------- > > lib/dpif-netdev.h | 4 +- > > lib/netdev-dpdk.c | 17 ++- > > lib/netdev-dpdk.h | 7 + > > 4 files changed, 302 insertions(+), 133 deletions(-) > > > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > > index dcce02e..29a92b3 100644 > > --- a/lib/dpif-netdev.c > > +++ b/lib/dpif-netdev.c > > @@ -52,6 +52,7 @@ > > #include "odp-util.h" > > #include "ofp-print.h" > > #include "ofpbuf.h" > > +#include "ovs-numa.h" > > #include "ovs-rcu.h" > > #include "packet-dpif.h" > > #include "packets.h" > > @@ -158,7 +159,6 @@ struct emc_cache { > > * > > * dp_netdev_mutex (global) > > * port_mutex > > - * emc_mutex > > * flow_mutex > > */ > > struct dp_netdev { > > @@ -195,17 +195,16 @@ struct dp_netdev { > > upcall_callback *upcall_cb; /* Callback function for executing > upcalls. */ > > void *upcall_aux; > > > > - /* Forwarding threads. */ > > - struct latch exit_latch; > > - struct pmd_thread *pmd_threads; > > - size_t n_pmd_threads; > > - int pmd_count; > > - > > - /* Exact match cache for non-pmd devices. > > - * Pmd devices use instead each thread's flow_cache for this > purpose. > > - * Protected by emc_mutex */ > > - struct emc_cache flow_cache OVS_GUARDED; > > - struct ovs_mutex emc_mutex; > > + /* Stores all 'struct dp_netdev_pmd_thread's. */ > > + struct cmap poll_threads; > > + > > + /* Protects the access of the 'struct dp_netdev_pmd_thread' > > + * instance for non-pmd thread. */ > > + struct ovs_mutex non_pmd_mutex; > > + > > + /* Each pmd thread will store its pointer to > > + * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */ > > + ovsthread_key_t per_pmd_key; > > }; > > > > static struct dp_netdev_port *dp_netdev_lookup_port(const struct > dp_netdev *dp, > > @@ -340,15 +339,25 @@ static void dp_netdev_actions_free(struct > dp_netdev_actions *); > > * > > * DPDK used PMD for accessing NIC. > > * > > - * A thread that receives packets from PMD ports, looks them up in the > flow > > - * table, and executes the actions it finds. > > + * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for > > + * I/O of all non-pmd threads. There will be no actual thread created > > + * for the instance. > > **/ > > -struct pmd_thread { > > +struct dp_netdev_pmd_thread { > > struct dp_netdev *dp; > > + struct cmap_node node; /* In 'dp->poll_threads'. */ > > + /* Per thread exact-match cache. Note, the instance for cpu core > > + * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly > > + * need to be protected (e.g. by 'dp_netdev_mutex'). All other > > + * instances will only be accessed by its own pmd thread. */ > > struct emc_cache flow_cache; > > + struct latch exit_latch; /* For terminating the pmd thread. > */ > > + atomic_uint change_seq; /* For reloading pmd ports. */ > > pthread_t thread; > > - int id; > > - atomic_uint change_seq; > > + int index; /* Idx of this pmd thread among > pmd*/ > > + /* threads on same numa node. */ > > + int core_id; /* CPU core id of this pmd thread. > */ > > + int numa_id; /* numa node id of this pmd thread. > */ > > }; > > > > #define PMD_INITIAL_SEQ 1 > > @@ -374,18 +383,22 @@ static void do_del_port(struct dp_netdev *dp, > struct dp_netdev_port *) > > OVS_REQUIRES(dp->port_mutex); > > static int dpif_netdev_open(const struct dpif_class *, const char *name, > > bool create, struct dpif **); > > -static void dp_netdev_execute_actions(struct dp_netdev *dp, > > +static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, > > struct dpif_packet **, int c, > > bool may_steal, struct > pkt_metadata *, > > - struct emc_cache *flow_cache, > > const struct nlattr *actions, > > size_t actions_len); > > -static void dp_netdev_input(struct dp_netdev *, struct emc_cache *, > > +static void dp_netdev_input(struct dp_netdev_pmd_thread *, > > struct dpif_packet **, int cnt, > > struct pkt_metadata *); > > - > > -static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); > > static void dp_netdev_disable_upcall(struct dp_netdev *); > > +static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, > > + struct dp_netdev *dp, int index, > > + int core_id, int numa_id); > > +static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct > dp_netdev *dp); > > +static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); > > +static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int > numa_id); > > +static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int > numa_id); > > > > static void emc_clear_entry(struct emc_entry *ce); > > > > @@ -524,6 +537,7 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > > OVS_REQUIRES(dp_netdev_mutex) > > { > > struct dp_netdev *dp; > > + struct dp_netdev_pmd_thread *non_pmd; > > int error; > > > > dp = xzalloc(sizeof *dp); > > @@ -543,7 +557,6 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > > ovs_mutex_init(&dp->port_mutex); > > cmap_init(&dp->ports); > > dp->port_seq = seq_create(); > > - latch_init(&dp->exit_latch); > > fat_rwlock_init(&dp->upcall_rwlock); > > > > /* Disable upcalls by default. */ > > @@ -551,6 +564,16 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > > dp->upcall_aux = NULL; > > dp->upcall_cb = NULL; > > > > + cmap_init(&dp->poll_threads); > > + ovs_mutex_init_recursive(&dp->non_pmd_mutex); > > + ovsthread_key_create(&dp->per_pmd_key, NULL); > > + > > + /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */ > > + ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID); > > + non_pmd = xzalloc(sizeof *non_pmd); > > + dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID, > > + OVS_NUMA_UNSPEC); > > + > > ovs_mutex_lock(&dp->port_mutex); > > error = do_add_port(dp, name, "internal", ODPP_LOCAL); > > ovs_mutex_unlock(&dp->port_mutex); > > @@ -559,9 +582,6 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > > return error; > > } > > > > - ovs_mutex_init_recursive(&dp->emc_mutex); > > - emc_cache_init(&dp->flow_cache); > > - > > *dpp = dp; > > return 0; > > } > > @@ -603,8 +623,9 @@ dp_netdev_free(struct dp_netdev *dp) > > > > shash_find_and_delete(&dp_netdevs, dp->name); > > > > - dp_netdev_set_pmd_threads(dp, 0); > > - free(dp->pmd_threads); > > + dp_netdev_destroy_all_pmds(dp); > > + ovs_mutex_destroy(&dp->non_pmd_mutex); > > + ovsthread_key_delete(dp->per_pmd_key); > > > > dp_netdev_flow_flush(dp); > > ovs_mutex_lock(&dp->port_mutex); > > @@ -625,10 +646,6 @@ dp_netdev_free(struct dp_netdev *dp) > > seq_destroy(dp->port_seq); > > cmap_destroy(&dp->ports); > > fat_rwlock_destroy(&dp->upcall_rwlock); > > - latch_destroy(&dp->exit_latch); > > - > > - emc_cache_uninit(&dp->flow_cache); > > - ovs_mutex_destroy(&dp->emc_mutex); > > > > free(CONST_CAST(char *, dp->name)); > > free(dp); > > @@ -696,15 +713,22 @@ dpif_netdev_get_stats(const struct dpif *dpif, > struct dpif_dp_stats *stats) > > } > > > > static void > > -dp_netdev_reload_pmd_threads(struct dp_netdev *dp) > > +dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) > > { > > - int i; > > + int old_seq; > > + > > + atomic_add_relaxed(&pmd->change_seq, 1, &old_seq); > > +} > > > > - for (i = 0; i < dp->n_pmd_threads; i++) { > > - struct pmd_thread *f = &dp->pmd_threads[i]; > > - int old_seq; > > +/* Causes all pmd threads to reload its tx/rx devices. > > + * Must be called after adding/removing ports. */ > > +static void > > +dp_netdev_reload_pmds(struct dp_netdev *dp) > > +{ > > + struct dp_netdev_pmd_thread *pmd; > > > > - atomic_add_relaxed(&f->change_seq, 1, &old_seq); > > + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > > + dp_netdev_reload_pmd__(pmd); > > } > > } > > > > @@ -777,9 +801,8 @@ do_add_port(struct dp_netdev *dp, const char > *devname, const char *type, > > port->sf = sf; > > > > if (netdev_is_pmd(netdev)) { > > - dp->pmd_count++; > > - dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS); > > - dp_netdev_reload_pmd_threads(dp); > > + dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); > > + dp_netdev_reload_pmds(dp); > > } > > ovs_refcount_init(&port->ref_cnt); > > > > @@ -930,6 +953,39 @@ get_port_by_name(struct dp_netdev *dp, > > return ENOENT; > > } > > > > +static int > > +get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id) > > +{ > > + struct dp_netdev_pmd_thread *pmd; > > + int n_pmds = 0; > > + > > + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > > + if (pmd->numa_id == numa_id) { > > + n_pmds++; > > + } > > + } > > + > > + return n_pmds; > > +} > > + > > +/* Returns 'true' if there is a port with pmd netdev and the netdev > > + * is on numa node 'numa_id'. */ > > +static bool > > +has_pmd_port_for_numa(struct dp_netdev *dp, int numa_id) > > +{ > > + struct dp_netdev_port *port; > > + > > + CMAP_FOR_EACH (port, node, &dp->ports) { > > + if (netdev_is_pmd(port->netdev) > > + && netdev_get_numa_id(port->netdev) == numa_id) { > > + return true; > > + } > > + } > > + > > + return false; > > +} > > + > > + > > static void > > do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) > > OVS_REQUIRES(dp->port_mutex) > > @@ -937,7 +993,14 @@ do_del_port(struct dp_netdev *dp, struct > dp_netdev_port *port) > > cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no)); > > seq_change(dp->port_seq); > > if (netdev_is_pmd(port->netdev)) { > > - dp_netdev_reload_pmd_threads(dp); > > + int numa_id = netdev_get_numa_id(port->netdev); > > + > > + /* If there is no netdev on the numa node, deletes the pmd > threads > > + * for that numa. Else, just reloads the queues. */ > > + if (!has_pmd_port_for_numa(dp, numa_id)) { > > + dp_netdev_del_pmds_on_numa(dp, numa_id); > > + } > > + dp_netdev_reload_pmds(dp); > > } > > > > port_unref(port); > > @@ -1694,8 +1757,10 @@ dpif_netdev_flow_dump_next(struct > dpif_flow_dump_thread *thread_, > > > > static int > > dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) > > + OVS_NO_THREAD_SAFETY_ANALYSIS > > { > > struct dp_netdev *dp = get_dp_netdev(dpif); > > + struct dp_netdev_pmd_thread *pmd; > > struct dpif_packet packet, *pp; > > struct pkt_metadata *md = &execute->md; > > > > @@ -1707,11 +1772,24 @@ dpif_netdev_execute(struct dpif *dpif, struct > dpif_execute *execute) > > packet.ofpbuf = *execute->packet; > > pp = &packet; > > > > - ovs_mutex_lock(&dp->emc_mutex); > > - dp_netdev_execute_actions(dp, &pp, 1, false, md, > > - &dp->flow_cache, execute->actions, > > + /* Tries finding the 'pmd'. If NULL is returned, that means > > + * the current thread is a non-pmd thread and should use > > + * dp_netdev_get_nonpmd(). */ > > + pmd = ovsthread_getspecific(dp->per_pmd_key); > > + if (!pmd) { > > + pmd = dp_netdev_get_nonpmd(dp); > > + } > > + > In which case execute is called in PMD thread? > > > + /* If the current thread is non-pmd thread, acquires > > + * the 'non_pmd_mutex'. */ > > + if (pmd->core_id == NON_PMD_CORE_ID) { > > + ovs_mutex_lock(&dp->non_pmd_mutex); > > + } > > + dp_netdev_execute_actions(pmd, &pp, 1, false, md, execute->actions, > > execute->actions_len); > > - ovs_mutex_unlock(&dp->emc_mutex); > > + if (pmd->core_id == NON_PMD_CORE_ID) { > > + ovs_mutex_unlock(&dp->non_pmd_mutex); > > + } > > > > /* Even though may_steal is set to false, some actions could modify > or > > * reallocate the ofpbuf memory. We need to pass those changes to > the > > @@ -1788,8 +1866,7 @@ dp_netdev_actions_free(struct dp_netdev_actions > *actions) > > > > > > static void > > -dp_netdev_process_rxq_port(struct dp_netdev *dp, > > - struct emc_cache *flow_cache, > > +dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, > > struct dp_netdev_port *port, > > struct netdev_rxq *rxq) > > { > > @@ -1801,7 +1878,7 @@ dp_netdev_process_rxq_port(struct dp_netdev *dp, > > struct pkt_metadata md = > PKT_METADATA_INITIALIZER(port->port_no); > > > > *recirc_depth_get() = 0; > > - dp_netdev_input(dp, flow_cache, packets, cnt, &md); > > + dp_netdev_input(pmd, packets, cnt, &md); > > } else if (error != EAGAIN && error != EOPNOTSUPP) { > > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > > > > @@ -1815,19 +1892,19 @@ dpif_netdev_run(struct dpif *dpif) > > { > > struct dp_netdev_port *port; > > struct dp_netdev *dp = get_dp_netdev(dpif); > > + struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_nonpmd(dp); > > > > - ovs_mutex_lock(&dp->emc_mutex); > > + ovs_mutex_lock(&dp->non_pmd_mutex); > > CMAP_FOR_EACH (port, node, &dp->ports) { > > if (!netdev_is_pmd(port->netdev)) { > > int i; > > > > for (i = 0; i < netdev_n_rxq(port->netdev); i++) { > > - dp_netdev_process_rxq_port(dp, &dp->flow_cache, port, > > - port->rxq[i]); > > + dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]); > > } > > } > > } > > - ovs_mutex_unlock(&dp->emc_mutex); > > + ovs_mutex_unlock(&dp->non_pmd_mutex); > > } > > > > static void > > @@ -1855,33 +1932,32 @@ struct rxq_poll { > > }; > > > > static int > > -pmd_load_queues(struct pmd_thread *f, > > +pmd_load_queues(struct dp_netdev_pmd_thread *pmd, > > struct rxq_poll **ppoll_list, int poll_cnt) > > { > > - struct dp_netdev *dp = f->dp; > > struct rxq_poll *poll_list = *ppoll_list; > > struct dp_netdev_port *port; > > - int id = f->id; > > - int index; > > - int i; > > + int n_pmds_on_numa, index, i; > > > > /* Simple scheduler for netdev rx polling. */ > > for (i = 0; i < poll_cnt; i++) { > > - port_unref(poll_list[i].port); > > + port_unref(poll_list[i].port); > > } > > > > poll_cnt = 0; > > + n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); > > index = 0; > > > > - CMAP_FOR_EACH (port, node, &f->dp->ports) { > > + CMAP_FOR_EACH (port, node, &pmd->dp->ports) { > > /* Calls port_try_ref() to prevent the main thread > > * from deleting the port. */ > > if (port_try_ref(port)) { > > - if (netdev_is_pmd(port->netdev)) { > > + if (netdev_is_pmd(port->netdev) > > + && netdev_get_numa_id(port->netdev) == pmd->numa_id) { > > int i; > > > > for (i = 0; i < netdev_n_rxq(port->netdev); i++) { > > - if ((index % dp->n_pmd_threads) == id) { > > + if ((index % n_pmds_on_numa) == pmd->index) { > > poll_list = xrealloc(poll_list, > > sizeof *poll_list * (poll_cnt + > 1)); > > > > @@ -1905,8 +1981,7 @@ pmd_load_queues(struct pmd_thread *f, > > static void * > > pmd_thread_main(void *f_) > > { > > - struct pmd_thread *f = f_; > > - struct dp_netdev *dp = f->dp; > > + struct dp_netdev_pmd_thread *pmd = f_; > > unsigned int lc = 0; > > struct rxq_poll *poll_list; > > unsigned int port_seq = PMD_INITIAL_SEQ; > > @@ -1916,17 +1991,18 @@ pmd_thread_main(void *f_) > > poll_cnt = 0; > > poll_list = NULL; > > > > - pmd_thread_setaffinity_cpu(f->id); > > + /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ > > + ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); > > + pmd_thread_setaffinity_cpu(pmd->core_id); > > reload: > > - emc_cache_init(&f->flow_cache); > > - poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt); > > + emc_cache_init(&pmd->flow_cache); > > + poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt); > > > > for (;;) { > > int i; > > > > for (i = 0; i < poll_cnt; i++) { > > - dp_netdev_process_rxq_port(dp, &f->flow_cache, > poll_list[i].port, > > - poll_list[i].rx); > > + dp_netdev_process_rxq_port(pmd, poll_list[i].port, > poll_list[i].rx); > > } > > > > if (lc++ > 1024) { > > @@ -1936,7 +2012,7 @@ reload: > > > > ovsrcu_quiesce(); > > > > - atomic_read_relaxed(&f->change_seq, &seq); > > + atomic_read_relaxed(&pmd->change_seq, &seq); > > if (seq != port_seq) { > > port_seq = seq; > > break; > > @@ -1944,9 +2020,9 @@ reload: > > } > > } > > > > - emc_cache_uninit(&f->flow_cache); > > + emc_cache_uninit(&pmd->flow_cache); > > > > - if (!latch_is_set(&f->dp->exit_latch)){ > > + if (!latch_is_set(&pmd->exit_latch)){ > > goto reload; > > } > > > > @@ -1988,40 +2064,124 @@ dpif_netdev_enable_upcall(struct dpif *dpif) > > dp_netdev_enable_upcall(dp); > > } > > > > +/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. > */ > > +static struct dp_netdev_pmd_thread * > > +dp_netdev_get_nonpmd(struct dp_netdev *dp) > > +{ > > + struct dp_netdev_pmd_thread *pmd; > > + struct cmap_node *pnode; > > + > > + pnode = cmap_find(&dp->poll_threads, hash_int(NON_PMD_CORE_ID, 0)); > > + ovs_assert(pnode); > > + pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node); > > + > > + return pmd; > > +} > > + > > +/* Configures the 'pmd' based on the input argument. */ > > +static void > > +dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct > dp_netdev *dp, > > + int index, int core_id, int numa_id) > > +{ > > + pmd->dp = dp; > > + pmd->index = index; > > + pmd->core_id = core_id; > > + pmd->numa_id = numa_id; > > + latch_init(&pmd->exit_latch); > > + atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ); > > + /* init the 'flow_cache' since there is no > > + * actual thread created for NON_PMD_CORE_ID. */ > > + if (core_id == NON_PMD_CORE_ID) { > > + emc_cache_init(&pmd->flow_cache); > > + } > > + cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, > &pmd->node), > > + hash_int(core_id, 0)); > > +} > > + > > +/* Stops the pmd thread, removes it from the 'dp->poll_threads' > > + * and destroys the struct. */ > > static void > > -dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) > > +dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd) > > { > > - int i; > > + /* Uninit the 'flow_cache' since there is > > + * no actual thread uninit it. */ > > + if (pmd->core_id == NON_PMD_CORE_ID) { > > + emc_cache_uninit(&pmd->flow_cache); > > + } else { > > + latch_set(&pmd->exit_latch); > > + dp_netdev_reload_pmd__(pmd); > > + ovs_numa_unpin_core(pmd->core_id); > > + xpthread_join(pmd->thread, NULL); > > + } > > + cmap_remove(&pmd->dp->poll_threads, &pmd->node, > hash_int(pmd->core_id, 0)); > > + latch_destroy(&pmd->exit_latch); > > + free(pmd); > > +} > > > > - if (n == dp->n_pmd_threads) { > > - return; > > +/* Destroys all pmd threads. */ > > +static void > > +dp_netdev_destroy_all_pmds(struct dp_netdev *dp) > > +{ > > + struct dp_netdev_pmd_thread *pmd; > > + > > + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > > + dp_netdev_del_pmd(pmd); > > } > > +} > > > > - /* Stop existing threads. */ > > - latch_set(&dp->exit_latch); > > - dp_netdev_reload_pmd_threads(dp); > > - for (i = 0; i < dp->n_pmd_threads; i++) { > > - struct pmd_thread *f = &dp->pmd_threads[i]; > > +/* Deletes all pmd threads on numa node 'numa_id'. */ > > +static void > > +dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) > > +{ > > + struct dp_netdev_pmd_thread *pmd; > > > > - xpthread_join(f->thread, NULL); > > + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > > + if (pmd->numa_id == numa_id) { > > + dp_netdev_del_pmd(pmd); > > + } > > } > > - latch_poll(&dp->exit_latch); > > - free(dp->pmd_threads); > > +} > > > > - /* Start new threads. */ > > - dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads); > > - dp->n_pmd_threads = n; > > +/* Checks the numa node id of 'netdev' and starts pmd threads for > > + * the numa node. */ > > +static void > > +dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) > > +{ > > + int n_pmds; > > > > - for (i = 0; i < n; i++) { > > - struct pmd_thread *f = &dp->pmd_threads[i]; > > + if (!ovs_numa_numa_id_is_valid(numa_id)) { > > + VLOG_ERR("Cannot create pmd threads due to numa id (%d)" > > + "invalid", numa_id); > > + return ; > > + } > > + > > + n_pmds = get_n_pmd_threads_on_numa(dp, numa_id); > > + > > + /* If there are already pmd threads created for the numa node > > + * in which 'netdev' is on, do nothing. Else, creates the > > + * pmd threads for the numa node. */ > > + if (!n_pmds) { > > + int can_have, n_unpinned, i; > > + > > + n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id); > > + if (!n_unpinned) { > > + VLOG_ERR("Cannot create pmd threads due to out of unpinned " > > + "cores on numa node"); > > + return; > > + } > > > > - f->dp = dp; > > - f->id = i; > > - atomic_init(&f->change_seq, PMD_INITIAL_SEQ); > > + /* Tries creating 'number of dpdk ifaces on numa node' pmd > threads. */ > > + can_have = MIN(n_unpinned, netdev_dpdk_n_devs_on_numa(numa_id)); > > + for (i = 0; i < can_have; i++) { > > + struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd); > > + int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id); > > > > - /* Each thread will distribute all devices rx-queues among > > - * themselves. */ > > - f->thread = ovs_thread_create("pmd", pmd_thread_main, f); > > + dp_netdev_configure_pmd(pmd, dp, i, core_id, numa_id); > > + /* Each thread will distribute all devices rx-queues among > > + * themselves. */ > > + pmd->thread = ovs_thread_create("pmd", pmd_thread_main, > pmd); > > + } > > + VLOG_INFO("Created %d pmd threads on numa node %d", can_have, > numa_id); > > } > > } > > > > @@ -2161,8 +2321,8 @@ packet_batch_init(struct packet_batch *batch, > struct dp_netdev_flow *flow, > > } > > > > static inline void > > -packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp, > > - struct emc_cache *flow_cache) > > +packet_batch_execute(struct packet_batch *batch, > > + struct dp_netdev_pmd_thread *pmd) > > { > > struct dp_netdev_actions *actions; > > struct dp_netdev_flow *flow = batch->flow; > > @@ -2172,11 +2332,10 @@ packet_batch_execute(struct packet_batch *batch, > struct dp_netdev *dp, > > > > actions = dp_netdev_flow_get_actions(flow); > > > > - dp_netdev_execute_actions(dp, batch->packets, batch->packet_count, > true, > > - &batch->md, flow_cache, > > - actions->actions, actions->size); > > + dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, > true, > > + &batch->md, actions->actions, > actions->size); > > > > - dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count); > > + dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count); > > } > > > > static inline bool > > @@ -2231,12 +2390,13 @@ dpif_packet_swap(struct dpif_packet **a, struct > dpif_packet **b) > > * 'packets' array (they have been moved to the beginning of the > vector). > > */ > > static inline size_t > > -emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, > > - struct dpif_packet **packets, size_t cnt, > > - struct pkt_metadata *md, struct netdev_flow_key *keys) > > +emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet > **packets, > > + size_t cnt, struct pkt_metadata *md, > > + struct netdev_flow_key *keys) > > { > > struct netdev_flow_key key; > > struct packet_batch batches[4]; > > + struct emc_cache *flow_cache = &pmd->flow_cache; > > size_t n_batches, i; > > size_t notfound_cnt = 0; > > > > @@ -2269,14 +2429,14 @@ emc_processing(struct dp_netdev *dp, struct > emc_cache *flow_cache, > > } > > > > for (i = 0; i < n_batches; i++) { > > - packet_batch_execute(&batches[i], dp, flow_cache); > > + packet_batch_execute(&batches[i], pmd); > > } > > > > return notfound_cnt; > > } > > > > static inline void > > -fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, > > +fast_path_processing(struct dp_netdev_pmd_thread *pmd, > > struct dpif_packet **packets, size_t cnt, > > struct pkt_metadata *md, struct netdev_flow_key > *keys) > > { > > @@ -2289,6 +2449,8 @@ fast_path_processing(struct dp_netdev *dp, struct > emc_cache *flow_cache, > > struct packet_batch batches[PKT_ARRAY_SIZE]; > > const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* NULL at bad packets. > */ > > struct cls_rule *rules[PKT_ARRAY_SIZE]; > > + struct dp_netdev *dp = pmd->dp; > > + struct emc_cache *flow_cache = &pmd->flow_cache; > > size_t n_batches, i; > > bool any_miss; > > > > @@ -2337,8 +2499,8 @@ fast_path_processing(struct dp_netdev *dp, struct > emc_cache *flow_cache, > > /* We can't allow the packet batching in the next loop to > execute > > * the actions. Otherwise, if there are any slow path > actions, > > * we'll send the packet up twice. */ > > - dp_netdev_execute_actions(dp, &packets[i], 1, false, md, > > - flow_cache, ofpbuf_data(&actions), > > + dp_netdev_execute_actions(pmd, &packets[i], 1, false, md, > > + ofpbuf_data(&actions), > > ofpbuf_size(&actions)); > > > > add_actions = ofpbuf_size(&put_actions) > > @@ -2375,18 +2537,19 @@ fast_path_processing(struct dp_netdev *dp, > struct emc_cache *flow_cache, > > } > > > > flow = dp_netdev_flow_cast(rules[i]); > > - emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet), > flow); > > + emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet), > > + flow); > > dp_netdev_queue_batches(packet, md, flow, mfs[i], batches, > &n_batches, > > ARRAY_SIZE(batches)); > > } > > > > for (i = 0; i < n_batches; i++) { > > - packet_batch_execute(&batches[i], dp, flow_cache); > > + packet_batch_execute(&batches[i], pmd); > > } > > } > > > > static void > > -dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache, > > +dp_netdev_input(struct dp_netdev_pmd_thread *pmd, > > struct dpif_packet **packets, int cnt, struct > pkt_metadata *md) > > { > > #if !defined(__CHECKER__) && !defined(_WIN32) > > @@ -2398,15 +2561,14 @@ dp_netdev_input(struct dp_netdev *dp, struct > emc_cache *flow_cache, > > struct netdev_flow_key keys[PKT_ARRAY_SIZE]; > > size_t newcnt; > > > > - newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys); > > + newcnt = emc_processing(pmd, packets, cnt, md, keys); > > if (OVS_UNLIKELY(newcnt)) { > > - fast_path_processing(dp, flow_cache, packets, newcnt, md, keys); > > + fast_path_processing(pmd, packets, newcnt, md, keys); > > } > > } > > > > struct dp_netdev_execute_aux { > > - struct dp_netdev *dp; > > - struct emc_cache *flow_cache; > > + struct dp_netdev_pmd_thread *pmd; > > }; > > > > static void > > @@ -2426,7 +2588,8 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > { > > struct dp_netdev_execute_aux *aux = aux_; > > uint32_t *depth = recirc_depth_get(); > > - struct dp_netdev *dp = aux->dp; > > + struct dp_netdev_pmd_thread *pmd= aux->pmd; > > + struct dp_netdev *dp= pmd->dp; > > int type = nl_attr_type(a); > > struct dp_netdev_port *p; > > int i; > > @@ -2435,7 +2598,7 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > case OVS_ACTION_ATTR_OUTPUT: > > p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a))); > > if (OVS_LIKELY(p)) { > > - netdev_send(p->netdev, NETDEV_QID_NONE, packets, cnt, > may_steal); > > + netdev_send(p->netdev, pmd->core_id, packets, cnt, > may_steal); > > } else if (may_steal) { > > for (i = 0; i < cnt; i++) { > > dpif_packet_delete(packets[i]); > > @@ -2462,8 +2625,7 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > DPIF_UC_ACTION, userdata, > &actions, > > NULL); > > if (!error || error == ENOSPC) { > > - dp_netdev_execute_actions(dp, &packets[i], 1, > false, md, > > - aux->flow_cache, > > + dp_netdev_execute_actions(pmd, &packets[i], 1, > false, md, > > ofpbuf_data(&actions), > > ofpbuf_size(&actions)); > > } > > @@ -2525,7 +2687,7 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > /* Hash is private to each packet */ > > recirc_md.dp_hash = dpif_packet_get_dp_hash(packets[i]); > > > > - dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1, > > + dp_netdev_input(pmd, &recirc_pkt, 1, > > &recirc_md); > > } > > (*depth)--; > > @@ -2555,13 +2717,12 @@ dp_execute_cb(void *aux_, struct dpif_packet > **packets, int cnt, > > } > > > > static void > > -dp_netdev_execute_actions(struct dp_netdev *dp, > > +dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, > > struct dpif_packet **packets, int cnt, > > bool may_steal, struct pkt_metadata *md, > > - struct emc_cache *flow_cache, > > const struct nlattr *actions, size_t > actions_len) > > { > > - struct dp_netdev_execute_aux aux = {dp, flow_cache}; > > + struct dp_netdev_execute_aux aux = {pmd}; > > > > odp_execute_actions(&aux, packets, cnt, may_steal, md, actions, > > actions_len, dp_execute_cb); > > diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h > > index adbbf87..f501f7c 100644 > > --- a/lib/dpif-netdev.h > > +++ b/lib/dpif-netdev.h > > @@ -40,9 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b) > > } > > } > > > > -#define NETDEV_QID_NONE INT_MAX > > - > > -#define NR_PMD_THREADS 1 > > +#define NON_PMD_CORE_ID 0 > > > > #ifdef __cplusplus > > } > > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c > > index 26b1591..928ca3f 100644 > > --- a/lib/netdev-dpdk.c > > +++ b/lib/netdev-dpdk.c > > @@ -404,7 +404,6 @@ dpdk_get_n_devs(int numa_id) > > count++; > > } > > } > > - ovs_assert(count); > > > > return count; > > } > > @@ -495,8 +494,7 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned > int port_no) OVS_REQUIRES(dpdk > > > > ovs_mutex_lock(&netdev->mutex); > > > > - /* XXX: need to discover device node at run time. */ > > - netdev->socket_id = SOCKET0; > > + netdev->socket_id = rte_eth_dev_socket_id(port_no); > > > > /* There can only be ovs_numa_get_n_cores() pmd threads, so creates > a tx_q > > * for each of them. */ > > @@ -1503,6 +1501,13 @@ netdev_dpdk_register(void) > > } > > } > > > > +/* Returns the number of dpdk interfaces on numa node 'numa_id'. */ > > +int > > +netdev_dpdk_n_devs_on_numa(int numa_id) > > +{ > > + return dpdk_get_n_devs(numa_id); > > +} > > + > > int > > pmd_thread_setaffinity_cpu(int cpu) > > { > > @@ -1517,7 +1522,8 @@ pmd_thread_setaffinity_cpu(int cpu) > > return err; > > } > > /* lcore_id 0 is reseved for use by non pmd threads. */ > > - RTE_PER_LCORE(_lcore_id) = cpu + 1; > > + ovs_assert(cpu); > > + RTE_PER_LCORE(_lcore_id) = cpu; > > > > return 0; > > } > > @@ -1525,9 +1531,6 @@ pmd_thread_setaffinity_cpu(int cpu) > > void > > thread_set_nonpmd(void) > > { > > - /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is > reserved > > - * for non pmd threads */ > > - BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE); > > /* We have to use 0 to allow non pmd threads to perform certain DPDK > > * operations, like rte_eth_dev_configure(). */ > > RTE_PER_LCORE(_lcore_id) = 0; > > diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h > > index e4ba6fc..8fe5a42 100644 > > --- a/lib/netdev-dpdk.h > > +++ b/lib/netdev-dpdk.h > > @@ -22,6 +22,7 @@ struct dpif_packet; > > > > int dpdk_init(int argc, char **argv); > > void netdev_dpdk_register(void); > > +int netdev_dpdk_n_devs_on_numa(int numa_id); > > void free_dpdk_buf(struct dpif_packet *); > > int pmd_thread_setaffinity_cpu(int cpu); > > void thread_set_nonpmd(void); > > @@ -40,6 +41,12 @@ netdev_dpdk_register(void) > > /* Nothing */ > > } > > > > +static inline int > > +netdev_dpdk_n_devs_on_numa(int numa_id OVS_UNUSED) > > +{ > > + return -1; > > +} > > + > > static inline void > > free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED) > > { > > -- > > 1.7.9.5 > > > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev