Signed-off-by: Alex Wang <al...@nicira.com> --- lib/dpif-netdev.c | 190 ++++++++++++++++++++++++++++++++++------------------- lib/flow.c | 18 +++++ lib/flow.h | 1 + 3 files changed, 143 insertions(+), 66 deletions(-)
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index ee8be43..62a775f 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -73,7 +73,6 @@ enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */ enum { DP_NETDEV_HEADROOM = 2 + VLAN_HEADER_LEN }; /* Queues. */ -enum { N_QUEUES = 2 }; /* Number of queues for dpif_recv(). */ enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. */ enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 }; BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN)); @@ -90,14 +89,17 @@ struct dp_netdev_upcall { struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */ }; -/* A queue passing packets from a struct dp_netdev to its clients. +/* A queue passing packets from a struct dp_netdev to its clients (handlers). * * * Thread-safety * ============= * - * Any access at all requires the owning 'dp_netdev''s queue_mutex. */ + * Any access at all requires the owning 'dp_netdev''s queue_rwlock and + * its own mutex. */ struct dp_netdev_queue { + struct ovs_mutex mutex; + struct seq *seq; /* Incremented whenever a packet is queued. */ struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED; unsigned int head OVS_GUARDED; unsigned int tail OVS_GUARDED; @@ -118,7 +120,7 @@ struct dp_netdev_queue { * port_rwlock * flow_mutex * cls.rwlock - * queue_mutex + * queue_rwlock */ struct dp_netdev { const struct dpif_class *const class; @@ -140,10 +142,12 @@ struct dp_netdev { /* Queues. * - * Everything in 'queues' is protected by 'queue_mutex'. */ - struct ovs_mutex queue_mutex; - struct dp_netdev_queue queues[N_QUEUES]; - struct seq *queue_seq; /* Incremented whenever a packet is queued. */ + * 'queue_rwlock' protects the modification of 'handler_queues' and + * 'n_handlers'. The queue elements are protected by its + * 'handler_queues''s mutex. */ + struct fat_rwlock queue_rwlock; + struct dp_netdev_queue *handler_queues; + uint32_t n_handlers; /* Statistics. * @@ -317,12 +321,15 @@ static int do_add_port(struct dp_netdev *dp, const char *devname, OVS_REQ_WRLOCK(dp->port_rwlock); static int do_del_port(struct dp_netdev *dp, odp_port_t port_no) OVS_REQ_WRLOCK(dp->port_rwlock); +static void dp_netdev_destroy_all_queues(struct dp_netdev *dp) + OVS_EXCLUDED(dp->queue_rwlock); static int dpif_netdev_open(const struct dpif_class *, const char *name, bool create, struct dpif **); static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *, - int queue_no, const struct flow *, - const struct nlattr *userdata) - OVS_EXCLUDED(dp->queue_mutex); + int queue_no, int type, + const struct flow *, + const struct nlattr *userdata) + OVS_EXCLUDED(dp->queue_rwlock); static void dp_netdev_execute_actions(struct dp_netdev *dp, const struct flow *, struct ofpbuf *, struct pkt_metadata *, @@ -440,7 +447,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class, { struct dp_netdev *dp; int error; - int i; dp = xzalloc(sizeof *dp); shash_add(&dp_netdevs, name, dp); @@ -454,13 +460,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class, classifier_init(&dp->cls, NULL); hmap_init(&dp->flow_table); - ovs_mutex_init(&dp->queue_mutex); - ovs_mutex_lock(&dp->queue_mutex); - for (i = 0; i < N_QUEUES; i++) { - dp->queues[i].head = dp->queues[i].tail = 0; - } - ovs_mutex_unlock(&dp->queue_mutex); - dp->queue_seq = seq_create(); + fat_rwlock_init(&dp->queue_rwlock); dp->n_hit = ovsthread_counter_create(); dp->n_missed = ovsthread_counter_create(); @@ -510,20 +510,21 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, static void dp_netdev_purge_queues(struct dp_netdev *dp) + OVS_REQ_WRLOCK(dp->queue_rwlock) { int i; - ovs_mutex_lock(&dp->queue_mutex); - for (i = 0; i < N_QUEUES; i++) { - struct dp_netdev_queue *q = &dp->queues[i]; + for (i = 0; i < dp->n_handlers; i++) { + struct dp_netdev_queue *q = &dp->handler_queues[i]; + ovs_mutex_lock(&q->mutex); while (q->tail != q->head) { struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; ofpbuf_uninit(&u->upcall.packet); ofpbuf_uninit(&u->buf); } + ovs_mutex_unlock(&q->mutex); } - ovs_mutex_unlock(&dp->queue_mutex); } /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp' @@ -549,9 +550,8 @@ dp_netdev_free(struct dp_netdev *dp) ovsthread_counter_destroy(dp->n_missed); ovsthread_counter_destroy(dp->n_lost); - dp_netdev_purge_queues(dp); - seq_destroy(dp->queue_seq); - ovs_mutex_destroy(&dp->queue_mutex); + dp_netdev_destroy_all_queues(dp); + fat_rwlock_destroy(&dp->queue_rwlock); classifier_destroy(&dp->cls); hmap_destroy(&dp->flow_table); @@ -1430,18 +1430,73 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) return 0; } +static void +dp_netdev_init_queue(struct dp_netdev_queue *q) +{ + ovs_mutex_init(&q->mutex); + q->seq = seq_create(); +} + +static void +dp_netdev_uninit_queue(struct dp_netdev_queue *q) +{ + ovs_mutex_destroy(&q->mutex); + seq_destroy(q->seq); +} + +static void +dp_netdev_destroy_all_queues(struct dp_netdev *dp) + OVS_EXCLUDED(dp->queue_rwlock) +{ + size_t i; + + fat_rwlock_wrlock(&dp->queue_rwlock); + dp_netdev_purge_queues(dp); + + for (i = 0; i < dp->n_handlers; i++) { + dp_netdev_uninit_queue(&dp->handler_queues[i]); + } + free(dp->handler_queues); + dp->handler_queues = NULL; + dp->n_handlers = 0; + fat_rwlock_unlock(&dp->queue_rwlock); +} + +static void +dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers) + OVS_EXCLUDED(dp->queue_rwlock) +{ + if (dp->n_handlers != n_handlers) { + size_t i; + + dp_netdev_destroy_all_queues(dp); + + fat_rwlock_wrlock(&dp->queue_rwlock); + dp->n_handlers = n_handlers; + dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues); + + for (i = 0; i < n_handlers; i++) { + dp_netdev_init_queue(&dp->handler_queues[i]); + } + fat_rwlock_unlock(&dp->queue_rwlock); + } +} + static int -dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED, - uint32_t n_handlers OVS_UNUSED) +dpif_netdev_recv_set(struct dpif *dpif, bool enable OVS_UNUSED, + uint32_t n_handlers) { + struct dp_netdev *dp = get_dp_netdev(dpif); + + dp_netdev_refresh_queues(dp, n_handlers); + return 0; } static int -dpif_netdev_handlers_set(struct dpif *dpif OVS_UNUSED, - uint32_t n_handlers OVS_UNUSED) +dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers) { - return 0; + return dpif_netdev_recv_set(dpif, true, n_handlers); } static int @@ -1452,32 +1507,18 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, return 0; } -static struct dp_netdev_queue * -find_nonempty_queue(struct dp_netdev *dp) - OVS_REQUIRES(dp->queue_mutex) -{ - int i; - - for (i = 0; i < N_QUEUES; i++) { - struct dp_netdev_queue *q = &dp->queues[i]; - if (q->head != q->tail) { - return q; - } - } - return NULL; -} - static int -dpif_netdev_recv(struct dpif *dpif, uint32_t n_handlers OVS_UNUSED, +dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall, struct ofpbuf *buf) { struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_queue *q; int error; - ovs_mutex_lock(&dp->queue_mutex); - q = find_nonempty_queue(dp); - if (q) { + fat_rwlock_rdlock(&dp->queue_rwlock); + q = &dp->handler_queues[handler_id]; + ovs_mutex_lock(&q->mutex); + if (q->head != q->tail) { struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; *upcall = u->upcall; @@ -1489,25 +1530,32 @@ dpif_netdev_recv(struct dpif *dpif, uint32_t n_handlers OVS_UNUSED, } else { error = EAGAIN; } - ovs_mutex_unlock(&dp->queue_mutex); + ovs_mutex_unlock(&q->mutex); + fat_rwlock_unlock(&dp->queue_rwlock); return error; } static void -dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id OVS_UNUSED) +dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id) { struct dp_netdev *dp = get_dp_netdev(dpif); + struct dp_netdev_queue *q; uint64_t seq; - ovs_mutex_lock(&dp->queue_mutex); - seq = seq_read(dp->queue_seq); - if (find_nonempty_queue(dp)) { + fat_rwlock_rdlock(&dp->queue_rwlock); + + q = &dp->handler_queues[handler_id]; + ovs_mutex_lock(&q->mutex); + seq = seq_read(q->seq); + if (q->head != q->tail) { poll_immediate_wake(); } else { - seq_wait(dp->queue_seq, seq); + seq_wait(q->seq, seq); } - ovs_mutex_unlock(&dp->queue_mutex); + + ovs_mutex_unlock(&q->mutex); + fat_rwlock_unlock(&dp->queue_rwlock); } static void @@ -1515,7 +1563,9 @@ dpif_netdev_recv_purge(struct dpif *dpif) { struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); + fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock); dp_netdev_purge_queues(dpif_netdev->dp); + fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock); } /* Creates and returns a new 'struct dp_netdev_actions', with a reference count @@ -1719,27 +1769,31 @@ dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet, ovsthread_counter_inc(dp->n_hit, 1); } else { ovsthread_counter_inc(dp->n_missed, 1); - dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL); + dp_netdev_output_userspace(dp, packet, + flow_hash_5tuple(&key, 0) % dp->n_handlers, + DPIF_UC_MISS, &key, NULL); } } static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, - int queue_no, const struct flow *flow, + int queue_no, int type, const struct flow *flow, const struct nlattr *userdata) - OVS_EXCLUDED(dp->queue_mutex) + OVS_EXCLUDED(dp->queue_rwlock) { - struct dp_netdev_queue *q = &dp->queues[queue_no]; + struct dp_netdev_queue *q; int error; - ovs_mutex_lock(&dp->queue_mutex); + fat_rwlock_rdlock(&dp->queue_rwlock); + q = &dp->handler_queues[queue_no]; + ovs_mutex_lock(&q->mutex); if (q->head - q->tail < MAX_QUEUE_LEN) { struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK]; struct dpif_upcall *upcall = &u->upcall; struct ofpbuf *buf = &u->buf; size_t buf_size; - upcall->type = queue_no; + upcall->type = type; /* Allocate buffer big enough for everything. */ buf_size = ODPUTIL_FLOW_KEY_BYTES; @@ -1764,14 +1818,15 @@ dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, upcall->packet = *packet; ofpbuf_use(packet, NULL, 0); - seq_change(dp->queue_seq); + seq_change(q->seq); error = 0; } else { ovsthread_counter_inc(dp->n_lost, 1); error = ENOBUFS; } - ovs_mutex_unlock(&dp->queue_mutex); + ovs_mutex_unlock(&q->mutex); + fat_rwlock_unlock(&dp->queue_rwlock); return error; } @@ -1808,7 +1863,10 @@ dp_execute_cb(void *aux_, struct ofpbuf *packet, if (!may_steal) { packet = ofpbuf_clone_with_headroom(packet, DP_NETDEV_HEADROOM); } - dp_netdev_output_userspace(aux->dp, packet, DPIF_UC_ACTION, aux->key, + dp_netdev_output_userspace(aux->dp, packet, + flow_hash_5tuple(aux->key, 0) + % aux->dp->n_handlers, + DPIF_UC_ACTION, aux->key, userdata); if (!may_steal) { ofpbuf_uninit(packet); diff --git a/lib/flow.c b/lib/flow.c index e7fe4d3..7a36940 100644 --- a/lib/flow.c +++ b/lib/flow.c @@ -822,6 +822,24 @@ flow_wildcards_set_reg_mask(struct flow_wildcards *wc, int idx, uint32_t mask) wc->masks.regs[idx] = mask; } +/* Calculates the 5-tuple hash from the given flow. */ +uint32_t +flow_hash_5tuple(const struct flow *flow, uint32_t basis) +{ + uint32_t hash; + + if (!flow) { + return 0; + } + + hash = (OVS_FORCE int) flow->nw_src + ^ (OVS_FORCE int) flow->nw_dst + ^ flow->nw_proto ^ (OVS_FORCE int) flow->tp_src + ^ (OVS_FORCE int) flow->tp_dst; + + return jhash_bytes((void *) &hash, sizeof hash, basis); +} + /* Hashes 'flow' based on its L2 through L4 protocol information. */ uint32_t flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis) diff --git a/lib/flow.h b/lib/flow.h index 3109a84..46ab481 100644 --- a/lib/flow.h +++ b/lib/flow.h @@ -323,6 +323,7 @@ void flow_wildcards_fold_minimask_range(struct flow_wildcards *, uint32_t flow_wildcards_hash(const struct flow_wildcards *, uint32_t basis); bool flow_wildcards_equal(const struct flow_wildcards *, const struct flow_wildcards *); +uint32_t flow_hash_5tuple(const struct flow *flow, uint32_t basis); uint32_t flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis); /* Initialize a flow with random fields that matter for nx_hash_fields. */ -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev