A future commit will stop using RCU for 'dp->ports' and use a mutex for reading/writing them. To avoid taking a mutex in dp_execute_cb(), which is called in the fast path, this commit introduces a pmd thread local cache of ports.
The downside is that every port add/remove now needs to synchronize with every pmd thread. Among the advantages, keeping a per thread port mapping could allow greater control over the txq assigment. Signed-off-by: Daniele Di Proietto <diproiet...@vmware.com> --- lib/dpif-netdev.c | 249 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 179 insertions(+), 70 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index cedaf39..bd2249e 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -184,6 +184,7 @@ static bool dpcls_lookup(const struct dpcls *cls, * * dp_netdev_mutex (global) * port_mutex + * non_pmd_mutex */ struct dp_netdev { const struct dpif_class *const class; @@ -379,6 +380,13 @@ struct rxq_poll { struct ovs_list node; }; +/* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */ +struct tx_port { + odp_port_t port_no; + struct netdev *netdev; + struct hmap_node node; +}; + /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate * the performance overhead of interrupt processing. Therefore netdev can * not implement rx-wait for these devices. dpif-netdev needs to poll @@ -405,8 +413,8 @@ struct dp_netdev_pmd_thread { /* Per thread exact-match cache. Note, the instance for cpu core * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly - * need to be protected (e.g. by 'dp_netdev_mutex'). All other - * instances will only be accessed by its own pmd thread. */ + * need to be protected by 'non_pmd_mutex'. Every other instance + * will only be accessed by its own pmd thread. */ struct emc_cache flow_cache; /* Classifier and Flow-Table. @@ -435,10 +443,20 @@ struct dp_netdev_pmd_thread { atomic_int tx_qid; /* Queue id used by this pmd thread to * send packets on all netdevs */ - struct ovs_mutex poll_mutex; /* Mutex for poll_list. */ + struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */ /* List of rx queues to poll. */ struct ovs_list poll_list OVS_GUARDED; - int poll_cnt; /* Number of elemints in poll_list. */ + /* Number of elements in 'poll_list' */ + int poll_cnt; + /* Map of 'tx_port's used for transmission. Written by the main thread, + * read by the pmd thread. */ + struct hmap tx_ports OVS_GUARDED; + + /* Map of 'tx_port' used in the fast path. This is a thread-local copy of + * 'tx_ports'. The instance for cpu core NON_PMD_CORE_ID can be accessed + * by multiple threads, and thusly need to be protected by 'non_pmd_mutex'. + * Every other instance will only be accessed by its own pmd thread. */ + struct hmap port_cache; /* Only a pmd thread can write on its own 'cycles' and 'stats'. * The main thread keeps 'stats_zero' and 'cycles_zero' as base @@ -494,7 +512,7 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos); static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); -static void dp_netdev_pmd_clear_poll_list(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp, struct dp_netdev_port *port); static void @@ -508,6 +526,8 @@ static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd); static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd); +static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd) + OVS_REQUIRES(pmd->port_mutex); static inline bool emc_entry_alive(struct emc_entry *ce); static void emc_clear_entry(struct emc_entry *ce); @@ -690,7 +710,7 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd) ds_put_format(reply, "pmd thread numa_id %d core_id %u:\n", pmd->numa_id, pmd->core_id); - ovs_mutex_lock(&pmd->poll_mutex); + ovs_mutex_lock(&pmd->port_mutex); LIST_FOR_EACH (poll, node, &pmd->poll_list) { const char *name = netdev_get_name(poll->port->netdev); @@ -704,7 +724,7 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd) ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx)); prev_name = name; } - ovs_mutex_unlock(&pmd->poll_mutex); + ovs_mutex_unlock(&pmd->port_mutex); ds_put_cstr(reply, "\n"); } } @@ -1077,6 +1097,11 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) int old_seq; if (pmd->core_id == NON_PMD_CORE_ID) { + ovs_mutex_lock(&pmd->dp->non_pmd_mutex); + ovs_mutex_lock(&pmd->port_mutex); + pmd_load_cached_ports(pmd); + ovs_mutex_unlock(&pmd->port_mutex); + ovs_mutex_unlock(&pmd->dp->non_pmd_mutex); return; } @@ -1199,9 +1224,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, cmap_insert(&dp->ports, &port->node, hash_port_no(port_no)); - if (netdev_is_pmd(port->netdev)) { - dp_netdev_add_port_to_pmds(dp, port); - } + dp_netdev_add_port_to_pmds(dp, port); seq_change(dp->port_seq); return 0; @@ -1370,6 +1393,9 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) { cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no)); seq_change(dp->port_seq); + + dp_netdev_del_port_from_all_pmds(dp, port); + if (netdev_is_pmd(port->netdev)) { int numa_id = netdev_get_numa_id(port->netdev); @@ -1379,8 +1405,6 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) * for that numa. Else, deletes the queues from polling lists. */ if (!has_pmd_port_for_numa(dp, numa_id)) { dp_netdev_del_pmds_on_numa(dp, numa_id); - } else { - dp_netdev_del_port_from_all_pmds(dp, port); } } @@ -2377,7 +2401,6 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) * the 'non_pmd_mutex'. */ if (pmd->core_id == NON_PMD_CORE_ID) { ovs_mutex_lock(&dp->non_pmd_mutex); - ovs_mutex_lock(&dp->port_mutex); } pp = execute->packet; @@ -2385,7 +2408,6 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) execute->actions_len); if (pmd->core_id == NON_PMD_CORE_ID) { dp_netdev_pmd_unref(pmd); - ovs_mutex_unlock(&dp->port_mutex); ovs_mutex_unlock(&dp->non_pmd_mutex); } @@ -2649,21 +2671,53 @@ dpif_netdev_wait(struct dpif *dpif) seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); } +static void +pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd) +{ + struct tx_port *tx_port_cached; + + HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) { + free(tx_port_cached); + } +} + +/* Copies ports from 'pmd->tx_ports' (shared with the main thread) to + * 'pmd->port_cache' (thread local) */ +static void +pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd) + OVS_REQUIRES(pmd->port_mutex) +{ + struct tx_port *tx_port, *tx_port_cached; + + pmd_free_cached_ports(pmd); + hmap_shrink(&pmd->port_cache); + + HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) { + tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached); + hmap_insert(&pmd->port_cache, &tx_port_cached->node, + hash_port_no(tx_port_cached->port_no)); + } +} + static int -pmd_load_queues(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **ppoll_list) +pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd, + struct rxq_poll **ppoll_list) { struct rxq_poll *poll_list = *ppoll_list; struct rxq_poll *poll; int i; - ovs_mutex_lock(&pmd->poll_mutex); + ovs_mutex_lock(&pmd->port_mutex); poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list); i = 0; LIST_FOR_EACH (poll, node, &pmd->poll_list) { poll_list[i++] = *poll; } - ovs_mutex_unlock(&pmd->poll_mutex); + + pmd_load_cached_ports(pmd); + + ovs_mutex_unlock(&pmd->port_mutex); *ppoll_list = poll_list; return i; @@ -2686,7 +2740,7 @@ pmd_thread_main(void *f_) /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); pmd_thread_setaffinity_cpu(pmd->core_id); - poll_cnt = pmd_load_queues(pmd, &poll_list); + poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list); reload: emc_cache_init(&pmd->flow_cache); @@ -2719,7 +2773,7 @@ reload: } } - poll_cnt = pmd_load_queues(pmd, &poll_list); + poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list); exiting = latch_is_set(&pmd->exit_latch); /* Signal here to make sure the pmd finishes * reloading the updated configuration. */ @@ -2732,6 +2786,7 @@ reload: } free(poll_list); + pmd_free_cached_ports(pmd); return NULL; } @@ -2858,10 +2913,12 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, xpthread_cond_init(&pmd->cond, NULL); ovs_mutex_init(&pmd->cond_mutex); ovs_mutex_init(&pmd->flow_mutex); - ovs_mutex_init(&pmd->poll_mutex); + ovs_mutex_init(&pmd->port_mutex); dpcls_init(&pmd->cls); cmap_init(&pmd->flow_table); ovs_list_init(&pmd->poll_list); + hmap_init(&pmd->tx_ports); + hmap_init(&pmd->port_cache); /* init the 'flow_cache' since there is no * actual thread created for NON_PMD_CORE_ID. */ if (core_id == NON_PMD_CORE_ID) { @@ -2876,12 +2933,14 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd) { dp_netdev_pmd_flow_flush(pmd); dpcls_destroy(&pmd->cls); + hmap_destroy(&pmd->port_cache); + hmap_destroy(&pmd->tx_ports); cmap_destroy(&pmd->flow_table); ovs_mutex_destroy(&pmd->flow_mutex); latch_destroy(&pmd->exit_latch); xpthread_cond_destroy(&pmd->cond); ovs_mutex_destroy(&pmd->cond_mutex); - ovs_mutex_destroy(&pmd->poll_mutex); + ovs_mutex_destroy(&pmd->port_mutex); free(pmd); } @@ -2890,10 +2949,11 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd) static void dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) { - /* Uninit the 'flow_cache' since there is - * no actual thread uninit it for NON_PMD_CORE_ID. */ + /* NON_PMD_CORE_ID doesn't have a thread, so we don't have to synchronize, + * but extra cleanup is necessary */ if (pmd->core_id == NON_PMD_CORE_ID) { emc_cache_uninit(&pmd->flow_cache); + pmd_free_cached_ports(pmd); } else { latch_set(&pmd->exit_latch); dp_netdev_reload_pmd__(pmd); @@ -2901,8 +2961,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd) xpthread_join(pmd->thread, NULL); } - /* Unref all ports and free poll_list. */ - dp_netdev_pmd_clear_poll_list(pmd); + dp_netdev_pmd_clear_ports(pmd); /* Purges the 'pmd''s flows after stopping the thread, but before * destroying the flows, so that the flow stats can be collected. */ @@ -2985,30 +3044,51 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) free(free_idx); } -/* Deletes all rx queues from pmd->poll_list. */ +/* Deletes all rx queues from pmd->poll_list and all the ports from + * pmd->tx_ports. */ static void -dp_netdev_pmd_clear_poll_list(struct dp_netdev_pmd_thread *pmd) +dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd) { struct rxq_poll *poll; + struct tx_port *port; - ovs_mutex_lock(&pmd->poll_mutex); + ovs_mutex_lock(&pmd->port_mutex); LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) { free(poll); } pmd->poll_cnt = 0; - ovs_mutex_unlock(&pmd->poll_mutex); + HMAP_FOR_EACH_POP (port, node, &pmd->tx_ports) { + free(port); + } + ovs_mutex_unlock(&pmd->port_mutex); } -/* Deletes all rx queues of 'port' from poll_list of pmd thread. Returns true - * if 'port' was found in 'pmd' (therefore a restart is required). */ +static struct tx_port * +tx_port_lookup(const struct hmap *hmap, odp_port_t port_no) +{ + struct tx_port *tx; + + HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) { + if (tx->port_no == port_no) { + return tx; + } + } + + return NULL; +} + +/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from + * 'tx_ports' of 'pmd' thread. Returns true if 'port' was found in 'pmd' + * (therefore a restart is required). */ static bool dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port, struct dp_netdev_pmd_thread *pmd) { struct rxq_poll *poll, *next; + struct tx_port *tx; bool found = false; - ovs_mutex_lock(&pmd->poll_mutex); + ovs_mutex_lock(&pmd->port_mutex); LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) { if (poll->port == port) { found = true; @@ -3017,36 +3097,41 @@ dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port, free(poll); } } - ovs_mutex_unlock(&pmd->poll_mutex); + + tx = tx_port_lookup(&pmd->tx_ports, port->port_no); + if (tx) { + hmap_remove(&pmd->tx_ports, &tx->node); + free(tx); + found = true; + } + ovs_mutex_unlock(&pmd->port_mutex); return found; } -/* Deletes all rx queues of 'port' from all pmd threads. The pmd threads that - * need to be restarted are inserted in 'to_reload'. */ +/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd + * threads. The pmd threads that need to be restarted are inserted in + * 'to_reload'. */ static void dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port, struct hmapx *to_reload) { - int numa_id = netdev_get_numa_id(port->netdev); struct dp_netdev_pmd_thread *pmd; CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { - if (pmd->numa_id == numa_id) { - bool found; + bool found; - found = dp_netdev_del_port_from_pmd__(port, pmd); + found = dp_netdev_del_port_from_pmd__(port, pmd); - if (found) { - hmapx_add(to_reload, pmd); - } - } + if (found) { + hmapx_add(to_reload, pmd); + } } } -/* Deletes all rx queues of 'port' from all pmd threads of dp and - * reloads them if needed. */ +/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd + * threads. Reloads the threads if needed. */ static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp, struct dp_netdev_port *port) @@ -3090,7 +3175,7 @@ dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id) static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev_port *port, struct netdev_rxq *rx) - OVS_REQUIRES(pmd->poll_mutex) + OVS_REQUIRES(pmd->port_mutex) { struct rxq_poll *poll = xmalloc(sizeof *poll); @@ -3101,8 +3186,9 @@ dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, pmd->poll_cnt++; } -/* Distributes all rx queues of 'port' between all PMD threads in 'dp'. The - * pmd threads that need to be restarted are inserted in 'to_reload'. */ +/* Distributes all rx queues of 'port' between all PMD threads in 'dp' and + * inserts 'port' in the PMD threads 'tx_ports'. The pmd threads that need to + * be restarted are inserted in 'to_reload'. */ static void dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port, struct hmapx *to_reload) @@ -3111,27 +3197,41 @@ dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port, struct dp_netdev_pmd_thread *pmd; int i; - /* Cannot create pmd threads for invalid numa node. */ - ovs_assert(ovs_numa_numa_id_is_valid(numa_id)); - dp_netdev_set_pmds_on_numa(dp, numa_id); + if (netdev_is_pmd(port->netdev)) { + /* Cannot create pmd threads for invalid numa node. */ + ovs_assert(ovs_numa_numa_id_is_valid(numa_id)); + dp_netdev_set_pmds_on_numa(dp, numa_id); - for (i = 0; i < port->n_rxq; i++) { - pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id); - if (!pmd) { - VLOG_WARN("There's no pmd thread on numa node %d", numa_id); - break; + for (i = 0; i < port->n_rxq; i++) { + pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id); + if (!pmd) { + VLOG_WARN("There's no pmd thread on numa node %d", numa_id); + break; + } + + ovs_mutex_lock(&pmd->port_mutex); + dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]); + ovs_mutex_unlock(&pmd->port_mutex); + + hmapx_add(to_reload, pmd); } + } + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + struct tx_port *tx = xzalloc(sizeof *tx); - ovs_mutex_lock(&pmd->poll_mutex); - dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]); - ovs_mutex_unlock(&pmd->poll_mutex); + tx->netdev = port->netdev; + tx->port_no = port->port_no; + ovs_mutex_lock(&pmd->port_mutex); + hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no)); + ovs_mutex_unlock(&pmd->port_mutex); hmapx_add(to_reload, pmd); } } -/* Distributes all rx queues of 'port' between all PMD threads in 'dp' and - * reloads them, if needed. */ +/* Distributes all rx queues of 'port' between all PMD threads in 'dp', inserts + * 'port' in the PMD threads 'tx_ports' and reloads them, if needed. */ static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port) { @@ -3705,6 +3805,13 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb, dp->upcall_cb = cb; } +static struct tx_port * +pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd, + odp_port_t port_no) +{ + return tx_port_lookup(&pmd->port_cache, port_no); +} + static void dp_netdev_drop_packets(struct dp_packet **packets, int cnt, bool may_steal) { @@ -3718,16 +3825,16 @@ dp_netdev_drop_packets(struct dp_packet **packets, int cnt, bool may_steal) } static int -push_tnl_action(const struct dp_netdev *dp, - const struct nlattr *attr, - struct dp_packet **packets, int cnt) +push_tnl_action(const struct dp_netdev_pmd_thread *pmd, + const struct nlattr *attr, + struct dp_packet **packets, int cnt) { - struct dp_netdev_port *tun_port; + struct tx_port *tun_port; const struct ovs_action_push_tnl *data; data = nl_attr_get(attr); - tun_port = dp_netdev_lookup_port(dp, u32_to_odp(data->tnl_port)); + tun_port = pmd_tx_port_cache_lookup(pmd, u32_to_odp(data->tnl_port)); if (!tun_port) { return -EINVAL; } @@ -3757,12 +3864,12 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt, struct dp_netdev_pmd_thread *pmd = aux->pmd; struct dp_netdev *dp = pmd->dp; int type = nl_attr_type(a); - struct dp_netdev_port *p; + struct tx_port *p; int i; switch ((enum ovs_action_attr)type) { case OVS_ACTION_ATTR_OUTPUT: - p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a))); + p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a))); if (OVS_LIKELY(p)) { int tx_qid; @@ -3783,7 +3890,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt, packets = tnl_pkt; } - err = push_tnl_action(dp, a, packets, cnt); + err = push_tnl_action(pmd, a, packets, cnt); if (!err) { (*depth)++; dp_netdev_recirculate(pmd, packets, cnt); @@ -3799,7 +3906,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt, if (*depth < MAX_RECIRC_DEPTH) { odp_port_t portno = u32_to_odp(nl_attr_get_u32(a)); - p = dp_netdev_lookup_port(dp, portno); + p = pmd_tx_port_cache_lookup(pmd, portno); if (p) { struct dp_packet *tnl_pkt[NETDEV_MAX_BURST]; int err; @@ -4002,12 +4109,14 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED, /* Remove old port. */ cmap_remove(&dp->ports, &old_port->node, hash_port_no(old_port->port_no)); + dp_netdev_del_port_from_all_pmds(dp, old_port); ovsrcu_postpone(free, old_port); /* Insert new port (cmap semantics mean we cannot re-insert 'old_port'). */ new_port = xmemdup(old_port, sizeof *old_port); new_port->port_no = port_no; cmap_insert(&dp->ports, &new_port->node, hash_port_no(port_no)); + dp_netdev_add_port_to_pmds(dp, new_port); seq_change(dp->port_seq); unixctl_command_reply(conn, NULL); -- 2.1.4 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev