Since lookups in the classifier can be pretty expensive, we introduce this (thread local) cache which simply compares the miniflows of the packets
Signed-off-by: Daniele Di Proietto <ddiproie...@vmware.com> --- lib/dpif-netdev.c | 422 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 357 insertions(+), 65 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index c77fbce..9c33122 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -87,6 +87,59 @@ static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex) static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600); +/* Stores a miniflow */ + +/* There are fields in the flow structure that we never use. Therefore we can + * save a few words of memory */ +#define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S - MINI_N_INLINE \ + - FLOW_U32_SIZE(regs) \ + - FLOW_U32_SIZE(metadata) \ + ) +struct netdev_flow_key { + struct miniflow flow; + uint32_t buf[NETDEV_KEY_BUF_SIZE_U32]; +}; + +/* Exact match cache for frequently used flows + * + * The cache uses a 32-bit hash of the packet (which can be the RSS hash) to + * search its entries for a miniflow that matches exactly the miniflow of the + * packet. It stores the 'cls_rule'(rule) that matches the miniflow. + * + * A cache entry holds a reference to its 'dp_netdev_flow'. + * + * A miniflow with a given hash can be in one of EM_FLOW_HASH_SEGS different + * entries. Given its hash (h), the miniflow can be in the entries whose index + * is: + * + * h & EM_FLOW_HASH_MASK + * h >> EM_FLOW_HASH_SHIFT & EM_FLOW_HASH_MASK + * h >> 2 * EM_FLOW_HASH_SHIFT & EM_FLOW_HASH_MASK + * h >> 3 * EM_FLOW_HASH_SHIFT & EM_FLOW_HASH_MASK + * ... + * + * Thread-safety + * ============= + * + * Each pmd_thread has its own private exact match cache. + * If dp_netdev_input is not called from a pmd thread, a mutex is used. + */ + +#define EM_FLOW_HASH_SHIFT 10 +#define EM_FLOW_HASH_ENTRIES (1u << EM_FLOW_HASH_SHIFT) +#define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1) +#define EM_FLOW_HASH_SEGS 2 + +struct emc_entry { + struct dp_netdev_flow * flow; + struct netdev_flow_key mf; + uint32_t hash; +}; + +struct emc_cache { + struct emc_entry entries[EM_FLOW_HASH_ENTRIES]; +}; + /* Datapath based on the network device interface from netdev.h. * * @@ -101,6 +154,7 @@ static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600); * dp_netdev_mutex (global) * port_mutex * flow_mutex + * emc_mutex */ struct dp_netdev { const struct dpif_class *const class; @@ -141,6 +195,12 @@ struct dp_netdev { struct pmd_thread *pmd_threads; size_t n_pmd_threads; int pmd_count; + + /* Exact match cache for non-pmd devices. + * Pmd devices use instead each thread's flow_cache for this purpose. + * Protected by emc_mutex */ + struct emc_cache flow_cache OVS_GUARDED; + struct ovs_mutex emc_mutex; }; static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp, @@ -173,20 +233,6 @@ struct dp_netdev_port { char *type; /* Port type as requested by user. */ }; - -/* Stores a miniflow */ - -/* There are fields in the flow structure that we never use. Therefore we can - * save a few words of memory */ -#define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S - MINI_N_INLINE \ - - FLOW_U32_SIZE(regs) \ - - FLOW_U32_SIZE(metadata) \ - ) -struct netdev_flow_key { - struct miniflow flow; - uint32_t buf[NETDEV_KEY_BUF_SIZE_U32]; -}; - /* A flow in dp_netdev's 'flow_table'. * * @@ -225,6 +271,7 @@ struct netdev_flow_key { * requires synchronization, as noted in more detail below. */ struct dp_netdev_flow { + bool dead; /* Packet classification. */ const struct cls_rule cr; /* In owning dp_netdev's 'cls'. */ @@ -248,6 +295,7 @@ struct dp_netdev_flow { }; static void dp_netdev_flow_unref(struct dp_netdev_flow *); +static bool dp_netdev_flow_ref(struct dp_netdev_flow *); /* Contained by struct dp_netdev_flow's 'stats' member. */ struct dp_netdev_flow_stats { @@ -292,6 +340,7 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *); **/ struct pmd_thread { struct dp_netdev *dp; + struct emc_cache flow_cache; pthread_t thread; int id; atomic_uint change_seq; @@ -321,15 +370,41 @@ static int dpif_netdev_open(const struct dpif_class *, const char *name, static void dp_netdev_execute_actions(struct dp_netdev *dp, struct dpif_packet **, int c, bool may_steal, struct pkt_metadata *, + struct emc_cache * flow_cache, const struct nlattr *actions, size_t actions_len); static void dp_netdev_port_input(struct dp_netdev *dp, + struct emc_cache *flow_cache, struct dpif_packet **packets, int cnt, odp_port_t port_no); static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); static void dp_netdev_disable_upcall(struct dp_netdev *); +static void emc_clear_entry(struct emc_entry *ce); + +static void +emc_cache_init(struct emc_cache * flow_cache) +{ + int i; + + for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) { + flow_cache->entries[i].flow = NULL; + miniflow_initialize(&flow_cache->entries[i].mf.flow, + flow_cache->entries[i].mf.buf); + } +} + +static void +emc_cache_uninit(struct emc_cache * flow_cache) +{ + int i; + + for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) { + emc_clear_entry(&flow_cache->entries[i]); + } +} + static struct dpif_netdev * dpif_netdev_cast(const struct dpif *dpif) { @@ -477,6 +552,11 @@ create_dp_netdev(const char *name, const struct dpif_class *class, return error; } + ovs_mutex_init(&dp->emc_mutex); + ovs_mutex_lock(&dp->emc_mutex); + emc_cache_init(&dp->flow_cache); + ovs_mutex_unlock(&dp->emc_mutex); + *dpp = dp; return 0; } @@ -541,6 +621,12 @@ dp_netdev_free(struct dp_netdev *dp) cmap_destroy(&dp->ports); fat_rwlock_destroy(&dp->upcall_rwlock); latch_destroy(&dp->exit_latch); + + ovs_mutex_lock(&dp->emc_mutex); + emc_cache_uninit(&dp->flow_cache); + ovs_mutex_unlock(&dp->emc_mutex); + ovs_mutex_destroy(&dp->emc_mutex); + free(CONST_CAST(char *, dp->name)); free(dp); } @@ -916,6 +1002,7 @@ dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) classifier_remove(&dp->cls, cr); cmap_remove(&dp->flow_table, node, flow_hash(&flow->flow, 0)); + flow->dead = true; dp_netdev_flow_unref(flow); } @@ -1023,6 +1110,118 @@ dp_netdev_flow_cast(const struct cls_rule *cr) return cr ? CONTAINER_OF(cr, struct dp_netdev_flow, cr) : NULL; } +static bool dp_netdev_flow_ref(struct dp_netdev_flow *flow) +{ + return ovs_refcount_try_ref_rcu(&flow->ref_cnt); +} + +static inline bool +emc_entry_alive(struct emc_entry *ce) +{ + return ce->flow && !ce->flow->dead; +} + +static void +emc_clear_entry(struct emc_entry *ce) +{ + if (ce->flow) { + dp_netdev_flow_unref(ce->flow); + ce->flow = NULL; + } +} + +static inline void +emc_change_entry(struct emc_entry *ce, struct dp_netdev_flow *flow, + const struct miniflow *mf, uint32_t hash) +{ + if (ce->flow != flow) { + if (ce->flow) { + dp_netdev_flow_unref(ce->flow); + } + + if(dp_netdev_flow_ref(flow)) { + ce->flow = flow; + } else { + ce->flow = NULL; + } + } + if (mf) { + miniflow_clone_inline(&ce->mf.flow, mf, count_1bits(mf->map)); + ce->hash = hash; + } +} + +static inline void +emc_insert(struct emc_cache *cache, const struct miniflow * mf, uint32_t hash, + struct dp_netdev_flow *flow) +{ + struct emc_entry *to_be_replaced = NULL; + uint32_t search_hash = hash; + int i; + + /* Each flow can be in one of EM_FLOW_HASH_SEGS entries */ + for (i = 0; i < EM_FLOW_HASH_SEGS; i++) { + struct emc_entry *current_entry; + int current_idx; + + current_idx = search_hash & EM_FLOW_HASH_MASK; + current_entry = &cache->entries[current_idx]; + + if (emc_entry_alive(current_entry)) { + if (current_entry->hash == hash + && miniflow_equal(¤t_entry->mf.flow, mf)) { + /* We found the entry with the 'mf' miniflow */ + emc_change_entry(current_entry, flow, NULL, 0); + + return; + } + } + + /* Replacement policy: put the flow in an empty (not alive) entry, or + * in the first entry where it can be */ + if (!to_be_replaced + || (emc_entry_alive(to_be_replaced) + && !emc_entry_alive(current_entry))) { + to_be_replaced = current_entry; + } + + search_hash >>= EM_FLOW_HASH_SHIFT; + } + /* We didn't find the miniflow in the cache. + * The 'to_be_replaced' entry is where the new flow will be stored */ + + emc_change_entry(to_be_replaced, flow, mf, hash); +} + +static inline struct dp_netdev_flow * +emc_lookup(struct emc_cache *cache, const struct miniflow * mf, + uint32_t hash) +{ + uint32_t search_hash = hash; + int i; + + /* Each flow can be in one of EM_FLOW_HASH_SEGS entries */ + for (i = 0; i < EM_FLOW_HASH_SEGS; i++) { + struct emc_entry *current_entry; + int current_idx; + + current_idx = search_hash & EM_FLOW_HASH_MASK; + current_entry = &cache->entries[current_idx]; + + if (emc_entry_alive(current_entry)) { + if (current_entry->hash == hash + && miniflow_equal(¤t_entry->mf.flow, mf)) { + /* We found the entry with the 'mf' miniflow */ + return current_entry->flow; + } + } + + search_hash >>= EM_FLOW_HASH_SHIFT; + } + + return NULL; +} + static struct dp_netdev_flow * dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct miniflow *key) { @@ -1514,8 +1713,11 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) packet.ofpbuf = *execute->packet; pp = &packet; + ovs_mutex_lock(&dp->emc_mutex); dp_netdev_execute_actions(dp, &pp, 1, false, md, - execute->actions, execute->actions_len); + &dp->flow_cache, execute->actions, + execute->actions_len); + ovs_mutex_unlock(&dp->emc_mutex); /* Even though may_steal is set to false, some actions could modify or * reallocate the ofpbuf memory. We need to pass those changes to the @@ -1593,15 +1795,16 @@ dp_netdev_actions_free(struct dp_netdev_actions *actions) static void dp_netdev_process_rxq_port(struct dp_netdev *dp, - struct dp_netdev_port *port, - struct netdev_rxq *rxq) + struct emc_cache *flow_cache, + struct dp_netdev_port *port, + struct netdev_rxq *rxq) { struct dpif_packet *packets[NETDEV_MAX_RX_BATCH]; int error, cnt; error = netdev_rxq_recv(rxq, packets, &cnt); if (!error) { - dp_netdev_port_input(dp, packets, cnt, port->port_no); + dp_netdev_port_input(dp, flow_cache, packets, cnt, port->port_no); } else if (error != EAGAIN && error != EOPNOTSUPP) { static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); @@ -1618,15 +1821,18 @@ dpif_netdev_run(struct dpif *dpif) struct dp_netdev_port *port; struct dp_netdev *dp = get_dp_netdev(dpif); + ovs_mutex_lock(&dp->emc_mutex); CMAP_FOR_EACH (port, node, &dp->ports) { if (!netdev_is_pmd(port->netdev)) { int i; for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - dp_netdev_process_rxq_port(dp, port, port->rxq[i]); + dp_netdev_process_rxq_port(dp, &dp->flow_cache, port, + port->rxq[i]); } } } + ovs_mutex_unlock(&dp->emc_mutex); } static void @@ -1708,6 +1914,7 @@ pmd_thread_main(void *f_) poll_cnt = 0; poll_list = NULL; + emc_cache_init(&f->flow_cache); pmd_thread_setaffinity_cpu(f->id); reload: poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt); @@ -1718,7 +1925,8 @@ reload: int i; for (i = 0; i < poll_cnt; i++) { - dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx); + dp_netdev_process_rxq_port(dp, &f->flow_cache, poll_list[i].port, + poll_list[i].rx); } if (lc++ > 1024) { @@ -1743,6 +1951,8 @@ reload: port_unref(poll_list[i].port); } + emc_cache_uninit(&f->flow_cache); + free(poll_list); return NULL; } @@ -1935,8 +2145,8 @@ struct packet_batch { }; static inline void -packet_batch_update(struct packet_batch *batch, - struct dpif_packet *packet, const struct miniflow *mf) +packet_batch_update(struct packet_batch *batch, struct dpif_packet *packet, + const struct miniflow *mf) { batch->tcp_flags |= miniflow_get_tcp_flags(mf); batch->packets[batch->packet_count++] = packet; @@ -1956,7 +2166,8 @@ packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow, } static inline void -packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp) +packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp, + struct emc_cache *flow_cache) { struct dp_netdev_actions *actions; struct dp_netdev_flow *flow = batch->flow; @@ -1966,36 +2177,114 @@ packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp) actions = dp_netdev_flow_get_actions(flow); - dp_netdev_execute_actions(dp, batch->packets, - batch->packet_count, true, &batch->md, + dp_netdev_execute_actions(dp, batch->packets, batch->packet_count, true, + &batch->md, flow_cache, actions->actions, actions->size); dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count); } -static void -dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, - struct pkt_metadata *md) +static inline bool +dp_netdev_queue_batches(struct dpif_packet *pkt, struct pkt_metadata *md, + struct dp_netdev_flow *flow, const struct miniflow *mf, + struct packet_batch *batches, size_t *n_batches, + size_t max_batches) +{ + struct packet_batch *batch = NULL; + int j; + + /* XXX: This O(n^2) algortihm makes sense if we're operating under the + * assumption that the number of distinct flows (and therefore the + * number of distinct batches) is quite small. If this turns out not + * to be the case, it may make sense to pre sort based on the + * netdev_flow pointer. That done we can get the appropriate batching + * in O(n * log(n)) instead. */ + for (j = *n_batches - 1; j >= 0; j--) { + if (batches[j].flow == flow) { + batch = &batches[j]; + } + } + if (!batch && *n_batches >= max_batches) { + return false; + } + + if (!batch) { + batch = &batches[(*n_batches)++]; + packet_batch_init(batch, flow, md); + } + packet_batch_update(batch, pkt, mf); + return true; +} + +/* Try to process all ('cnt') the 'packets' using only the exact match cache + * 'flow_cache'. If a flow is not found for a packet 'packets[i]', or if there + * is no matching batch for a packet's flow, the miniflow is copied into 'keys' + * and the packet pointer is moved at the beginning of the 'packets' array. + * + * The function returns the number of packets that needs to be processed in the + * 'packets' array (they have been moved to the beginning of the vector). + */ +static inline size_t +emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, + struct dpif_packet **packets, size_t cnt, + struct pkt_metadata *md, struct netdev_flow_key *keys) { - struct packet_batch batches[NETDEV_MAX_RX_BATCH]; - struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH]; - const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */ - struct cls_rule *rules[NETDEV_MAX_RX_BATCH]; + struct packet_batch batches[4]; size_t n_batches, i; - bool any_miss; + size_t notfound_cnt = 0; + n_batches = 0; for (i = 0; i < cnt; i++) { + struct netdev_flow_key key; + struct dp_netdev_flow *flow; + uint32_t hash; + if (OVS_UNLIKELY(ofpbuf_size(&packets[i]->ofpbuf) < ETH_HEADER_LEN)) { dpif_packet_delete(packets[i]); - mfs[i] = NULL; continue; } - miniflow_initialize(&keys[i].flow, keys[i].buf); - miniflow_extract(&packets[i]->ofpbuf, md, &keys[i].flow); - mfs[i] = &keys[i].flow; + miniflow_initialize(&key.flow, key.buf); + miniflow_extract(&packets[i]->ofpbuf, md, &key.flow); + + hash = dp_netdev_packet_rss(packets[i], &key.flow); + + flow = emc_lookup(flow_cache, &key.flow, hash); + if (!flow || !dp_netdev_queue_batches(packets[i], md, flow, &key.flow, + batches, &n_batches, ARRAY_SIZE(batches))) { + struct dpif_packet *swap; + + keys[notfound_cnt] = key; + + swap = packets[notfound_cnt]; + packets[notfound_cnt] = packets[i]; + packets[i] = swap; + + notfound_cnt++; + } + } + + for (i = 0; i < n_batches; i++) { + packet_batch_execute(&batches[i], dp, flow_cache); } + return notfound_cnt; +} + +static inline void +fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache, + struct dpif_packet **packets, size_t cnt, + struct pkt_metadata *md, struct netdev_flow_key * keys) +{ + struct packet_batch batches[cnt]; + const struct miniflow *mfs[cnt]; /* NULL at bad packets. */ + struct cls_rule *rules[cnt]; + size_t n_batches, i; + bool any_miss; + + for (i = 0; i < cnt; i++) { + mfs[i] = &keys[i].flow; + } any_miss = !classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt); if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { uint64_t actions_stub[512 / 8], slow_stub[512 / 8]; @@ -2039,7 +2328,7 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, * the actions. Otherwise, if there are any slow path actions, * we'll send the packet up twice. */ dp_netdev_execute_actions(dp, &packets[i], 1, false, md, - ofpbuf_data(&actions), + flow_cache, ofpbuf_data(&actions), ofpbuf_size(&actions)); add_actions = ofpbuf_size(&put_actions) @@ -2069,53 +2358,53 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, n_batches = 0; for (i = 0; i < cnt; i++) { struct dp_netdev_flow *flow; - struct packet_batch *batch; - size_t j; + uint32_t hash; if (OVS_UNLIKELY(!rules[i] || !mfs[i])) { continue; } - /* XXX: This O(n^2) algortihm makes sense if we're operating under the - * assumption that the number of distinct flows (and therefore the - * number of distinct batches) is quite small. If this turns out not - * to be the case, it may make sense to pre sort based on the - * netdev_flow pointer. That done we can get the appropriate batching - * in O(n * log(n)) instead. */ - batch = NULL; - flow = dp_netdev_flow_cast(rules[i]); - for (j = 0; j < n_batches; j++) { - if (batches[j].flow == flow) { - batch = &batches[j]; - break; - } - } + hash = dp_netdev_packet_rss(packets[i], mfs[i]); - if (!batch) { - batch = &batches[n_batches++]; - packet_batch_init(batch, flow, md); - } - packet_batch_update(batch, packets[i], mfs[i]); + flow = dp_netdev_flow_cast(rules[i]); + emc_insert(flow_cache, mfs[i], hash, flow); + dp_netdev_queue_batches(packets[i], md, flow, mfs[i], batches, + &n_batches, ARRAY_SIZE(batches)); } for (i = 0; i < n_batches; i++) { - packet_batch_execute(&batches[i], dp); + packet_batch_execute(&batches[i], dp, flow_cache); + } +} + +static void +dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache, + struct dpif_packet **packets, int cnt, struct pkt_metadata *md) +{ + struct netdev_flow_key keys[cnt]; + size_t newcnt; + + newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys); + if (OVS_UNLIKELY(newcnt)) { + fast_path_processing(dp, flow_cache, packets, newcnt, md, keys); } } + static void -dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets, - int cnt, odp_port_t port_no) +dp_netdev_port_input(struct dp_netdev *dp, struct emc_cache *flow_cache, + struct dpif_packet **packets, int cnt, odp_port_t port_no) { uint32_t *recirc_depth = recirc_depth_get(); struct pkt_metadata md = PKT_METADATA_INITIALIZER(port_no); *recirc_depth = 0; - dp_netdev_input(dp, packets, cnt, &md); + dp_netdev_input(dp, flow_cache, packets, cnt, &md); } struct dp_netdev_execute_aux { struct dp_netdev *dp; + struct emc_cache *flow_cache; }; static void @@ -2172,6 +2461,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, NULL); if (!error || error == ENOSPC) { dp_netdev_execute_actions(dp, &packets[i], 1, false, md, + aux->flow_cache, ofpbuf_data(&actions), ofpbuf_size(&actions)); } @@ -2233,7 +2523,8 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt, /* Hash is private to each packet */ recirc_md.dp_hash = packets[i]->dp_hash; - dp_netdev_input(dp, &recirc_pkt, 1, &recirc_md); + dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1, + &recirc_md); } (*depth)--; @@ -2264,9 +2555,10 @@ static void dp_netdev_execute_actions(struct dp_netdev *dp, struct dpif_packet **packets, int cnt, bool may_steal, struct pkt_metadata *md, + struct emc_cache *flow_cache, const struct nlattr *actions, size_t actions_len) { - struct dp_netdev_execute_aux aux = {dp}; + struct dp_netdev_execute_aux aux = {dp, flow_cache}; odp_execute_actions(&aux, packets, cnt, may_steal, md, actions, actions_len, dp_execute_cb); -- 2.0.1 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev