Current rx queue management model is buggy and will not work properly without additional barriers and other syncronization between PMD threads and main thread.
Introducing the new model, where distribution of queues is made by main thread with minimal synchronizations and without data races between pmd threads. Also, this model should work faster, because only needed threads will be interrupted for reconfiguraition and total computational complexity of reconfiguration is lower. Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> --- This patch supersedes "[PATCH 0/2] Per numa node barriers for pmd threads". I will be on vacation untill Jan 11, so *other my patches will be rebased on top of this* after Jan 11. lib/dpif-netdev.c | 272 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 151 insertions(+), 121 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index cd72e62..fdd6f2a 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles { atomic_ullong n[PMD_N_CYCLES]; }; +/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */ +struct rxq_poll { + struct dp_netdev_port *port; + struct netdev_rxq *rx; + struct ovs_list node; +}; + /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate * the performance overhead of interrupt processing. Therefore netdev can * not implement rx-wait for these devices. dpif-netdev needs to poll @@ -429,6 +436,9 @@ struct dp_netdev_pmd_thread { int numa_id; /* numa node id of this pmd thread. */ int tx_qid; /* Queue id used by this pmd thread to * send packets on all netdevs */ + /* List of rx queues to poll. */ + struct ovs_list poll_list; + int poll_cnt; /* Only a pmd thread can write on its own 'cycles' and 'stats'. * The main thread keeps 'stats_zero' and 'cycles_zero' as base @@ -469,7 +479,7 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *, struct dp_packet **, int cnt); static void dp_netdev_disable_upcall(struct dp_netdev *); -void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); +void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, int index, unsigned core_id, int numa_id); @@ -482,6 +492,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos); static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); +static void +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev_port *port, struct netdev_rxq *rx); +static struct dp_netdev_pmd_thread * +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd); @@ -1011,7 +1026,7 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) } static void -dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) +dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd) { int old_seq; @@ -1025,16 +1040,28 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) ovs_mutex_unlock(&pmd->cond_mutex); } -/* Causes all pmd threads to reload its tx/rx devices. - * Must be called after adding/removing ports. */ static void -dp_netdev_reload_pmds(struct dp_netdev *dp) +dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd) { - struct dp_netdev_pmd_thread *pmd; + int old_seq; - CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { - dp_netdev_reload_pmd__(pmd); + if (pmd->core_id == NON_PMD_CORE_ID) { + return; } + + ovs_mutex_lock(&pmd->cond_mutex); + atomic_add_relaxed(&pmd->change_seq, 1, &old_seq); +} + +static void +dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd) +{ + if (pmd->core_id == NON_PMD_CORE_ID) { + return; + } + + ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex); + ovs_mutex_unlock(&pmd->cond_mutex); } static uint32_t @@ -1128,8 +1155,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, cmap_insert(&dp->ports, &port->node, hash_port_no(port_no)); if (netdev_is_pmd(netdev)) { - dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); - dp_netdev_reload_pmds(dp); + int numa_id = netdev_get_numa_id(netdev); + struct dp_netdev_pmd_thread *pmd; + + for (i = 0; i < netdev_n_rxq(netdev); i++) { + pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id); + if (!pmd) { + /* There is no pmd threads on this numa node. */ + dp_netdev_set_pmds_on_numa(dp, numa_id); + /* Assigning of rx queues done. */ + break; + } + + dp_netdev_pause_pmd__(pmd); + dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]); + dp_netdev_resume_pmd__(pmd); + } } seq_change(dp->port_seq); @@ -1226,16 +1267,6 @@ port_ref(struct dp_netdev_port *port) } } -static bool -port_try_ref(struct dp_netdev_port *port) -{ - if (port) { - return ovs_refcount_try_ref_rcu(&port->ref_cnt); - } - - return false; -} - static void port_unref(struct dp_netdev_port *port) { @@ -1314,11 +1345,36 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) int numa_id = netdev_get_numa_id(port->netdev); /* If there is no netdev on the numa node, deletes the pmd threads - * for that numa. Else, just reloads the queues. */ + * for that numa. Else, deletes the queues from polling lists. */ if (!has_pmd_port_for_numa(dp, numa_id)) { dp_netdev_del_pmds_on_numa(dp, numa_id); } - dp_netdev_reload_pmds(dp); + else { + bool found; + struct dp_netdev_pmd_thread *pmd; + struct rxq_poll *poll, *next; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id) { + found = false; + dp_netdev_pause_pmd__(pmd); + LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) { + if (poll->port == port) { + port_unref(poll->port); + list_remove(&poll->node); + free(poll); + found = true; + } + } + if (found) { + /* Clean up emc cache if poll_list modified. */ + emc_cache_uninit(&pmd->flow_cache); + emc_cache_init(&pmd->flow_cache); + } + dp_netdev_resume_pmd__(pmd); + } + } + } } port_unref(port); @@ -2583,92 +2639,27 @@ dpif_netdev_wait(struct dpif *dpif) seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); } -struct rxq_poll { - struct dp_netdev_port *port; - struct netdev_rxq *rx; -}; - -static int -pmd_load_queues(struct dp_netdev_pmd_thread *pmd, - struct rxq_poll **ppoll_list, int poll_cnt) -{ - struct rxq_poll *poll_list = *ppoll_list; - struct dp_netdev_port *port; - int n_pmds_on_numa, index, i; - - /* Simple scheduler for netdev rx polling. */ - for (i = 0; i < poll_cnt; i++) { - port_unref(poll_list[i].port); - } - - poll_cnt = 0; - n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); - index = 0; - - CMAP_FOR_EACH (port, node, &pmd->dp->ports) { - /* Calls port_try_ref() to prevent the main thread - * from deleting the port. */ - if (port_try_ref(port)) { - if (netdev_is_pmd(port->netdev) - && netdev_get_numa_id(port->netdev) == pmd->numa_id) { - int i; - - for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - if ((index % n_pmds_on_numa) == pmd->index) { - poll_list = xrealloc(poll_list, - sizeof *poll_list * (poll_cnt + 1)); - - port_ref(port); - poll_list[poll_cnt].port = port; - poll_list[poll_cnt].rx = port->rxq[i]; - poll_cnt++; - } - index++; - } - } - /* Unrefs the port_try_ref(). */ - port_unref(port); - } - } - - *ppoll_list = poll_list; - return poll_cnt; -} - static void * pmd_thread_main(void *f_) { struct dp_netdev_pmd_thread *pmd = f_; + struct rxq_poll *poll; unsigned int lc = 0; - struct rxq_poll *poll_list; unsigned int port_seq = PMD_INITIAL_SEQ; - int poll_cnt; - int i; - - poll_cnt = 0; - poll_list = NULL; /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); pmd_thread_setaffinity_cpu(pmd->core_id); reload: - emc_cache_init(&pmd->flow_cache); - poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt); - /* List port/core affinity */ - for (i = 0; i < poll_cnt; i++) { - VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, netdev_get_name(poll_list[i].port->netdev)); + LIST_FOR_EACH (poll, node, &pmd->poll_list) { + VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, + netdev_get_name(poll->port->netdev)); } - /* Signal here to make sure the pmd finishes - * reloading the updated configuration. */ - dp_netdev_pmd_reload_done(pmd); - for (;;) { - int i; - - for (i = 0; i < poll_cnt; i++) { - dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx); + LIST_FOR_EACH (poll, node, &pmd->poll_list) { + dp_netdev_process_rxq_port(pmd, poll->port, poll->rx); } if (lc++ > 1024) { @@ -2688,19 +2679,13 @@ reload: } } - emc_cache_uninit(&pmd->flow_cache); + /* Synchronize with breaker thread. */ + dp_netdev_pmd_break_done(pmd); - if (!latch_is_set(&pmd->exit_latch)){ + if (!latch_is_set(&pmd->exit_latch)) { goto reload; } - for (i = 0; i < poll_cnt; i++) { - port_unref(poll_list[i].port); - } - - dp_netdev_pmd_reload_done(pmd); - - free(poll_list); return NULL; } @@ -2735,7 +2720,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif) } void -dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd) +dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd) { ovs_mutex_lock(&pmd->cond_mutex); xpthread_cond_signal(&pmd->cond); @@ -2827,6 +2812,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, pmd->core_id = core_id; pmd->tx_qid = core_id_to_qid(core_id); pmd->numa_id = numa_id; + pmd->poll_cnt = 0; ovs_refcount_init(&pmd->ref_cnt); latch_init(&pmd->exit_latch); @@ -2836,11 +2822,9 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, ovs_mutex_init(&pmd->flow_mutex); dpcls_init(&pmd->cls); cmap_init(&pmd->flow_table); - /* init the 'flow_cache' since there is no - * actual thread created for NON_PMD_CORE_ID. */ - if (core_id == NON_PMD_CORE_ID) { - emc_cache_init(&pmd->flow_cache); - } + list_init(&pmd->poll_list); + emc_cache_init(&pmd->flow_cache); + cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node), hash_int(core_id, 0)); } @@ -2863,16 +2847,23 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd) static void dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) { - /* Uninit the 'flow_cache' since there is - * no actual thread uninit it for NON_PMD_CORE_ID. */ - if (pmd->core_id == NON_PMD_CORE_ID) { - emc_cache_uninit(&pmd->flow_cache); - } else { + struct rxq_poll *poll; + + emc_cache_uninit(&pmd->flow_cache); + + if (pmd->core_id != NON_PMD_CORE_ID) { latch_set(&pmd->exit_latch); - dp_netdev_reload_pmd__(pmd); + dp_netdev_break_pmd__(pmd); ovs_numa_unpin_core(pmd->core_id); xpthread_join(pmd->thread, NULL); } + + /* Unref all ports and free poll_list. */ + LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) { + port_unref(poll->port); + free(poll); + } + /* Purges the 'pmd''s flows after stopping the thread, but before * destroying the flows, so that the flow stats can be collected. */ if (dp->dp_purge_cb) { @@ -2906,6 +2897,37 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) } } +static struct dp_netdev_pmd_thread * +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id) +{ + int min_cnt = -1; + struct dp_netdev_pmd_thread *pmd, *res = NULL; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id + && (min_cnt > pmd->poll_cnt || res == NULL)) { + min_cnt = pmd->poll_cnt; + res = pmd; + } + } + + return res; +} + +static void +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev_port *port, struct netdev_rxq *rx) +{ + struct rxq_poll *poll = xmalloc(sizeof *poll); + + port_ref(port); + poll->port = port; + poll->rx = rx; + + list_push_back(&pmd->poll_list, &poll->node); + pmd->poll_cnt++; +} + /* Checks the numa node id of 'netdev' and starts pmd threads for * the numa node. */ static void @@ -2925,8 +2947,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) * in which 'netdev' is on, do nothing. Else, creates the * pmd threads for the numa node. */ if (!n_pmds) { - int can_have, n_unpinned, i; + int can_have, n_unpinned, i, index = 0; struct dp_netdev_pmd_thread **pmds; + struct dp_netdev_port *port; n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id); if (!n_unpinned) { @@ -2944,13 +2967,20 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) pmds[i] = xzalloc(sizeof **pmds); dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id); } - /* The pmd thread code needs to see all the others configured pmd - * threads on the same numa node. That's why we call - * 'dp_netdev_configure_pmd()' on all the threads and then we actually - * start them. */ + + /* Distributes rx queues of this numa node between new pmd threads. */ + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_get_numa_id(port->netdev) == numa_id) { + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + dp_netdev_add_rxq_to_pmd(pmds[index], port, port->rxq[i]); + index = (index + 1) % can_have; + } + } + } + + /* Actual start of pmd threads. */ for (i = 0; i < can_have; i++) { - /* Each thread will distribute all devices rx-queues among - * themselves. */ pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, pmds[i]); } free(pmds); -- 2.5.0 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev