This commit changes the per dpif-netdev datapath flow-table/ classifier to per pmd-thread. As direct benefit, datapath and flow statistics no longer need to be protected by mutex or be declared as per-thread variable, since they are only written by the owning pmd thread.
As side effects, the flow-dump output of userspace datapath can contain overlapping flows. To reduce confusion, the dump from different pmd thread will be separated by a title line. In addition, the flow operations via 'ovs-appctl dpctl/*' are modified so that if the given flow in_port corresponds to a dpdk interface, the operation will be conducted to all pmd threads recv from that interface. Signed-off-by: Alex Wang <al...@nicira.com> --- lib/dpctl.c | 134 ++++++++++- lib/dpif-netdev.c | 506 +++++++++++++++++++++++------------------ lib/dpif-netlink.c | 3 +- lib/dpif.c | 10 +- lib/dpif.h | 34 ++- ofproto/ofproto-dpif-upcall.c | 38 ++-- ofproto/ofproto-dpif.c | 8 +- tests/ofproto-dpif.at | 6 + 8 files changed, 481 insertions(+), 258 deletions(-) diff --git a/lib/dpctl.c b/lib/dpctl.c index 4e41fe4..d5cccdf 100644 --- a/lib/dpctl.c +++ b/lib/dpctl.c @@ -26,11 +26,13 @@ #include <string.h> #include <unistd.h> +#include "bitmap.h" #include "command-line.h" #include "compiler.h" #include "dirs.h" #include "dpctl.h" #include "dpif.h" +#include "dpif-netdev.h" #include "dynamic-string.h" #include "flow.h" #include "match.h" @@ -39,6 +41,7 @@ #include "odp-util.h" #include "ofp-parse.h" #include "ofpbuf.h" +#include "ovs-numa.h" #include "packets.h" #include "shash.h" #include "simap.h" @@ -708,7 +711,7 @@ dpctl_dump_flows(int argc, const char *argv[], struct dpctl_params *dpctl_p) struct dpif_flow_dump_thread *flow_dump_thread; struct dpif_flow_dump *flow_dump; struct dpif_flow f; - + int poller_id = POLLER_ID_NULL; int error; if (argc > 1 && !strncmp(argv[argc - 1], "filter=", 7)) { @@ -771,6 +774,19 @@ dpctl_dump_flows(int argc, const char *argv[], struct dpctl_params *dpctl_p) minimatch_destroy(&minimatch); } ds_clear(&ds); + /* If 'poller_id' is specified, overlapping flows could be dumped from + * different pmd threads. So, separates dumps from different pmds + * by printing a title line. */ + if (poller_id != f.poller_id) { + if (f.poller_id == NON_PMD_CORE_ID) { + ds_put_format(&ds, "flow-dump from non-dpdk interfaces:\n"); + } else { + ds_put_format(&ds, "flow-dump from pmd on cpu core: %d\n", + f.poller_id); + } + poller_id = f.poller_id; + } + if (dpctl_p->verbosity) { if (f.ufid_present) { odp_format_ufid(&f.ufid, &ds); @@ -806,12 +822,43 @@ out_freefilter: return error; } +/* Extracts the in_port from the parsed keys, and returns the reference + * to the 'struct netdev *' of the dpif port. On error, returns NULL. + * Users must call 'netdev_close()' after finish using the returned + * reference. */ +static struct netdev * +get_in_port_netdev_from_key(struct dpif *dpif, const struct ofpbuf *key) +{ + const struct nlattr *in_port_nla; + struct netdev *dev = NULL; + + in_port_nla = nl_attr_find(key, 0, OVS_KEY_ATTR_IN_PORT); + if (in_port_nla) { + struct dpif_port dpif_port; + odp_port_t port_no; + int error; + + port_no = ODP_PORT_C(nl_attr_get_u32(in_port_nla)); + error = dpif_port_query_by_number(dpif, port_no, &dpif_port); + if (error) { + goto out; + } + + netdev_open(dpif_port.name, dpif_port.type, &dev); + dpif_port_destroy(&dpif_port); + } + +out: + return dev; +} + static int dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags, struct dpctl_params *dpctl_p) { const char *key_s = argv[argc - 2]; const char *actions_s = argv[argc - 1]; + struct netdev *in_port_netdev = NULL; struct dpif_flow_stats stats; struct dpif_port dpif_port; struct dpif_port_dump port_dump; @@ -834,7 +881,6 @@ dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags, return error; } - simap_init(&port_names); DPIF_PORT_FOR_EACH (&dpif_port, &port_dump, dpif) { simap_put(&port_names, dpif_port.name, odp_to_u32(dpif_port.port_no)); @@ -855,12 +901,49 @@ dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags, dpctl_error(dpctl_p, error, "parsing actions"); goto out_freeactions; } - error = dpif_flow_put(dpif, flags, - ofpbuf_data(&key), ofpbuf_size(&key), - ofpbuf_size(&mask) == 0 ? NULL : ofpbuf_data(&mask), - ofpbuf_size(&mask), - ofpbuf_data(&actions), ofpbuf_size(&actions), - NULL, dpctl_p->print_statistics ? &stats : NULL); + + /* For DPDK interface, apply the operation to all pmd threads + * on the same numa node. */ + in_port_netdev = get_in_port_netdev_from_key(dpif, &key); + if (in_port_netdev && netdev_is_pmd(in_port_netdev)) { + int numa_id; + + numa_id = netdev_get_numa_id(in_port_netdev); + if (ovs_numa_numa_id_is_valid(numa_id)) { + unsigned long *bm; + int n_cores = ovs_numa_get_n_cores(); + int idx; + + bm = bitmap_allocate(n_cores); + ovs_numa_get_pinned_cores_on_numa(numa_id, bm); + + BITMAP_FOR_EACH_1 (idx, n_cores, bm) { + error = dpif_flow_put(dpif, flags, + ofpbuf_data(&key), ofpbuf_size(&key), + ofpbuf_size(&mask) == 0 + ? NULL : ofpbuf_data(&mask), + ofpbuf_size(&mask), + ofpbuf_data(&actions), + ofpbuf_size(&actions), + NULL, idx, + dpctl_p->print_statistics + ? &stats : NULL); + + } + bitmap_free(bm); + } + } else { + error = dpif_flow_put(dpif, flags, + ofpbuf_data(&key), ofpbuf_size(&key), + ofpbuf_size(&mask) == 0 + ? NULL : ofpbuf_data(&mask), + ofpbuf_size(&mask), + ofpbuf_data(&actions), + ofpbuf_size(&actions), + NULL, POLLER_ID_NULL, + dpctl_p->print_statistics + ? &stats : NULL); + } if (error) { dpctl_error(dpctl_p, error, "updating flow table"); goto out_freeactions; @@ -881,6 +964,7 @@ out_freekeymask: ofpbuf_uninit(&mask); ofpbuf_uninit(&key); dpif_close(dpif); + netdev_close(in_port_netdev); return error; } @@ -910,6 +994,7 @@ static int dpctl_del_flow(int argc, const char *argv[], struct dpctl_params *dpctl_p) { const char *key_s = argv[argc - 1]; + struct netdev *in_port_netdev = NULL; struct dpif_flow_stats stats; struct dpif_port dpif_port; struct dpif_port_dump port_dump; @@ -945,9 +1030,35 @@ dpctl_del_flow(int argc, const char *argv[], struct dpctl_params *dpctl_p) goto out; } - error = dpif_flow_del(dpif, - ofpbuf_data(&key), ofpbuf_size(&key), NULL, - dpctl_p->print_statistics ? &stats : NULL); + /* For DPDK interface, apply the operation to all pmd threads + * on the same numa node. */ + in_port_netdev = get_in_port_netdev_from_key(dpif, &key); + if (in_port_netdev && netdev_is_pmd(in_port_netdev)) { + int numa_id; + + numa_id = netdev_get_numa_id(in_port_netdev); + if (ovs_numa_numa_id_is_valid(numa_id)) { + unsigned long *bm; + int n_cores = ovs_numa_get_n_cores(); + int idx; + + bm = bitmap_allocate(n_cores); + ovs_numa_get_pinned_cores_on_numa(numa_id, bm); + + BITMAP_FOR_EACH_1(idx, n_cores, bm) { + error = dpif_flow_del(dpif, + ofpbuf_data(&key), ofpbuf_size(&key), + NULL, idx, dpctl_p->print_statistics + ? &stats : NULL); + } + bitmap_free(bm); + } + } else { + error = dpif_flow_del(dpif, + ofpbuf_data(&key), ofpbuf_size(&key), NULL, + POLLER_ID_NULL, + dpctl_p->print_statistics ? &stats : NULL); + } if (error) { dpctl_error(dpctl_p, error, "deleting flow"); goto out; @@ -967,6 +1078,7 @@ out: ofpbuf_uninit(&key); simap_destroy(&port_names); dpif_close(dpif); + netdev_close(in_port_netdev); return error; } diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 9b047be..599ef6b 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -177,7 +177,6 @@ static bool dpcls_lookup(const struct dpcls *cls, * * dp_netdev_mutex (global) * port_mutex - * flow_mutex */ struct dp_netdev { const struct dpif_class *const class; @@ -186,20 +185,6 @@ struct dp_netdev { struct ovs_refcount ref_cnt; atomic_flag destroyed; - /* Flows. - * - * Writers of 'flow_table' must take the 'flow_mutex'. Corresponding - * changes to 'cls' must be made while still holding the 'flow_mutex'. - */ - struct ovs_mutex flow_mutex; - struct dpcls cls; - struct cmap flow_table OVS_GUARDED; /* Flow table. */ - - /* Statistics. - * - * ovsthread_stats is internally synchronized. */ - struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */ - /* Ports. * * Protected by RCU. Take the mutex to add or remove ports. */ @@ -241,15 +226,6 @@ enum dp_stat_type { DP_N_STATS }; -/* Contained by struct dp_netdev's 'stats' member. */ -struct dp_netdev_stats { - struct ovs_mutex mutex; /* Protects 'n'. */ - - /* Indexed by DP_STAT_*, protected by 'mutex'. */ - unsigned long long int n[DP_N_STATS] OVS_GUARDED; -}; - - /* A port in a netdev-based datapath. */ struct dp_netdev_port { struct cmap_node node; /* Node in dp_netdev's 'ports'. */ @@ -261,15 +237,22 @@ struct dp_netdev_port { char *type; /* Port type as requested by user. */ }; - -/* A flow in dp_netdev's 'flow_table'. +/* Contained by struct dp_netdev_flow's 'stats' member. */ +struct dp_netdev_flow_stats { + long long int used; /* Last used time, in monotonic msecs. */ + long long int packet_count; /* Number of packets matched. */ + long long int byte_count; /* Number of bytes matched. */ + uint16_t tcp_flags; /* Bitwise-OR of seen tcp_flags values. */ +}; + +/* A flow in 'dp_netdev_pmd_thread's 'flow_table'. * * * Thread-safety * ============= * * Except near the beginning or ending of its lifespan, rule 'rule' belongs to - * its dp_netdev's classifier. The text below calls this classifier 'cls'. + * its pmd thread's classifier. The text below calls this classifier 'cls'. * * Motivation * ---------- @@ -303,9 +286,12 @@ struct dp_netdev_flow { bool dead; /* Hash table index by unmasked flow. */ - const struct cmap_node node; /* In owning dp_netdev's 'flow_table'. */ + const struct cmap_node node; /* In owning dp_netdev_pmd_thread's */ + /* 'flow_table'. */ const ovs_u128 ufid; /* Unique flow identifier. */ const struct flow flow; /* Unmasked flow that created this entry. */ + const int pmd_id; /* The 'core_id' of pmd thread owning this */ + /* flow. */ /* Number of references. * The classifier owns one reference. @@ -313,10 +299,8 @@ struct dp_netdev_flow { * reference. */ struct ovs_refcount ref_cnt; - /* Statistics. - * - * Reading or writing these members requires 'mutex'. */ - struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */ + /* Statistics. */ + struct dp_netdev_flow_stats stats; /* Actions. */ OVSRCU_TYPE(struct dp_netdev_actions *) actions; @@ -331,16 +315,6 @@ static bool dp_netdev_flow_ref(struct dp_netdev_flow *); static int dpif_netdev_flow_from_nlattrs(const struct nlattr *, uint32_t, struct flow *); -/* Contained by struct dp_netdev_flow's 'stats' member. */ -struct dp_netdev_flow_stats { - struct ovs_mutex mutex; /* Guards all the other members. */ - - long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */ - long long int packet_count OVS_GUARDED; /* Number of packets matched. */ - long long int byte_count OVS_GUARDED; /* Number of bytes matched. */ - uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */ -}; - /* A set of datapath actions within a "struct dp_netdev_flow". * * @@ -361,20 +335,31 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions( const struct dp_netdev_flow *); static void dp_netdev_actions_free(struct dp_netdev_actions *); +/* Contained by struct dp_netdev_pmd_thread's 'stats' member. */ +struct dp_netdev_pmd_stats { + /* Indexed by DP_STAT_*. */ + unsigned long long int n[DP_N_STATS]; +}; + /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate * the performance overhead of interrupt processing. Therefore netdev can * not implement rx-wait for these devices. dpif-netdev needs to poll * these device to check for recv buffer. pmd-thread does polling for - * devices assigned to itself thread. + * devices assigned to itself. * * DPDK used PMD for accessing NIC. * * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for * I/O of all non-pmd threads. There will be no actual thread created * for the instance. - **/ + * + * Each struct has its own flow table and classifier. Packets received + * from managed ports are looked up in the corresponding pmd thread's + * flow table, and are executed with the found actions. + * */ struct dp_netdev_pmd_thread { struct dp_netdev *dp; + struct ovs_refcount ref_cnt; /* Every reference must be refcount'ed. */ struct cmap_node node; /* In 'dp->poll_threads'. */ pthread_cond_t cond; /* For synchronizing pmd thread reload. */ @@ -385,6 +370,19 @@ struct dp_netdev_pmd_thread { * need to be protected (e.g. by 'dp_netdev_mutex'). All other * instances will only be accessed by its own pmd thread. */ struct emc_cache flow_cache; + + /* Classifier and Flow-Table. + * + * Writers of 'flow_table' must take the 'flow_mutex'. Corresponding + * changes to 'cls' must be made while still holding the 'flow_mutex'. + */ + struct ovs_mutex flow_mutex; + struct dpcls cls; + struct cmap flow_table OVS_GUARDED; /* Flow table. */ + + /* Statistics. */ + struct dp_netdev_pmd_stats stats; + struct latch exit_latch; /* For terminating the pmd thread. */ atomic_uint change_seq; /* For reloading pmd ports. */ pthread_t thread; @@ -409,7 +407,6 @@ static int get_port_by_name(struct dp_netdev *dp, const char *devname, struct dp_netdev_port **portp); static void dp_netdev_free(struct dp_netdev *) OVS_REQUIRES(dp_netdev_mutex); -static void dp_netdev_flow_flush(struct dp_netdev *); static int do_add_port(struct dp_netdev *dp, const char *devname, const char *type, odp_port_t port_no) OVS_REQUIRES(dp->port_mutex); @@ -433,10 +430,15 @@ static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, static void dp_netdev_set_nonpmd(struct dp_netdev *dp); static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp, int core_id); +static struct dp_netdev_pmd_thread * +dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos); static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); +static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd); static inline bool emc_entry_alive(struct emc_entry *ce); static void emc_clear_entry(struct emc_entry *ce); @@ -604,12 +606,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class, ovs_refcount_init(&dp->ref_cnt); atomic_flag_clear(&dp->destroyed); - ovs_mutex_init(&dp->flow_mutex); - dpcls_init(&dp->cls); - cmap_init(&dp->flow_table); - - ovsthread_stats_init(&dp->stats); - ovs_mutex_init(&dp->port_mutex); cmap_init(&dp->ports); dp->port_seq = seq_create(); @@ -686,8 +682,6 @@ dp_netdev_free(struct dp_netdev *dp) OVS_REQUIRES(dp_netdev_mutex) { struct dp_netdev_port *port; - struct dp_netdev_stats *bucket; - int i; shash_find_and_delete(&dp_netdevs, dp->name); @@ -696,22 +690,12 @@ dp_netdev_free(struct dp_netdev *dp) ovs_mutex_destroy(&dp->non_pmd_mutex); ovsthread_key_delete(dp->per_pmd_key); - dp_netdev_flow_flush(dp); ovs_mutex_lock(&dp->port_mutex); CMAP_FOR_EACH (port, node, &dp->ports) { do_del_port(dp, port); } ovs_mutex_unlock(&dp->port_mutex); - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) { - ovs_mutex_destroy(&bucket->mutex); - free_cacheline(bucket); - } - ovsthread_stats_destroy(&dp->stats); - - dpcls_destroy(&dp->cls); - cmap_destroy(&dp->flow_table); - ovs_mutex_destroy(&dp->flow_mutex); seq_destroy(dp->port_seq); cmap_destroy(&dp->ports); @@ -765,18 +749,14 @@ static int dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) { struct dp_netdev *dp = get_dp_netdev(dpif); - struct dp_netdev_stats *bucket; - size_t i; - - stats->n_flows = cmap_count(&dp->flow_table); + struct dp_netdev_pmd_thread *pmd; - stats->n_hit = stats->n_missed = stats->n_lost = 0; - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) { - ovs_mutex_lock(&bucket->mutex); - stats->n_hit += bucket->n[DP_STAT_HIT]; - stats->n_missed += bucket->n[DP_STAT_MISS]; - stats->n_lost += bucket->n[DP_STAT_LOST]; - ovs_mutex_unlock(&bucket->mutex); + stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0; + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + stats->n_flows += cmap_count(&pmd->flow_table); + stats->n_hit += pmd->stats.n[DP_STAT_HIT]; + stats->n_missed += pmd->stats.n[DP_STAT_MISS]; + stats->n_lost += pmd->stats.n[DP_STAT_LOST]; } stats->n_masks = UINT32_MAX; stats->n_mask_hit = UINT64_MAX; @@ -1140,15 +1120,6 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname, static void dp_netdev_flow_free(struct dp_netdev_flow *flow) { - struct dp_netdev_flow_stats *bucket; - size_t i; - - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) { - ovs_mutex_destroy(&bucket->mutex); - free_cacheline(bucket); - } - ovsthread_stats_destroy(&flow->stats); - dp_netdev_actions_free(dp_netdev_flow_get_actions(flow)); free(flow); } @@ -1167,36 +1138,41 @@ dp_netdev_flow_hash(const ovs_u128 *ufid) } static void -dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) - OVS_REQUIRES(dp->flow_mutex) +dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev_flow *flow) + OVS_REQUIRES(pmd->flow_mutex) { struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node); - dpcls_remove(&dp->cls, &flow->cr); - cmap_remove(&dp->flow_table, node, dp_netdev_flow_hash(&flow->ufid)); + dpcls_remove(&pmd->cls, &flow->cr); + cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid)); flow->dead = true; dp_netdev_flow_unref(flow); } static void -dp_netdev_flow_flush(struct dp_netdev *dp) +dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd) { struct dp_netdev_flow *netdev_flow; - ovs_mutex_lock(&dp->flow_mutex); - CMAP_FOR_EACH (netdev_flow, node, &dp->flow_table) { - dp_netdev_remove_flow(dp, netdev_flow); + ovs_mutex_lock(&pmd->flow_mutex); + CMAP_FOR_EACH (netdev_flow, node, &pmd->flow_table) { + dp_netdev_pmd_remove_flow(pmd, netdev_flow); } - ovs_mutex_unlock(&dp->flow_mutex); + ovs_mutex_unlock(&pmd->flow_mutex); } static int dpif_netdev_flow_flush(struct dpif *dpif) { struct dp_netdev *dp = get_dp_netdev(dpif); + struct dp_netdev_pmd_thread *pmd; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + dp_netdev_pmd_flow_flush(pmd); + } - dp_netdev_flow_flush(dp); return 0; } @@ -1528,21 +1504,22 @@ emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key) } static struct dp_netdev_flow * -dp_netdev_lookup_flow(const struct dp_netdev *dp, - const struct netdev_flow_key *key) +dp_netdev_pmd_lookup_flow(const struct dp_netdev_pmd_thread *pmd, + const struct netdev_flow_key *key) { struct dp_netdev_flow *netdev_flow; struct dpcls_rule *rule; - dpcls_lookup(&dp->cls, key, &rule, 1); + dpcls_lookup(&pmd->cls, key, &rule, 1); netdev_flow = dp_netdev_flow_cast(rule); return netdev_flow; } static struct dp_netdev_flow * -dp_netdev_find_flow(const struct dp_netdev *dp, const ovs_u128 *ufidp, - const struct nlattr *key, size_t key_len) +dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd, + const ovs_u128 *ufidp, const struct nlattr *key, + size_t key_len) { struct dp_netdev_flow *netdev_flow; struct flow flow; @@ -1551,13 +1528,13 @@ dp_netdev_find_flow(const struct dp_netdev *dp, const ovs_u128 *ufidp, /* If a UFID is not provided, determine one based on the key. */ if (!ufidp && key && key_len && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow)) { - dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid); + dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid); ufidp = &ufid; } if (ufidp) { CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, dp_netdev_flow_hash(ufidp), - &dp->flow_table) { + &pmd->flow_table) { if (ovs_u128_equal(&netdev_flow->ufid, ufidp)) { return netdev_flow; } @@ -1571,18 +1548,10 @@ static void get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow, struct dpif_flow_stats *stats) { - struct dp_netdev_flow_stats *bucket; - size_t i; - - memset(stats, 0, sizeof *stats); - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) { - ovs_mutex_lock(&bucket->mutex); - stats->n_packets += bucket->packet_count; - stats->n_bytes += bucket->byte_count; - stats->used = MAX(stats->used, bucket->used); - stats->tcp_flags |= bucket->tcp_flags; - ovs_mutex_unlock(&bucket->mutex); - } + stats->n_packets = netdev_flow->stats.packet_count; + stats->n_bytes = netdev_flow->stats.byte_count; + stats->used = netdev_flow->stats.used; + stats->tcp_flags = netdev_flow->stats.tcp_flags; } static bool @@ -1632,6 +1601,7 @@ dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow, flow->ufid = netdev_flow->ufid; flow->ufid_present = true; + flow->poller_id = netdev_flow->pmd_id; get_dpif_flow_stats(netdev_flow, &flow->stats); } @@ -1732,24 +1702,46 @@ dpif_netdev_flow_get(const struct dpif *dpif, const struct dpif_flow_get *get) { struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_flow *netdev_flow; + struct dp_netdev_pmd_thread *pmd; + int poller_id = get->poller_id == POLLER_ID_NULL ? NON_PMD_CORE_ID + : get->poller_id; int error = 0; - netdev_flow = dp_netdev_find_flow(dp, get->ufid, get->key, get->key_len); + /* Each call must provide valid poller_id. */ + if (!ovs_numa_core_id_is_valid(poller_id)) { + return EINVAL; + } + + pmd = dp_netdev_get_pmd(dp, poller_id); + if (!pmd) { + return EINVAL; + } + + netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key, + get->key_len); if (netdev_flow) { dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer, get->flow, false); } else { error = ENOENT; } + dp_netdev_pmd_unref(pmd); + return error; } +static void +clear_stats(struct dp_netdev_flow *netdev_flow) +{ + memset(&netdev_flow->stats, 0, sizeof netdev_flow->stats); +} + static struct dp_netdev_flow * -dp_netdev_flow_add(struct dp_netdev *dp, struct match *match, - const ovs_u128 *ufid, +dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd, + struct match *match, const ovs_u128 *ufid, const struct nlattr *actions, size_t actions_len) - OVS_REQUIRES(dp->flow_mutex) + OVS_REQUIRES(pmd->flow_mutex) { struct dp_netdev_flow *flow; struct netdev_flow_key mask; @@ -1760,18 +1752,19 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match, /* Do not allocate extra space. */ flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len); + clear_stats(flow); flow->dead = false; + *CONST_CAST(int *, &flow->pmd_id) = pmd->core_id; *CONST_CAST(struct flow *, &flow->flow) = match->flow; *CONST_CAST(ovs_u128 *, &flow->ufid) = *ufid; ovs_refcount_init(&flow->ref_cnt); - ovsthread_stats_init(&flow->stats); ovsrcu_set(&flow->actions, dp_netdev_actions_create(actions, actions_len)); - cmap_insert(&dp->flow_table, + cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node), dp_netdev_flow_hash(&flow->ufid)); netdev_flow_key_init_masked(&flow->cr.flow, &match->flow, &mask); - dpcls_insert(&dp->cls, &flow->cr, &mask); + dpcls_insert(&pmd->cls, &flow->cr, &mask); if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) { struct match match; @@ -1795,30 +1788,17 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match, return flow; } -static void -clear_stats(struct dp_netdev_flow *netdev_flow) -{ - struct dp_netdev_flow_stats *bucket; - size_t i; - - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) { - ovs_mutex_lock(&bucket->mutex); - bucket->used = 0; - bucket->packet_count = 0; - bucket->byte_count = 0; - bucket->tcp_flags = 0; - ovs_mutex_unlock(&bucket->mutex); - } -} - static int dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) { struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_flow *netdev_flow; struct netdev_flow_key key; + struct dp_netdev_pmd_thread *pmd; struct match match; ovs_u128 ufid; + int poller_id = put->poller_id == POLLER_ID_NULL ? NON_PMD_CORE_ID + : put->poller_id; int error; error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow); @@ -1832,6 +1812,16 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) return error; } + /* Each call must provide valid poller_id. */ + if (!ovs_numa_core_id_is_valid(poller_id)) { + return EINVAL; + } + + pmd = dp_netdev_get_pmd(dp, poller_id); + if (!pmd) { + return EINVAL; + } + /* Must produce a netdev_flow_key for lookup. * This interface is no longer performance critical, since it is not used * for upcall processing any more. */ @@ -1843,15 +1833,15 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid); } - ovs_mutex_lock(&dp->flow_mutex); - netdev_flow = dp_netdev_lookup_flow(dp, &key); + ovs_mutex_lock(&pmd->flow_mutex); + netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key); if (!netdev_flow) { if (put->flags & DPIF_FP_CREATE) { - if (cmap_count(&dp->flow_table) < MAX_FLOWS) { + if (cmap_count(&pmd->flow_table) < MAX_FLOWS) { if (put->stats) { memset(put->stats, 0, sizeof *put->stats); } - dp_netdev_flow_add(dp, &match, &ufid, put->actions, + dp_netdev_flow_add(pmd, &match, &ufid, put->actions, put->actions_len); error = 0; } else { @@ -1887,7 +1877,8 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) error = EINVAL; } } - ovs_mutex_unlock(&dp->flow_mutex); + ovs_mutex_unlock(&pmd->flow_mutex); + dp_netdev_pmd_unref(pmd); return error; } @@ -1897,26 +1888,43 @@ dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del) { struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_flow *netdev_flow; + struct dp_netdev_pmd_thread *pmd; + int poller_id = del->poller_id == POLLER_ID_NULL ? NON_PMD_CORE_ID + : del->poller_id; int error = 0; - ovs_mutex_lock(&dp->flow_mutex); - netdev_flow = dp_netdev_find_flow(dp, del->ufid, del->key, del->key_len); + /* Each call must provide valid poller_id. */ + if (!ovs_numa_core_id_is_valid(poller_id)) { + return EINVAL; + } + + pmd = dp_netdev_get_pmd(dp, poller_id); + if (!pmd) { + return EINVAL; + } + + ovs_mutex_lock(&pmd->flow_mutex); + netdev_flow = dp_netdev_pmd_find_flow(pmd, del->ufid, del->key, + del->key_len); if (netdev_flow) { if (del->stats) { get_dpif_flow_stats(netdev_flow, del->stats); } - dp_netdev_remove_flow(dp, netdev_flow); + dp_netdev_pmd_remove_flow(pmd, netdev_flow); } else { error = ENOENT; } - ovs_mutex_unlock(&dp->flow_mutex); + ovs_mutex_unlock(&pmd->flow_mutex); + dp_netdev_pmd_unref(pmd); return error; } struct dpif_netdev_flow_dump { struct dpif_flow_dump up; - struct cmap_position pos; + struct cmap_position poll_thread_pos; + struct cmap_position flow_pos; + struct dp_netdev_pmd_thread *cur_pmd; int status; struct ovs_mutex mutex; }; @@ -1932,10 +1940,8 @@ dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse) { struct dpif_netdev_flow_dump *dump; - dump = xmalloc(sizeof *dump); + dump = xzalloc(sizeof *dump); dpif_flow_dump_init(&dump->up, dpif_); - memset(&dump->pos, 0, sizeof dump->pos); - dump->status = 0; dump->up.terse = terse; ovs_mutex_init(&dump->mutex); @@ -1993,26 +1999,58 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_, struct dpif_netdev_flow_dump_thread *thread = dpif_netdev_flow_dump_thread_cast(thread_); struct dpif_netdev_flow_dump *dump = thread->dump; - struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif); struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH]; - struct dp_netdev *dp = get_dp_netdev(&dpif->dpif); int n_flows = 0; int i; ovs_mutex_lock(&dump->mutex); if (!dump->status) { - for (n_flows = 0; n_flows < MIN(max_flows, FLOW_DUMP_MAX_BATCH); - n_flows++) { - struct cmap_node *node; + struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif); + struct dp_netdev *dp = get_dp_netdev(&dpif->dpif); + struct dp_netdev_pmd_thread *pmd = dump->cur_pmd; + int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH); + + /* First call to dump_next(), extracts the first pmd thread. + * If there is no pmd thread, returns immediately. */ + if (!pmd) { + pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos); + if (!pmd) { + ovs_mutex_unlock(&dump->mutex); + return n_flows; - node = cmap_next_position(&dp->flow_table, &dump->pos); - if (!node) { - dump->status = EOF; - break; } - netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow, - node); } + + do { + for (n_flows = 0; n_flows < flow_limit; n_flows++) { + struct cmap_node *node; + + node = cmap_next_position(&pmd->flow_table, &dump->flow_pos); + if (!node) { + break; + } + netdev_flows[n_flows] = CONTAINER_OF(node, + struct dp_netdev_flow, + node); + } + /* When finishing dumping the current pmd thread, moves to + * the next. */ + if (n_flows < flow_limit) { + memset(&dump->flow_pos, 0, sizeof dump->flow_pos); + dp_netdev_pmd_unref(pmd); + pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos); + if (!pmd) { + dump->status = EOF; + break; + } + } + /* Keeps the reference to next caller. */ + dump->cur_pmd = pmd; + + /* If the current dump is empty, do not exit the loop, since the + * remaining pmds could have flows to be dumped. Just dumps again + * on the new 'pmd'. */ + } while (!n_flows); } ovs_mutex_unlock(&dump->mutex); @@ -2063,9 +2101,11 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) ovs_mutex_lock(&dp->non_pmd_mutex); ovs_mutex_lock(&dp->port_mutex); } + dp_netdev_execute_actions(pmd, &pp, 1, false, execute->actions, execute->actions_len); if (pmd->core_id == NON_PMD_CORE_ID) { + dp_netdev_pmd_unref(pmd); ovs_mutex_unlock(&dp->port_mutex); ovs_mutex_unlock(&dp->non_pmd_mutex); } @@ -2263,6 +2303,8 @@ dpif_netdev_run(struct dpif *dpif) } } ovs_mutex_unlock(&dp->non_pmd_mutex); + dp_netdev_pmd_unref(non_pmd); + tnl_arp_cache_run(); new_tnl_seq = seq_read(tnl_conf_seq); @@ -2446,7 +2488,10 @@ dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd) ovs_mutex_unlock(&pmd->cond_mutex); } -/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. */ +/* Finds and refs the dp_netdev_pmd_thread on core 'core_id'. Returns + * the pointer if succeeds, otherwise, NULL. + * + * Caller must unrefs the returned reference. */ static struct dp_netdev_pmd_thread * dp_netdev_get_pmd(struct dp_netdev *dp, int core_id) { @@ -2454,10 +2499,12 @@ dp_netdev_get_pmd(struct dp_netdev *dp, int core_id) const struct cmap_node *pnode; pnode = cmap_find(&dp->poll_threads, hash_int(core_id, 0)); - ovs_assert(pnode); + if (!pnode) { + return NULL; + } pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node); - return pmd; + return dp_netdev_pmd_try_ref(pmd) ? pmd : NULL; } /* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */ @@ -2471,6 +2518,53 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp) OVS_NUMA_UNSPEC); } +static bool +dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd) +{ + if (pmd) { + return ovs_refcount_try_ref_rcu(&pmd->ref_cnt); + } + + return false; +} + +static void +dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd) +{ + if (pmd && ovs_refcount_unref(&pmd->ref_cnt) == 1) { + /* Since every reference is refcount'ed, reaching here + * means we are safe to destroy everything. */ + dp_netdev_pmd_flow_flush(pmd); + dpcls_destroy(&pmd->cls); + cmap_destroy(&pmd->flow_table); + ovs_mutex_destroy(&pmd->flow_mutex); + latch_destroy(&pmd->exit_latch); + xpthread_cond_destroy(&pmd->cond); + ovs_mutex_destroy(&pmd->cond_mutex); + free(pmd); + } +} + +/* Given cmap position 'pos', tries to ref the next node. If try_ref() + * fails, keeps checking for next node until reaching the end of cmap. + * + * Caller must unrefs the returned reference. */ +static struct dp_netdev_pmd_thread * +dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos) +{ + struct dp_netdev_pmd_thread *next; + + do { + struct cmap_node *node; + + node = cmap_next_position(&dp->poll_threads, pos); + next = node ? CONTAINER_OF(node, struct dp_netdev_pmd_thread, node) + : NULL; + } while (next && !dp_netdev_pmd_try_ref(next)); + + return next; +} + /* Configures the 'pmd' based on the input argument. */ static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, @@ -2480,10 +2574,15 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, pmd->index = index; pmd->core_id = core_id; pmd->numa_id = numa_id; + + ovs_refcount_init(&pmd->ref_cnt); latch_init(&pmd->exit_latch); atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ); xpthread_cond_init(&pmd->cond, NULL); ovs_mutex_init(&pmd->cond_mutex); + ovs_mutex_init(&pmd->flow_mutex); + dpcls_init(&pmd->cls); + cmap_init(&pmd->flow_table); /* init the 'flow_cache' since there is no * actual thread created for NON_PMD_CORE_ID. */ if (core_id == NON_PMD_CORE_ID) { @@ -2493,13 +2592,13 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, hash_int(core_id, 0)); } -/* Stops the pmd thread, removes it from the 'dp->poll_threads' - * and destroys the struct. */ +/* Stops the pmd thread, removes it from the 'dp->poll_threads', + * and unrefs the struct. */ static void dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd) { /* Uninit the 'flow_cache' since there is - * no actual thread uninit it. */ + * no actual thread uninit it for NON_PMD_CORE_ID. */ if (pmd->core_id == NON_PMD_CORE_ID) { emc_cache_uninit(&pmd->flow_cache); } else { @@ -2509,10 +2608,7 @@ dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd) xpthread_join(pmd->thread, NULL); } cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0)); - latch_destroy(&pmd->exit_latch); - xpthread_cond_destroy(&pmd->cond); - ovs_mutex_destroy(&pmd->cond_mutex); - free(pmd); + dp_netdev_pmd_unref(pmd); } /* Destroys all pmd threads. */ @@ -2584,14 +2680,6 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) } -static void * -dp_netdev_flow_stats_new_cb(void) -{ - struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket); - ovs_mutex_init(&bucket->mutex); - return bucket; -} - /* Called after pmd threads config change. Restarts pmd threads with * new configuration. */ static void @@ -2615,53 +2703,35 @@ dpif_netdev_get_datapath_version(void) } static void -dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, - int cnt, int size, +dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, int cnt, int size, uint16_t tcp_flags) { long long int now = time_msec(); - struct dp_netdev_flow_stats *bucket; - - bucket = ovsthread_stats_bucket_get(&netdev_flow->stats, - dp_netdev_flow_stats_new_cb); - - ovs_mutex_lock(&bucket->mutex); - bucket->used = MAX(now, bucket->used); - bucket->packet_count += cnt; - bucket->byte_count += size; - bucket->tcp_flags |= tcp_flags; - ovs_mutex_unlock(&bucket->mutex); -} -static void * -dp_netdev_stats_new_cb(void) -{ - struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket); - ovs_mutex_init(&bucket->mutex); - return bucket; + netdev_flow->stats.used = MAX(now, netdev_flow->stats.used); + netdev_flow->stats.packet_count += cnt; + netdev_flow->stats.byte_count += size; + netdev_flow->stats.tcp_flags |= tcp_flags; } static void -dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type, int cnt) +dp_netdev_count_packet(struct dp_netdev_pmd_thread *pmd, + enum dp_stat_type type, int cnt) { - struct dp_netdev_stats *bucket; - - bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb); - ovs_mutex_lock(&bucket->mutex); - bucket->n[type] += cnt; - ovs_mutex_unlock(&bucket->mutex); + pmd->stats.n[type] += cnt; } static int -dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_, +dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dpif_packet *packet_, struct flow *flow, struct flow_wildcards *wc, ovs_u128 *ufid, enum dpif_upcall_type type, const struct nlattr *userdata, struct ofpbuf *actions, struct ofpbuf *put_actions) { + struct dp_netdev *dp = pmd->dp; struct ofpbuf *packet = &packet_->ofpbuf; if (type == DPIF_UC_MISS) { - dp_netdev_count_packet(dp, DP_STAT_MISS, 1); + dp_netdev_count_packet(pmd, DP_STAT_MISS, 1); } if (OVS_UNLIKELY(!dp->upcall_cb)) { @@ -2690,8 +2760,8 @@ dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_, ds_destroy(&ds); } - return dp->upcall_cb(packet, flow, ufid, type, userdata, actions, wc, - put_actions, dp->upcall_aux); + return dp->upcall_cb(packet, flow, ufid, pmd->core_id, type, userdata, + actions, wc, put_actions, dp->upcall_aux); } static inline uint32_t @@ -2752,7 +2822,7 @@ packet_batch_execute(struct packet_batch *batch, dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true, actions->actions, actions->size); - dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count); + dp_netdev_count_packet(pmd, DP_STAT_HIT, batch->packet_count); } static inline bool @@ -2871,7 +2941,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd, /* Key length is needed in all the cases, hash computed on demand. */ keys[i].len = netdev_flow_key_size(count_1bits(keys[i].mf.map)); } - any_miss = !dpcls_lookup(&dp->cls, keys, rules, cnt); + any_miss = !dpcls_lookup(&pmd->cls, keys, rules, cnt); if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { uint64_t actions_stub[512 / 8], slow_stub[512 / 8]; struct ofpbuf actions, put_actions; @@ -2893,7 +2963,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd, /* It's possible that an earlier slow path execution installed * a rule covering this flow. In this case, it's a lot cheaper * to catch it here than execute a miss. */ - netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]); + netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]); if (netdev_flow) { rules[i] = &netdev_flow->cr; continue; @@ -2905,7 +2975,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd, ofpbuf_clear(&put_actions); dpif_flow_hash(dp->dpif, &match.flow, sizeof match.flow, &ufid); - error = dp_netdev_upcall(dp, packets[i], &match.flow, &match.wc, + error = dp_netdev_upcall(pmd, packets[i], &match.flow, &match.wc, &ufid, DPIF_UC_MISS, NULL, &actions, &put_actions); if (OVS_UNLIKELY(error && error != ENOSPC)) { @@ -2930,14 +3000,14 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd, * mutex lock outside the loop, but that's an awful long time * to be locking everyone out of making flow installs. If we * move to a per-core classifier, it would be reasonable. */ - ovs_mutex_lock(&dp->flow_mutex); - netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]); + ovs_mutex_lock(&pmd->flow_mutex); + netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]); if (OVS_LIKELY(!netdev_flow)) { - netdev_flow = dp_netdev_flow_add(dp, &match, &ufid, + netdev_flow = dp_netdev_flow_add(pmd, &match, &ufid, ofpbuf_data(add_actions), ofpbuf_size(add_actions)); } - ovs_mutex_unlock(&dp->flow_mutex); + ovs_mutex_unlock(&pmd->flow_mutex); emc_insert(flow_cache, &keys[i], netdev_flow); } @@ -2956,7 +3026,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd, } } - dp_netdev_count_packet(dp, DP_STAT_LOST, dropped_cnt); + dp_netdev_count_packet(pmd, DP_STAT_LOST, dropped_cnt); } n_batches = 0; @@ -3147,7 +3217,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, flow_extract(&packets[i]->ofpbuf, &packets[i]->md, &flow); dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid); - error = dp_netdev_upcall(dp, packets[i], &flow, NULL, &ufid, + error = dp_netdev_upcall(pmd, packets[i], &flow, NULL, &ufid, DPIF_UC_ACTION, userdata,&actions, NULL); if (!error || error == ENOSPC) { diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c index 3545290..15df30d 100644 --- a/lib/dpif-netlink.c +++ b/lib/dpif-netlink.c @@ -1452,6 +1452,7 @@ dpif_netlink_flow_to_dpif_flow(struct dpif *dpif, struct dpif_flow *dpif_flow, dpif_flow->ufid_present = datapath_flow->ufid_present; if (datapath_flow->ufid_present) { dpif_flow->ufid = datapath_flow->ufid; + dpif_flow->poller_id = POLLER_ID_NULL; } else { ovs_assert(datapath_flow->key && datapath_flow->key_len); dpif_flow_hash(dpif, datapath_flow->key, datapath_flow->key_len, @@ -1763,7 +1764,7 @@ dpif_netlink_check_ufid__(struct dpif *dpif_) dpif_flow_hash(dpif_, ofpbuf_data(&key), ofpbuf_size(&key), &ufid); error = dpif_flow_put(dpif_, DPIF_FP_CREATE | DPIF_FP_PROBE, ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0, NULL, - 0, &ufid, NULL); + 0, &ufid, POLLER_ID_NULL, NULL); if (error && error != EEXIST) { VLOG_WARN("%s: UFID feature probe failed (%s).", diff --git a/lib/dpif.c b/lib/dpif.c index 50a7cc1..b729573 100644 --- a/lib/dpif.c +++ b/lib/dpif.c @@ -878,7 +878,7 @@ dpif_get_enable_ufid(struct dpif *dpif) int dpif_flow_get(struct dpif *dpif, const struct nlattr *key, size_t key_len, const ovs_u128 *ufid, - struct ofpbuf *buf, struct dpif_flow *flow) + const int poller_id, struct ofpbuf *buf, struct dpif_flow *flow) { struct dpif_op *opp; struct dpif_op op; @@ -887,6 +887,7 @@ dpif_flow_get(struct dpif *dpif, op.u.flow_get.key = key; op.u.flow_get.key_len = key_len; op.u.flow_get.ufid = ufid; + op.u.flow_get.poller_id = poller_id; op.u.flow_get.buffer = buf; memset(flow, 0, sizeof *flow); @@ -907,7 +908,8 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags, const struct nlattr *key, size_t key_len, const struct nlattr *mask, size_t mask_len, const struct nlattr *actions, size_t actions_len, - const ovs_u128 *ufid, struct dpif_flow_stats *stats) + const ovs_u128 *ufid, const int poller_id, + struct dpif_flow_stats *stats) { struct dpif_op *opp; struct dpif_op op; @@ -921,6 +923,7 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags, op.u.flow_put.actions = actions; op.u.flow_put.actions_len = actions_len; op.u.flow_put.ufid = ufid; + op.u.flow_put.poller_id = poller_id; op.u.flow_put.stats = stats; opp = &op; @@ -933,7 +936,7 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags, int dpif_flow_del(struct dpif *dpif, const struct nlattr *key, size_t key_len, const ovs_u128 *ufid, - struct dpif_flow_stats *stats) + const int poller_id, struct dpif_flow_stats *stats) { struct dpif_op *opp; struct dpif_op op; @@ -942,6 +945,7 @@ dpif_flow_del(struct dpif *dpif, op.u.flow_del.key = key; op.u.flow_del.key_len = key_len; op.u.flow_del.ufid = ufid; + op.u.flow_del.poller_id = poller_id; op.u.flow_del.stats = stats; opp = &op; diff --git a/lib/dpif.h b/lib/dpif.h index 527cedc..c069c1c 100644 --- a/lib/dpif.h +++ b/lib/dpif.h @@ -391,6 +391,7 @@ #include "netdev.h" #include "ofpbuf.h" #include "openflow/openflow.h" +#include "ovs-numa.h" #include "packets.h" #include "util.h" @@ -523,14 +524,15 @@ int dpif_flow_put(struct dpif *, enum dpif_flow_put_flags, const struct nlattr *key, size_t key_len, const struct nlattr *mask, size_t mask_len, const struct nlattr *actions, size_t actions_len, - const ovs_u128 *ufid, struct dpif_flow_stats *); - + const ovs_u128 *ufid, const int poller_id, + struct dpif_flow_stats *); int dpif_flow_del(struct dpif *, const struct nlattr *key, size_t key_len, - const ovs_u128 *ufid, struct dpif_flow_stats *); + const ovs_u128 *ufid, const int poller_id, + struct dpif_flow_stats *); int dpif_flow_get(struct dpif *, const struct nlattr *key, size_t key_len, - const ovs_u128 *ufid, + const ovs_u128 *ufid, const int poller_id, struct ofpbuf *, struct dpif_flow *); /* Flow dumping interface @@ -568,6 +570,8 @@ struct dpif_flow_dump_thread *dpif_flow_dump_thread_create( struct dpif_flow_dump *); void dpif_flow_dump_thread_destroy(struct dpif_flow_dump_thread *); +#define POLLER_ID_NULL OVS_CORE_UNSPEC + /* A datapath flow as dumped by dpif_flow_dump_next(). */ struct dpif_flow { const struct nlattr *key; /* Flow key, as OVS_KEY_ATTR_* attrs. */ @@ -578,6 +582,7 @@ struct dpif_flow { size_t actions_len; /* 'actions' length in bytes. */ ovs_u128 ufid; /* Unique flow identifier. */ bool ufid_present; /* True if 'ufid' was provided by datapath.*/ + int poller_id; /* Datapath polling thread id. */ struct dpif_flow_stats stats; /* Flow statistics. */ }; int dpif_flow_dump_next(struct dpif_flow_dump_thread *, @@ -619,6 +624,10 @@ enum dpif_op_type { * * If the operation succeeds, then 'stats', if nonnull, will be set to the * flow's statistics before the update. + * + * - If the datapath implements multiple polling thread with its own flow + * table, 'poller_id' should be used to specify the particular polling + * thread for the operation. */ struct dpif_flow_put { /* Input. */ @@ -630,6 +639,7 @@ struct dpif_flow_put { const struct nlattr *actions; /* Actions to perform on flow. */ size_t actions_len; /* Length of 'actions' in bytes. */ const ovs_u128 *ufid; /* Optional unique flow identifier. */ + int poller_id; /* Datapath polling thread id. */ /* Output. */ struct dpif_flow_stats *stats; /* Optional flow statistics. */ @@ -647,6 +657,10 @@ struct dpif_flow_put { * Callers should always provide the 'key' to improve dpif logging in the event * of errors or unexpected behaviour. * + * If the datapath implements multiple polling thread with its own flow table, + * 'poller_id' should be used to specify the particular polling thread for the + * operation. + * * If the operation succeeds, then 'stats', if nonnull, will be set to the * flow's statistics before its deletion. */ struct dpif_flow_del { @@ -654,6 +668,7 @@ struct dpif_flow_del { const struct nlattr *key; /* Flow to delete. */ size_t key_len; /* Length of 'key' in bytes. */ const ovs_u128 *ufid; /* Unique identifier of flow to delete. */ + int poller_id; /* Datapath polling thread id. */ /* Output. */ struct dpif_flow_stats *stats; /* Optional flow statistics. */ @@ -703,6 +718,10 @@ struct dpif_execute { * Callers should always provide 'key' to improve dpif logging in the event of * errors or unexpected behaviour. * + * If the datapath implements multiple polling thread with its own flow table, + * 'poller_id' should be used to specify the particular polling thread for the + * operation. + * * Succeeds with status 0 if the flow is fetched, or fails with ENOENT if no * such flow exists. Other failures are indicated with a positive errno value. */ @@ -711,6 +730,7 @@ struct dpif_flow_get { const struct nlattr *key; /* Flow to get. */ size_t key_len; /* Length of 'key' in bytes. */ const ovs_u128 *ufid; /* Unique identifier of flow to get. */ + int poller_id; /* Datapath polling thread id. */ struct ofpbuf *buffer; /* Storage for output parameters. */ /* Output. */ @@ -767,8 +787,9 @@ struct dpif_upcall { /* A callback to process an upcall, currently implemented only by dpif-netdev. * * The caller provides the 'packet' and 'flow' to process, the corresponding - * 'ufid' as generated by dpif_flow_hash(), the 'type' of the upcall, and if - * 'type' is DPIF_UC_ACTION then the 'userdata' attached to the action. + * 'ufid' as generated by dpif_flow_hash(), the polling thread id 'poller_id', + * the 'type' of the upcall, and if 'type' is DPIF_UC_ACTION then the + * 'userdata' attached to the action. * * The callback must fill in 'actions' with the datapath actions to apply to * 'packet'. 'wc' and 'put_actions' will either be both null or both nonnull. @@ -784,6 +805,7 @@ struct dpif_upcall { typedef int upcall_callback(const struct ofpbuf *packet, const struct flow *flow, ovs_u128 *ufid, + int poller_id, enum dpif_upcall_type type, const struct nlattr *userdata, struct ofpbuf *actions, diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index 38e1aff..deaf667 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -158,6 +158,7 @@ struct upcall { * may be used with other datapaths. */ const struct flow *flow; /* Parsed representation of the packet. */ const ovs_u128 *ufid; /* Unique identifier for 'flow'. */ + int poller_id; /* Datapath polling thread id. */ const struct ofpbuf *packet; /* Packet associated with this upcall. */ ofp_port_t in_port; /* OpenFlow in port, or OFPP_NONE. */ @@ -211,6 +212,7 @@ struct udpif_key { ovs_u128 ufid; /* Unique flow identifier. */ bool ufid_present; /* True if 'ufid' is in datapath. */ uint32_t hash; /* Pre-computed hash for 'key'. */ + int poller_id; /* Datapath polling thread id. */ struct ovs_mutex mutex; /* Guards the following. */ struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/ @@ -288,7 +290,7 @@ static enum upcall_type classify_upcall(enum dpif_upcall_type type, static int upcall_receive(struct upcall *, const struct dpif_backer *, const struct ofpbuf *packet, enum dpif_upcall_type, const struct nlattr *userdata, const struct flow *, - const ovs_u128 *ufid); + const ovs_u128 *ufid, const int poller_id); static void upcall_uninit(struct upcall *); static upcall_callback upcall_cb; @@ -650,7 +652,7 @@ recv_upcalls(struct handler *handler) error = upcall_receive(upcall, udpif->backer, &dupcall->packet, dupcall->type, dupcall->userdata, flow, - &dupcall->ufid); + &dupcall->ufid, POLLER_ID_NULL); if (error) { if (error == ENODEV) { /* Received packet on datapath port for which we couldn't @@ -659,7 +661,7 @@ recv_upcalls(struct handler *handler) * message in case it happens frequently. */ dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key, dupcall->key_len, NULL, 0, NULL, 0, - &dupcall->ufid, NULL); + &dupcall->ufid, POLLER_ID_NULL, NULL); VLOG_INFO_RL(&rl, "received packet on unassociated datapath " "port %"PRIu32, flow->in_port.odp_port); } @@ -880,7 +882,7 @@ static int upcall_receive(struct upcall *upcall, const struct dpif_backer *backer, const struct ofpbuf *packet, enum dpif_upcall_type type, const struct nlattr *userdata, const struct flow *flow, - const ovs_u128 *ufid) + const ovs_u128 *ufid, const int poller_id) { int error; @@ -893,6 +895,7 @@ upcall_receive(struct upcall *upcall, const struct dpif_backer *backer, upcall->flow = flow; upcall->packet = packet; upcall->ufid = ufid; + upcall->poller_id = poller_id; upcall->type = type; upcall->userdata = userdata; ofpbuf_init(&upcall->put_actions, 0); @@ -994,9 +997,9 @@ upcall_uninit(struct upcall *upcall) static int upcall_cb(const struct ofpbuf *packet, const struct flow *flow, ovs_u128 *ufid, - enum dpif_upcall_type type, const struct nlattr *userdata, - struct ofpbuf *actions, struct flow_wildcards *wc, - struct ofpbuf *put_actions, void *aux) + int poller_id, enum dpif_upcall_type type, + const struct nlattr *userdata, struct ofpbuf *actions, + struct flow_wildcards *wc, struct ofpbuf *put_actions, void *aux) { struct udpif *udpif = aux; unsigned int flow_limit; @@ -1008,7 +1011,7 @@ upcall_cb(const struct ofpbuf *packet, const struct flow *flow, ovs_u128 *ufid, atomic_read_relaxed(&udpif->flow_limit, &flow_limit); error = upcall_receive(&upcall, udpif->backer, packet, type, userdata, - flow, ufid); + flow, ufid, poller_id); if (error) { return error; } @@ -1257,7 +1260,7 @@ static struct udpif_key * ukey_create__(const struct nlattr *key, size_t key_len, const struct nlattr *mask, size_t mask_len, bool ufid_present, const ovs_u128 *ufid, - const struct ofpbuf *actions, + const int poller_id, const struct ofpbuf *actions, uint64_t dump_seq, uint64_t reval_seq, long long int used) OVS_NO_THREAD_SAFETY_ANALYSIS { @@ -1271,6 +1274,7 @@ ukey_create__(const struct nlattr *key, size_t key_len, ukey->mask_len = mask_len; ukey->ufid_present = ufid_present; ukey->ufid = *ufid; + ukey->poller_id = poller_id; ukey->hash = get_ufid_hash(&ukey->ufid); ukey->actions = ofpbuf_clone(actions); @@ -1316,8 +1320,9 @@ ukey_create_from_upcall(const struct upcall *upcall) return ukey_create__(ofpbuf_data(&keybuf), ofpbuf_size(&keybuf), ofpbuf_data(&maskbuf), ofpbuf_size(&maskbuf), - true, upcall->ufid, &upcall->put_actions, - upcall->dump_seq, upcall->reval_seq, 0); + true, upcall->ufid, upcall->poller_id, + &upcall->put_actions, upcall->dump_seq, + upcall->reval_seq, 0); } static int @@ -1336,8 +1341,8 @@ ukey_create_from_dpif_flow(const struct udpif *udpif, /* If the key was not provided by the datapath, fetch the full flow. */ ofpbuf_use_stack(&buf, &stub, sizeof stub); - err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->ufid, &buf, - &full_flow); + err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->ufid, + flow->poller_id, &buf, &full_flow); if (err) { return err; } @@ -1348,8 +1353,9 @@ ukey_create_from_dpif_flow(const struct udpif *udpif, ofpbuf_use_const(&actions, &flow->actions, flow->actions_len); *ukey = ukey_create__(flow->key, flow->key_len, flow->mask, flow->mask_len, flow->ufid_present, - &flow->ufid, &actions, dump_seq, reval_seq, - flow->stats.used); + &flow->ufid, flow->poller_id, &actions, dump_seq, + reval_seq, flow->stats.used); + return 0; } @@ -1679,6 +1685,7 @@ delete_op_init__(struct ukey_op *op, const struct dpif_flow *flow) op->dop.u.flow_del.key = flow->key; op->dop.u.flow_del.key_len = flow->key_len; op->dop.u.flow_del.ufid = flow->ufid_present ? &flow->ufid : NULL; + op->dop.u.flow_del.poller_id = flow->poller_id; op->dop.u.flow_del.stats = &op->stats; } @@ -1690,6 +1697,7 @@ delete_op_init(struct ukey_op *op, struct udpif_key *ukey) op->dop.u.flow_del.key = ukey->key; op->dop.u.flow_del.key_len = ukey->key_len; op->dop.u.flow_del.ufid = ukey->ufid_present ? &ukey->ufid : NULL; + op->dop.u.flow_del.poller_id = ukey->poller_id; op->dop.u.flow_del.stats = &op->stats; } diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index 5b3e64c..760a157 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -1021,7 +1021,7 @@ check_recirc(struct dpif_backer *backer) error = dpif_flow_put(backer->dpif, DPIF_FP_CREATE | DPIF_FP_PROBE, ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0, NULL, - 0, NULL, NULL); + 0, NULL, POLLER_ID_NULL, NULL); if (error && error != EEXIST) { if (error != EINVAL) { VLOG_WARN("%s: Reciculation flow probe failed (%s)", @@ -1031,7 +1031,7 @@ check_recirc(struct dpif_backer *backer) } error = dpif_flow_del(backer->dpif, ofpbuf_data(&key), ofpbuf_size(&key), - NULL, NULL); + NULL, POLLER_ID_NULL, NULL); if (error) { VLOG_WARN("%s: failed to delete recirculation feature probe flow", dpif_name(backer->dpif)); @@ -1150,7 +1150,7 @@ check_max_mpls_depth(struct dpif_backer *backer) error = dpif_flow_put(backer->dpif, DPIF_FP_CREATE | DPIF_FP_PROBE, ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0, - NULL, 0, NULL, NULL); + NULL, 0, NULL, POLLER_ID_NULL, NULL); if (error && error != EEXIST) { if (error != EINVAL) { VLOG_WARN("%s: MPLS stack length feature probe failed (%s)", @@ -1160,7 +1160,7 @@ check_max_mpls_depth(struct dpif_backer *backer) } error = dpif_flow_del(backer->dpif, ofpbuf_data(&key), - ofpbuf_size(&key), NULL, NULL); + ofpbuf_size(&key), NULL, POLLER_ID_NULL, NULL); if (error) { VLOG_WARN("%s: failed to delete MPLS feature probe flow", dpif_name(backer->dpif)); diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at index fbf35c6..4cbe9a8 100644 --- a/tests/ofproto-dpif.at +++ b/tests/ofproto-dpif.at @@ -3509,6 +3509,7 @@ for type in no first later; do done AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl +flow-dump from non-dpdk interfaces: recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),5 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:0, bytes:0, used:never, actions:6 @@ -3527,6 +3528,7 @@ for type in no first later; do done AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl +flow-dump from non-dpdk interfaces: recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(frag=first), packets:0, bytes:0, used:never, actions:drop recirc_id(0),in_port(90),eth_type(0x0800),ipv4(frag=later), packets:0, bytes:0, used:never, actions:drop @@ -3545,6 +3547,7 @@ for type in no first later; do done AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl +flow-dump from non-dpdk interfaces: recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),2 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:0, bytes:0, used:never, actions:6 @@ -3616,6 +3619,7 @@ for frag in 4000 6000 6008 4010; do done AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl +flow-dump from non-dpdk interfaces: recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=33419), packets:0, bytes:0, used:never, actions:set(tcp(src=33322)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=33419), packets:0, bytes:0, used:never, actions:set(tcp(src=33322)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:74, used:0.001s, actions:1 @@ -3631,6 +3635,7 @@ for frag in 4000 6000 6008 4010; do done AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl +flow-dump from non-dpdk interfaces: recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:60, used:0.001s, actions:1 @@ -3646,6 +3651,7 @@ for frag in 4000 6000 6001 4002; do done AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl +flow-dump from non-dpdk interfaces: recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1 recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:60, used:0.001s, actions:1 -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev