The pmd threads are pinned to available cpu cores on the corresponding cpu socket.
Signed-off-by: Alex Wang <al...@nicira.com> --- PATCH -> V2: - Add latch_destory(). - Use 'int' for cpu socket/core id. --- lib/dpif-netdev.c | 178 ++++++++++++++++++++++++++++++++++++++--------------- lib/netdev-dpdk.c | 27 +++++++- lib/netdev-dpdk.h | 14 +++++ 3 files changed, 167 insertions(+), 52 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 4dcc268..f36ff2b 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -51,6 +51,7 @@ #include "odp-util.h" #include "ofp-print.h" #include "ofpbuf.h" +#include "ovs-numa.h" #include "ovs-rcu.h" #include "packet-dpif.h" #include "packets.h" @@ -69,7 +70,6 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev); /* By default, choose a priority in the middle. */ #define NETDEV_RULE_PRIORITY 0x8000 -#define NR_THREADS 1 /* Use per thread recirc_depth to prevent recirculation loop. */ #define MAX_RECIRC_DEPTH 5 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) @@ -166,11 +166,9 @@ struct dp_netdev { struct cmap ports; struct seq *port_seq; /* Incremented whenever a port changes. */ - /* Forwarding threads. */ - struct latch exit_latch; - struct pmd_thread *pmd_threads; - size_t n_pmd_threads; - int pmd_count; + /* Per-cpu-socket struct for configuring pmd threads. */ + struct pmd_socket *pmd_sockets; + int n_pmd_sockets; }; static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp, @@ -305,6 +303,15 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions( const struct dp_netdev_flow *); static void dp_netdev_actions_free(struct dp_netdev_actions *); +/* Represents the PMD configuration on a cpu socket. */ +struct pmd_socket { + struct dp_netdev *dp; + struct latch exit_latch; + struct pmd_thread *pmd_threads; + int socket_id; + int n_pmd_threads; +}; + /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate * the performance overhead of interrupt processing. Therefore netdev can * not implement rx-wait for these devices. dpif-netdev needs to poll @@ -317,9 +324,10 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *); * table, and executes the actions it finds. **/ struct pmd_thread { - struct dp_netdev *dp; + struct pmd_socket *socket; pthread_t thread; int id; + int core_id; atomic_uint change_seq; }; @@ -359,7 +367,9 @@ static void dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, odp_port_t port_no); -static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); +static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *); +static void dp_netdev_unset_pmd_threads(struct dp_netdev *, int socket_id); +static void dp_netdev_set_pmd_threads(struct dp_netdev *, int socket_id); static struct dpif_netdev * dpif_netdev_cast(const struct dpif *dpif) @@ -473,7 +483,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class, OVS_REQUIRES(dp_netdev_mutex) { struct dp_netdev *dp; - int error; + int n_sockets, error, i; dp = xzalloc(sizeof *dp); shash_add(&dp_netdevs, name, dp); @@ -494,7 +504,15 @@ create_dp_netdev(const char *name, const struct dpif_class *class, ovs_mutex_init(&dp->port_mutex); cmap_init(&dp->ports); dp->port_seq = seq_create(); - latch_init(&dp->exit_latch); + + n_sockets = ovs_numa_get_n_sockets(); + dp->n_pmd_sockets = n_sockets != OVS_SOCKET_UNSPEC ? n_sockets : 0; + dp->pmd_sockets = xzalloc(dp->n_pmd_sockets * sizeof *dp->pmd_sockets); + for (i = 0; i < dp->n_pmd_sockets; i++) { + dp->pmd_sockets[i].dp = dp; + dp->pmd_sockets[i].socket_id = i; + latch_init(&dp->pmd_sockets[i].exit_latch); + } ovs_mutex_lock(&dp->port_mutex); error = do_add_port(dp, name, "internal", ODPP_LOCAL); @@ -563,8 +581,8 @@ dp_netdev_free(struct dp_netdev *dp) shash_find_and_delete(&dp_netdevs, dp->name); - dp_netdev_set_pmd_threads(dp, 0); - free(dp->pmd_threads); + dp_netdev_destroy_all_pmd_sockets(dp); + free(dp->pmd_sockets); dp_netdev_flow_flush(dp); ovs_mutex_lock(&dp->port_mutex); @@ -590,7 +608,6 @@ dp_netdev_free(struct dp_netdev *dp) ovs_mutex_destroy(&dp->flow_mutex); seq_destroy(dp->port_seq); cmap_destroy(&dp->ports); - latch_destroy(&dp->exit_latch); free(CONST_CAST(char *, dp->name)); free(dp); } @@ -659,12 +676,12 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) } static void -dp_netdev_reload_pmd_threads(struct dp_netdev *dp) +dp_netdev_reload_pmd_threads(struct pmd_socket *s) { int i; - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; + for (i = 0; i < s->n_pmd_threads; i++) { + struct pmd_thread *f = &s->pmd_threads[i]; int id; atomic_add(&f->change_seq, 1, &id); @@ -735,10 +752,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, } port->sf = sf; - if (netdev_is_pmd(netdev)) { - dp->pmd_count++; - dp_netdev_set_pmd_threads(dp, NR_THREADS); - dp_netdev_reload_pmd_threads(dp); + if (netdev_is_pmd(netdev) && dp->n_pmd_sockets) { + dp_netdev_set_pmd_threads(dp, netdev_dpdk_get_socket_id(netdev)); } ovs_refcount_init(&port->ref_cnt); @@ -879,6 +894,23 @@ get_port_by_name(struct dp_netdev *dp, return ENOENT; } +/* Returns 'true' if there is a port with pmd netdev and the netdev + * is on cpu socket 'socket_id'. */ +static bool +has_pmd_port_for_socket(struct dp_netdev *dp, int socket_id) +{ + struct dp_netdev_port *port; + + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_dpdk_get_socket_id(port->netdev) == socket_id) { + return true; + } + } + + return false; +} + static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) OVS_REQUIRES(dp->port_mutex) @@ -886,7 +918,15 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no)); seq_change(dp->port_seq); if (netdev_is_pmd(port->netdev)) { - dp_netdev_reload_pmd_threads(dp); + int socket_id = netdev_dpdk_get_socket_id(port->netdev); + + /* If there is no netdev on the cpu socket, unsets the pmd threads + * for that cpu socket. Else, just reload the queues. */ + if (!has_pmd_port_for_socket(dp, socket_id)) { + dp_netdev_unset_pmd_threads(dp, socket_id); + } else { + dp_netdev_reload_pmd_threads(&dp->pmd_sockets[socket_id]); + } } port_unref(port); @@ -1828,7 +1868,7 @@ static int pmd_load_queues(struct pmd_thread *f, struct rxq_poll **ppoll_list, int poll_cnt) { - struct dp_netdev *dp = f->dp; + struct pmd_socket *s = f->socket; struct rxq_poll *poll_list = *ppoll_list; struct dp_netdev_port *port; int id = f->id; @@ -1843,12 +1883,13 @@ pmd_load_queues(struct pmd_thread *f, poll_cnt = 0; index = 0; - CMAP_FOR_EACH (port, node, &f->dp->ports) { - if (netdev_is_pmd(port->netdev)) { + CMAP_FOR_EACH (port, node, &s->dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_dpdk_get_socket_id(port->netdev) == s->socket_id) { int i; for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - if ((index % dp->n_pmd_threads) == id) { + if ((index % s->n_pmd_threads) == id) { poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1)); port_ref(port); @@ -1869,7 +1910,7 @@ static void * pmd_thread_main(void *f_) { struct pmd_thread *f = f_; - struct dp_netdev *dp = f->dp; + struct dp_netdev *dp = f->socket->dp; unsigned int lc = 0; struct rxq_poll *poll_list; unsigned int port_seq; @@ -1879,7 +1920,7 @@ pmd_thread_main(void *f_) poll_cnt = 0; poll_list = NULL; - pmd_thread_setaffinity_cpu(f->id); + pmd_thread_setaffinity_cpu(f->core_id); reload: poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt); atomic_read(&f->change_seq, &port_seq); @@ -1892,6 +1933,8 @@ reload: dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx); } + netdev_dpdk_flush_non_local(f->socket->socket_id, f->core_id); + if (lc++ > 1024) { ovsrcu_quiesce(); @@ -1906,7 +1949,7 @@ reload: } } - if (!latch_is_set(&f->dp->exit_latch)){ + if (!latch_is_set(&f->socket->exit_latch)){ goto reload; } @@ -1919,40 +1962,75 @@ reload: } static void -dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) +dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp) { int i; - if (n == dp->n_pmd_threads) { - return; + /* Exits all existing pmd threads. */ + for (i = 0; i < dp->n_pmd_sockets; i++) { + struct pmd_socket *s = &dp->pmd_sockets[i]; + + dp_netdev_unset_pmd_threads(dp, i); + latch_destroy(&s->exit_latch); } +} + +/* Deletes all pmd threads on cpu socket 'socket_id'. */ +static void +dp_netdev_unset_pmd_threads(struct dp_netdev *dp, int socket_id) +{ + struct pmd_socket *s = &dp->pmd_sockets[socket_id]; + int i; - /* Stop existing threads. */ - latch_set(&dp->exit_latch); - dp_netdev_reload_pmd_threads(dp); - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; + if (s->n_pmd_threads) { + latch_set(&s->exit_latch); + dp_netdev_reload_pmd_threads(s); + for (i = 0; i < s->n_pmd_threads; i++) { + struct pmd_thread *f = &s->pmd_threads[i]; - xpthread_join(f->thread, NULL); + ovs_numa_unpin_core(f->core_id); + xpthread_join(f->thread, NULL); + } + latch_poll(&s->exit_latch); + free(s->pmd_threads); + s->pmd_threads = NULL; + s->n_pmd_threads = 0; } - latch_poll(&dp->exit_latch); - free(dp->pmd_threads); +} + +/* Checks the cpu socket id of 'netdev' and starts pmd threads for + * the cpu socket. */ +static void +dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id) +{ + struct pmd_socket *s = &dp->pmd_sockets[socket_id]; + + /* If there are already pmd threads create for the cpu socket + * in which 'netdev' is on, do nothing. Else, creates the + * pmd threads for the socket. */ + if (!s->n_pmd_threads) { + int n_threads = netdev_dpdk_get_n_devs_on_socket(socket_id); + int i; - /* Start new threads. */ - dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads); - dp->n_pmd_threads = n; + /* Starts a new thread for the cpu socket. */ + s->pmd_threads = xmalloc(n_threads * sizeof *s->pmd_threads); + s->n_pmd_threads = n_threads; - for (i = 0; i < n; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; + for (i = 0; i < s->n_pmd_threads; i++) { + struct pmd_thread *f = &s->pmd_threads[i]; - f->dp = dp; - f->id = i; - atomic_store(&f->change_seq, 1); + f->core_id = ovs_numa_get_unpinned_core_on_socket(socket_id); + f->id = i; + f->socket = s; + atomic_store(&f->change_seq, 1); - /* Each thread will distribute all devices rx-queues among - * themselves. */ - f->thread = ovs_thread_create("pmd", pmd_thread_main, f); + /* Each thread will distribute all devices rx-queues among + * themselves. */ + f->thread = ovs_thread_create("pmd", pmd_thread_main, f); + } } + + dp_netdev_reload_pmd_threads(s); } diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index b0254e7..5a0ca76 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -491,8 +491,8 @@ netdev_dpdk_construct(struct netdev *netdev_) netdev->mtu = ETHER_MTU; netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu); - /* TODO: need to discover device node at run time. */ - netdev->socket_id = SOCKET0; + netdev->socket_id = rte_eth_dev_socket_id(port_no); + CPU_SOCKET_ID_ASSERT(netdev->socket_id); netdev->port_id = port_no; netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu); @@ -1308,6 +1308,29 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_) } } +/* Returns the number of dpdk interfaces on cpu socket 'socket_id'. */ +int +netdev_dpdk_get_n_devs_on_socket(int socket_id) +{ + CPU_SOCKET_ID_ASSERT(socket_id); + + return dpdk_get_n_devs(socket_id); +} + +/* For all dpdk ifaces not on the cpu socket 'socket_id', flushes the 'tx_q'. + * for this thread. */ +void +netdev_dpdk_flush_non_local(int socket_id, int qid) +{ + struct netdev_dpdk *dev; + + LIST_FOR_EACH (dev, list_node, &dpdk_list) { + if (dev->socket_id != socket_id) { + dpdk_queue_flush(dev, qid); + } + } +} + int pmd_thread_setaffinity_cpu(int cpu) { diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h index da507ce..fc6c217 100644 --- a/lib/netdev-dpdk.h +++ b/lib/netdev-dpdk.h @@ -25,6 +25,8 @@ struct netdev; int dpdk_init(int argc, char **argv); void netdev_dpdk_register(void); int netdev_dpdk_get_socket_id(const struct netdev *); +int netdev_dpdk_get_n_devs_on_socket(int socket_id); +void netdev_dpdk_flush_non_local(int socket_id, int qid); void free_dpdk_buf(struct dpif_packet *); int pmd_thread_setaffinity_cpu(int cpu); @@ -50,6 +52,18 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_ OVS_UNUSED) return -1; } +static inline int +netdev_dpdk_get_n_devs_on_socket(int socket_id OVS_UNUSED) +{ + return -1; +} + +static inline void +netdev_dpdk_flush_non_local(int socket_id OVS_UNUSED, int qid OVS_UNUSED) +{ + /* Nothing */ +} + static inline void free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED) { -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev