Superseded by version from patch-set: "dpif-netdev: Rework of queue management." http://openvswitch.org/pipermail/dev/2016-January/064478.html
On 31.12.2015 14:29, Ilya Maximets wrote: > Current rx queue management model is buggy and > will not work properly without additional barriers > and other syncronization between PMD threads and > main thread. > > Introducing the new model, where distribution > of queues is made by main thread with minimal > synchronizations and without data races between > pmd threads. Also, this model should work faster, > because only needed threads will be interrupted > for reconfiguraition and total computational > complexity of reconfiguration is lower. > > Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> > --- > > This patch supersedes "[PATCH 0/2] Per numa node barriers for pmd threads". > I will be on vacation untill Jan 11, so *other my patches will be > rebased on top of this* after Jan 11. > > lib/dpif-netdev.c | 272 > ++++++++++++++++++++++++++++++------------------------ > 1 file changed, 151 insertions(+), 121 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index cd72e62..fdd6f2a 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles { > atomic_ullong n[PMD_N_CYCLES]; > }; > > +/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */ > +struct rxq_poll { > + struct dp_netdev_port *port; > + struct netdev_rxq *rx; > + struct ovs_list node; > +}; > + > /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate > * the performance overhead of interrupt processing. Therefore netdev can > * not implement rx-wait for these devices. dpif-netdev needs to poll > @@ -429,6 +436,9 @@ struct dp_netdev_pmd_thread { > int numa_id; /* numa node id of this pmd thread. */ > int tx_qid; /* Queue id used by this pmd thread to > * send packets on all netdevs */ > + /* List of rx queues to poll. */ > + struct ovs_list poll_list; > + int poll_cnt; > > /* Only a pmd thread can write on its own 'cycles' and 'stats'. > * The main thread keeps 'stats_zero' and 'cycles_zero' as base > @@ -469,7 +479,7 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *, > struct dp_packet **, int cnt); > > static void dp_netdev_disable_upcall(struct dp_netdev *); > -void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); > +void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd); > static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, > struct dp_netdev *dp, int index, > unsigned core_id, int numa_id); > @@ -482,6 +492,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct > cmap_position *pos); > static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); > static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); > static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); > +static void > +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, > + struct dp_netdev_port *port, struct netdev_rxq *rx); > +static struct dp_netdev_pmd_thread * > +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id); > static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); > static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd); > static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd); > @@ -1011,7 +1026,7 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct > dpif_dp_stats *stats) > } > > static void > -dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) > +dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd) > { > int old_seq; > > @@ -1025,16 +1040,28 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread > *pmd) > ovs_mutex_unlock(&pmd->cond_mutex); > } > > -/* Causes all pmd threads to reload its tx/rx devices. > - * Must be called after adding/removing ports. */ > static void > -dp_netdev_reload_pmds(struct dp_netdev *dp) > +dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd) > { > - struct dp_netdev_pmd_thread *pmd; > + int old_seq; > > - CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > - dp_netdev_reload_pmd__(pmd); > + if (pmd->core_id == NON_PMD_CORE_ID) { > + return; > } > + > + ovs_mutex_lock(&pmd->cond_mutex); > + atomic_add_relaxed(&pmd->change_seq, 1, &old_seq); > +} > + > +static void > +dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd) > +{ > + if (pmd->core_id == NON_PMD_CORE_ID) { > + return; > + } > + > + ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex); > + ovs_mutex_unlock(&pmd->cond_mutex); > } > > static uint32_t > @@ -1128,8 +1155,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, > const char *type, > cmap_insert(&dp->ports, &port->node, hash_port_no(port_no)); > > if (netdev_is_pmd(netdev)) { > - dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); > - dp_netdev_reload_pmds(dp); > + int numa_id = netdev_get_numa_id(netdev); > + struct dp_netdev_pmd_thread *pmd; > + > + for (i = 0; i < netdev_n_rxq(netdev); i++) { > + pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id); > + if (!pmd) { > + /* There is no pmd threads on this numa node. */ > + dp_netdev_set_pmds_on_numa(dp, numa_id); > + /* Assigning of rx queues done. */ > + break; > + } > + > + dp_netdev_pause_pmd__(pmd); > + dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]); > + dp_netdev_resume_pmd__(pmd); > + } > } > seq_change(dp->port_seq); > > @@ -1226,16 +1267,6 @@ port_ref(struct dp_netdev_port *port) > } > } > > -static bool > -port_try_ref(struct dp_netdev_port *port) > -{ > - if (port) { > - return ovs_refcount_try_ref_rcu(&port->ref_cnt); > - } > - > - return false; > -} > - > static void > port_unref(struct dp_netdev_port *port) > { > @@ -1314,11 +1345,36 @@ do_del_port(struct dp_netdev *dp, struct > dp_netdev_port *port) > int numa_id = netdev_get_numa_id(port->netdev); > > /* If there is no netdev on the numa node, deletes the pmd threads > - * for that numa. Else, just reloads the queues. */ > + * for that numa. Else, deletes the queues from polling lists. */ > if (!has_pmd_port_for_numa(dp, numa_id)) { > dp_netdev_del_pmds_on_numa(dp, numa_id); > } > - dp_netdev_reload_pmds(dp); > + else { > + bool found; > + struct dp_netdev_pmd_thread *pmd; > + struct rxq_poll *poll, *next; > + > + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > + if (pmd->numa_id == numa_id) { > + found = false; > + dp_netdev_pause_pmd__(pmd); > + LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) { > + if (poll->port == port) { > + port_unref(poll->port); > + list_remove(&poll->node); > + free(poll); > + found = true; > + } > + } > + if (found) { > + /* Clean up emc cache if poll_list modified. */ > + emc_cache_uninit(&pmd->flow_cache); > + emc_cache_init(&pmd->flow_cache); > + } > + dp_netdev_resume_pmd__(pmd); > + } > + } > + } > } > > port_unref(port); > @@ -2583,92 +2639,27 @@ dpif_netdev_wait(struct dpif *dpif) > seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); > } > > -struct rxq_poll { > - struct dp_netdev_port *port; > - struct netdev_rxq *rx; > -}; > - > -static int > -pmd_load_queues(struct dp_netdev_pmd_thread *pmd, > - struct rxq_poll **ppoll_list, int poll_cnt) > -{ > - struct rxq_poll *poll_list = *ppoll_list; > - struct dp_netdev_port *port; > - int n_pmds_on_numa, index, i; > - > - /* Simple scheduler for netdev rx polling. */ > - for (i = 0; i < poll_cnt; i++) { > - port_unref(poll_list[i].port); > - } > - > - poll_cnt = 0; > - n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); > - index = 0; > - > - CMAP_FOR_EACH (port, node, &pmd->dp->ports) { > - /* Calls port_try_ref() to prevent the main thread > - * from deleting the port. */ > - if (port_try_ref(port)) { > - if (netdev_is_pmd(port->netdev) > - && netdev_get_numa_id(port->netdev) == pmd->numa_id) { > - int i; > - > - for (i = 0; i < netdev_n_rxq(port->netdev); i++) { > - if ((index % n_pmds_on_numa) == pmd->index) { > - poll_list = xrealloc(poll_list, > - sizeof *poll_list * (poll_cnt + 1)); > - > - port_ref(port); > - poll_list[poll_cnt].port = port; > - poll_list[poll_cnt].rx = port->rxq[i]; > - poll_cnt++; > - } > - index++; > - } > - } > - /* Unrefs the port_try_ref(). */ > - port_unref(port); > - } > - } > - > - *ppoll_list = poll_list; > - return poll_cnt; > -} > - > static void * > pmd_thread_main(void *f_) > { > struct dp_netdev_pmd_thread *pmd = f_; > + struct rxq_poll *poll; > unsigned int lc = 0; > - struct rxq_poll *poll_list; > unsigned int port_seq = PMD_INITIAL_SEQ; > - int poll_cnt; > - int i; > - > - poll_cnt = 0; > - poll_list = NULL; > > /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ > ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); > pmd_thread_setaffinity_cpu(pmd->core_id); > reload: > - emc_cache_init(&pmd->flow_cache); > - poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt); > - > /* List port/core affinity */ > - for (i = 0; i < poll_cnt; i++) { > - VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, > netdev_get_name(poll_list[i].port->netdev)); > + LIST_FOR_EACH (poll, node, &pmd->poll_list) { > + VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, > + netdev_get_name(poll->port->netdev)); > } > > - /* Signal here to make sure the pmd finishes > - * reloading the updated configuration. */ > - dp_netdev_pmd_reload_done(pmd); > - > for (;;) { > - int i; > - > - for (i = 0; i < poll_cnt; i++) { > - dp_netdev_process_rxq_port(pmd, poll_list[i].port, > poll_list[i].rx); > + LIST_FOR_EACH (poll, node, &pmd->poll_list) { > + dp_netdev_process_rxq_port(pmd, poll->port, poll->rx); > } > > if (lc++ > 1024) { > @@ -2688,19 +2679,13 @@ reload: > } > } > > - emc_cache_uninit(&pmd->flow_cache); > + /* Synchronize with breaker thread. */ > + dp_netdev_pmd_break_done(pmd); > > - if (!latch_is_set(&pmd->exit_latch)){ > + if (!latch_is_set(&pmd->exit_latch)) { > goto reload; > } > > - for (i = 0; i < poll_cnt; i++) { > - port_unref(poll_list[i].port); > - } > - > - dp_netdev_pmd_reload_done(pmd); > - > - free(poll_list); > return NULL; > } > > @@ -2735,7 +2720,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif) > } > > void > -dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd) > +dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd) > { > ovs_mutex_lock(&pmd->cond_mutex); > xpthread_cond_signal(&pmd->cond); > @@ -2827,6 +2812,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread > *pmd, struct dp_netdev *dp, > pmd->core_id = core_id; > pmd->tx_qid = core_id_to_qid(core_id); > pmd->numa_id = numa_id; > + pmd->poll_cnt = 0; > > ovs_refcount_init(&pmd->ref_cnt); > latch_init(&pmd->exit_latch); > @@ -2836,11 +2822,9 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread > *pmd, struct dp_netdev *dp, > ovs_mutex_init(&pmd->flow_mutex); > dpcls_init(&pmd->cls); > cmap_init(&pmd->flow_table); > - /* init the 'flow_cache' since there is no > - * actual thread created for NON_PMD_CORE_ID. */ > - if (core_id == NON_PMD_CORE_ID) { > - emc_cache_init(&pmd->flow_cache); > - } > + list_init(&pmd->poll_list); > + emc_cache_init(&pmd->flow_cache); > + > cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, > &pmd->node), > hash_int(core_id, 0)); > } > @@ -2863,16 +2847,23 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread > *pmd) > static void > dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) > { > - /* Uninit the 'flow_cache' since there is > - * no actual thread uninit it for NON_PMD_CORE_ID. */ > - if (pmd->core_id == NON_PMD_CORE_ID) { > - emc_cache_uninit(&pmd->flow_cache); > - } else { > + struct rxq_poll *poll; > + > + emc_cache_uninit(&pmd->flow_cache); > + > + if (pmd->core_id != NON_PMD_CORE_ID) { > latch_set(&pmd->exit_latch); > - dp_netdev_reload_pmd__(pmd); > + dp_netdev_break_pmd__(pmd); > ovs_numa_unpin_core(pmd->core_id); > xpthread_join(pmd->thread, NULL); > } > + > + /* Unref all ports and free poll_list. */ > + LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) { > + port_unref(poll->port); > + free(poll); > + } > + > /* Purges the 'pmd''s flows after stopping the thread, but before > * destroying the flows, so that the flow stats can be collected. */ > if (dp->dp_purge_cb) { > @@ -2906,6 +2897,37 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int > numa_id) > } > } > > +static struct dp_netdev_pmd_thread * > +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id) > +{ > + int min_cnt = -1; > + struct dp_netdev_pmd_thread *pmd, *res = NULL; > + > + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { > + if (pmd->numa_id == numa_id > + && (min_cnt > pmd->poll_cnt || res == NULL)) { > + min_cnt = pmd->poll_cnt; > + res = pmd; > + } > + } > + > + return res; > +} > + > +static void > +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, > + struct dp_netdev_port *port, struct netdev_rxq *rx) > +{ > + struct rxq_poll *poll = xmalloc(sizeof *poll); > + > + port_ref(port); > + poll->port = port; > + poll->rx = rx; > + > + list_push_back(&pmd->poll_list, &poll->node); > + pmd->poll_cnt++; > +} > + > /* Checks the numa node id of 'netdev' and starts pmd threads for > * the numa node. */ > static void > @@ -2925,8 +2947,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int > numa_id) > * in which 'netdev' is on, do nothing. Else, creates the > * pmd threads for the numa node. */ > if (!n_pmds) { > - int can_have, n_unpinned, i; > + int can_have, n_unpinned, i, index = 0; > struct dp_netdev_pmd_thread **pmds; > + struct dp_netdev_port *port; > > n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id); > if (!n_unpinned) { > @@ -2944,13 +2967,20 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int > numa_id) > pmds[i] = xzalloc(sizeof **pmds); > dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id); > } > - /* The pmd thread code needs to see all the others configured pmd > - * threads on the same numa node. That's why we call > - * 'dp_netdev_configure_pmd()' on all the threads and then we > actually > - * start them. */ > + > + /* Distributes rx queues of this numa node between new pmd threads. > */ > + CMAP_FOR_EACH (port, node, &dp->ports) { > + if (netdev_is_pmd(port->netdev) > + && netdev_get_numa_id(port->netdev) == numa_id) { > + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { > + dp_netdev_add_rxq_to_pmd(pmds[index], port, > port->rxq[i]); > + index = (index + 1) % can_have; > + } > + } > + } > + > + /* Actual start of pmd threads. */ > for (i = 0; i < can_have; i++) { > - /* Each thread will distribute all devices rx-queues among > - * themselves. */ > pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, > pmds[i]); > } > free(pmds); > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev