Someone other than me needs to review this as I co-authored it. Jarno, do you have time to look at it?
Ethan On Wed, Apr 16, 2014 at 9:06 PM, Joe Stringer <j...@wand.net.nz> wrote: > From: Ethan Jackson <et...@nicira.com> > > Previously, we had a separate flow_dumper thread that fetched flows from > the datapath to distribute to revalidator threads. This patch takes the > logic for dumping and pushes it into the revalidator threads, resulting > in simpler code with similar performance to the current code. > > One thread, the "leader", is responsible for beginning and ending each > flow dump, maintaining the flow_limit, and checking whether the > revalidator threads need to exit. All revalidator threads dump, > revalidate, delete datapath flows and garbage collect ukeys. > > Co-authored-by: Joe Stringer <joestrin...@nicira.com> > Signed-off-by: Joe Stringer <joestrin...@nicira.com> > --- > v10: Minor whitespace and documentation fixups. > v9: Update testsuite for also printing actions on flow_dump. > v8: Rebase. > v7: Add back logic (present in master) that deletes all flows older than > 100ms if we are currently exceeding the flow limit. > Rebase. > v6: Shift ukeys hmaps from revalidators into udpif. > Documentation tidyups. > v5: Handle ukey creation race. > Style fixes. > v4: Rebase. > v3: First post. > --- > ofproto/ofproto-dpif-upcall.c | 633 > ++++++++++++++++++++--------------------- > tests/ofproto-dpif.at | 8 +- > 2 files changed, 305 insertions(+), 336 deletions(-) > > diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c > index 938cfde..b463f0e 100644 > --- a/ofproto/ofproto-dpif-upcall.c > +++ b/ofproto/ofproto-dpif-upcall.c > @@ -67,34 +67,27 @@ struct handler { > 'mutex'. */ > }; > > -/* A thread that processes each kernel flow handed to it by the flow_dumper > - * thread, updates OpenFlow statistics, and updates or removes the kernel > flow > - * as necessary. */ > +/* A thread that processes datapath flows, updates OpenFlow statistics, and > + * updates or removes them if necessary. */ > struct revalidator { > struct udpif *udpif; /* Parent udpif. */ > char *name; /* Thread name. */ > > pthread_t thread; /* Thread ID. */ > - struct hmap ukeys; /* Datapath flow keys. */ > - > - uint64_t dump_seq; > - > - struct ovs_mutex mutex; /* Mutex guarding the following. */ > - pthread_cond_t wake_cond; > - struct list udumps OVS_GUARDED; /* Unprocessed udumps. */ > - size_t n_udumps OVS_GUARDED; /* Number of unprocessed udumps. */ > + struct hmap *ukeys; /* Points into udpif->ukeys for this > + revalidator. Used for GC phase. */ > }; > > /* An upcall handler for ofproto_dpif. > * > * udpif has two logically separate pieces: > * > - * - A "dispatcher" thread that reads upcalls from the kernel and > dispatches > - * them to one of several "handler" threads (see struct handler). > + * - Miss handling threads led by a "dispatcher" thread that reads upcalls > + * from the kernel and dispatches them to one of several "handler" > threads > + * (see struct handler). > * > - * - A "flow_dumper" thread that reads the kernel flow table and > dispatches > - * flows to one of several "revalidator" threads (see struct > - * revalidator). */ > + * - Revalidation threads which read the datapath flow table and maintains > + * them. */ > struct udpif { > struct list list_node; /* In all_udpifs list. */ > > @@ -104,7 +97,6 @@ struct udpif { > uint32_t secret; /* Random seed for upcall hash. */ > > pthread_t dispatcher; /* Dispatcher thread ID. */ > - pthread_t flow_dumper; /* Flow dumper thread ID. */ > > struct handler *handlers; /* Upcall handlers. */ > size_t n_handlers; > @@ -112,14 +104,24 @@ struct udpif { > struct revalidator *revalidators; /* Flow revalidators. */ > size_t n_revalidators; > > - uint64_t last_reval_seq; /* 'reval_seq' at last revalidation. > */ > - struct seq *reval_seq; /* Incremented to force revalidation. > */ > - > - struct seq *dump_seq; /* Increments each dump iteration. */ > - > struct latch exit_latch; /* Tells child threads to exit. */ > > + /* Revalidation. */ > + struct seq *reval_seq; /* Incremented to force revalidation. > */ > + bool need_revalidate; /* As indicated by 'reval_seq'. */ > + bool reval_exit; /* Set by leader on 'exit_latch. */ > + pthread_barrier_t reval_barrier; /* Barrier used by revalidators. */ > + struct dpif_flow_dump dump; /* DPIF flow dump state. */ > long long int dump_duration; /* Duration of the last flow dump. */ > + struct seq *dump_seq; /* Increments each dump iteration. */ > + > + /* During the flow dump phase, revalidators insert into these with a > random > + * distribution. During the garbage collection phase, each revalidator > + * takes care of garbage collecting one of these hmaps. */ > + struct { > + struct ovs_mutex mutex; /* Guards the following. */ > + struct hmap hmap OVS_GUARDED; /* Datapath flow keys. */ > + } *ukeys; > > /* Datapath flow statistics. */ > unsigned int max_n_flows; > @@ -154,41 +156,30 @@ struct upcall { > > /* 'udpif_key's are responsible for tracking the little bit of state udpif > * needs to do flow expiration which can't be pulled directly from the > - * datapath. They are owned, created by, maintained, and destroyed by a > single > - * revalidator making them easy to efficiently handle with multiple threads. > */ > + * datapath. They may be created or maintained by any revalidator during > + * the dump phase, but are owned by a single revalidator, and are destroyed > + * by that revalidator during the garbage-collection phase. > + * > + * While some elements of a udpif_key are protected by a mutex, the ukey > itself > + * is not. Therefore it is not safe to destroy a udpif_key except when all > + * revalidators are in garbage collection phase, or they aren't running. */ > struct udpif_key { > struct hmap_node hmap_node; /* In parent revalidator 'ukeys' map. */ > > - struct nlattr *key; /* Datapath flow key. */ > - size_t key_len; /* Length of 'key'. */ > - > - struct dpif_flow_stats stats; /* Stats at most recent flow dump. */ > - long long int created; /* Estimation of creation time. */ > - > - bool mark; /* Used by mark and sweep GC algorithm. */ > - > - struct odputil_keybuf key_buf; /* Memory for 'key'. */ > -}; > - > -/* 'udpif_flow_dump's hold the state associated with one iteration in a flow > - * dump operation. This is created by the flow_dumper thread and handed to > the > - * appropriate revalidator thread to be processed. */ > -struct udpif_flow_dump { > - struct list list_node; > - > - struct nlattr *key; /* Datapath flow key. */ > + /* These elements are read only once created, and therefore aren't > + * protected by a mutex. */ > + const struct nlattr *key; /* Datapath flow key. */ > size_t key_len; /* Length of 'key'. */ > - uint32_t key_hash; /* Hash of 'key'. */ > > - struct odputil_keybuf mask_buf; > - struct nlattr *mask; /* Datapath mask for 'key'. */ > - size_t mask_len; /* Length of 'mask'. */ > - > - struct dpif_flow_stats stats; /* Stats pulled from the datapath. */ > - > - bool need_revalidate; /* Key needs revalidation? */ > + struct ovs_mutex mutex; /* Guards the following. */ > + struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/ > + long long int created OVS_GUARDED; /* Estimate of creation time. > */ > + bool mark OVS_GUARDED; /* For mark and sweep garbage > + collection. */ > + bool flow_exists OVS_GUARDED; /* Ensures flows are only > deleted > + once. */ > > - struct odputil_keybuf key_buf; > + struct odputil_keybuf key_buf; /* Memory for 'key'. */ > }; > > /* Flow miss batching. > @@ -223,12 +214,11 @@ static struct list all_udpifs = > LIST_INITIALIZER(&all_udpifs); > > static void recv_upcalls(struct udpif *); > static void handle_upcalls(struct handler *handler, struct list *upcalls); > -static void *udpif_flow_dumper(void *); > static void *udpif_dispatcher(void *); > static void *udpif_upcall_handler(void *); > static void *udpif_revalidator(void *); > static uint64_t udpif_get_n_flows(struct udpif *); > -static void revalidate_udumps(struct revalidator *, struct list *udumps); > +static void revalidate(struct revalidator *); > static void revalidator_sweep(struct revalidator *); > static void revalidator_purge(struct revalidator *); > static void upcall_unixctl_show(struct unixctl_conn *conn, int argc, > @@ -239,6 +229,9 @@ static void upcall_unixctl_enable_megaflows(struct > unixctl_conn *, int argc, > const char *argv[], void *aux); > static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int > argc, > const char *argv[], void *aux); > + > +static struct udpif_key *ukey_create(const struct nlattr *key, size_t > key_len, > + long long int used); > static void ukey_delete(struct revalidator *, struct udpif_key *); > > static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true); > @@ -319,34 +312,21 @@ udpif_set_threads(struct udpif *udpif, size_t > n_handlers, > } > > for (i = 0; i < udpif->n_revalidators; i++) { > - struct revalidator *revalidator = &udpif->revalidators[i]; > - > - ovs_mutex_lock(&revalidator->mutex); > - xpthread_cond_signal(&revalidator->wake_cond); > - ovs_mutex_unlock(&revalidator->mutex); > - xpthread_join(revalidator->thread, NULL); > + xpthread_join(udpif->revalidators[i].thread, NULL); > } > > - xpthread_join(udpif->flow_dumper, NULL); > xpthread_join(udpif->dispatcher, NULL); > > for (i = 0; i < udpif->n_revalidators; i++) { > struct revalidator *revalidator = &udpif->revalidators[i]; > - struct udpif_flow_dump *udump, *next_udump; > - > - LIST_FOR_EACH_SAFE (udump, next_udump, list_node, > - &revalidator->udumps) { > - list_remove(&udump->list_node); > - free(udump); > - } > > /* Delete ukeys, and delete all flows from the datapath to > prevent > * double-counting stats. */ > revalidator_purge(revalidator); > - hmap_destroy(&revalidator->ukeys); > - ovs_mutex_destroy(&revalidator->mutex); > - > free(revalidator->name); > + > + hmap_destroy(&udpif->ukeys[i].hmap); > + ovs_mutex_destroy(&udpif->ukeys[i].mutex); > } > > for (i = 0; i < udpif->n_handlers; i++) { > @@ -364,6 +344,8 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, > } > latch_poll(&udpif->exit_latch); > > + xpthread_barrier_destroy(&udpif->reval_barrier); > + > free(udpif->revalidators); > udpif->revalidators = NULL; > udpif->n_revalidators = 0; > @@ -371,6 +353,9 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, > free(udpif->handlers); > udpif->handlers = NULL; > udpif->n_handlers = 0; > + > + free(udpif->ukeys); > + udpif->ukeys = NULL; > } > > error = dpif_handlers_set(udpif->dpif, 1); > @@ -400,21 +385,23 @@ udpif_set_threads(struct udpif *udpif, size_t > n_handlers, > handler); > } > > + xpthread_barrier_init(&udpif->reval_barrier, NULL, > + udpif->n_revalidators); > + udpif->reval_exit = false; > udpif->revalidators = xzalloc(udpif->n_revalidators > * sizeof *udpif->revalidators); > + udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators); > for (i = 0; i < udpif->n_revalidators; i++) { > struct revalidator *revalidator = &udpif->revalidators[i]; > > revalidator->udpif = udpif; > - list_init(&revalidator->udumps); > - hmap_init(&revalidator->ukeys); > - ovs_mutex_init(&revalidator->mutex); > - xpthread_cond_init(&revalidator->wake_cond, NULL); > + hmap_init(&udpif->ukeys[i].hmap); > + ovs_mutex_init(&udpif->ukeys[i].mutex); > + revalidator->ukeys = &udpif->ukeys[i].hmap; > xpthread_create(&revalidator->thread, NULL, udpif_revalidator, > revalidator); > } > xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif); > - xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif); > } > > ovsrcu_quiesce_end(); > @@ -459,7 +446,6 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap > *usage) > size_t i; > > simap_increase(usage, "dispatchers", 1); > - simap_increase(usage, "flow_dumpers", 1); > > simap_increase(usage, "handlers", udpif->n_handlers); > for (i = 0; i < udpif->n_handlers; i++) { > @@ -471,15 +457,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap > *usage) > > simap_increase(usage, "revalidators", udpif->n_revalidators); > for (i = 0; i < udpif->n_revalidators; i++) { > - struct revalidator *revalidator = &udpif->revalidators[i]; > - ovs_mutex_lock(&revalidator->mutex); > - simap_increase(usage, "revalidator dumps", revalidator->n_udumps); > - > - /* XXX: This isn't technically thread safe because the revalidator > - * ukeys maps isn't protected by a mutex since it's per thread. */ > - simap_increase(usage, "revalidator keys", > - hmap_count(&revalidator->ukeys)); > - ovs_mutex_unlock(&revalidator->mutex); > + ovs_mutex_lock(&udpif->ukeys[i].mutex); > + simap_increase(usage, "udpif keys", > hmap_count(&udpif->ukeys[i].hmap)); > + ovs_mutex_unlock(&udpif->ukeys[i].mutex); > } > } > > @@ -560,125 +540,6 @@ udpif_dispatcher(void *arg) > return NULL; > } > > -static void * > -udpif_flow_dumper(void *arg) > -{ > - struct udpif *udpif = arg; > - > - set_subprogram_name("flow_dumper"); > - while (!latch_is_set(&udpif->exit_latch)) { > - const struct dpif_flow_stats *stats; > - long long int start_time, duration; > - const struct nlattr *key, *mask; > - struct dpif_flow_dump dump; > - size_t key_len, mask_len; > - unsigned int flow_limit; > - bool need_revalidate; > - uint64_t reval_seq; > - size_t n_flows, i; > - int error; > - void *state = NULL; > - > - reval_seq = seq_read(udpif->reval_seq); > - need_revalidate = udpif->last_reval_seq != reval_seq; > - udpif->last_reval_seq = reval_seq; > - > - n_flows = udpif_get_n_flows(udpif); > - udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows); > - udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2; > - > - start_time = time_msec(); > - error = dpif_flow_dump_start(&dump, udpif->dpif); > - if (error) { > - VLOG_INFO("Failed to start flow dump (%s)", ovs_strerror(error)); > - goto skip; > - } > - dpif_flow_dump_state_init(udpif->dpif, &state); > - while (dpif_flow_dump_next(&dump, state, &key, &key_len, > - &mask, &mask_len, NULL, NULL, &stats) > - && !latch_is_set(&udpif->exit_latch)) { > - struct udpif_flow_dump *udump = xmalloc(sizeof *udump); > - struct revalidator *revalidator; > - > - udump->key_hash = hash_bytes(key, key_len, udpif->secret); > - memcpy(&udump->key_buf, key, key_len); > - udump->key = (struct nlattr *) &udump->key_buf; > - udump->key_len = key_len; > - > - memcpy(&udump->mask_buf, mask, mask_len); > - udump->mask = (struct nlattr *) &udump->mask_buf; > - udump->mask_len = mask_len; > - > - udump->stats = *stats; > - udump->need_revalidate = need_revalidate; > - > - revalidator = &udpif->revalidators[udump->key_hash > - % udpif->n_revalidators]; > - > - ovs_mutex_lock(&revalidator->mutex); > - while (revalidator->n_udumps >= REVALIDATE_MAX_BATCH * 3 > - && !latch_is_set(&udpif->exit_latch)) { > - ovs_mutex_cond_wait(&revalidator->wake_cond, > - &revalidator->mutex); > - } > - list_push_back(&revalidator->udumps, &udump->list_node); > - revalidator->n_udumps++; > - xpthread_cond_signal(&revalidator->wake_cond); > - ovs_mutex_unlock(&revalidator->mutex); > - } > - dpif_flow_dump_state_uninit(udpif->dpif, state); > - dpif_flow_dump_done(&dump); > - > - /* Let all the revalidators finish and garbage collect. */ > - seq_change(udpif->dump_seq); > - for (i = 0; i < udpif->n_revalidators; i++) { > - struct revalidator *revalidator = &udpif->revalidators[i]; > - ovs_mutex_lock(&revalidator->mutex); > - xpthread_cond_signal(&revalidator->wake_cond); > - ovs_mutex_unlock(&revalidator->mutex); > - } > - > - for (i = 0; i < udpif->n_revalidators; i++) { > - struct revalidator *revalidator = &udpif->revalidators[i]; > - > - ovs_mutex_lock(&revalidator->mutex); > - while (revalidator->dump_seq != seq_read(udpif->dump_seq) > - && !latch_is_set(&udpif->exit_latch)) { > - ovs_mutex_cond_wait(&revalidator->wake_cond, > - &revalidator->mutex); > - } > - ovs_mutex_unlock(&revalidator->mutex); > - } > - > - duration = MAX(time_msec() - start_time, 1); > - udpif->dump_duration = duration; > - atomic_read(&udpif->flow_limit, &flow_limit); > - if (duration > 2000) { > - flow_limit /= duration / 1000; > - } else if (duration > 1300) { > - flow_limit = flow_limit * 3 / 4; > - } else if (duration < 1000 && n_flows > 2000 > - && flow_limit < n_flows * 1000 / duration) { > - flow_limit += 1000; > - } > - flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000)); > - atomic_store(&udpif->flow_limit, flow_limit); > - > - if (duration > 2000) { > - VLOG_INFO("Spent an unreasonably long %lldms dumping flows", > - duration); > - } > - > -skip: > - poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500)); > - seq_wait(udpif->reval_seq, udpif->last_reval_seq); > - latch_wait(&udpif->exit_latch); > - poll_block(); > - } > - > - return NULL; > -} > - > /* The miss handler thread is responsible for processing miss upcalls > retrieved > * by the dispatcher thread. Once finished it passes the processed miss > * upcalls to ofproto-dpif where they're installed in the datapath. */ > @@ -723,42 +584,85 @@ udpif_upcall_handler(void *arg) > static void * > udpif_revalidator(void *arg) > { > + /* Used by all revalidators. */ > struct revalidator *revalidator = arg; > + struct udpif *udpif = revalidator->udpif; > + bool leader = revalidator == &udpif->revalidators[0]; > + > + /* Used only by the leader. */ > + long long int start_time = 0; > + uint64_t last_reval_seq = 0; > + unsigned int flow_limit = 0; > + size_t n_flows = 0; > > revalidator->name = xasprintf("revalidator_%u", ovsthread_id_self()); > set_subprogram_name("%s", revalidator->name); > for (;;) { > - struct list udumps = LIST_INITIALIZER(&udumps); > - struct udpif *udpif = revalidator->udpif; > - size_t i; > + if (leader) { > + uint64_t reval_seq; > > - ovs_mutex_lock(&revalidator->mutex); > - if (latch_is_set(&udpif->exit_latch)) { > - ovs_mutex_unlock(&revalidator->mutex); > - return NULL; > - } > + reval_seq = seq_read(udpif->reval_seq); > + udpif->need_revalidate = last_reval_seq != reval_seq; > + last_reval_seq = reval_seq; > > - if (!revalidator->n_udumps) { > - if (revalidator->dump_seq != seq_read(udpif->dump_seq)) { > - revalidator->dump_seq = seq_read(udpif->dump_seq); > - revalidator_sweep(revalidator); > - } else { > - ovs_mutex_cond_wait(&revalidator->wake_cond, > - &revalidator->mutex); > + n_flows = udpif_get_n_flows(udpif); > + udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows); > + udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2; > + > + /* Only the leader checks the exit latch to prevent a race where > + * some threads think it's true and exit and others think it's > + * false and block indefinitely on the reval_barrier */ > + udpif->reval_exit = latch_is_set(&udpif->exit_latch); > + > + start_time = time_msec(); > + if (!udpif->reval_exit) { > + dpif_flow_dump_start(&udpif->dump, udpif->dpif); > } > } > > - for (i = 0; i < REVALIDATE_MAX_BATCH && revalidator->n_udumps; i++) { > - list_push_back(&udumps, list_pop_front(&revalidator->udumps)); > - revalidator->n_udumps--; > + /* Wait for the leader to start the flow dump. */ > + xpthread_barrier_wait(&udpif->reval_barrier); > + if (udpif->reval_exit) { > + break; > } > + revalidate(revalidator); > + > + /* Wait for all flows to have been dumped before we garbage collect. > */ > + xpthread_barrier_wait(&udpif->reval_barrier); > + revalidator_sweep(revalidator); > + > + /* Wait for all revalidators to finish garbage collection. */ > + xpthread_barrier_wait(&udpif->reval_barrier); > + > + if (leader) { > + long long int duration; > + > + dpif_flow_dump_done(&udpif->dump); > + seq_change(udpif->dump_seq); > + > + duration = MAX(time_msec() - start_time, 1); > + atomic_read(&udpif->flow_limit, &flow_limit); > + udpif->dump_duration = duration; > + if (duration > 2000) { > + flow_limit /= duration / 1000; > + } else if (duration > 1300) { > + flow_limit = flow_limit * 3 / 4; > + } else if (duration < 1000 && n_flows > 2000 > + && flow_limit < n_flows * 1000 / duration) { > + flow_limit += 1000; > + } > + flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000)); > + atomic_store(&udpif->flow_limit, flow_limit); > > - /* Wake up the flow dumper. */ > - xpthread_cond_signal(&revalidator->wake_cond); > - ovs_mutex_unlock(&revalidator->mutex); > + if (duration > 2000) { > + VLOG_INFO("Spent an unreasonably long %lldms dumping flows", > + duration); > + } > > - if (!list_is_empty(&udumps)) { > - revalidate_udumps(revalidator, &udumps); > + poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500)); > + seq_wait(udpif->reval_seq, last_reval_seq); > + latch_wait(&udpif->exit_latch); > + poll_block(); > } > } > > @@ -1287,15 +1191,16 @@ handle_upcalls(struct handler *handler, struct list > *upcalls) > } > } > > +/* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */ > static struct udpif_key * > -ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump) > +ukey_lookup__(struct udpif *udpif, const struct nlattr *key, size_t key_len, > + uint32_t hash) > { > struct udpif_key *ukey; > + struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap; > > - HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, udump->key_hash, > - &revalidator->ukeys) { > - if (ukey->key_len == udump->key_len > - && !memcmp(ukey->key, udump->key, udump->key_len)) { > + HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) { > + if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) { > return ukey; > } > } > @@ -1303,40 +1208,88 @@ ukey_lookup(struct revalidator *revalidator, struct > udpif_flow_dump *udump) > } > > static struct udpif_key * > +ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len, > + uint32_t hash) > +{ > + struct udpif_key *ukey; > + uint32_t idx = hash % udpif->n_revalidators; > + > + ovs_mutex_lock(&udpif->ukeys[idx].mutex); > + ukey = ukey_lookup__(udpif, key, key_len, hash); > + ovs_mutex_unlock(&udpif->ukeys[idx].mutex); > + > + return ukey; > +} > + > +static struct udpif_key * > ukey_create(const struct nlattr *key, size_t key_len, long long int used) > { > struct udpif_key *ukey = xmalloc(sizeof *ukey); > + ovs_mutex_init(&ukey->mutex); > > ukey->key = (struct nlattr *) &ukey->key_buf; > memcpy(&ukey->key_buf, key, key_len); > ukey->key_len = key_len; > > + ovs_mutex_lock(&ukey->mutex); > ukey->mark = false; > + ukey->flow_exists = true; > ukey->created = used ? used : time_msec(); > memset(&ukey->stats, 0, sizeof ukey->stats); > + ovs_mutex_unlock(&ukey->mutex); > > return ukey; > } > > +/* Checks for a ukey in 'udpif->ukeys' with the same 'ukey->key' and 'hash', > + * and inserts 'ukey' if it does not exist. > + * > + * Returns true if 'ukey' was inserted into 'udpif->ukeys', false otherwise. > */ > +static bool > +udpif_insert_ukey(struct udpif *udpif, struct udpif_key *ukey, uint32_t hash) > +{ > + struct udpif_key *duplicate; > + uint32_t idx = hash % udpif->n_revalidators; > + bool ok; > + > + ovs_mutex_lock(&udpif->ukeys[idx].mutex); > + duplicate = ukey_lookup__(udpif, ukey->key, ukey->key_len, hash); > + if (duplicate) { > + ok = false; > + } else { > + hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash); > + ok = true; > + } > + ovs_mutex_unlock(&udpif->ukeys[idx].mutex); > + > + return ok; > +} > + > static void > ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey) > { > - hmap_remove(&revalidator->ukeys, &ukey->hmap_node); > + if (revalidator) { > + hmap_remove(revalidator->ukeys, &ukey->hmap_node); > + } > + ovs_mutex_destroy(&ukey->mutex); > free(ukey); > } > > static bool > -revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, > - struct udpif_key *ukey) > +revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, > + const struct nlattr *mask, size_t mask_len, > + const struct nlattr *actions, size_t actions_len, > + const struct dpif_flow_stats *stats) > + > { > - struct ofpbuf xout_actions, *actions; > uint64_t slow_path_buf[128 / 8]; > struct xlate_out xout, *xoutp; > struct netflow *netflow; > - struct flow flow, udump_mask; > struct ofproto_dpif *ofproto; > struct dpif_flow_stats push; > - uint32_t *udump32, *xout32; > + struct ofpbuf xout_actions; > + struct flow flow, dp_mask; > + uint32_t *dp32, *xout32; > odp_port_t odp_in_port; > struct xlate_in xin; > int error; > @@ -1345,30 +1298,21 @@ revalidate_ukey(struct udpif *udpif, struct > udpif_flow_dump *udump, > > ok = false; > xoutp = NULL; > - actions = NULL; > netflow = NULL; > > - /* If we don't need to revalidate, we can simply push the stats contained > - * in the udump, otherwise we'll have to get the actions so we can check > - * them. */ > - if (udump->need_revalidate) { > - if (dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &actions, > - &udump->stats)) { > - goto exit; > - } > - } > - > - push.used = udump->stats.used; > - push.tcp_flags = udump->stats.tcp_flags; > - push.n_packets = udump->stats.n_packets > ukey->stats.n_packets > - ? udump->stats.n_packets - ukey->stats.n_packets > + ovs_mutex_lock(&ukey->mutex); > + push.used = stats->used; > + push.tcp_flags = stats->tcp_flags; > + push.n_packets = stats->n_packets > ukey->stats.n_packets > + ? stats->n_packets - ukey->stats.n_packets > : 0; > - push.n_bytes = udump->stats.n_bytes > ukey->stats.n_bytes > - ? udump->stats.n_bytes - ukey->stats.n_bytes > + push.n_bytes = stats->n_bytes > ukey->stats.n_bytes > + ? stats->n_bytes - ukey->stats.n_bytes > : 0; > - ukey->stats = udump->stats; > + ukey->stats = *stats; > + ovs_mutex_unlock(&ukey->mutex); > > - if (!push.n_packets && !udump->need_revalidate) { > + if (!push.n_packets && !udpif->need_revalidate) { > ok = true; > goto exit; > } > @@ -1382,11 +1326,11 @@ revalidate_ukey(struct udpif *udpif, struct > udpif_flow_dump *udump, > xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL); > xin.resubmit_stats = push.n_packets ? &push : NULL; > xin.may_learn = push.n_packets > 0; > - xin.skip_wildcards = !udump->need_revalidate; > + xin.skip_wildcards = !udpif->need_revalidate; > xlate_actions(&xin, &xout); > xoutp = &xout; > > - if (!udump->need_revalidate) { > + if (!udpif->need_revalidate) { > ok = true; > goto exit; > } > @@ -1399,11 +1343,12 @@ revalidate_ukey(struct udpif *udpif, struct > udpif_flow_dump *udump, > compose_slow_path(udpif, &xout, odp_in_port, &xout_actions); > } > > - if (!ofpbuf_equal(&xout_actions, actions)) { > + if (actions_len != ofpbuf_size(&xout_actions) > + || memcmp(ofpbuf_data(&xout_actions), actions, actions_len)) { > goto exit; > } > > - if (odp_flow_key_to_mask(udump->mask, udump->mask_len, &udump_mask, > &flow) > + if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow) > == ODP_FIT_ERROR) { > goto exit; > } > @@ -1413,10 +1358,10 @@ revalidate_ukey(struct udpif *udpif, struct > udpif_flow_dump *udump, > * mask in the kernel is more specific i.e. less wildcarded, than what > * we've calculated here. This guarantees we don't catch any packets we > * shouldn't with the megaflow. */ > - udump32 = (uint32_t *) &udump_mask; > + dp32 = (uint32_t *) &dp_mask; > xout32 = (uint32_t *) &xout.wc.masks; > for (i = 0; i < FLOW_U32S; i++) { > - if ((udump32[i] | xout32[i]) != udump32[i]) { > + if ((dp32[i] | xout32[i]) != dp32[i]) { > goto exit; > } > } > @@ -1430,24 +1375,21 @@ exit: > } > netflow_unref(netflow); > } > - ofpbuf_delete(actions); > xlate_out_uninit(xoutp); > return ok; > } > > struct dump_op { > struct udpif_key *ukey; > - struct udpif_flow_dump *udump; > struct dpif_flow_stats stats; /* Stats for 'op'. */ > struct dpif_op op; /* Flow del operation. */ > }; > > static void > dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len, > - struct udpif_key *ukey, struct udpif_flow_dump *udump) > + struct udpif_key *ukey) > { > op->ukey = ukey; > - op->udump = udump; > op->op.type = DPIF_OP_FLOW_DEL; > op->op.u.flow_del.key = key; > op->op.u.flow_del.key_len = key_len; > @@ -1455,10 +1397,8 @@ dump_op_init(struct dump_op *op, const struct nlattr > *key, size_t key_len, > } > > static void > -push_dump_ops(struct revalidator *revalidator, > - struct dump_op *ops, size_t n_ops) > +push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops) > { > - struct udpif *udpif = revalidator->udpif; > struct dpif_op *opsp[REVALIDATE_MAX_BATCH]; > size_t i; > > @@ -1475,10 +1415,12 @@ push_dump_ops(struct revalidator *revalidator, > stats = op->op.u.flow_del.stats; > if (op->ukey) { > push = &push_buf; > + ovs_mutex_lock(&op->ukey->mutex); > push->used = MAX(stats->used, op->ukey->stats.used); > push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags; > push->n_packets = stats->n_packets - op->ukey->stats.n_packets; > push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes; > + ovs_mutex_unlock(&op->ukey->mutex); > } else { > push = stats; > } > @@ -1508,94 +1450,120 @@ push_dump_ops(struct revalidator *revalidator, > } > } > } > +} > > - for (i = 0; i < n_ops; i++) { > - struct udpif_key *ukey; > +static void > +push_dump_ops(struct revalidator *revalidator, > + struct dump_op *ops, size_t n_ops) > +{ > + int i; > > - /* If there's a udump, this ukey came directly from a datapath flow > - * dump. Sometimes a datapath can send duplicates in flow dumps, in > - * which case we wouldn't want to double-free a ukey, so avoid that > by > - * looking up the ukey again. > - * > - * If there's no udump then we know what we're doing. */ > - ukey = (ops[i].udump > - ? ukey_lookup(revalidator, ops[i].udump) > - : ops[i].ukey); > - if (ukey) { > - ukey_delete(revalidator, ukey); > - } > + push_dump_ops__(revalidator->udpif, ops, n_ops); > + for (i = 0; i < n_ops; i++) { > + ukey_delete(revalidator, ops[i].ukey); > } > } > > static void > -revalidate_udumps(struct revalidator *revalidator, struct list *udumps) > +revalidate(struct revalidator *revalidator) > { > struct udpif *udpif = revalidator->udpif; > > struct dump_op ops[REVALIDATE_MAX_BATCH]; > - struct udpif_flow_dump *udump, *next_udump; > - size_t n_ops, n_flows; > + const struct nlattr *key, *mask, *actions; > + size_t key_len, mask_len, actions_len; > + const struct dpif_flow_stats *stats; > + long long int now; > unsigned int flow_limit; > - long long int max_idle; > - bool must_del; > + size_t n_ops; > + void *state; > > + n_ops = 0; > + now = time_msec(); > atomic_read(&udpif->flow_limit, &flow_limit); > > - n_flows = udpif_get_n_flows(udpif); > - > - must_del = false; > - max_idle = ofproto_max_idle; > - if (n_flows > flow_limit) { > - must_del = n_flows > 2 * flow_limit; > - max_idle = 100; > - } > - > - n_ops = 0; > - LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) { > - long long int used, now; > + dpif_flow_dump_state_init(udpif->dpif, &state); > + while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask, > + &mask_len, &actions, &actions_len, &stats)) { > struct udpif_key *ukey; > + bool mark, may_destroy; > + long long int used, max_idle; > + uint32_t hash; > + size_t n_flows; > > - now = time_msec(); > - ukey = ukey_lookup(revalidator, udump); > + hash = hash_bytes(key, key_len, udpif->secret); > + ukey = ukey_lookup(udpif, key, key_len, hash); > > - used = udump->stats.used; > + used = stats->used; > if (!used && ukey) { > + ovs_mutex_lock(&ukey->mutex); > used = ukey->created; > + ovs_mutex_unlock(&ukey->mutex); > } > > - if (must_del || (used && used < now - max_idle)) { > - struct dump_op *dop = &ops[n_ops++]; > + n_flows = udpif_get_n_flows(udpif); > + max_idle = ofproto_max_idle; > + if (n_flows > flow_limit) { > + max_idle = 100; > + } > > - dump_op_init(dop, udump->key, udump->key_len, ukey, udump); > - continue; > + if ((used && used < now - max_idle) || n_flows > flow_limit * 2) { > + mark = false; > + } else { > + if (!ukey) { > + ukey = ukey_create(key, key_len, used); > + if (!udpif_insert_ukey(udpif, ukey, hash)) { > + /* The same ukey has already been created. This means > that > + * another revalidator is processing this flow > + * concurrently, so don't bother processing it. */ > + ukey_delete(NULL, ukey); > + continue; > + } > + } > + > + mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions, > + actions_len, stats); > + } > + > + if (ukey) { > + ovs_mutex_lock(&ukey->mutex); > + ukey->mark = ukey->flow_exists = mark; > + ovs_mutex_unlock(&ukey->mutex); > } > > - if (!ukey) { > - ukey = ukey_create(udump->key, udump->key_len, used); > - hmap_insert(&revalidator->ukeys, &ukey->hmap_node, > - udump->key_hash); > + if (!mark) { > + dump_op_init(&ops[n_ops++], key, key_len, ukey); > } > - ukey->mark = true; > > - if (!revalidate_ukey(udpif, udump, ukey)) { > - dpif_flow_del(udpif->dpif, udump->key, udump->key_len, NULL); > - ukey_delete(revalidator, ukey); > + may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump, > + state); > + > + /* Only update 'now' immediately before 'buffer' will be updated. > + * This gives us the current time relative to the time the datapath > + * will write into 'stats'. */ > + if (may_destroy) { > + now = time_msec(); > } > > - list_remove(&udump->list_node); > - free(udump); > + /* Only do a dpif_operate when we've hit our maximum batch, or when > our > + * memory is about to be clobbered by the next call to > + * dpif_flow_dump_next(). */ > + if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) { > + push_dump_ops__(udpif, ops, n_ops); > + n_ops = 0; > + } > } > > - push_dump_ops(revalidator, ops, n_ops); > - > - LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) { > - list_remove(&udump->list_node); > - free(udump); > + if (n_ops) { > + push_dump_ops__(udpif, ops, n_ops); > } > + > + dpif_flow_dump_state_uninit(udpif->dpif, state); > } > > static void > revalidator_sweep__(struct revalidator *revalidator, bool purge) > + OVS_NO_THREAD_SAFETY_ANALYSIS > { > struct dump_op ops[REVALIDATE_MAX_BATCH]; > struct udpif_key *ukey, *next; > @@ -1603,16 +1571,20 @@ revalidator_sweep__(struct revalidator *revalidator, > bool purge) > > n_ops = 0; > > - HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) { > + /* During garbage collection, this revalidator completely owns its ukeys > + * map, and therefore doesn't need to do any locking. */ > + HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) { > if (!purge && ukey->mark) { > ukey->mark = false; > + } else if (!ukey->flow_exists) { > + ukey_delete(revalidator, ukey); > } else { > struct dump_op *op = &ops[n_ops++]; > > /* If we have previously seen a flow in the datapath, but didn't > * see it during the most recent dump, delete it. This allows us > * to clean up the ukey and keep the statistics consistent. */ > - dump_op_init(op, ukey->key, ukey->key_len, ukey, NULL); > + dump_op_init(op, ukey->key, ukey->key_len, ukey); > if (n_ops == REVALIDATE_MAX_BATCH) { > push_dump_ops(revalidator, ops, n_ops); > n_ops = 0; > @@ -1670,13 +1642,10 @@ upcall_unixctl_show(struct unixctl_conn *conn, int > argc OVS_UNUSED, > for (i = 0; i < n_revalidators; i++) { > struct revalidator *revalidator = &udpif->revalidators[i]; > > - /* XXX: The result of hmap_count(&revalidator->ukeys) may not be > - * accurate because it's not protected by the revalidator mutex. > */ > - ovs_mutex_lock(&revalidator->mutex); > - ds_put_format(&ds, "\t%s: (dump queue %"PRIuSIZE") (keys > %"PRIuSIZE > - ")\n", revalidator->name, revalidator->n_udumps, > - hmap_count(&revalidator->ukeys)); > - ovs_mutex_unlock(&revalidator->mutex); > + ovs_mutex_lock(&udpif->ukeys[i].mutex); > + ds_put_format(&ds, "\t%s: (keys %"PRIuSIZE")\n", > revalidator->name, > + hmap_count(&udpif->ukeys[i].hmap)); > + ovs_mutex_unlock(&udpif->ukeys[i].mutex); > } > } > > diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at > index b12b4fe..989a7c5 100644 > --- a/tests/ofproto-dpif.at > +++ b/tests/ofproto-dpif.at > @@ -3833,10 +3833,10 @@ > skb_priority(0),skb_mark(0/0),in_port(101),eth(src=50:54:00:00:00:07/00:00:00:00 > ]) > > AT_CHECK([cat ovs-vswitchd.log | grep -e 'in_port(100).*packets:9' | > FILTER_FLOW_DUMP], [0], [dnl > -skb_priority(0),skb_mark(0/0),in_port(100),eth(src=50:54:00:00:00:05/00:00:00:00:00:00,dst=50:54:00:00:00:07/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.1/0.0.0.0,dst=192.168.0.2/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), > packets:9, bytes:540, used:0.0s > +skb_priority(0),skb_mark(0/0),in_port(100),eth(src=50:54:00:00:00:05/00:00:00:00:00:00,dst=50:54:00:00:00:07/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.1/0.0.0.0,dst=192.168.0.2/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), > packets:9, bytes:540, used:0.0s, actions:101,3,2 > ]) > AT_CHECK([cat ovs-vswitchd.log | grep -e 'in_port(101).*packets:4' | > FILTER_FLOW_DUMP], [0], [dnl > -skb_priority(0),skb_mark(0/0),in_port(101),eth(src=50:54:00:00:00:07/00:00:00:00:00:00,dst=50:54:00:00:00:05/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.2/0.0.0.0,dst=192.168.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), > packets:4, bytes:240, used:0.0s > +skb_priority(0),skb_mark(0/0),in_port(101),eth(src=50:54:00:00:00:07/00:00:00:00:00:00,dst=50:54:00:00:00:05/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.2/0.0.0.0,dst=192.168.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), > packets:4, bytes:240, used:0.0s, actions:100,2,3 > ]) > > AT_CHECK([ovs-ofctl dump-ports br0 pbr0], [0], [dnl > @@ -4378,10 +4378,10 @@ > skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:09,dst=50:54:00:00 > > skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.4,dst=10.0.0.3,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), > actions:drop > ]) > AT_CHECK([cat ovs-vswitchd.log | grep '00:09.*packets:3' | > FILTER_FLOW_DUMP], [0], [dnl > -skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), > packets:3, bytes:180, used:0.0s > +skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), > packets:3, bytes:180, used:0.0s, actions:2 > ]) > AT_CHECK([cat ovs-vswitchd.log | grep '00:0b.*packets:3' | > FILTER_FLOW_DUMP], [0], [dnl > -skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.4,dst=10.0.0.3,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), > packets:3, bytes:180, used:0.0s > +skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.4,dst=10.0.0.3,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), > packets:3, bytes:180, used:0.0s, actions:drop > ]) > OVS_VSWITCHD_STOP > AT_CLEANUP > -- > 1.7.10.4 > _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev