With this commit, ovs by default will create one pmd thread for each numa node and pin the pmd thread to available cpu core on the numa node.
NON_PMD_CORE_ID (currently 0) is used to reserve a particular cpu core for the I/O of all non-pmd threads. No pmd thread can be pinned to this reserved core. As side-effects of this commit: - pmd thread will not be created, if there is no dpdk interface from the corresponding numa node added to ovs. - the exact-match cache for non-pmd threads is removed from 'struct dp_netdev'. Instead, all non-pmd threads will use the exact-match cache defined in the 'struct dp_netdev_pmd_thread' for NON_PMD_CORE_ID. - the rx packet processing functions are refactored to use 'struct dp_netdev_pmd_thread' as input. - the 'netdev_send()' function will be called with the proper queue id. - both pmd and non-pmd threads can call the dpif_netdev_execute(). so, use a per-thread key to help recognize the calling thread. Signed-off-by: Alex Wang <al...@nicira.com> --- PATCH -> V2 - rebase and refactor the code. V2 -> V3: - both pmd and non-pmd threads can call the dpif_netdev_execute(). so, use a per-thread variable to help recognize the calling thread. V3 -> V4: - change the default number of pmd thread on a numa node to 1. --- lib/dpif-netdev.c | 406 +++++++++++++++++++++++++++++++++++++---------------- lib/dpif-netdev.h | 3 +- lib/netdev-dpdk.c | 9 +- 3 files changed, 287 insertions(+), 131 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 12e3571..3f69219 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -159,7 +159,6 @@ struct emc_cache { * * dp_netdev_mutex (global) * port_mutex - * emc_mutex * flow_mutex */ struct dp_netdev { @@ -196,17 +195,16 @@ struct dp_netdev { upcall_callback *upcall_cb; /* Callback function for executing upcalls. */ void *upcall_aux; - /* Forwarding threads. */ - struct latch exit_latch; - struct pmd_thread *pmd_threads; - size_t n_pmd_threads; - int pmd_count; - - /* Exact match cache for non-pmd devices. - * Pmd devices use instead each thread's flow_cache for this purpose. - * Protected by emc_mutex */ - struct emc_cache flow_cache OVS_GUARDED; - struct ovs_mutex emc_mutex; + /* Stores all 'struct dp_netdev_pmd_thread's. */ + struct cmap poll_threads; + + /* Protects the access of the 'struct dp_netdev_pmd_thread' + * instance for non-pmd thread. */ + struct ovs_mutex non_pmd_mutex; + + /* Each pmd thread will store its pointer to + * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */ + ovsthread_key_t per_pmd_key; }; static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp, @@ -341,15 +339,25 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *); * * DPDK used PMD for accessing NIC. * - * A thread that receives packets from PMD ports, looks them up in the flow - * table, and executes the actions it finds. + * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for + * I/O of all non-pmd threads. There will be no actual thread created + * for the instance. **/ -struct pmd_thread { +struct dp_netdev_pmd_thread { struct dp_netdev *dp; + struct cmap_node node; /* In 'dp->poll_threads'. */ + /* Per thread exact-match cache. Note, the instance for cpu core + * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly + * need to be protected (e.g. by 'dp_netdev_mutex'). All other + * instances will only be accessed by its own pmd thread. */ struct emc_cache flow_cache; + struct latch exit_latch; /* For terminating the pmd thread. */ + atomic_uint change_seq; /* For reloading pmd ports. */ pthread_t thread; - int id; - atomic_uint change_seq; + int index; /* Idx of this pmd thread among pmd*/ + /* threads on same numa node. */ + int core_id; /* CPU core id of this pmd thread. */ + int numa_id; /* numa node id of this pmd thread. */ }; #define PMD_INITIAL_SEQ 1 @@ -375,18 +383,22 @@ static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *) OVS_REQUIRES(dp->port_mutex); static int dpif_netdev_open(const struct dpif_class *, const char *name, bool create, struct dpif **); -static void dp_netdev_execute_actions(struct dp_netdev *dp, +static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **, int c, bool may_steal, struct pkt_metadata *, - struct emc_cache *flow_cache, const struct nlattr *actions, size_t actions_len); -static void dp_netdev_input(struct dp_netdev *, struct emc_cache *, +static void dp_netdev_input(struct dp_netdev_pmd_thread *, struct dpif_packet **, int cnt, struct pkt_metadata *); - -static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); static void dp_netdev_disable_upcall(struct dp_netdev *); +static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev *dp, int index, + int core_id, int numa_id); +static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp); +static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); +static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); +static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); static void emc_clear_entry(struct emc_entry *ce); @@ -525,6 +537,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class, OVS_REQUIRES(dp_netdev_mutex) { struct dp_netdev *dp; + struct dp_netdev_pmd_thread *non_pmd; int error; dp = xzalloc(sizeof *dp); @@ -544,7 +557,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class, ovs_mutex_init(&dp->port_mutex); cmap_init(&dp->ports); dp->port_seq = seq_create(); - latch_init(&dp->exit_latch); fat_rwlock_init(&dp->upcall_rwlock); /* Disable upcalls by default. */ @@ -552,6 +564,16 @@ create_dp_netdev(const char *name, const struct dpif_class *class, dp->upcall_aux = NULL; dp->upcall_cb = NULL; + cmap_init(&dp->poll_threads); + ovs_mutex_init_recursive(&dp->non_pmd_mutex); + ovsthread_key_create(&dp->per_pmd_key, NULL); + + /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */ + ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID); + non_pmd = xzalloc(sizeof *non_pmd); + dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID, + OVS_NUMA_UNSPEC); + ovs_mutex_lock(&dp->port_mutex); error = do_add_port(dp, name, "internal", ODPP_LOCAL); ovs_mutex_unlock(&dp->port_mutex); @@ -560,9 +582,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class, return error; } - ovs_mutex_init_recursive(&dp->emc_mutex); - emc_cache_init(&dp->flow_cache); - *dpp = dp; return 0; } @@ -604,8 +623,9 @@ dp_netdev_free(struct dp_netdev *dp) shash_find_and_delete(&dp_netdevs, dp->name); - dp_netdev_set_pmd_threads(dp, 0); - free(dp->pmd_threads); + dp_netdev_destroy_all_pmds(dp); + ovs_mutex_destroy(&dp->non_pmd_mutex); + ovsthread_key_delete(dp->per_pmd_key); dp_netdev_flow_flush(dp); ovs_mutex_lock(&dp->port_mutex); @@ -626,10 +646,6 @@ dp_netdev_free(struct dp_netdev *dp) seq_destroy(dp->port_seq); cmap_destroy(&dp->ports); fat_rwlock_destroy(&dp->upcall_rwlock); - latch_destroy(&dp->exit_latch); - - emc_cache_uninit(&dp->flow_cache); - ovs_mutex_destroy(&dp->emc_mutex); free(CONST_CAST(char *, dp->name)); free(dp); @@ -697,15 +713,22 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) } static void -dp_netdev_reload_pmd_threads(struct dp_netdev *dp) +dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) { - int i; + int old_seq; + + atomic_add_relaxed(&pmd->change_seq, 1, &old_seq); +} - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; - int old_seq; +/* Causes all pmd threads to reload its tx/rx devices. + * Must be called after adding/removing ports. */ +static void +dp_netdev_reload_pmds(struct dp_netdev *dp) +{ + struct dp_netdev_pmd_thread *pmd; - atomic_add_relaxed(&f->change_seq, 1, &old_seq); + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + dp_netdev_reload_pmd__(pmd); } } @@ -793,9 +816,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, port->sf = sf; if (netdev_is_pmd(netdev)) { - dp->pmd_count++; - dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS); - dp_netdev_reload_pmd_threads(dp); + dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); + dp_netdev_reload_pmds(dp); } ovs_refcount_init(&port->ref_cnt); @@ -946,6 +968,39 @@ get_port_by_name(struct dp_netdev *dp, return ENOENT; } +static int +get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id) +{ + struct dp_netdev_pmd_thread *pmd; + int n_pmds = 0; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id) { + n_pmds++; + } + } + + return n_pmds; +} + +/* Returns 'true' if there is a port with pmd netdev and the netdev + * is on numa node 'numa_id'. */ +static bool +has_pmd_port_for_numa(struct dp_netdev *dp, int numa_id) +{ + struct dp_netdev_port *port; + + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_get_numa_id(port->netdev) == numa_id) { + return true; + } + } + + return false; +} + + static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) OVS_REQUIRES(dp->port_mutex) @@ -953,7 +1008,14 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no)); seq_change(dp->port_seq); if (netdev_is_pmd(port->netdev)) { - dp_netdev_reload_pmd_threads(dp); + int numa_id = netdev_get_numa_id(port->netdev); + + /* If there is no netdev on the numa node, deletes the pmd threads + * for that numa. Else, just reloads the queues. */ + if (!has_pmd_port_for_numa(dp, numa_id)) { + dp_netdev_del_pmds_on_numa(dp, numa_id); + } + dp_netdev_reload_pmds(dp); } port_unref(port); @@ -1710,8 +1772,10 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_, static int dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) + OVS_NO_THREAD_SAFETY_ANALYSIS { struct dp_netdev *dp = get_dp_netdev(dpif); + struct dp_netdev_pmd_thread *pmd; struct dpif_packet packet, *pp; struct pkt_metadata *md = &execute->md; @@ -1723,11 +1787,24 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) packet.ofpbuf = *execute->packet; pp = &packet; - ovs_mutex_lock(&dp->emc_mutex); - dp_netdev_execute_actions(dp, &pp, 1, false, md, - &dp->flow_cache, execute->actions, + /* Tries finding the 'pmd'. If NULL is returned, that means + * the current thread is a non-pmd thread and should use + * dp_netdev_get_nonpmd(). */ + pmd = ovsthread_getspecific(dp->per_pmd_key); + if (!pmd) { + pmd = dp_netdev_get_nonpmd(dp); + } + + /* If the current thread is non-pmd thread, acquires + * the 'non_pmd_mutex'. */ + if (pmd->core_id == NON_PMD_CORE_ID) { + ovs_mutex_lock(&dp->non_pmd_mutex); + } + dp_netdev_execute_actions(pmd, &pp, 1, false, md, execute->actions, execute->actions_len); - ovs_mutex_unlock(&dp->emc_mutex); + if (pmd->core_id == NON_PMD_CORE_ID) { + ovs_mutex_unlock(&dp->non_pmd_mutex); + } /* Even though may_steal is set to false, some actions could modify or * reallocate the ofpbuf memory. We need to pass those changes to the @@ -1804,8 +1881,7 @@ dp_netdev_actions_free(struct dp_netdev_actions *actions) static void -dp_netdev_process_rxq_port(struct dp_netdev *dp, - struct emc_cache *flow_cache, +dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, struct dp_netdev_port *port, struct netdev_rxq *rxq) { @@ -1817,7 +1893,7 @@ dp_netdev_process_rxq_port(struct dp_netdev *dp, struct pkt_metadata md = PKT_METADATA_INITIALIZER(port->port_no); *recirc_depth_get() = 0; - dp_netdev_input(dp, flow_cache, packets, cnt, &md); + dp_netdev_input(pmd, packets, cnt, &md); } else if (error != EAGAIN && error != EOPNOTSUPP) { static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); @@ -1831,19 +1907,19 @@ dpif_netdev_run(struct dpif *dpif) { struct dp_netdev_port *port; struct dp_netdev *dp = get_dp_netdev(dpif); + struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_nonpmd(dp); - ovs_mutex_lock(&dp->emc_mutex); + ovs_mutex_lock(&dp->non_pmd_mutex); CMAP_FOR_EACH (port, node, &dp->ports) { if (!netdev_is_pmd(port->netdev)) { int i; for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - dp_netdev_process_rxq_port(dp, &dp->flow_cache, port, - port->rxq[i]); + dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]); } } } - ovs_mutex_unlock(&dp->emc_mutex); + ovs_mutex_unlock(&dp->non_pmd_mutex); } static void @@ -1871,33 +1947,32 @@ struct rxq_poll { }; static int -pmd_load_queues(struct pmd_thread *f, +pmd_load_queues(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **ppoll_list, int poll_cnt) { - struct dp_netdev *dp = f->dp; struct rxq_poll *poll_list = *ppoll_list; struct dp_netdev_port *port; - int id = f->id; - int index; - int i; + int n_pmds_on_numa, index, i; /* Simple scheduler for netdev rx polling. */ for (i = 0; i < poll_cnt; i++) { - port_unref(poll_list[i].port); + port_unref(poll_list[i].port); } poll_cnt = 0; + n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); index = 0; - CMAP_FOR_EACH (port, node, &f->dp->ports) { + CMAP_FOR_EACH (port, node, &pmd->dp->ports) { /* Calls port_try_ref() to prevent the main thread * from deleting the port. */ if (port_try_ref(port)) { - if (netdev_is_pmd(port->netdev)) { + if (netdev_is_pmd(port->netdev) + && netdev_get_numa_id(port->netdev) == pmd->numa_id) { int i; for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - if ((index % dp->n_pmd_threads) == id) { + if ((index % n_pmds_on_numa) == pmd->index) { poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1)); @@ -1921,8 +1996,7 @@ pmd_load_queues(struct pmd_thread *f, static void * pmd_thread_main(void *f_) { - struct pmd_thread *f = f_; - struct dp_netdev *dp = f->dp; + struct dp_netdev_pmd_thread *pmd = f_; unsigned int lc = 0; struct rxq_poll *poll_list; unsigned int port_seq = PMD_INITIAL_SEQ; @@ -1932,17 +2006,18 @@ pmd_thread_main(void *f_) poll_cnt = 0; poll_list = NULL; - pmd_thread_setaffinity_cpu(f->id); + /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ + ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); + pmd_thread_setaffinity_cpu(pmd->core_id); reload: - emc_cache_init(&f->flow_cache); - poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt); + emc_cache_init(&pmd->flow_cache); + poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt); for (;;) { int i; for (i = 0; i < poll_cnt; i++) { - dp_netdev_process_rxq_port(dp, &f->flow_cache, poll_list[i].port, - poll_list[i].rx); + dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx); } if (lc++ > 1024) { @@ -1952,7 +2027,7 @@ reload: ovsrcu_quiesce(); - atomic_read_relaxed(&f->change_seq, &seq); + atomic_read_relaxed(&pmd->change_seq, &seq); if (seq != port_seq) { port_seq = seq; break; @@ -1960,9 +2035,9 @@ reload: } } - emc_cache_uninit(&f->flow_cache); + emc_cache_uninit(&pmd->flow_cache); - if (!latch_is_set(&f->dp->exit_latch)){ + if (!latch_is_set(&pmd->exit_latch)){ goto reload; } @@ -2004,40 +2079,124 @@ dpif_netdev_enable_upcall(struct dpif *dpif) dp_netdev_enable_upcall(dp); } +/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. */ +static struct dp_netdev_pmd_thread * +dp_netdev_get_nonpmd(struct dp_netdev *dp) +{ + struct dp_netdev_pmd_thread *pmd; + struct cmap_node *pnode; + + pnode = cmap_find(&dp->poll_threads, hash_int(NON_PMD_CORE_ID, 0)); + ovs_assert(pnode); + pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node); + + return pmd; +} + +/* Configures the 'pmd' based on the input argument. */ +static void +dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, + int index, int core_id, int numa_id) +{ + pmd->dp = dp; + pmd->index = index; + pmd->core_id = core_id; + pmd->numa_id = numa_id; + latch_init(&pmd->exit_latch); + atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ); + /* init the 'flow_cache' since there is no + * actual thread created for NON_PMD_CORE_ID. */ + if (core_id == NON_PMD_CORE_ID) { + emc_cache_init(&pmd->flow_cache); + } + cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node), + hash_int(core_id, 0)); +} + +/* Stops the pmd thread, removes it from the 'dp->poll_threads' + * and destroys the struct. */ static void -dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) +dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd) { - int i; + /* Uninit the 'flow_cache' since there is + * no actual thread uninit it. */ + if (pmd->core_id == NON_PMD_CORE_ID) { + emc_cache_uninit(&pmd->flow_cache); + } else { + latch_set(&pmd->exit_latch); + dp_netdev_reload_pmd__(pmd); + ovs_numa_unpin_core(pmd->core_id); + xpthread_join(pmd->thread, NULL); + } + cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0)); + latch_destroy(&pmd->exit_latch); + free(pmd); +} - if (n == dp->n_pmd_threads) { - return; +/* Destroys all pmd threads. */ +static void +dp_netdev_destroy_all_pmds(struct dp_netdev *dp) +{ + struct dp_netdev_pmd_thread *pmd; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + dp_netdev_del_pmd(pmd); } +} - /* Stop existing threads. */ - latch_set(&dp->exit_latch); - dp_netdev_reload_pmd_threads(dp); - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; +/* Deletes all pmd threads on numa node 'numa_id'. */ +static void +dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) +{ + struct dp_netdev_pmd_thread *pmd; - xpthread_join(f->thread, NULL); + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id) { + dp_netdev_del_pmd(pmd); + } } - latch_poll(&dp->exit_latch); - free(dp->pmd_threads); +} - /* Start new threads. */ - dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads); - dp->n_pmd_threads = n; +/* Checks the numa node id of 'netdev' and starts pmd threads for + * the numa node. */ +static void +dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) +{ + int n_pmds; - for (i = 0; i < n; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; + if (!ovs_numa_numa_id_is_valid(numa_id)) { + VLOG_ERR("Cannot create pmd threads due to numa id (%d)" + "invalid", numa_id); + return ; + } + + n_pmds = get_n_pmd_threads_on_numa(dp, numa_id); + + /* If there are already pmd threads created for the numa node + * in which 'netdev' is on, do nothing. Else, creates the + * pmd threads for the numa node. */ + if (!n_pmds) { + int can_have, n_unpinned, i; + + n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id); + if (!n_unpinned) { + VLOG_ERR("Cannot create pmd threads due to out of unpinned " + "cores on numa node"); + return; + } - f->dp = dp; - f->id = i; - atomic_init(&f->change_seq, PMD_INITIAL_SEQ); + /* Tries creating NR_PMD_THREADS pmd threads on the numa node. */ + can_have = MIN(n_unpinned, NR_PMD_THREADS); + for (i = 0; i < can_have; i++) { + struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd); + int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id); - /* Each thread will distribute all devices rx-queues among - * themselves. */ - f->thread = ovs_thread_create("pmd", pmd_thread_main, f); + dp_netdev_configure_pmd(pmd, dp, i, core_id, numa_id); + /* Each thread will distribute all devices rx-queues among + * themselves. */ + pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd); + } + VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id); } } @@ -2177,8 +2336,8 @@ packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow, } static inline void -packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp, - struct emc_cache *flow_cache) +packet_batch_execute(struct packet_batch *batch, + struct dp_netdev_pmd_thread *pmd) { struct dp_netdev_actions *actions; struct dp_netdev_flow *flow = batch->flow; @@ -2188,11 +2347,10 @@ packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp, actions = dp_netdev_flow_get_actions(flow); - dp_netdev_execute_actions(dp, batch->packets, batch->packet_count, true, - &batch->md, flow_cache, - actions->actions, actions->size); + dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true, + &batch->md, actions->actions, actions->size); - dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count); + dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count); } static inline bool @@ -2247,12 +2405,13 @@ dpif_packet_swap(struct dpif_packet **a, struct dpif_packet **b) * 'packets' array (they have been moved to the beginning of the vector). */ static inline size_t -emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, - struct dpif_packet **packets, size_t cnt, - struct pkt_metadata *md, struct netdev_flow_key *keys) +emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets, + size_t cnt, struct pkt_metadata *md, + struct netdev_flow_key *keys) { struct netdev_flow_key key; struct packet_batch batches[4]; + struct emc_cache *flow_cache = &pmd->flow_cache; size_t n_batches, i; size_t notfound_cnt = 0; @@ -2285,14 +2444,14 @@ emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, } for (i = 0; i < n_batches; i++) { - packet_batch_execute(&batches[i], dp, flow_cache); + packet_batch_execute(&batches[i], pmd); } return notfound_cnt; } static inline void -fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, +fast_path_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets, size_t cnt, struct pkt_metadata *md, struct netdev_flow_key *keys) { @@ -2305,6 +2464,8 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, struct packet_batch batches[PKT_ARRAY_SIZE]; const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* NULL at bad packets. */ struct cls_rule *rules[PKT_ARRAY_SIZE]; + struct dp_netdev *dp = pmd->dp; + struct emc_cache *flow_cache = &pmd->flow_cache; size_t n_batches, i; bool any_miss; @@ -2353,8 +2514,8 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, /* We can't allow the packet batching in the next loop to execute * the actions. Otherwise, if there are any slow path actions, * we'll send the packet up twice. */ - dp_netdev_execute_actions(dp, &packets[i], 1, false, md, - flow_cache, ofpbuf_data(&actions), + dp_netdev_execute_actions(pmd, &packets[i], 1, false, md, + ofpbuf_data(&actions), ofpbuf_size(&actions)); add_actions = ofpbuf_size(&put_actions) @@ -2391,18 +2552,19 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, } flow = dp_netdev_flow_cast(rules[i]); - emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet), flow); + emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet), + flow); dp_netdev_queue_batches(packet, md, flow, mfs[i], batches, &n_batches, ARRAY_SIZE(batches)); } for (i = 0; i < n_batches; i++) { - packet_batch_execute(&batches[i], dp, flow_cache); + packet_batch_execute(&batches[i], pmd); } } static void -dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache, +dp_netdev_input(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets, int cnt, struct pkt_metadata *md) { #if !defined(__CHECKER__) && !defined(_WIN32) @@ -2414,15 +2576,14 @@ dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache, struct netdev_flow_key keys[PKT_ARRAY_SIZE]; size_t newcnt; - newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys); + newcnt = emc_processing(pmd, packets, cnt, md, keys); if (OVS_UNLIKELY(newcnt)) { - fast_path_processing(dp, flow_cache, packets, newcnt, md, keys); + fast_path_processing(pmd, packets, newcnt, md, keys); } } struct dp_netdev_execute_aux { - struct dp_netdev *dp; - struct emc_cache *flow_cache; + struct dp_netdev_pmd_thread *pmd; }; static void @@ -2442,7 +2603,8 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, { struct dp_netdev_execute_aux *aux = aux_; uint32_t *depth = recirc_depth_get(); - struct dp_netdev *dp = aux->dp; + struct dp_netdev_pmd_thread *pmd= aux->pmd; + struct dp_netdev *dp= pmd->dp; int type = nl_attr_type(a); struct dp_netdev_port *p; int i; @@ -2451,7 +2613,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, case OVS_ACTION_ATTR_OUTPUT: p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a))); if (OVS_LIKELY(p)) { - netdev_send(p->netdev, NETDEV_QID_NONE, packets, cnt, may_steal); + netdev_send(p->netdev, pmd->core_id, packets, cnt, may_steal); } else if (may_steal) { for (i = 0; i < cnt; i++) { dpif_packet_delete(packets[i]); @@ -2478,8 +2640,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, DPIF_UC_ACTION, userdata, &actions, NULL); if (!error || error == ENOSPC) { - dp_netdev_execute_actions(dp, &packets[i], 1, false, md, - aux->flow_cache, + dp_netdev_execute_actions(pmd, &packets[i], 1, false, md, ofpbuf_data(&actions), ofpbuf_size(&actions)); } @@ -2541,7 +2702,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, /* Hash is private to each packet */ recirc_md.dp_hash = dpif_packet_get_dp_hash(packets[i]); - dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1, + dp_netdev_input(pmd, &recirc_pkt, 1, &recirc_md); } (*depth)--; @@ -2571,13 +2732,12 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, } static void -dp_netdev_execute_actions(struct dp_netdev *dp, +dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets, int cnt, bool may_steal, struct pkt_metadata *md, - struct emc_cache *flow_cache, const struct nlattr *actions, size_t actions_len) { - struct dp_netdev_execute_aux aux = {dp, flow_cache}; + struct dp_netdev_execute_aux aux = {pmd}; odp_execute_actions(&aux, packets, cnt, may_steal, md, actions, actions_len, dp_execute_cb); diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h index fb9d0e2..d811507 100644 --- a/lib/dpif-netdev.h +++ b/lib/dpif-netdev.h @@ -40,10 +40,9 @@ static inline void dp_packet_pad(struct ofpbuf *b) } } -#define NETDEV_QID_NONE INT_MAX - #define NR_QUEUE 1 #define NR_PMD_THREADS 1 +#define NON_PMD_CORE_ID 0 #ifdef __cplusplus } diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index ef4fe5d..5b22a65 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -494,8 +494,7 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) ovs_mutex_lock(&netdev->mutex); - /* XXX: need to discover device node at run time. */ - netdev->socket_id = SOCKET0; + netdev->socket_id = rte_eth_dev_socket_id(port_no); netdev_dpdk_set_txq(netdev, NR_QUEUE); netdev->port_id = port_no; netdev->flags = 0; @@ -1528,7 +1527,8 @@ pmd_thread_setaffinity_cpu(int cpu) return err; } /* lcore_id 0 is reseved for use by non pmd threads. */ - RTE_PER_LCORE(_lcore_id) = cpu + 1; + ovs_assert(cpu); + RTE_PER_LCORE(_lcore_id) = cpu; return 0; } @@ -1536,9 +1536,6 @@ pmd_thread_setaffinity_cpu(int cpu) void thread_set_nonpmd(void) { - /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved - * for non pmd threads */ - BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE); /* We have to use 0 to allow non pmd threads to perform certain DPDK * operations, like rte_eth_dev_configure(). */ RTE_PER_LCORE(_lcore_id) = 0; -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev