> Why does udpif_set_threads() destroy each revalidator thread's ukeys? > Can't the thread itself do that before it exits?
I actually like that udpif_set_threads() handles the teardown because we can be absolutely sure that there aren't any threads running when it happens. This means we dont' have to worry about locking or anything. At the moment I don't think it really matters for some of the data structures, but I don't want to have to think about it on a case by case basis, especially since this stuff is likely to change. Note that I noticed that the current code isn't totally consistent with this philosophy so I've changed it. > 'key' in udpif_key, and 'key' and 'mask' in udpif_flow_dump, seem to > be unneeded: they always point to the beginning of the corresponding > _buf. Maybe they are convenient enough to keep, though. The code was relatively ugly without them, and I'm planning to get rid of udpif_flow_dump anyway, so I'd like to leave it for now. Here's an incremental which consists of your patch with some of my changes folded in. There are some changes to the core revalidation code which are probably worth reading carefully. Ethan --- NEWS | 4 + ofproto/ofproto-dpif-upcall.c | 172 ++++++++++++++++++++++-------------------- ofproto/ofproto-dpif-upcall.h | 25 ++---- ofproto/ofproto-dpif.c | 14 ++-- vswitchd/vswitch.xml | 26 ++++++- 5 files changed, 129 insertions(+), 112 deletions(-) diff --git a/NEWS b/NEWS index 45b8239..b279572 100644 --- a/NEWS +++ b/NEWS @@ -54,6 +54,10 @@ Post-v2.0.0 - ovsdb-client: * The "monitor" command can now monitor all tables in a database, instead of being limited to a single table. + - The flow-eviction-threshold has been replaced by the flow-limit which is a + hard limit on the number of flows in the datapath. It defaults to 200,000 + flows. OVS automatically adjusts this number depending on network + conditions. v2.0.0 - 15 Oct 2013 diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index fcfb75a..78424fd 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -31,6 +31,7 @@ #include "ofpbuf.h" #include "ofproto-dpif-ipfix.h" #include "ofproto-dpif-sflow.h" +#include "ofproto-dpif-xlate.h" #include "packets.h" #include "poll-loop.h" #include "seq.h" @@ -38,6 +39,7 @@ #include "vlog.h" #define MAX_QUEUE_LENGTH 512 +#define FLOW_MISS_MAX_BATCH 50 #define REVALIDATE_MAX_BATCH 50 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall); @@ -45,8 +47,8 @@ VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall); COVERAGE_DEFINE(upcall_queue_overflow); /* A thread that processes each upcall handed to it by the dispatcher thread, - * forwards the upcall's packet, and then queues it to the main ofproto_dpif - * to possibly set up a kernel flow as a cache. */ + * forwards the upcall's packet, and possibly sets up a kernel flow as a + * cache. */ struct handler { struct udpif *udpif; /* Parent udpif. */ pthread_t thread; /* Thread ID. */ @@ -64,6 +66,9 @@ struct handler { 'mutex'. */ }; +/* A thread that processes each kernel flow handed to it by the flow_dumper + * thread, updates OpenFlow statistics, and updates or removes the kernel flow + * as necessary. */ struct revalidator { struct udpif *udpif; /* Parent udpif. */ char *name; /* Thread name. */ @@ -81,11 +86,14 @@ struct revalidator { /* An upcall handler for ofproto_dpif. * - * udpif is implemented as a "dispatcher" thread that reads upcalls from the - * kernel. It processes each upcall just enough to figure out its next - * destination. For a "miss" upcall (MISS_UPCALL), this is one of several - * "handler" threads (see struct handler). Other upcalls are queued to the - * main ofproto_dpif. */ + * udpif has two logically separate pieces: + * + * - A "dispatcher" thread that reads upcalls from the kernel and dispatches + * them to one of several "handler" threads (see struct handler). + * + * - A "flow_dumper" thread that reads the kernel flow table and dispatches + * flows to one of several "revalidator" threads (see struct + * revalidator). */ struct udpif { struct list list_node; /* In all_udpifs list. */ @@ -146,7 +154,6 @@ struct upcall { struct udpif_key { struct hmap_node hmap_node; /* In parent revalidator 'ukeys' map. */ - struct odputil_keybuf key_buf; /* Memory for 'key'. */ struct nlattr *key; /* Datapath flow key. */ size_t key_len; /* Length of 'key'. */ @@ -154,6 +161,8 @@ struct udpif_key { long long int created; /* Estimation of creation time. */ bool mark; /* Used by mark and sweep GC algorithm. */ + + struct odputil_keybuf key_buf; /* Memory for 'key'. */ }; /* 'udpif_flow_dump's hold the state associated with one iteration in a flow @@ -162,7 +171,6 @@ struct udpif_key { struct udpif_flow_dump { struct list list_node; - struct odputil_keybuf key_buf; struct nlattr *key; /* Datapath flow key. */ size_t key_len; /* Length of 'key'. */ uint32_t key_hash; /* Hash of 'key'. */ @@ -175,7 +183,7 @@ struct udpif_flow_dump { bool need_revalidate; /* Key needs revalidation? */ - struct dpif_op op; /* Op used for deleting 'key'. */ + struct odputil_keybuf key_buf; }; /* Flow miss batching. @@ -222,6 +230,7 @@ static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc, const char *argv[], void *aux); static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc, const char *argv[], void *aux); +static void ukey_delete(struct revalidator *, struct udpif_key *); static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true); @@ -283,13 +292,13 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, latch_set(&udpif->exit_latch); - /* Wake the handlers so they can exit. */ for (i = 0; i < udpif->n_handlers; i++) { struct handler *handler = &udpif->handlers[i]; ovs_mutex_lock(&handler->mutex); xpthread_cond_signal(&handler->wake_cond); ovs_mutex_unlock(&handler->mutex); + xpthread_join(handler->thread, NULL); } for (i = 0; i < udpif->n_revalidators; i++) { @@ -298,6 +307,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, ovs_mutex_lock(&revalidator->mutex); xpthread_cond_signal(&revalidator->wake_cond); ovs_mutex_unlock(&revalidator->mutex); + xpthread_join(revalidator->thread, NULL); } xpthread_join(udpif->flow_dumper, NULL); @@ -308,9 +318,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, struct udpif_flow_dump *udump, *next_udump; struct udpif_key *ukey, *next_ukey; - xpthread_cond_signal(&revalidator->wake_cond); - xpthread_join(revalidator->thread, NULL); - LIST_FOR_EACH_SAFE (udump, next_udump, list_node, &revalidator->udumps) { list_remove(&udump->list_node); @@ -319,10 +326,10 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, HMAP_FOR_EACH_SAFE (ukey, next_ukey, hmap_node, &revalidator->ukeys) { - hmap_remove(&revalidator->ukeys, &ukey->hmap_node); - free(ukey); + ukey_delete(revalidator, ukey); } hmap_destroy(&revalidator->ukeys); + ovs_mutex_destroy(&revalidator->mutex); free(revalidator->name); } @@ -331,14 +338,10 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, struct handler *handler = &udpif->handlers[i]; struct upcall *miss, *next; - xpthread_join(handler->thread, NULL); - - ovs_mutex_lock(&handler->mutex); LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls) { list_remove(&miss->list_node); upcall_destroy(miss); } - ovs_mutex_unlock(&handler->mutex); ovs_mutex_destroy(&handler->mutex); xpthread_cond_destroy(&handler->wake_cond); @@ -432,7 +435,7 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage) ovs_mutex_lock(&revalidator->mutex); simap_increase(usage, "revalidator dumps", revalidator->n_udumps); - /* XXX: This isn't technically thread safe becuase the revalidator + /* XXX: This isn't technically thread safe because the revalidator * ukeys maps isn't protected by a mutex since it's per thread. */ simap_increase(usage, "revalidator keys", hmap_count(&revalidator->ukeys)); @@ -465,7 +468,7 @@ upcall_destroy(struct upcall *upcall) static uint64_t udpif_get_n_flows(const struct udpif *udpif) { - static struct dpif_dp_stats stats; + struct dpif_dp_stats stats; dpif_get_dp_stats(udpif->dpif, &stats); return stats.n_flows; @@ -895,7 +898,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls) atomic_read(&udpif->flow_limit, &flow_limit); may_put = udpif_get_n_flows(udpif) < flow_limit; - /* Extract the flow from each upcall. Construct in misses a hash table + /* Extract the flow from each upcall. Construct in 'misses' a hash table * that maps each unique flow to a 'struct flow_miss'. * * Most commonly there is a single packet per flow_miss, but there are @@ -1046,10 +1049,11 @@ handle_upcalls(struct handler *handler, struct list *upcalls) xin.may_learn = true; if (miss->upcall_type == DPIF_UC_MISS) { - /* For non miss upcalls, there's a flow in the datapath which this - * packet was accounted to. Presumably the revalidators will deal - * with pushing it's stats eventually. */ xin.resubmit_stats = &miss->stats; + } else { + /* For non-miss upcalls, there's a flow in the datapath which this + * packet was accounted to. Presumably the revalidators will deal + * with pushing its stats eventually. */ } xlate_actions(&xin, &miss->xout); @@ -1290,8 +1294,8 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, /* Since the kernel is free to ignore wildcarded bits in the mask, we can't * directly check that the masks are the same. Instead we check that the - * mask in the kernel is more specific i.e. less wildcarded, then what - * we've calculated here. This guarnatees we don't catch any packets we + * mask in the kernel is more specific i.e. less wildcarded, than what + * we've calculated here. This guarantees we don't catch any packets we * shouldn't with the megaflow. */ udump32 = (uint32_t *) &udump_mask; xout32 = (uint32_t *) &xout.wc.masks; @@ -1303,12 +1307,8 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, ok = true; exit: - if (actions) { - ofpbuf_delete(actions); - } - if (xoutp) { - xlate_out_uninit(xoutp); - } + ofpbuf_delete(actions); + xlate_out_uninit(xoutp); return ok; } @@ -1317,7 +1317,12 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps) { struct udpif *udpif = revalidator->udpif; - struct dpif_flow_stats stats[REVALIDATE_MAX_BATCH]; + struct { + struct dpif_flow_stats ukey_stats; /* Stats stored in the ukey. */ + struct dpif_flow_stats stats; /* Stats for 'op'. */ + struct dpif_op op; /* Flow del operation. */ + } ops[REVALIDATE_MAX_BATCH]; + struct dpif_op *opsp[REVALIDATE_MAX_BATCH]; struct udpif_flow_dump *udump, *next_udump; size_t n_ops, i, n_flows; @@ -1338,48 +1343,49 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps) n_ops = 0; LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) { - long long int cutoff, used, now; + long long int used, now; struct udpif_key *ukey; - list_remove(&udump->list_node); - - ukey = NULL; now = time_msec(); - cutoff = now - max_idle; + ukey = ukey_lookup(revalidator, udump); + used = udump->stats.used; + if (!used && ukey) { + used = ukey->created; + } + + if (must_del || (used && used < now - max_idle)) { + struct dpif_flow_stats *ukey_stats = &ops[n_ops].ukey_stats; + struct dpif_op *op = &ops[n_ops].op; + + op->type = DPIF_OP_FLOW_DEL; + op->u.flow_del.key = udump->key; + op->u.flow_del.key_len = udump->key_len; + op->u.flow_del.stats = &ops[n_ops].stats; + n_ops++; - /* If used is zero this ukey has never been used in the datapath. It's - * possible that we've processed this udump before in which case there - * will be a ukey storing its creation time. However, if we're going - * to delete it anyway, don't bother checking for a creation time and - * just skip to deletion. */ - if (!must_del && !used) { - ukey = ukey_lookup(revalidator, udump); if (ukey) { - used = ukey->created; + *ukey_stats = ukey->stats; + ukey_delete(revalidator, ukey); + } else { + memset(ukey_stats, 0, sizeof *ukey_stats); } - } - if (must_del || (used && used < cutoff)) { - opsp[n_ops] = &udump->op; - udump->op.type = DPIF_OP_FLOW_DEL; - udump->op.u.flow_del.key = udump->key; - udump->op.u.flow_del.key_len = udump->key_len; - udump->op.u.flow_del.stats = &stats[n_ops]; - n_ops++; continue; } if (!ukey) { - ukey = ukey_lookup(revalidator, udump); - } + ukey = xmalloc(sizeof *ukey); - if (!ukey) { - ukey = xzalloc(sizeof *ukey); ukey->key = (struct nlattr *) &ukey->key_buf; memcpy(ukey->key, udump->key, udump->key_len); ukey->key_len = udump->key_len; + ukey->created = used ? used : now; + memset(&ukey->stats, 0, sizeof ukey->stats); + + ukey->mark = false; + hmap_insert(&revalidator->ukeys, &ukey->hmap_node, udump->key_hash); } @@ -1390,43 +1396,39 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps) ukey_delete(revalidator, ukey); } + list_remove(&udump->list_node); free(udump); } + for (i = 0; i < n_ops; i++) { + opsp[i] = &ops[i].op; + } dpif_operate(udpif->dpif, opsp, n_ops); + for (i = 0; i < n_ops; i++) { - struct dpif_flow_stats push_buf, *push, *stats; - struct udpif_flow_dump *udump; - struct udpif_key *ukey; + struct dpif_flow_stats push, *stats, *ukey_stats; - udump = CONTAINER_OF(opsp[i], struct udpif_flow_dump, op); - stats = udump->op.u.flow_del.stats; - ukey = ukey_lookup(revalidator, udump); - if (ukey) { - push = &push_buf; - push->used = MAX(ukey->stats.used, stats->used); - push->tcp_flags = ukey->stats.tcp_flags | stats->tcp_flags; - push->n_packets = stats->n_packets - ukey->stats.n_packets; - push->n_bytes = stats->n_bytes - ukey->stats.n_bytes; - ukey_delete(revalidator, ukey); - } else { - push = stats; - } + ukey_stats = &ops[i].ukey_stats; + stats = ops[i].op.u.flow_del.stats; + push.used = MAX(stats->used, ukey_stats->used); + push.tcp_flags = stats->tcp_flags | ukey_stats->tcp_flags; + push.n_packets = stats->n_packets - ukey_stats->n_packets; + push.n_bytes = stats->n_bytes - ukey_stats->n_bytes; - if (push->n_packets || netflow_exists()) { + if (push.n_packets || netflow_exists()) { struct ofproto_dpif *ofproto; struct netflow *netflow; struct flow flow; - if (!xlate_receive(udpif->backer, NULL, udump->key, udump->key_len, - &flow, NULL, &ofproto, NULL, NULL, &netflow, - NULL)) { + if (!xlate_receive(udpif->backer, NULL, ops[i].op.u.flow_del.key, + ops[i].op.u.flow_del.key_len, &flow, NULL, + &ofproto, NULL, NULL, &netflow, NULL)) { struct xlate_in xin; - xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags, + xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL); - xin.resubmit_stats = push->n_packets ? push : NULL; - xin.may_learn = push->n_packets > 0; + xin.resubmit_stats = push.n_packets ? &push : NULL; + xin.may_learn = push.n_packets > 0; xin.skip_wildcards = true; xlate_actions_for_side_effects(&xin); @@ -1437,6 +1439,10 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps) } } } + } + + LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) { + list_remove(&udump->list_node); free(udump); } } diff --git a/ofproto/ofproto-dpif-upcall.h b/ofproto/ofproto-dpif-upcall.h index 9f445c2..d73ae4c 100644 --- a/ofproto/ofproto-dpif-upcall.h +++ b/ofproto/ofproto-dpif-upcall.h @@ -15,37 +15,24 @@ #ifndef OFPROTO_DPIF_UPCALL_H #define OFPROTO_DPIF_UPCALL_H -#define FLOW_MISS_MAX_BATCH 50 +#include <stddef.h> -#include "dpif.h" -#include "flow.h" -#include "hmap.h" -#include "list.h" -#include "odp-util.h" -#include "ofpbuf.h" -#include "ofproto-dpif-xlate.h" - -struct seq; struct dpif; struct dpif_backer; +struct seq; +struct simap; -/* udif is responsible for retrieving upcalls from the kernel, processing miss - * upcalls, and handing more complex ones up to the main ofproto-dpif - * module. */ +/* Udif is responsible for retrieving upcalls from the kernel and processing + * them. Additionally, it's responsible for maintaining the datapath flow + * table. */ struct udpif *udpif_create(struct dpif_backer *, struct dpif *); void udpif_set_threads(struct udpif *, size_t n_handlers, size_t n_revalidators); void udpif_destroy(struct udpif *); - void udpif_revalidate(struct udpif *); - void udpif_get_memory_usage(struct udpif *, struct simap *usage); - struct seq *udpif_dump_seq(struct udpif *); - -void udpif_get_memory_usage(struct udpif *, struct simap *usage); - void udpif_flush(void); #endif /* ofproto-dpif-upcall.h */ diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c index 65e1b4b..befa9f7 100644 --- a/ofproto/ofproto-dpif.c +++ b/ofproto/ofproto-dpif.c @@ -84,8 +84,8 @@ struct rule_dpif { /* These statistics: * - * - Do include packets and bytes from datapath flows which have not been - * pushed by ofproto-dpif-upcall. */ + * - Do include packets and bytes from datapath flows which have not + * recently been processed by a revalidator. */ struct ovs_mutex stats_mutex; uint64_t packet_count OVS_GUARDED; /* Number of packets received. */ uint64_t byte_count OVS_GUARDED; /* Number of bytes received. */ @@ -100,8 +100,8 @@ struct group_dpif { /* These statistics: * - * - Do include packets and bytes from datapath flows which have not been - * pushed by ofproto-dpif-upcall. */ + * - Do include packets and bytes from datapath flows which have not + * recently been processed by a revalidator. */ struct ovs_mutex stats_mutex; uint64_t packet_count OVS_GUARDED; /* Number of packets received. */ uint64_t byte_count OVS_GUARDED; /* Number of bytes received. */ @@ -213,7 +213,7 @@ struct dpif_completion { struct ofoperation *op; }; -/* Reasons that we might need to revalidate every datapath flow , and +/* Reasons that we might need to revalidate every datapath flow, and * corresponding coverage counters. * * A value of 0 means that there is no need to revalidate. @@ -1147,8 +1147,8 @@ run(struct ofproto *ofproto_) if (ofproto->dump_seq != new_dump_seq) { struct rule *rule, *next_rule; - /* We know stats are relatively fresh since a dump just happened, so - * now is a good time to do some periodic work. */ + /* We know stats are relatively fresh, so now is a good time to do some + * periodic work. */ ofproto->dump_seq = new_dump_seq; /* Expire OpenFlow flows whose idle_timeout or hard_timeout diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index 28a6c2b..5fd82fc 100644 --- a/vswitchd/vswitch.xml +++ b/vswitchd/vswitch.xml @@ -126,7 +126,7 @@ <column name="other_config" key="flow-limit" type='{"type": "integer", "minInteger": 0}'> <p> - A number of flows as a nonnegative integer. This sets the maxmimum + The maximum number of flows allowed in the datapath flow table. Internally OVS will choose a flow limit which will likely be lower than this number, based on real time network conditions. @@ -162,8 +162,28 @@ type='{"type": "integer", "minInteger": 1}'> <p> Specifies the number of threads for software datapaths to use for - handling new flows. The default is two less than the number of - online CPU cores (but at least 1). + handling new flows. The default the number of online CPU cores minus + the number of revalidators. + </p> + <p> + This configuration is per datapath. If you have more than one + software datapath (e.g. some <code>system</code> bridges and some + <code>netdev</code> bridges), then the total number of threads is + <code>n-handler-threads</code> times the number of software + datapaths. + </p> + </column> + + <column name="other_config" key="n-revalidator-threads" + type='{"type": "integer", "minInteger": 1}'> + <p> + Specifies the number of threads for software datapaths to use for + revalidating flows in the datapath. Typically, there is a direct + correlation between the number of revalidator threads, and the number + of flows allowed in the datapath. The default is the number of cpu + cores divided by four plus one. If <code>n-handler-threads</code> is + set, the default changes to the number of cpu cores minus the number + of handler threads. </p> <p> This configuration is per datapath. If you have more than one -- 1.8.1.2 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev