The pmd threads are pinned to available cpu cores on the corresponding cpu socket. Note, core 0 is not pinnable and is reserved for all non-pmd threads.
Signed-off-by: Alex Wang <al...@nicira.com> --- lib/dpif-netdev.c | 254 +++++++++++++++++++++++++++++++++++++++++------------ lib/dpif-netdev.h | 2 +- lib/netdev-dpdk.c | 40 ++++++--- lib/netdev-dpdk.h | 15 ++++ 4 files changed, 244 insertions(+), 67 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index c637d9f..14784bf 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -52,6 +52,7 @@ #include "odp-util.h" #include "ofp-print.h" #include "ofpbuf.h" +#include "ovs-numa.h" #include "ovs-rcu.h" #include "packet-dpif.h" #include "packets.h" @@ -71,6 +72,7 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev); #define NETDEV_RULE_PRIORITY 0x8000 #define FLOW_DUMP_MAX_BATCH 50 + /* Use per thread recirc_depth to prevent recirculation loop. */ #define MAX_RECIRC_DEPTH 5 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) @@ -142,11 +144,9 @@ struct dp_netdev { struct fat_rwlock upcall_rwlock; exec_upcall_cb *upcall_cb; /* Callback function for executing upcalls. */ - /* Forwarding threads. */ - struct latch exit_latch; - struct pmd_thread *pmd_threads; - size_t n_pmd_threads; - int pmd_count; + /* Per-cpu-socket struct for configuring pmd threads. */ + struct pmd_socket *pmd_sockets; + int n_pmd_sockets; }; static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp, @@ -281,6 +281,15 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions( const struct dp_netdev_flow *); static void dp_netdev_actions_free(struct dp_netdev_actions *); +/* Represents the PMD configuration on a cpu socket. */ +struct pmd_socket { + struct dp_netdev *dp; + struct latch exit_latch; + struct pmd_thread *pmd_threads; + int socket_id; + int n_pmd_threads; +}; + /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate * the performance overhead of interrupt processing. Therefore netdev can * not implement rx-wait for these devices. dpif-netdev needs to poll @@ -293,9 +302,10 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *); * table, and executes the actions it finds. **/ struct pmd_thread { - struct dp_netdev *dp; + struct pmd_socket *socket; pthread_t thread; - int id; + int index; + int core_id; atomic_uint change_seq; }; @@ -335,8 +345,10 @@ static void dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, odp_port_t port_no); -static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); static void dp_netdev_disable_upcall(struct dp_netdev *); +static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *); +static void dp_netdev_unset_pmd_threads(struct dp_netdev *, int socket_id); +static void dp_netdev_set_pmd_threads(struct dp_netdev *, int socket_id); static struct dpif_netdev * dpif_netdev_cast(const struct dpif *dpif) @@ -450,7 +462,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class, OVS_REQUIRES(dp_netdev_mutex) { struct dp_netdev *dp; - int error; + int n_sockets, error; dp = xzalloc(sizeof *dp); shash_add(&dp_netdevs, name, dp); @@ -469,13 +481,32 @@ create_dp_netdev(const char *name, const struct dpif_class *class, ovs_mutex_init(&dp->port_mutex); cmap_init(&dp->ports); dp->port_seq = seq_create(); - latch_init(&dp->exit_latch); fat_rwlock_init(&dp->upcall_rwlock); /* Disable upcalls by default. */ dp_netdev_disable_upcall(dp); dp->upcall_cb = NULL; + /* Reserves the core 0 for main thread. */ + ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID); + + /* Creates 'pmd_socket' struct for each cpu socket. */ + n_sockets = ovs_numa_get_n_sockets(); + if (n_sockets != OVS_SOCKET_UNSPEC) { + int i; + + dp->n_pmd_sockets = n_sockets; + dp->pmd_sockets = xzalloc(dp->n_pmd_sockets * sizeof *dp->pmd_sockets); + for (i = 0; i < dp->n_pmd_sockets; i++) { + dp->pmd_sockets[i].dp = dp; + dp->pmd_sockets[i].socket_id = i; + latch_init(&dp->pmd_sockets[i].exit_latch); + } + } else { + dp->n_pmd_sockets = 0; + dp->pmd_sockets = NULL; + } + ovs_mutex_lock(&dp->port_mutex); error = do_add_port(dp, name, "internal", ODPP_LOCAL); ovs_mutex_unlock(&dp->port_mutex); @@ -525,8 +556,8 @@ dp_netdev_free(struct dp_netdev *dp) shash_find_and_delete(&dp_netdevs, dp->name); - dp_netdev_set_pmd_threads(dp, 0); - free(dp->pmd_threads); + dp_netdev_destroy_all_pmd_sockets(dp); + free(dp->pmd_sockets); dp_netdev_flow_flush(dp); ovs_mutex_lock(&dp->port_mutex); @@ -547,7 +578,6 @@ dp_netdev_free(struct dp_netdev *dp) seq_destroy(dp->port_seq); cmap_destroy(&dp->ports); fat_rwlock_destroy(&dp->upcall_rwlock); - latch_destroy(&dp->exit_latch); free(CONST_CAST(char *, dp->name)); free(dp); } @@ -618,12 +648,17 @@ dp_netdev_reload_pmd_threads(struct dp_netdev *dp) { int i; - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; - int id; + for (i = 0; i < dp->n_pmd_sockets; i++) { + struct pmd_socket *s = &dp->pmd_sockets[i]; + int j; - atomic_add(&f->change_seq, 1, &id); - } + for (j = 0; j < s->n_pmd_threads; j++) { + struct pmd_thread *f = &s->pmd_threads[j]; + int id; + + atomic_add(&f->change_seq, 1, &id); + } + } } static uint32_t @@ -690,10 +725,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, } port->sf = sf; - if (netdev_is_pmd(netdev)) { - dp->pmd_count++; - dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS); - dp_netdev_reload_pmd_threads(dp); + if (netdev_is_pmd(netdev) && dp->pmd_sockets) { + dp_netdev_set_pmd_threads(dp, netdev_dpdk_get_socket_id(netdev)); } ovs_refcount_init(&port->ref_cnt); @@ -834,6 +867,23 @@ get_port_by_name(struct dp_netdev *dp, return ENOENT; } +/* Returns 'true' if there is a port with pmd netdev and the netdev + * is on cpu socket 'socket_id'. */ +static bool +has_pmd_port_for_socket(struct dp_netdev *dp, int socket_id) +{ + struct dp_netdev_port *port; + + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_dpdk_get_socket_id(port->netdev) == socket_id) { + return true; + } + } + + return false; +} + static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) OVS_REQUIRES(dp->port_mutex) @@ -841,7 +891,15 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no)); seq_change(dp->port_seq); if (netdev_is_pmd(port->netdev)) { - dp_netdev_reload_pmd_threads(dp); + int socket_id = netdev_dpdk_get_socket_id(port->netdev); + + /* If there is no netdev on the cpu socket, unsets the pmd threads + * for that cpu socket. Else, just reloads the queues. */ + if (!has_pmd_port_for_socket(dp, socket_id)) { + dp_netdev_unset_pmd_threads(dp, socket_id); + } else { + dp_netdev_reload_pmd_threads(dp); + } } port_unref(port); @@ -1614,14 +1672,17 @@ struct rxq_poll { struct netdev_rxq *rx; }; +struct non_local_pmd_dev { + struct netdev *dev; +}; + static int pmd_load_queues(struct pmd_thread *f, struct rxq_poll **ppoll_list, int poll_cnt) { - struct dp_netdev *dp = f->dp; + struct pmd_socket *s = f->socket; struct rxq_poll *poll_list = *ppoll_list; struct dp_netdev_port *port; - int id = f->id; int index; int i; @@ -1633,12 +1694,13 @@ pmd_load_queues(struct pmd_thread *f, poll_cnt = 0; index = 0; - CMAP_FOR_EACH (port, node, &f->dp->ports) { - if (netdev_is_pmd(port->netdev)) { + CMAP_FOR_EACH (port, node, &s->dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_dpdk_get_socket_id(port->netdev) == s->socket_id) { int i; for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - if ((index % dp->n_pmd_threads) == id) { + if ((index % s->n_pmd_threads) == f->index) { poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1)); port_ref(port); @@ -1655,23 +1717,56 @@ pmd_load_queues(struct pmd_thread *f, return poll_cnt; } +static int +pmd_get_non_local_pmd_dev(struct pmd_thread *f, + struct non_local_pmd_dev **pdev_list, int dev_cnt) +{ + struct pmd_socket *s = f->socket; + struct non_local_pmd_dev *dev_list = *pdev_list; + struct dp_netdev_port *port; + int i; + + for (i = 0; i < dev_cnt; i++) { + netdev_close(dev_list[i].dev); + } + + dev_cnt = 0; + + CMAP_FOR_EACH (port, node, &s->dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_dpdk_get_socket_id(port->netdev) != s->socket_id) { + dev_list = xrealloc(dev_list, sizeof *dev_list * (dev_cnt + 1)); + netdev_ref(port->netdev); + dev_list[dev_cnt].dev = port->netdev; + dev_cnt++; + } + } + + *pdev_list = dev_list; + return dev_cnt; +} + static void * pmd_thread_main(void *f_) { struct pmd_thread *f = f_; - struct dp_netdev *dp = f->dp; + struct dp_netdev *dp = f->socket->dp; unsigned int lc = 0; struct rxq_poll *poll_list; + struct non_local_pmd_dev *dev_list; unsigned int port_seq; - int poll_cnt; + int poll_cnt, dev_cnt; int i; poll_cnt = 0; + dev_cnt = 0; poll_list = NULL; + dev_list = NULL; - pmd_thread_setaffinity_cpu(f->id); + pmd_thread_setaffinity_cpu(f->core_id); reload: poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt); + dev_cnt = pmd_get_non_local_pmd_dev(f, &dev_list, dev_cnt); atomic_read(&f->change_seq, &port_seq); for (;;) { @@ -1682,6 +1777,10 @@ reload: dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx); } + for (i = 0; i < dev_cnt; i++) { + netdev_dpdk_flush_non_local(dev_list[i].dev, f->core_id); + } + if (lc++ > 1024) { ovsrcu_quiesce(); @@ -1696,7 +1795,7 @@ reload: } } - if (!latch_is_set(&f->dp->exit_latch)){ + if (!latch_is_set(&f->socket->exit_latch)){ goto reload; } @@ -1739,40 +1838,87 @@ dpif_netdev_enable_upcall(struct dpif *dpif) } static void -dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) +dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp) { int i; - if (n == dp->n_pmd_threads) { - return; + /* Exits all existing pmd threads. */ + for (i = 0; i < dp->n_pmd_sockets; i++) { + struct pmd_socket *s = &dp->pmd_sockets[i]; + + dp_netdev_unset_pmd_threads(dp, i); + latch_destroy(&s->exit_latch); } +} - /* Stop existing threads. */ - latch_set(&dp->exit_latch); - dp_netdev_reload_pmd_threads(dp); - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; +/* Deletes all pmd threads on cpu socket 'socket_id'. */ +static void +dp_netdev_unset_pmd_threads(struct dp_netdev *dp, int socket_id) +{ + struct pmd_socket *s = &dp->pmd_sockets[socket_id]; + int i; + + if (s->n_pmd_threads) { + latch_set(&s->exit_latch); + dp_netdev_reload_pmd_threads(dp); + for (i = 0; i < s->n_pmd_threads; i++) { + struct pmd_thread *f = &s->pmd_threads[i]; - xpthread_join(f->thread, NULL); + ovs_numa_unpin_core(f->core_id); + xpthread_join(f->thread, NULL); + } + latch_poll(&s->exit_latch); + free(s->pmd_threads); + s->pmd_threads = NULL; + s->n_pmd_threads = 0; } - latch_poll(&dp->exit_latch); - free(dp->pmd_threads); +} + +/* Checks the cpu socket id of 'netdev' and starts pmd threads for + * the cpu socket. */ +static void +dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id) +{ + struct pmd_socket *s = &dp->pmd_sockets[socket_id]; + + ovs_assert(ovs_numa_get_n_sockets() != OVS_SOCKET_UNSPEC + && ovs_numa_get_n_cores() != OVS_CORE_UNSPEC); + + /* If there are already pmd threads create for the cpu socket + * in which 'netdev' is on, do nothing. Else, creates the + * pmd threads for the socket. */ + if (!s->n_pmd_threads) { + int n_devs = netdev_dpdk_get_n_devs_on_socket(socket_id); + int n_unpinned_cores = + ovs_numa_get_n_unpinned_cores_on_socket(socket_id); + int n_threads, i; + + if (!n_unpinned_cores) { + VLOG_ERR("Cannot create pmd threads due to out of unpinned " + "cores on socket"); + return; + } - /* Start new threads. */ - dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads); - dp->n_pmd_threads = n; + /* Starts new pmd threads on the cpu socket. */ + n_threads = MIN(n_devs, n_unpinned_cores); + s->pmd_threads = xmalloc(n_threads * sizeof *s->pmd_threads); + s->n_pmd_threads = n_threads; - for (i = 0; i < n; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; + for (i = 0; i < s->n_pmd_threads; i++) { + struct pmd_thread *f = &s->pmd_threads[i]; - f->dp = dp; - f->id = i; - atomic_store(&f->change_seq, 1); + f->core_id = ovs_numa_get_unpinned_core_on_socket(socket_id); + f->index = i; + f->socket = s; + atomic_store(&f->change_seq, 1); - /* Each thread will distribute all devices rx-queues among - * themselves. */ - f->thread = ovs_thread_create("pmd", pmd_thread_main, f); + /* Each thread will distribute all devices rx-queues among + * themselves. */ + f->thread = ovs_thread_create("pmd", pmd_thread_main, f); + } } + + dp_netdev_reload_pmd_threads(dp); } diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h index 50c1198..f501f7c 100644 --- a/lib/dpif-netdev.h +++ b/lib/dpif-netdev.h @@ -40,7 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b) } } -#define NR_PMD_THREADS 1 +#define NON_PMD_CORE_ID 0 #ifdef __cplusplus } diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index 432524f..012ee68 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -401,7 +401,6 @@ dpdk_get_n_devs(int socket_id) count++; } } - ovs_assert(count); return count; } @@ -508,14 +507,12 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk rte_spinlock_init(&netdev->tx_q[i].tx_lock); } - netdev->port_id = port_no; - netdev->flags = 0; netdev->mtu = ETHER_MTU; netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu); - /* XXX: need to discover device node at run time. */ - netdev->socket_id = SOCKET0; + netdev->socket_id = rte_eth_dev_socket_id(port_no); + netdev->port_id = port_no; netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu); if (!netdev->dpdk_mp) { @@ -699,7 +696,11 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets, struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); int nb_rx; - dpdk_queue_flush(dev, rxq_->queue_id); + /* There is only one tx queue for this core. Do not flush other + * queueus. */ + if (rxq_->queue_id == rte_lcore_id()) { + dpdk_queue_flush(dev, rxq_->queue_id); + } nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id, (struct rte_mbuf **) packets, @@ -1496,6 +1497,23 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_) } } +/* Returns the number of dpdk interfaces on cpu socket 'socket_id'. */ +int +netdev_dpdk_get_n_devs_on_socket(int socket_id) +{ + return dpdk_get_n_devs(socket_id); +} + +/* Flushes pkts sent from the calling thread to other dpdk ifaces that are + * not on the same cpu socket. */ +void +netdev_dpdk_flush_non_local(struct netdev *netdev_, int qid) +{ + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev_); + + dpdk_queue_flush(dev, qid); +} + int pmd_thread_setaffinity_cpu(int cpu) { @@ -1510,7 +1528,8 @@ pmd_thread_setaffinity_cpu(int cpu) return err; } /* lcore_id 0 is reseved for use by non pmd threads. */ - RTE_PER_LCORE(_lcore_id) = cpu + 1; + ovs_assert(cpu); + RTE_PER_LCORE(_lcore_id) = cpu; return 0; } @@ -1518,16 +1537,13 @@ pmd_thread_setaffinity_cpu(int cpu) void thread_set_nonpmd(void) { - /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved - * for non pmd threads */ - BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE); /* We have to use 0 to allow non pmd threads to perform certain DPDK * operations, like rte_eth_dev_configure(). */ - RTE_PER_LCORE(_lcore_id) = 0; + RTE_PER_LCORE(_lcore_id) = NON_PMD_CORE_ID; } static bool thread_is_pmd(void) { - return rte_lcore_id() != 0; + return rte_lcore_id() != NON_PMD_CORE_ID; } diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h index 75f6a0b..5e8f296 100644 --- a/lib/netdev-dpdk.h +++ b/lib/netdev-dpdk.h @@ -24,6 +24,8 @@ struct netdev; int dpdk_init(int argc, char **argv); void netdev_dpdk_register(void); int netdev_dpdk_get_socket_id(const struct netdev *); +int netdev_dpdk_get_n_devs_on_socket(int socket_id); +void netdev_dpdk_flush_non_local(struct netdev *, int qid); void free_dpdk_buf(struct dpif_packet *); int pmd_thread_setaffinity_cpu(int cpu); void thread_set_nonpmd(void); @@ -48,6 +50,19 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_ OVS_UNUSED) return -1; } +static inline int +netdev_dpdk_get_n_devs_on_socket(int socket_id OVS_UNUSED) +{ + return -1; +} + +static inline void +netdev_dpdk_flush_non_local(struct netdev *netdev OVS_UNUSED, + int qid OVS_UNUSED) +{ + /* Nothing */ +} + static inline void free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED) { -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev