Hey Ben and Ethan, Could you review this RFC when you are available?
This patch is pretty complete. I'll still do some experiments to find the best UPCALL_QUEUE_LENGTH and UPCALL_QUEUES value. Thanks, Alex Wang, On Tue, Nov 19, 2013 at 6:09 PM, Alex Wang <al...@nicira.com> wrote: > This commit improves the upcall dispatching fairness by > introduing a 2-stage scheme. And the two stages are run in two > threads, 'dispatcher' and 'distributor', respectively. > > At the first stage, the dispatcher thread will read upcalls from > kernel and put the upcalls into the fair queues based on the L2 > header information. At the second stage, the distributor thread > will iterate over the fair queues in a Round Robin fashion. Each > time, it will insert the upcall at the front of the fair queue into > the corresponding upcall handler thread's queue for processing. > > Experiment shows big improvement in handling fairness and slight > improvement in flow setup rate. > > Signed-off-by: Alex Wang <al...@nicira.com> > --- > lib/guarded-list.c | 12 ++ > lib/guarded-list.h | 1 + > ofproto/ofproto-dpif-upcall.c | 267 > +++++++++++++++++++++++++++++++++-------- > 3 files changed, 232 insertions(+), 48 deletions(-) > > diff --git a/lib/guarded-list.c b/lib/guarded-list.c > index cbb2030..2dc0c48 100644 > --- a/lib/guarded-list.c > +++ b/lib/guarded-list.c > @@ -44,6 +44,18 @@ guarded_list_is_empty(const struct guarded_list *list) > return empty; > } > > +struct list* > +guarded_list_front(struct guarded_list *list) > +{ > + struct list *ret; > + > + ovs_mutex_lock(&list->mutex); > + ret = list_front(&list->list); > + ovs_mutex_unlock(&list->mutex); > + > + return ret; > +} > + > /* If 'list' has fewer than 'max' elements, adds 'node' at the end of the > list > * and returns the number of elements now on the list. > * > diff --git a/lib/guarded-list.h b/lib/guarded-list.h > index 625865d..1a26dc8 100644 > --- a/lib/guarded-list.h > +++ b/lib/guarded-list.h > @@ -33,6 +33,7 @@ void guarded_list_destroy(struct guarded_list *); > > bool guarded_list_is_empty(const struct guarded_list *); > > +struct list *guarded_list_front(struct guarded_list *); > size_t guarded_list_push_back(struct guarded_list *, struct list *, > size_t max); > struct list *guarded_list_pop_front(struct guarded_list *); > diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c > index dba3d3b..97e9d22 100644 > --- a/ofproto/ofproto-dpif-upcall.c > +++ b/ofproto/ofproto-dpif-upcall.c > @@ -36,7 +36,9 @@ > #include "poll-loop.h" > #include "vlog.h" > > -#define MAX_QUEUE_LENGTH 512 > +#define MAX_QUEUE_LENGTH 128 > +#define UPCALL_QUEUES 512 > +#define UPCALL_QUEUE_LENGTH 64 > > VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall); > > @@ -45,7 +47,7 @@ COVERAGE_DEFINE(upcall_queue_overflow); > COVERAGE_DEFINE(fmb_queue_overflow); > COVERAGE_DEFINE(fmb_queue_revalidated); > > -/* A thread that processes each upcall handed to it by the dispatcher > thread, > +/* A thread that processes each upcall handed to it by the distributor > thread, > * forwards the upcall's packet, and then queues it to the main > ofproto_dpif > * to possibly set up a kernel flow as a cache. */ > struct handler { > @@ -58,8 +60,8 @@ struct handler { > struct list upcalls OVS_GUARDED; > size_t n_upcalls OVS_GUARDED; > > - size_t n_new_upcalls; /* Only changed by the dispatcher. > */ > - bool need_signal; /* Only changed by the dispatcher. > */ > + size_t n_new_upcalls; /* Only changed by the > distributor. */ > + bool need_signal; /* Only changed by the > distributor. */ > > pthread_cond_t wake_cond; /* Wakes 'thread' while holding > 'mutex'. */ > @@ -67,11 +69,12 @@ struct handler { > > /* An upcall handler for ofproto_dpif. > * > - * udpif is implemented as a "dispatcher" thread that reads upcalls from > the > - * kernel. It processes each upcall just enough to figure out its next > - * destination. For a "miss" upcall (MISS_UPCALL), this is one of several > - * "handler" threads (see struct handler). Other upcalls are queued to > the > - * main ofproto_dpif. */ > + * udpif is implemented as two threads, "dispatcher" and "distributor". > + * "dispatcher" thread reads upcalls from the kernel and puts the upcalls > + * to the corresponding fair queues based on L2+L3 header. "distributor" > + * thread reads upcalls from fair queues in a round-robin fashion and puts > + * the upcalls to the corresponding upcall handler's queue based on L4 > + * header. */ > struct udpif { > struct dpif *dpif; /* Datapath handle. */ > struct dpif_backer *backer; /* Opaque dpif_backer pointer. */ > @@ -79,6 +82,7 @@ struct udpif { > uint32_t secret; /* Random seed for upcall hash. */ > > pthread_t dispatcher; /* Dispatcher thread ID. */ > + pthread_t distributor; /* Distributor thread ID. */ > > struct handler *handlers; /* Upcall handlers. */ > size_t n_handlers; > @@ -93,6 +97,33 @@ struct udpif { > struct seq *wait_seq; > > struct latch exit_latch; /* Tells child threads to exit. */ > + > + /* Fair queues for upcalls. */ > + struct guarded_list upcall_queues[UPCALL_QUEUES]; > + > + /* For waking up the distributor thread, when there are upcalls > + * to distribute. */ > + struct seq *distributor_seq; > + /* For waking up the distributor thread, when the handler's queue > + * has more room. */ > + struct seq *hol_block_seq; > + > + struct ovs_mutex mutex; /* Mutex guarding the following. */ > + > + /* Contains the index of non-empty "upcall_queues". */ > + struct list non_empty_list; > + /* Indicates if upcall_queue at index has already been in > + * non_empty_list. */ > + int non_empty_list_map[UPCALL_QUEUES]; > + /* Indicates if upcall_queue at index is blocked by the front upcall. > */ > + int hol_block_map[UPCALL_QUEUES]; > +}; > + > +/* In udpif's "non_empty_list". Used to store an index, which indicates > + * a non-empty upcall_queue. */ > +struct non_empty_list_entry { > + struct list list_node; > + int value; > }; > > enum upcall_type { > @@ -107,6 +138,9 @@ struct upcall { > struct list list_node; /* For queuing upcalls. */ > struct flow_miss *flow_miss; /* This upcall's flow_miss. */ > > + uint32_t hash_1; /* Used by dispatcher thread. */ > + uint32_t hash_2; /* Used by distributor thread. */ > + > /* Raw upcall plus data for keeping track of the memory backing it. */ > struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */ > struct ofpbuf upcall_buf; /* Owns some data in 'dpif_upcall'. */ > @@ -118,24 +152,34 @@ static void upcall_destroy(struct upcall *); > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > > static void recv_upcalls(struct udpif *); > +static void distribute_upcalls(struct udpif *); > static void handle_upcalls(struct udpif *, struct list *upcalls); > static void miss_destroy(struct flow_miss *); > static void *udpif_dispatcher(void *); > +static void *udpif_distributor(void *); > static void *udpif_upcall_handler(void *); > > struct udpif * > udpif_create(struct dpif_backer *backer, struct dpif *dpif) > { > struct udpif *udpif = xzalloc(sizeof *udpif); > + size_t i; > > udpif->dpif = dpif; > udpif->backer = backer; > udpif->secret = random_uint32(); > udpif->wait_seq = seq_create(); > + udpif->distributor_seq = seq_create(); > + udpif->hol_block_seq = seq_create(); > latch_init(&udpif->exit_latch); > + list_init(&udpif->non_empty_list); > guarded_list_init(&udpif->drop_keys); > guarded_list_init(&udpif->fmbs); > + ovs_mutex_init(&udpif->mutex); > atomic_init(&udpif->reval_seq, 0); > + for (i = 0; i < UPCALL_QUEUES; i++) { > + guarded_list_init(&udpif->upcall_queues[i]); > + } > > return udpif; > } > @@ -145,6 +189,7 @@ udpif_destroy(struct udpif *udpif) > { > struct flow_miss_batch *fmb; > struct drop_key *drop_key; > + size_t i; > > udpif_recv_set(udpif, 0, false); > > @@ -160,6 +205,12 @@ udpif_destroy(struct udpif *udpif) > guarded_list_destroy(&udpif->fmbs); > latch_destroy(&udpif->exit_latch); > seq_destroy(udpif->wait_seq); > + seq_destroy(udpif->distributor_seq); > + seq_destroy(udpif->hol_block_seq); > + ovs_mutex_destroy(&udpif->mutex); > + for (i = 0; i < UPCALL_QUEUES; i++) { > + guarded_list_destroy(&udpif->upcall_queues[i]); > + } > free(udpif); > } > > @@ -189,6 +240,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers, > bool enable) > } > > xpthread_join(udpif->dispatcher, NULL); > + xpthread_join(udpif->distributor, NULL); > for (i = 0; i < udpif->n_handlers; i++) { > struct handler *handler = &udpif->handlers[i]; > struct upcall *miss, *next; > @@ -230,6 +282,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers, > bool enable) > handler); > } > xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, > udpif); > + xpthread_create(&udpif->distributor, NULL, udpif_distributor, > udpif); > } > } > > @@ -368,8 +421,8 @@ udpif_drop_key_clear(struct udpif *udpif) > } > } > > -/* The dispatcher thread is responsible for receiving upcalls from the > kernel, > - * assigning them to a upcall_handler thread. */ > +/* The dispatcher thread is responsible for reading upcalls from > + * the kernel, assigning them to udpif's "upcall_queues". */ > static void * > udpif_dispatcher(void *arg) > { > @@ -386,9 +439,36 @@ udpif_dispatcher(void *arg) > return NULL; > } > > -/* The miss handler thread is responsible for processing miss upcalls > retrieved > - * by the dispatcher thread. Once finished it passes the processed miss > - * upcalls to ofproto-dpif where they're installed in the datapath. */ > +/* The distributor thread is responsible for reading upcalls from > + * udpif's "upcall_queues" and distributing them into corresponding > + * upcall handler's queue. > + * > + * If all of the front upcalls in "upcall_queues" are blocked (similar > + * to head-of-line blocking), the distributor thread will wait on the > + * hol_block_seq. */ > +static void * > +udpif_distributor(void *arg) > +{ > + struct udpif *udpif = arg; > + > + set_subprogram_name("distributor"); > + while (!latch_is_set(&udpif->exit_latch)) { > + uint64_t distributor_seq = seq_read(udpif->distributor_seq); > + uint64_t hol_block_seq = seq_read(udpif->hol_block_seq); > + > + distribute_upcalls(udpif); > + seq_wait(udpif->distributor_seq, distributor_seq); > + seq_wait(udpif->hol_block_seq, hol_block_seq); > + latch_wait(&udpif->exit_latch); > + poll_block(); > + } > + > + return NULL; > +} > + > +/* The miss handler thread is responsible for processing miss upcalls > handed > + * to it by the distributor thread. Once finished it passes the processed > + * miss upcalls to ofproto-dpif where they're installed in the datapath. > */ > static void * > udpif_upcall_handler(void *arg) > { > @@ -420,6 +500,9 @@ udpif_upcall_handler(void *arg) > } > ovs_mutex_unlock(&handler->mutex); > > + /* Changes the udpif->hol_block_seq every time reading a batch of > + * upcalls. */ > + seq_change(handler->udpif->hol_block_seq); > handle_upcalls(handler->udpif, &misses); > > coverage_clear(); > @@ -490,19 +573,21 @@ classify_upcall(const struct upcall *upcall) > static void > recv_upcalls(struct udpif *udpif) > { > - int n; > - > + /* recv upcalls from dpif and put them into the upcall_queues. */ > for (;;) { > - uint32_t hash = udpif->secret; > - struct handler *handler; > struct upcall *upcall; > + struct guarded_list *queue; > size_t n_bytes, left; > struct nlattr *nla; > - int error; > + int idx, error; > > upcall = xmalloc(sizeof *upcall); > - ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub, > + upcall->hash_1 = udpif->secret; > + upcall->hash_2 = udpif->secret; > + ofpbuf_use_stub(&upcall->upcall_buf, > + upcall->upcall_stub, > sizeof upcall->upcall_stub); > + > error = dpif_recv(udpif->dpif, &upcall->dpif_upcall, > &upcall->upcall_buf); > if (error) { > @@ -514,11 +599,20 @@ recv_upcalls(struct udpif *udpif) > NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key, > upcall->dpif_upcall.key_len) { > enum ovs_key_attr type = nl_attr_type(nla); > - if (type == OVS_KEY_ATTR_IN_PORT > - || type == OVS_KEY_ATTR_TCP > - || type == OVS_KEY_ATTR_UDP) { > + if (type == OVS_KEY_ATTR_ETHERNET) { > + if (nl_attr_get_size(nla) == 12) { > + upcall->hash_1 = hash_bytes(nl_attr_get_unspec(nla, > 12), > + 2 * ETH_ADDR_LEN, 0); > + } else { > + VLOG_WARN_RL(&rl, > + "Netlink attribute with incorrect > size."); > + } > + } else if (type == OVS_KEY_ATTR_IN_PORT > + || type == OVS_KEY_ATTR_TCP > + || type == OVS_KEY_ATTR_UDP) { > if (nl_attr_get_size(nla) == 4) { > - hash = mhash_add(hash, nl_attr_get_u32(nla)); > + upcall->hash_2 = mhash_add(upcall->hash_2, > + nl_attr_get_u32(nla)); > n_bytes += 4; > } else { > VLOG_WARN_RL(&rl, > @@ -526,37 +620,112 @@ recv_upcalls(struct udpif *udpif) > } > } > } > - hash = mhash_finish(hash, n_bytes); > + upcall->hash_2 = mhash_finish(upcall->hash_2, n_bytes); > + > + idx = upcall->hash_1 % UPCALL_QUEUES; > + /* Selects the upcall_queues to insert the upcall. */ > + queue = &udpif->upcall_queues[idx]; > + /* Drops upcall if queue length exceeds UPCALL_QUEUE_LENGTH. */ > + if (!guarded_list_push_back(queue, &upcall->list_node, > + UPCALL_QUEUE_LENGTH)) { > + upcall_destroy(upcall); > + } else { > + /* If successfully inserted, check if the queue should be > recorded > + * in the non_empty_list. */ > + ovs_mutex_lock(&udpif->mutex); > + if (!udpif->non_empty_list_map[idx] > + && !guarded_list_is_empty(queue)) { > + struct non_empty_list_entry *entry = xmalloc(sizeof > *entry); > + > + entry->value = idx; > + udpif->non_empty_list_map[idx] = 1; > + list_push_back(&udpif->non_empty_list, &entry->list_node); > + if (list_is_singleton(&udpif->non_empty_list)) { > + seq_change(udpif->distributor_seq); > + } > + } > + ovs_mutex_unlock(&udpif->mutex); > + } > + } > +} > + > +static void > +distribute_upcalls(struct udpif *udpif) > +{ > + int n; > > - handler = &udpif->handlers[hash % udpif->n_handlers]; > + while (!list_is_empty(&udpif->non_empty_list)) { > + struct non_empty_list_entry *front; > + struct guarded_list *queue; > + struct upcall *upcall; > + struct handler *handler; > + > + front = CONTAINER_OF(list_front(&udpif->non_empty_list), > + struct non_empty_list_entry, list_node); > + > + queue = &udpif->upcall_queues[front->value]; > + upcall = CONTAINER_OF(guarded_list_front(queue), > + struct upcall, list_node); > + handler = &udpif->handlers[upcall->hash_2 % udpif->n_handlers]; > > ovs_mutex_lock(&handler->mutex); > - if (handler->n_upcalls < MAX_QUEUE_LENGTH) { > - list_push_back(&handler->upcalls, &upcall->list_node); > - if (handler->n_upcalls == 0) { > - handler->need_signal = true; > - } > - handler->n_upcalls++; > - if (handler->need_signal && > - handler->n_upcalls >= FLOW_MISS_MAX_BATCH) { > - handler->need_signal = false; > - xpthread_cond_signal(&handler->wake_cond); > - } > + if (handler->n_upcalls >= MAX_QUEUE_LENGTH) { > ovs_mutex_unlock(&handler->mutex); > - if (!VLOG_DROP_DBG(&rl)) { > - struct ds ds = DS_EMPTY_INITIALIZER; > - > - odp_flow_key_format(upcall->dpif_upcall.key, > - upcall->dpif_upcall.key_len, > - &ds); > - VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds)); > - ds_destroy(&ds); > + > + /* If the handler queue is full, moves front node to the > back. */ > + ovs_mutex_lock(&udpif->mutex); > + list_pop_front(&udpif->non_empty_list); > + list_push_back(&udpif->non_empty_list, &front->list_node); > + > + /* Updates the hol_block_map. */ > + if (!udpif->hol_block_map) { > + udpif->hol_block_map[front->value] = 1; > } > + > + /* If all queues are blocked by the head upcall, jump out the > + * loop. */ > + if (!memcmp(udpif->hol_block_map, udpif->non_empty_list_map, > + UPCALL_QUEUES)) { > + ovs_mutex_unlock(&udpif->mutex); > + break; > + } > + ovs_mutex_unlock(&udpif->mutex); > + continue; > + } > + > + guarded_list_pop_front(queue); > + list_push_back(&handler->upcalls, &upcall->list_node); > + if (handler->n_upcalls == 0) { > + handler->need_signal = true; > + } > + handler->n_upcalls++; > + if (handler->need_signal && > + handler->n_upcalls >= FLOW_MISS_MAX_BATCH) { > + handler->need_signal = false; > + xpthread_cond_signal(&handler->wake_cond); > + } > + ovs_mutex_unlock(&handler->mutex); > + if (!VLOG_DROP_DBG(&rl)) { > + struct ds ds = DS_EMPTY_INITIALIZER; > + > + odp_flow_key_format(upcall->dpif_upcall.key, > + upcall->dpif_upcall.key_len, > + &ds); > + VLOG_DBG("distributor: enqueue (%s)", ds_cstr(&ds)); > + ds_destroy(&ds); > + } > + > + /* Do not insert 'front' back to non_empty_list if the > + * list becomes empty. */ > + ovs_mutex_lock(&udpif->mutex); > + list_pop_front(&udpif->non_empty_list); > + if (guarded_list_is_empty(queue)) { > + udpif->non_empty_list_map[front->value] = 0; > + free(front); > } else { > - ovs_mutex_unlock(&handler->mutex); > - COVERAGE_INC(upcall_queue_overflow); > - upcall_destroy(upcall); > + list_push_back(&udpif->non_empty_list, &front->list_node); > } > + ovs_mutex_unlock(&udpif->mutex); > } > > for (n = 0; n < udpif->n_handlers; ++n) { > @@ -569,6 +738,8 @@ recv_upcalls(struct udpif *udpif) > ovs_mutex_unlock(&handler->mutex); > } > } > + > + memset(udpif->hol_block_map, '\0', UPCALL_QUEUES); > } > > static struct flow_miss * > -- > 1.7.9.5 > >
_______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev