This commit improves the upcall dispatching fairness by introduing a 2-stage scheme. At the first stage, the dispatcher thread will try receiving upcalls from dpif and hashing the upcalls to the local queues based on the L2 header information. When either there is no upcall to receive or one of the dispatcher queues is full, the disptacher will go to the second stage. At the second stage, the dispatcher will iterate over its queues in Round Robin fashion. Each time, it will try inserting the upcall at the front of queue into the corresponding upcall handler thread's queue.
Signed-off-by: Alex Wang <al...@nicira.com> --- ofproto/ofproto-dpif-upcall.c | 137 ++++++++++++++++++++++++++++++++++------- 1 file changed, 114 insertions(+), 23 deletions(-) diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index 24a5729..874bfbe 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -37,6 +37,7 @@ #include "vlog.h" #define MAX_QUEUE_LENGTH 512 +#define UPCALL_QUEUE_LENGTH 256 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall); @@ -79,6 +80,9 @@ struct udpif { uint32_t secret; /* Random seed for upcall hash. */ pthread_t dispatcher; /* Dispatcher thread ID. */ + /* Queues upcalls based on L2 header. */ + struct list upcall_queues[UPCALL_QUEUE_LENGTH]; + uint16_t upcall_queue_lens[UPCALL_QUEUE_LENGTH]; struct handler *handlers; /* Upcall handlers. */ size_t n_handlers; @@ -95,6 +99,22 @@ struct udpif { struct latch exit_latch; /* Tells child threads to exit. */ }; +/* An entry in one of the upcall_queues. */ +struct upcall_queue_entry { + struct list list_node; /* In one of upcall_queues. */ + + struct upcall *upcall; /* The received upcall. */ + uint32_t hash_1; /* Level one distribution hash. */ + uint32_t hash_2; /* Level two distribution hash. */ +}; + +/* Used to store the value, which indicates the index + * of a non-empty queue in "udpif"'s upcall_queues. */ +struct non_empty_entry { + struct list list_node; + int value; +}; + enum upcall_type { BAD_UPCALL, /* Some kind of bug somewhere. */ MISS_UPCALL, /* A flow miss. */ @@ -127,6 +147,7 @@ struct udpif * udpif_create(struct dpif_backer *backer, struct dpif *dpif) { struct udpif *udpif = xzalloc(sizeof *udpif); + size_t i; udpif->dpif = dpif; udpif->backer = backer; @@ -136,6 +157,9 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif) guarded_list_init(&udpif->drop_keys); guarded_list_init(&udpif->fmbs); atomic_init(&udpif->reval_seq, 0); + for (i = 0; i < UPCALL_QUEUE_LENGTH; i++) { + list_init(udpif->upcall_queues + i); + } return udpif; } @@ -490,35 +514,55 @@ classify_upcall(const struct upcall *upcall) static void recv_upcalls(struct udpif *udpif) { + struct list non_empty_list; + /* Indicates if the upcall_queues index has already been recorded in + * the non_empty_list. */ + int qmap[UPCALL_QUEUE_LENGTH] = {0}; int n; + list_init(&non_empty_list); + + /* Stage-1: recv upcalls from dpif and put them into the upcall_queues. */ for (;;) { - uint32_t hash = udpif->secret; - struct handler *handler; - struct upcall *upcall; + struct upcall_queue_entry *q_entry; + struct list *queue; size_t n_bytes, left; struct nlattr *nla; - int error; - - upcall = xmalloc(sizeof *upcall); - ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub, - sizeof upcall->upcall_stub); - error = dpif_recv(udpif->dpif, &upcall->dpif_upcall, - &upcall->upcall_buf); + int idx, error; + + q_entry = xmalloc(sizeof *q_entry); + q_entry->upcall = xmalloc(sizeof *q_entry->upcall); + q_entry->hash_1 = udpif->secret; + q_entry->hash_2 = udpif->secret; + ofpbuf_use_stub(&q_entry->upcall->upcall_buf, + q_entry->upcall->upcall_stub, + sizeof q_entry->upcall->upcall_stub); + + error = dpif_recv(udpif->dpif, &q_entry->upcall->dpif_upcall, + &q_entry->upcall->upcall_buf); if (error) { - upcall_destroy(upcall); + upcall_destroy(q_entry->upcall); break; } n_bytes = 0; - NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key, - upcall->dpif_upcall.key_len) { + NL_ATTR_FOR_EACH (nla, left, q_entry->upcall->dpif_upcall.key, + q_entry->upcall->dpif_upcall.key_len) { enum ovs_key_attr type = nl_attr_type(nla); - if (type == OVS_KEY_ATTR_IN_PORT - || type == OVS_KEY_ATTR_TCP - || type == OVS_KEY_ATTR_UDP) { + if (type == OVS_KEY_ATTR_ETHERNET) { + if (nl_attr_get_size(nla) == 12) { + q_entry->hash_1 = hash_bytes(nl_attr_get_unspec(nla, 12), + 2 * ETH_ADDR_LEN, 0); + } else { + VLOG_WARN_RL(&rl, + "Netlink attribute with incorrect size."); + } + } else if (type == OVS_KEY_ATTR_IN_PORT + || type == OVS_KEY_ATTR_TCP + || type == OVS_KEY_ATTR_UDP) { if (nl_attr_get_size(nla) == 4) { - hash = mhash_add(hash, nl_attr_get_u32(nla)); + q_entry->hash_2 = mhash_add(q_entry->hash_2, + nl_attr_get_u32(nla)); n_bytes += 4; } else { VLOG_WARN_RL(&rl, @@ -526,13 +570,48 @@ recv_upcalls(struct udpif *udpif) } } } - hash = mhash_finish(hash, n_bytes); + q_entry->hash_2 = mhash_finish(q_entry->hash_2, n_bytes); + + idx = q_entry->hash_1 % UPCALL_QUEUE_LENGTH; + if (!qmap[idx]) { + struct non_empty_entry *n_entry = xmalloc(sizeof *n_entry); + + n_entry->value = idx; + qmap[idx] = 1; + list_push_back(&non_empty_list, &n_entry->list_node); + } + + /* Pushes the entry to the corresponding queue in + * udpif->upcal_queues, based on hash_1. */ + queue = &udpif->upcall_queues[idx]; + list_push_back(queue, &q_entry->list_node); + + /* If any queue length exceeds UPCALL_QUEUE_LENGTH, + * break out the while loop. */ + udpif->upcall_queue_lens[idx]++; + if (udpif->upcall_queue_lens[idx] == UPCALL_QUEUE_LENGTH) { + break; + } + } + + while (!list_is_empty(&non_empty_list)) { + struct non_empty_entry *front; + struct list *queue; + struct upcall_queue_entry *q_entry; + struct handler *handler; + - handler = &udpif->handlers[hash % udpif->n_handlers]; + front = CONTAINER_OF(list_pop_front(&non_empty_list), + struct non_empty_entry, list_node); + + queue = &udpif->upcall_queues[front->value]; + q_entry = CONTAINER_OF(list_pop_front(queue), + struct upcall_queue_entry, list_node); + handler = &udpif->handlers[q_entry->hash_2 % udpif->n_handlers]; ovs_mutex_lock(&handler->mutex); if (handler->n_upcalls < MAX_QUEUE_LENGTH) { - list_push_back(&handler->upcalls, &upcall->list_node); + list_push_back(&handler->upcalls, &q_entry->upcall->list_node); if (handler->n_upcalls == 0) { handler->need_signal = true; } @@ -546,8 +625,8 @@ recv_upcalls(struct udpif *udpif) if (!VLOG_DROP_DBG(&rl)) { struct ds ds = DS_EMPTY_INITIALIZER; - odp_flow_key_format(upcall->dpif_upcall.key, - upcall->dpif_upcall.key_len, + odp_flow_key_format(q_entry->upcall->dpif_upcall.key, + q_entry->upcall->dpif_upcall.key_len, &ds); VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds)); ds_destroy(&ds); @@ -555,7 +634,19 @@ recv_upcalls(struct udpif *udpif) } else { ovs_mutex_unlock(&handler->mutex); COVERAGE_INC(upcall_queue_overflow); - upcall_destroy(upcall); + upcall_destroy(q_entry->upcall); + } + + /* Always frees the upcall_queue entry. */ + free(q_entry); + + /* Do not insert 'front' back to non_empty_list if the + * list becomes empty. */ + if (list_is_empty(queue)) { + udpif->upcall_queue_lens[front->value] = 0; + free(front); + } else { + list_push_back(&non_empty_list, &front->list_node); } } -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev