Ping v2
On 23.09.2015 15:22, Ilya Maximets wrote: > Ping. > > On 11.09.2015 14:38, Ilya Maximets wrote: >> Currently tx_qid is equal to pmd->core_id. This leads to unexpected >> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/', >> e.g. if core_ids are not sequential, or doesn't start from 0, or both. >> >> Example: >> starting 2 pmd threads with 1 port, 2 rxqs per port, >> pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2 >> >> It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and >> txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1 >> queues). >> >> In that case, after truncating in netdev_dpdk_send__(): >> 'qid = qid % dev->real_n_txq;' >> pmd_1: qid = 2 % 2 = 0 >> pmd_2: qid = 4 % 2 = 0 >> >> So, both threads will call dpdk_queue_pkts() with same qid = 0. >> This is unexpected behavior if there is 2 tx queues in device. >> Queue #1 will not be used and both threads will lock queue #0 >> on each send. >> >> Fix that by introducing per pmd thread hash map 'tx_queues', where will >> be stored all available tx queues for that pmd thread with >> port_no as a key(hash). All tx_qid-s will be unique per port and >> sequential to prevent described unexpected mapping after truncating. >> >> Implemented infrastructure can be used in the future to choose >> between all tx queues available for that pmd thread. >> >> Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> >> --- >> version 5: >> * txqs 0 from ports of non-pmd netdevs added to all pmd threads >> >> version 4: >> * fixed distribution of tx queues if multiqueue is not supported >> >> version 3: >> * fixed failing of unit tests by adding tx queues of non >> pmd devices to non pmd thread. (they haven't been used by any thread) >> * pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues >> * function names changed to dp_netdev_* >> * dp_netdev_pmd_lookup_txq now looks by port_no. >> * removed unnecessary dp_netdev_lookup_port in dp_execute_cb >> for OVS_ACTION_ATTR_OUTPUT. >> * refactoring >> >> lib/dpif-netdev.c | 173 >> ++++++++++++++++++++++++++++++++++++++++++++++-------- >> 1 file changed, 147 insertions(+), 26 deletions(-) >> >> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c >> index db76290..65cd533 100644 >> --- a/lib/dpif-netdev.c >> +++ b/lib/dpif-netdev.c >> @@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles { >> atomic_ullong n[PMD_N_CYCLES]; >> }; >> >> +struct dp_netdev_pmd_txq { >> + struct cmap_node node; /* In owning dp_netdev_pmd_thread's */ >> + /* 'tx_queues'. */ >> + struct dp_netdev_port *port; >> + int tx_qid; >> +}; >> + >> /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate >> * the performance overhead of interrupt processing. Therefore netdev can >> * not implement rx-wait for these devices. dpif-netdev needs to poll >> @@ -427,8 +434,8 @@ struct dp_netdev_pmd_thread { >> /* threads on same numa node. */ >> unsigned core_id; /* CPU core id of this pmd thread. */ >> int numa_id; /* numa node id of this pmd thread. */ >> - int tx_qid; /* Queue id used by this pmd thread to >> - * send packets on all netdevs */ >> + struct cmap tx_queues; /* Queue ids used by this pmd thread to >> + * send packets to ports */ >> >> /* Only a pmd thread can write on its own 'cycles' and 'stats'. >> * The main thread keeps 'stats_zero' and 'cycles_zero' as base >> @@ -470,6 +477,15 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread >> *, >> >> static void dp_netdev_disable_upcall(struct dp_netdev *); >> void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); >> +static void dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread >> *pmd); >> +static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd, >> + struct dp_netdev_port *port, int >> queue_id); >> +static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd, >> + struct dp_netdev_pmd_txq *txq); >> +static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread >> *pmd); >> +static struct dp_netdev_pmd_txq * >> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd, >> + odp_port_t port_no); >> static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, >> struct dp_netdev *dp, int index, >> unsigned core_id, int numa_id); >> @@ -1051,6 +1067,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, >> const char *type, >> struct netdev_saved_flags *sf; >> struct dp_netdev_port *port; >> struct netdev *netdev; >> + struct dp_netdev_pmd_thread *non_pmd; >> enum netdev_flags flags; >> const char *open_type; >> int error; >> @@ -1127,10 +1144,15 @@ do_add_port(struct dp_netdev *dp, const char >> *devname, const char *type, >> ovs_refcount_init(&port->ref_cnt); >> cmap_insert(&dp->ports, &port->node, hash_port_no(port_no)); >> >> - if (netdev_is_pmd(netdev)) { >> - dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); >> - dp_netdev_reload_pmds(dp); >> + non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); >> + if (non_pmd) { >> + dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores()); >> + dp_netdev_pmd_unref(non_pmd); >> } >> + if (netdev_is_pmd(netdev)) >> + dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); >> + dp_netdev_reload_pmds(dp); >> + >> seq_change(dp->port_seq); >> >> return 0; >> @@ -1308,18 +1330,32 @@ static void >> do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) >> OVS_REQUIRES(dp->port_mutex) >> { >> + struct dp_netdev_pmd_thread *non_pmd; >> + >> cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no)); >> seq_change(dp->port_seq); >> + >> + non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); >> + if (non_pmd) { >> + /* There is only one txq for each port for non pmd thread */ >> + struct dp_netdev_pmd_txq *txq; >> + txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no); >> + if (OVS_LIKELY(txq)) >> + dp_netdev_pmd_del_txq(non_pmd, txq); >> + dp_netdev_pmd_unref(non_pmd); >> + } >> + >> if (netdev_is_pmd(port->netdev)) { >> int numa_id = netdev_get_numa_id(port->netdev); >> >> /* If there is no netdev on the numa node, deletes the pmd threads >> - * for that numa. Else, just reloads the queues. */ >> + * for that numa. */ >> if (!has_pmd_port_for_numa(dp, numa_id)) { >> dp_netdev_del_pmds_on_numa(dp, numa_id); >> } >> - dp_netdev_reload_pmds(dp); >> } >> + /* Reload queues of pmd threads. */ >> + dp_netdev_reload_pmds(dp); >> >> port_unref(port); >> } >> @@ -2580,6 +2616,80 @@ dpif_netdev_wait(struct dpif *dpif) >> seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); >> } >> >> +static void >> +dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd, >> + struct dp_netdev_port *port, int queue_id) >> +{ >> + if (port_try_ref(port)) { >> + struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq); >> + txq->port = port; >> + txq->tx_qid = queue_id; >> + cmap_insert(&pmd->tx_queues, &txq->node, >> + hash_port_no(port->port_no)); >> + } >> +} >> + >> +/* Configures tx_queues for non pmd thread. */ >> +static void >> +dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd) >> +{ >> + if (!cmap_is_empty(&pmd->tx_queues)) >> + dp_netdev_pmd_detach_tx_queues(pmd); >> + >> + struct dp_netdev_port *port; >> + CMAP_FOR_EACH (port, node, &pmd->dp->ports) { >> + dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores()); >> + } >> +} >> + >> +static void >> +dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd, >> + struct dp_netdev_pmd_txq *txq) >> +{ >> + cmap_remove(&pmd->tx_queues, &txq->node, >> + hash_port_no(txq->port->port_no)); >> + port_unref(txq->port); >> + free(txq); >> +} >> + >> +/* Removes all queues from 'tx_queues' of pmd thread. */ >> +static void >> +dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd) >> +{ >> + struct dp_netdev_pmd_txq *txq; >> + >> + CMAP_FOR_EACH (txq, node, &pmd->tx_queues) { >> + dp_netdev_pmd_del_txq(pmd, txq); >> + } >> +} >> + >> +static void OVS_UNUSED >> +dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd) >> +{ >> + struct dp_netdev_pmd_txq *txq; >> + >> + CMAP_FOR_EACH (txq, node, &pmd->tx_queues) { >> + VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n", >> + pmd->core_id, netdev_get_name(txq->port->netdev), >> + txq->tx_qid); >> + } >> +} >> + >> +static struct dp_netdev_pmd_txq * >> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd, >> + odp_port_t port_no) >> +{ >> + struct dp_netdev_pmd_txq *txq; >> + >> + CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no), >> + &pmd->tx_queues) { >> + if (txq->port->port_no == port_no) { >> + return txq; >> + } >> + } >> + return NULL; >> +} >> + >> struct rxq_poll { >> struct dp_netdev_port *port; >> struct netdev_rxq *rx; >> @@ -2591,16 +2701,19 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd, >> { >> struct rxq_poll *poll_list = *ppoll_list; >> struct dp_netdev_port *port; >> - int n_pmds_on_numa, index, i; >> + int n_pmds_on_numa, rx_index, tx_index, i, n_txq; >> >> /* Simple scheduler for netdev rx polling. */ >> + dp_netdev_pmd_detach_tx_queues(pmd); >> + >> for (i = 0; i < poll_cnt; i++) { >> port_unref(poll_list[i].port); >> } >> >> poll_cnt = 0; >> n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); >> - index = 0; >> + rx_index = 0; >> + tx_index = 0; >> >> CMAP_FOR_EACH (port, node, &pmd->dp->ports) { >> /* Calls port_try_ref() to prevent the main thread >> @@ -2611,7 +2724,7 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd, >> int i; >> >> for (i = 0; i < netdev_n_rxq(port->netdev); i++) { >> - if ((index % n_pmds_on_numa) == pmd->index) { >> + if ((rx_index % n_pmds_on_numa) == pmd->index) { >> poll_list = xrealloc(poll_list, >> sizeof *poll_list * (poll_cnt + 1)); >> >> @@ -2620,9 +2733,20 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd, >> poll_list[poll_cnt].rx = port->rxq[i]; >> poll_cnt++; >> } >> - index++; >> + rx_index++; >> } >> + >> + } >> + >> + n_txq = netdev_n_txq(port->netdev); >> + /* Last queue reserved for non pmd threads */ >> + n_txq = n_txq == 1 ? 1 : n_txq - 1; >> + for (i = 0; i < n_txq; i++) { >> + if ((tx_index % n_pmds_on_numa) == pmd->index || n_txq == 1) >> + dp_netdev_pmd_add_txq(pmd, port, i); >> + tx_index++; >> } >> + >> /* Unrefs the port_try_ref(). */ >> port_unref(port); >> } >> @@ -2691,6 +2815,8 @@ reload: >> goto reload; >> } >> >> + dp_netdev_pmd_detach_tx_queues(pmd); >> + >> for (i = 0; i < poll_cnt; i++) { >> port_unref(poll_list[i].port); >> } >> @@ -2804,16 +2930,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct >> cmap_position *pos) >> return next; >> } >> >> -static int >> -core_id_to_qid(unsigned core_id) >> -{ >> - if (core_id != NON_PMD_CORE_ID) { >> - return core_id; >> - } else { >> - return ovs_numa_get_n_cores(); >> - } >> -} >> - >> /* Configures the 'pmd' based on the input argument. */ >> static void >> dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev >> *dp, >> @@ -2822,7 +2938,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread >> *pmd, struct dp_netdev *dp, >> pmd->dp = dp; >> pmd->index = index; >> pmd->core_id = core_id; >> - pmd->tx_qid = core_id_to_qid(core_id); >> pmd->numa_id = numa_id; >> >> ovs_refcount_init(&pmd->ref_cnt); >> @@ -2833,9 +2948,11 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread >> *pmd, struct dp_netdev *dp, >> ovs_mutex_init(&pmd->flow_mutex); >> dpcls_init(&pmd->cls); >> cmap_init(&pmd->flow_table); >> - /* init the 'flow_cache' since there is no >> + cmap_init(&pmd->tx_queues); >> + /* init the 'flow_cache' and 'tx_queues' since there is no >> * actual thread created for NON_PMD_CORE_ID. */ >> if (core_id == NON_PMD_CORE_ID) { >> + dp_netdev_configure_non_pmd_txqs(pmd); >> emc_cache_init(&pmd->flow_cache); >> } >> cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, >> &pmd->node), >> @@ -2848,6 +2965,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd) >> dp_netdev_pmd_flow_flush(pmd); >> dpcls_destroy(&pmd->cls); >> cmap_destroy(&pmd->flow_table); >> + cmap_destroy(&pmd->tx_queues); >> ovs_mutex_destroy(&pmd->flow_mutex); >> latch_destroy(&pmd->exit_latch); >> xpthread_cond_destroy(&pmd->cond); >> @@ -2864,6 +2982,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct >> dp_netdev_pmd_thread *pmd) >> * no actual thread uninit it for NON_PMD_CORE_ID. */ >> if (pmd->core_id == NON_PMD_CORE_ID) { >> emc_cache_uninit(&pmd->flow_cache); >> + dp_netdev_pmd_detach_tx_queues(pmd); >> } else { >> latch_set(&pmd->exit_latch); >> dp_netdev_reload_pmd__(pmd); >> @@ -3473,13 +3592,15 @@ dp_execute_cb(void *aux_, struct dp_packet >> **packets, int cnt, >> struct dp_netdev *dp = pmd->dp; >> int type = nl_attr_type(a); >> struct dp_netdev_port *p; >> + struct dp_netdev_pmd_txq *txq; >> int i; >> >> switch ((enum ovs_action_attr)type) { >> case OVS_ACTION_ATTR_OUTPUT: >> - p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a))); >> - if (OVS_LIKELY(p)) { >> - netdev_send(p->netdev, pmd->tx_qid, packets, cnt, may_steal); >> + txq = dp_netdev_pmd_lookup_txq(pmd, u32_to_odp(nl_attr_get_u32(a))); >> + if (OVS_LIKELY(txq)) { >> + netdev_send(txq->port->netdev, txq->tx_qid, >> + packets, cnt, may_steal); >> return; >> } >> break; >> _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev