Hi Ilya, Thank you very much for the patch.
I definitely like that the queue assignment is performed by the main thread: not only is less bug-prone, but the logic will be more easily customizable. I absolutely welcome the changes to do_add_port and do_del_port to keep the queues to the currently assigned threads. I think we can avoid pausing and resuming the threads each time and, instead, leave the current reloading logic unaltered. Here's a way: * pmd_thread_main() would be identical to master. pmd_load_queues(), instead, would return a poll_list by copying the struct rxq_poll from 'pmd->poll_list'. * do_add_port() and do_del_port() would still write on the pmd specific lists while the threads are running. After updating a list for a pmd thread, they would call dp_netdev_reload_pmd__(). This behaviour should still fix the bugs, but it requires less sychronization. What do you think? I don't think this should create any problems to the following patch, right? I've prepared an incremental on top of this patch to illustrate the idea, but other ideas/implementations/fixes are welcome. Thanks, Daniele ---------------------------------------- diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index fd6ac48..3f5cf42 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -480,7 +480,7 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *, struct dp_packet **, int cnt); static void dp_netdev_disable_upcall(struct dp_netdev *); -void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd); +void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, int index, unsigned core_id, int numa_id); @@ -1026,9 +1026,8 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) return 0; } -/* Causes pmd thread to break from infinite polling cycle. */ static void -dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd) +dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) { int old_seq; @@ -1042,39 +1041,6 @@ dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd) ovs_mutex_unlock(&pmd->cond_mutex); } -/* Causes pmd thread to break from infinite polling cycle and - * lock on poll_mutex. Not applicable for non-PMD threads. */ -static void -dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd) - OVS_ACQUIRES(pmd->poll_mutex) -{ - int old_seq; - - ovs_assert(pmd->core_id != NON_PMD_CORE_ID); - - /* Wait until pmd thread starts polling cycle to - * avoid deadlock. */ - while (!ovs_mutex_trylock(&pmd->poll_mutex)) { - ovs_mutex_unlock(&pmd->poll_mutex); - } - - ovs_mutex_lock(&pmd->cond_mutex); - atomic_add_relaxed(&pmd->change_seq, 1, &old_seq); - ovs_mutex_lock(&pmd->poll_mutex); - ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex); - ovs_mutex_unlock(&pmd->cond_mutex); -} - -/* Unlocks pmd thread by unlocking poll_mutex. - * Not applicable for non-PMD threads. */ -static void -dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd) - OVS_RELEASES(pmd->poll_mutex) -{ - ovs_assert(pmd->core_id != NON_PMD_CORE_ID); - ovs_mutex_unlock(&pmd->poll_mutex); -} - static uint32_t hash_port_no(odp_port_t port_no) { @@ -1181,9 +1147,10 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, break; } - dp_netdev_pause_pmd__(pmd); + ovs_mutex_lock(&pmd->poll_mutex); dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]); - dp_netdev_resume_pmd__(pmd); + ovs_mutex_unlock(&pmd->poll_mutex); + dp_netdev_reload_pmd__(pmd); } } seq_change(dp->port_seq); @@ -1366,29 +1333,27 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) dp_netdev_del_pmds_on_numa(dp, numa_id); } else { - bool found; struct dp_netdev_pmd_thread *pmd; struct rxq_poll *poll, *next; CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { if (pmd->numa_id == numa_id) { - found = false; - dp_netdev_pause_pmd__(pmd); + bool found = false; + + ovs_mutex_lock(&pmd->poll_mutex); LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) { if (poll->port == port) { + found = true; port_unref(poll->port); list_remove(&poll->node); pmd->poll_cnt--; free(poll); - found = true; } } + ovs_mutex_unlock(&pmd->poll_mutex); if (found) { - /* Clean up emc cache if poll_list modified. */ - emc_cache_uninit(&pmd->flow_cache); - emc_cache_init(&pmd->flow_cache); + dp_netdev_reload_pmd__(pmd); } - dp_netdev_resume_pmd__(pmd); } } } @@ -2656,28 +2621,56 @@ dpif_netdev_wait(struct dpif *dpif) seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); } +static int +pmd_load_queues(struct dp_netdev_pmd_thread *pmd, + struct rxq_poll **ppoll_list) +{ + struct rxq_poll *poll_list = *ppoll_list; + struct rxq_poll *poll; + int i = 0; + + poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list); + + LIST_FOR_EACH (poll, node, &pmd->poll_list) { + poll_list[i++] = *poll; + } + + *ppoll_list = poll_list; + return pmd->poll_cnt; +} + static void * pmd_thread_main(void *f_) { struct dp_netdev_pmd_thread *pmd = f_; - struct rxq_poll *poll; unsigned int lc = 0; + struct rxq_poll *poll_list; unsigned int port_seq = PMD_INITIAL_SEQ; + int poll_cnt; + int i; + + poll_cnt = 0; + poll_list = NULL; /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); pmd_thread_setaffinity_cpu(pmd->core_id); reload: - ovs_mutex_lock(&pmd->poll_mutex); + emc_cache_init(&pmd->flow_cache); + poll_cnt = pmd_load_queues(pmd, &poll_list); + /* List port/core affinity */ - LIST_FOR_EACH (poll, node, &pmd->poll_list) { - VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, - netdev_get_name(poll->port->netdev)); + for (i = 0; i < poll_cnt; i++) { + VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, netdev_get_name(poll_list[i].port->netdev)); } + /* Signal here to make sure the pmd finishes + * reloading the updated configuration. */ + dp_netdev_pmd_reload_done(pmd); + for (;;) { - LIST_FOR_EACH (poll, node, &pmd->poll_list) { - dp_netdev_process_rxq_port(pmd, poll->port, poll->rx); + for (i = 0; i < poll_cnt; i++) { + dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx); } if (lc++ > 1024) { @@ -2696,15 +2689,16 @@ reload: } } } - ovs_mutex_unlock(&pmd->poll_mutex); - /* Synchronize with breaker thread. */ - dp_netdev_pmd_break_done(pmd); + emc_cache_uninit(&pmd->flow_cache); - if (!latch_is_set(&pmd->exit_latch)) { + if (!latch_is_set(&pmd->exit_latch)){ goto reload; } + dp_netdev_pmd_reload_done(pmd); + + free(poll_list); return NULL; } @@ -2739,7 +2733,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif) } void -dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd) +dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd) { ovs_mutex_lock(&pmd->cond_mutex); xpthread_cond_signal(&pmd->cond); @@ -2843,8 +2837,11 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, dpcls_init(&pmd->cls); cmap_init(&pmd->flow_table); list_init(&pmd->poll_list); - emc_cache_init(&pmd->flow_cache); - + /* init the 'flow_cache' since there is no + * actual thread created for NON_PMD_CORE_ID. */ + if (core_id == NON_PMD_CORE_ID) { + emc_cache_init(&pmd->flow_cache); + } cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node), hash_int(core_id, 0)); } @@ -2870,11 +2867,13 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) { struct rxq_poll *poll; - emc_cache_uninit(&pmd->flow_cache); - - if (pmd->core_id != NON_PMD_CORE_ID) { + /* Uninit the 'flow_cache' since there is + * no actual thread uninit it for NON_PMD_CORE_ID. */ + if (pmd->core_id == NON_PMD_CORE_ID) { + emc_cache_uninit(&pmd->flow_cache); + } else { latch_set(&pmd->exit_latch); - dp_netdev_break_pmd__(pmd); + dp_netdev_reload_pmd__(pmd); ovs_numa_unpin_core(pmd->core_id); xpthread_join(pmd->thread, NULL); } On 14/01/2016 06:47, "Ilya Maximets" <i.maxim...@samsung.com> wrote: >Current rx queue management model is buggy and will not work properly >without additional barriers and other syncronization between PMD >threads and main thread. > >Known BUGS of current model: > * While reloading, two PMD threads, one already reloaded and > one not yet reloaded, can poll same queue of the same port. > This behavior may lead to dpdk driver failure, because they > are not thread-safe. > * Same bug as fixed in commit e4e74c3a2b > ("dpif-netdev: Purge all ukeys when reconfigure pmd.") but > reproduced while only reconfiguring of pmd threads without > restarting, because addition may change the sequence of > other ports, which is important in time of reconfiguration. > >Introducing the new model, where distribution of queues made by main >thread with minimal synchronizations and without data races between >pmd threads. Also, this model should work faster, because only >needed threads will be interrupted for reconfiguraition and total >computational complexity of reconfiguration is less. > >Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> >--- > lib/dpif-netdev.c | 301 >++++++++++++++++++++++++++++++++---------------------- > 1 file changed, 180 insertions(+), 121 deletions(-) > >diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c >index cd72e62..fd6ac48 100644 >--- a/lib/dpif-netdev.c >+++ b/lib/dpif-netdev.c >@@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles { > atomic_ullong n[PMD_N_CYCLES]; > }; > >+/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */ >+struct rxq_poll { >+ struct dp_netdev_port *port; >+ struct netdev_rxq *rx; >+ struct ovs_list node; >+}; >+ > /* PMD: Poll modes drivers. PMD accesses devices via polling to >eliminate > * the performance overhead of interrupt processing. Therefore netdev >can > * not implement rx-wait for these devices. dpif-netdev needs to poll >@@ -430,6 +437,10 @@ struct dp_netdev_pmd_thread { > int tx_qid; /* Queue id used by this pmd thread >to > * send packets on all netdevs */ > >+ struct ovs_list poll_list; /* List of rx queues to poll. */ >+ int poll_cnt; /* Number of elemints in poll_list. >*/ >+ struct ovs_mutex poll_mutex; /* Mutex for poll_list. */ >+ > /* Only a pmd thread can write on its own 'cycles' and 'stats'. > * The main thread keeps 'stats_zero' and 'cycles_zero' as base > * values and subtracts them from 'stats' and 'cycles' before >@@ -469,7 +480,7 @@ static void dp_netdev_input(struct >dp_netdev_pmd_thread *, > struct dp_packet **, int cnt); > > static void dp_netdev_disable_upcall(struct dp_netdev *); >-void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); >+void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd); > static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, > struct dp_netdev *dp, int index, > unsigned core_id, int numa_id); >@@ -482,6 +493,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct >cmap_position *pos); > static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); > static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int >numa_id); > static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int >numa_id); >+static void >+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, >+ struct dp_netdev_port *port, struct netdev_rxq >*rx); >+static struct dp_netdev_pmd_thread * >+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id); > static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); > static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd); > static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd); >@@ -1010,8 +1026,9 @@ dpif_netdev_get_stats(const struct dpif *dpif, >struct dpif_dp_stats *stats) > return 0; > } > >+/* Causes pmd thread to break from infinite polling cycle. */ > static void >-dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) >+dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd) > { > int old_seq; > >@@ -1025,16 +1042,37 @@ dp_netdev_reload_pmd__(struct >dp_netdev_pmd_thread *pmd) > ovs_mutex_unlock(&pmd->cond_mutex); > } > >-/* Causes all pmd threads to reload its tx/rx devices. >- * Must be called after adding/removing ports. */ >+/* Causes pmd thread to break from infinite polling cycle and >+ * lock on poll_mutex. Not applicable for non-PMD threads. */ > static void >-dp_netdev_reload_pmds(struct dp_netdev *dp) >+dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd) >+ OVS_ACQUIRES(pmd->poll_mutex) > { >- struct dp_netdev_pmd_thread *pmd; >+ int old_seq; > >- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { >- dp_netdev_reload_pmd__(pmd); >+ ovs_assert(pmd->core_id != NON_PMD_CORE_ID); >+ >+ /* Wait until pmd thread starts polling cycle to >+ * avoid deadlock. */ >+ while (!ovs_mutex_trylock(&pmd->poll_mutex)) { >+ ovs_mutex_unlock(&pmd->poll_mutex); > } >+ >+ ovs_mutex_lock(&pmd->cond_mutex); >+ atomic_add_relaxed(&pmd->change_seq, 1, &old_seq); >+ ovs_mutex_lock(&pmd->poll_mutex); >+ ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex); >+ ovs_mutex_unlock(&pmd->cond_mutex); >+} >+ >+/* Unlocks pmd thread by unlocking poll_mutex. >+ * Not applicable for non-PMD threads. */ >+static void >+dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd) >+ OVS_RELEASES(pmd->poll_mutex) >+{ >+ ovs_assert(pmd->core_id != NON_PMD_CORE_ID); >+ ovs_mutex_unlock(&pmd->poll_mutex); > } > > static uint32_t >@@ -1128,8 +1166,25 @@ do_add_port(struct dp_netdev *dp, const char >*devname, const char *type, > cmap_insert(&dp->ports, &port->node, hash_port_no(port_no)); > > if (netdev_is_pmd(netdev)) { >- dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); >- dp_netdev_reload_pmds(dp); >+ int numa_id = netdev_get_numa_id(netdev); >+ struct dp_netdev_pmd_thread *pmd; >+ >+ /* Cannot create pmd threads for invalid numa node. */ >+ ovs_assert(ovs_numa_numa_id_is_valid(numa_id)); >+ >+ for (i = 0; i < netdev_n_rxq(netdev); i++) { >+ pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id); >+ if (!pmd) { >+ /* There is no pmd threads on this numa node. */ >+ dp_netdev_set_pmds_on_numa(dp, numa_id); >+ /* Assigning of rx queues done. */ >+ break; >+ } >+ >+ dp_netdev_pause_pmd__(pmd); >+ dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]); >+ dp_netdev_resume_pmd__(pmd); >+ } > } > seq_change(dp->port_seq); > >@@ -1226,16 +1281,6 @@ port_ref(struct dp_netdev_port *port) > } > } > >-static bool >-port_try_ref(struct dp_netdev_port *port) >-{ >- if (port) { >- return ovs_refcount_try_ref_rcu(&port->ref_cnt); >- } >- >- return false; >-} >- > static void > port_unref(struct dp_netdev_port *port) > { >@@ -1313,12 +1358,40 @@ do_del_port(struct dp_netdev *dp, struct >dp_netdev_port *port) > if (netdev_is_pmd(port->netdev)) { > int numa_id = netdev_get_numa_id(port->netdev); > >+ /* PMD threads can not be on invalid numa node. */ >+ ovs_assert(ovs_numa_numa_id_is_valid(numa_id)); > /* If there is no netdev on the numa node, deletes the pmd >threads >- * for that numa. Else, just reloads the queues. */ >+ * for that numa. Else, deletes the queues from polling lists. >*/ > if (!has_pmd_port_for_numa(dp, numa_id)) { > dp_netdev_del_pmds_on_numa(dp, numa_id); > } >- dp_netdev_reload_pmds(dp); >+ else { >+ bool found; >+ struct dp_netdev_pmd_thread *pmd; >+ struct rxq_poll *poll, *next; >+ >+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { >+ if (pmd->numa_id == numa_id) { >+ found = false; >+ dp_netdev_pause_pmd__(pmd); >+ LIST_FOR_EACH_SAFE (poll, next, node, >&pmd->poll_list) { >+ if (poll->port == port) { >+ port_unref(poll->port); >+ list_remove(&poll->node); >+ pmd->poll_cnt--; >+ free(poll); >+ found = true; >+ } >+ } >+ if (found) { >+ /* Clean up emc cache if poll_list modified. */ >+ emc_cache_uninit(&pmd->flow_cache); >+ emc_cache_init(&pmd->flow_cache); >+ } >+ dp_netdev_resume_pmd__(pmd); >+ } >+ } >+ } > } > > port_unref(port); >@@ -2583,92 +2656,28 @@ dpif_netdev_wait(struct dpif *dpif) > seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); > } > >-struct rxq_poll { >- struct dp_netdev_port *port; >- struct netdev_rxq *rx; >-}; >- >-static int >-pmd_load_queues(struct dp_netdev_pmd_thread *pmd, >- struct rxq_poll **ppoll_list, int poll_cnt) >-{ >- struct rxq_poll *poll_list = *ppoll_list; >- struct dp_netdev_port *port; >- int n_pmds_on_numa, index, i; >- >- /* Simple scheduler for netdev rx polling. */ >- for (i = 0; i < poll_cnt; i++) { >- port_unref(poll_list[i].port); >- } >- >- poll_cnt = 0; >- n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); >- index = 0; >- >- CMAP_FOR_EACH (port, node, &pmd->dp->ports) { >- /* Calls port_try_ref() to prevent the main thread >- * from deleting the port. */ >- if (port_try_ref(port)) { >- if (netdev_is_pmd(port->netdev) >- && netdev_get_numa_id(port->netdev) == pmd->numa_id) { >- int i; >- >- for (i = 0; i < netdev_n_rxq(port->netdev); i++) { >- if ((index % n_pmds_on_numa) == pmd->index) { >- poll_list = xrealloc(poll_list, >- sizeof *poll_list * (poll_cnt + >1)); >- >- port_ref(port); >- poll_list[poll_cnt].port = port; >- poll_list[poll_cnt].rx = port->rxq[i]; >- poll_cnt++; >- } >- index++; >- } >- } >- /* Unrefs the port_try_ref(). */ >- port_unref(port); >- } >- } >- >- *ppoll_list = poll_list; >- return poll_cnt; >-} >- > static void * > pmd_thread_main(void *f_) > { > struct dp_netdev_pmd_thread *pmd = f_; >+ struct rxq_poll *poll; > unsigned int lc = 0; >- struct rxq_poll *poll_list; > unsigned int port_seq = PMD_INITIAL_SEQ; >- int poll_cnt; >- int i; >- >- poll_cnt = 0; >- poll_list = NULL; > > /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ > ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); > pmd_thread_setaffinity_cpu(pmd->core_id); > reload: >- emc_cache_init(&pmd->flow_cache); >- poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt); >- >+ ovs_mutex_lock(&pmd->poll_mutex); > /* List port/core affinity */ >- for (i = 0; i < poll_cnt; i++) { >- VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, >netdev_get_name(poll_list[i].port->netdev)); >+ LIST_FOR_EACH (poll, node, &pmd->poll_list) { >+ VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, >+ netdev_get_name(poll->port->netdev)); > } > >- /* Signal here to make sure the pmd finishes >- * reloading the updated configuration. */ >- dp_netdev_pmd_reload_done(pmd); >- > for (;;) { >- int i; >- >- for (i = 0; i < poll_cnt; i++) { >- dp_netdev_process_rxq_port(pmd, poll_list[i].port, >poll_list[i].rx); >+ LIST_FOR_EACH (poll, node, &pmd->poll_list) { >+ dp_netdev_process_rxq_port(pmd, poll->port, poll->rx); > } > > if (lc++ > 1024) { >@@ -2687,20 +2696,15 @@ reload: > } > } > } >+ ovs_mutex_unlock(&pmd->poll_mutex); > >- emc_cache_uninit(&pmd->flow_cache); >+ /* Synchronize with breaker thread. */ >+ dp_netdev_pmd_break_done(pmd); > >- if (!latch_is_set(&pmd->exit_latch)){ >+ if (!latch_is_set(&pmd->exit_latch)) { > goto reload; > } > >- for (i = 0; i < poll_cnt; i++) { >- port_unref(poll_list[i].port); >- } >- >- dp_netdev_pmd_reload_done(pmd); >- >- free(poll_list); > return NULL; > } > >@@ -2735,7 +2739,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif) > } > > void >-dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd) >+dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd) > { > ovs_mutex_lock(&pmd->cond_mutex); > xpthread_cond_signal(&pmd->cond); >@@ -2827,6 +2831,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread >*pmd, struct dp_netdev *dp, > pmd->core_id = core_id; > pmd->tx_qid = core_id_to_qid(core_id); > pmd->numa_id = numa_id; >+ pmd->poll_cnt = 0; > > ovs_refcount_init(&pmd->ref_cnt); > latch_init(&pmd->exit_latch); >@@ -2834,13 +2839,12 @@ dp_netdev_configure_pmd(struct >dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, > xpthread_cond_init(&pmd->cond, NULL); > ovs_mutex_init(&pmd->cond_mutex); > ovs_mutex_init(&pmd->flow_mutex); >+ ovs_mutex_init(&pmd->poll_mutex); > dpcls_init(&pmd->cls); > cmap_init(&pmd->flow_table); >- /* init the 'flow_cache' since there is no >- * actual thread created for NON_PMD_CORE_ID. */ >- if (core_id == NON_PMD_CORE_ID) { >- emc_cache_init(&pmd->flow_cache); >- } >+ list_init(&pmd->poll_list); >+ emc_cache_init(&pmd->flow_cache); >+ > cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, >&pmd->node), > hash_int(core_id, 0)); > } >@@ -2855,6 +2859,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread >*pmd) > latch_destroy(&pmd->exit_latch); > xpthread_cond_destroy(&pmd->cond); > ovs_mutex_destroy(&pmd->cond_mutex); >+ ovs_mutex_destroy(&pmd->poll_mutex); > free(pmd); > } > >@@ -2863,16 +2868,23 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread >*pmd) > static void > dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) > { >- /* Uninit the 'flow_cache' since there is >- * no actual thread uninit it for NON_PMD_CORE_ID. */ >- if (pmd->core_id == NON_PMD_CORE_ID) { >- emc_cache_uninit(&pmd->flow_cache); >- } else { >+ struct rxq_poll *poll; >+ >+ emc_cache_uninit(&pmd->flow_cache); >+ >+ if (pmd->core_id != NON_PMD_CORE_ID) { > latch_set(&pmd->exit_latch); >- dp_netdev_reload_pmd__(pmd); >+ dp_netdev_break_pmd__(pmd); > ovs_numa_unpin_core(pmd->core_id); > xpthread_join(pmd->thread, NULL); > } >+ >+ /* Unref all ports and free poll_list. */ >+ LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) { >+ port_unref(poll->port); >+ free(poll); >+ } >+ > /* Purges the 'pmd''s flows after stopping the thread, but before > * destroying the flows, so that the flow stats can be collected. */ > if (dp->dp_purge_cb) { >@@ -2906,6 +2918,42 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, >int numa_id) > } > } > >+/* Returns PMD thread from this numa node with fewer rx queues to poll. >+ * Returns NULL if there is no PMD threads on this numa node. */ >+static struct dp_netdev_pmd_thread * >+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id) >+{ >+ int min_cnt = -1; >+ struct dp_netdev_pmd_thread *pmd, *res = NULL; >+ >+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { >+ if (pmd->numa_id == numa_id >+ && (min_cnt > pmd->poll_cnt || res == NULL)) { >+ min_cnt = pmd->poll_cnt; >+ res = pmd; >+ } >+ } >+ >+ return res; >+} >+ >+/* Adds rx queue to poll_list of PMD thread. May be called only >+ * when PMD thread paused or not started yet. */ >+static void >+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, >+ struct dp_netdev_port *port, struct netdev_rxq >*rx) >+ OVS_REQUIRES(pmd->poll_mutex) >+{ >+ struct rxq_poll *poll = xmalloc(sizeof *poll); >+ >+ port_ref(port); >+ poll->port = port; >+ poll->rx = rx; >+ >+ list_push_back(&pmd->poll_list, &poll->node); >+ pmd->poll_cnt++; >+} >+ > /* Checks the numa node id of 'netdev' and starts pmd threads for > * the numa node. */ > static void >@@ -2925,8 +2973,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, >int numa_id) > * in which 'netdev' is on, do nothing. Else, creates the > * pmd threads for the numa node. */ > if (!n_pmds) { >- int can_have, n_unpinned, i; >+ int can_have, n_unpinned, i, index = 0; > struct dp_netdev_pmd_thread **pmds; >+ struct dp_netdev_port *port; > > n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id); > if (!n_unpinned) { >@@ -2944,13 +2993,23 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, >int numa_id) > pmds[i] = xzalloc(sizeof **pmds); > dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id); > } >- /* The pmd thread code needs to see all the others configured pmd >- * threads on the same numa node. That's why we call >- * 'dp_netdev_configure_pmd()' on all the threads and then we >actually >- * start them. */ >+ >+ /* Distributes rx queues of this numa node between new pmd >threads. */ >+ CMAP_FOR_EACH (port, node, &dp->ports) { >+ if (netdev_is_pmd(port->netdev) >+ && netdev_get_numa_id(port->netdev) == numa_id) { >+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) { >+ /* Make thread-safety analyser happy. */ >+ ovs_mutex_lock(&pmds[index]->poll_mutex); >+ dp_netdev_add_rxq_to_pmd(pmds[index], port, >port->rxq[i]); >+ ovs_mutex_unlock(&pmds[index]->poll_mutex); >+ index = (index + 1) % can_have; >+ } >+ } >+ } >+ >+ /* Actual start of pmd threads. */ > for (i = 0; i < can_have; i++) { >- /* Each thread will distribute all devices rx-queues among >- * themselves. */ > pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, >pmds[i]); > } > free(pmds); >-- >2.5.0 > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev