On Fri, Dec 27, 2013 at 8:03 PM, Ben Pfaff <b...@nicira.com> wrote: > For now, we use exactly two threads. Presumably at some point we will want > to make this configurable. > > Signed-off-by: Ben Pfaff <b...@nicira.com>
Look good. Thanks. > --- > lib/dpif-netdev.c | 210 > +++++++++++++++++++++++++++++++++++------------------ > 1 file changed, 141 insertions(+), 69 deletions(-) > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index 7070eac..d667afe 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -39,6 +39,7 @@ > #include "dynamic-string.h" > #include "flow.h" > #include "hmap.h" > +#include "latch.h" > #include "list.h" > #include "meta-flow.h" > #include "netdev.h" > @@ -158,6 +159,11 @@ struct dp_netdev { > struct ovs_rwlock port_rwlock; > struct hmap ports OVS_GUARDED; > struct seq *port_seq; /* Incremented whenever a port changes. */ > + > + /* Forwarding threads. */ > + struct latch exit_latch; > + struct dp_forwarder *forwarders; > + size_t n_forwarders; > }; > > static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev > *dp, > @@ -281,6 +287,15 @@ struct dp_netdev_actions *dp_netdev_actions_ref( > const struct dp_netdev_actions *); > void dp_netdev_actions_unref(struct dp_netdev_actions *); > > +/* A thread that receives packets from some ports, looks them up in the flow > + * table, and executes the actions it finds. */ > +struct dp_forwarder { > + struct dp_netdev *dp; > + pthread_t thread; > + char *name; > + uint32_t min_hash, max_hash; > +}; > + > /* Interface to netdev-based datapath. */ > struct dpif_netdev { > struct dpif dpif; > @@ -317,6 +332,7 @@ static void dp_netdev_port_input(struct dp_netdev *dp, > struct dp_netdev_port *port, > struct ofpbuf *packet) > OVS_REQ_RDLOCK(dp->port_rwlock); > +static void dp_netdev_set_threads(struct dp_netdev *, int n); > > static struct dpif_netdev * > dpif_netdev_cast(const struct dpif *dpif) > @@ -453,6 +469,7 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > ovs_rwlock_init(&dp->port_rwlock); > hmap_init(&dp->ports); > dp->port_seq = seq_create(); > + latch_init(&dp->exit_latch); > > ovs_rwlock_wrlock(&dp->port_rwlock); > error = do_add_port(dp, name, "internal", ODPP_LOCAL); > @@ -461,6 +478,7 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > dp_netdev_free(dp); > return error; > } > + dp_netdev_set_threads(dp, 2); > > *dpp = dp; > return 0; > @@ -518,6 +536,9 @@ dp_netdev_free(struct dp_netdev *dp) > > shash_find_and_delete(&dp_netdevs, dp->name); > > + dp_netdev_set_threads(dp, 0); > + free(dp->forwarders); > + > dp_netdev_flow_flush(dp); > ovs_rwlock_wrlock(&dp->port_rwlock); > HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) { > @@ -539,6 +560,7 @@ dp_netdev_free(struct dp_netdev *dp) > hmap_destroy(&dp->ports); > atomic_flag_destroy(&dp->destroyed); > ovs_refcount_destroy(&dp->ref_cnt); > + latch_destroy(&dp->exit_latch); > free(CONST_CAST(char *, dp->name)); > free(dp); > } > @@ -1526,6 +1548,123 @@ dp_netdev_actions_unref(struct dp_netdev_actions > *actions) > } > } > > +static void * > +dp_forwarder_main(void *f_) > +{ > + struct dp_forwarder *f = f_; > + struct dp_netdev *dp = f->dp; > + struct ofpbuf packet; > + > + f->name = xasprintf("forwarder_%u", ovsthread_id_self()); > + set_subprogram_name("%s", f->name); > + > + ofpbuf_init(&packet, 0); > + while (!latch_is_set(&dp->exit_latch)) { > + bool received_anything; > + int i; > + > + ovs_rwlock_rdlock(&dp->port_rwlock); > + for (i = 0; i < 50; i++) { > + struct dp_netdev_port *port; > + > + received_anything = false; > + HMAP_FOR_EACH (port, node, &f->dp->ports) { > + if (port->rx > + && port->node.hash >= f->min_hash > + && port->node.hash <= f->max_hash) { > + int buf_size; > + int error; > + int mtu; > + > + if (netdev_get_mtu(port->netdev, &mtu)) { > + mtu = ETH_PAYLOAD_MAX; > + } > + buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + > mtu; > + > + ofpbuf_clear(&packet); > + ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM, > + buf_size); > + > + error = netdev_rx_recv(port->rx, &packet); > + if (!error) { > + received_anything = true; > + dp_netdev_port_input(dp, port, &packet); > + } else if (error != EAGAIN && error != EOPNOTSUPP) { > + static struct vlog_rate_limit rl > + = VLOG_RATE_LIMIT_INIT(1, 5); > + > + VLOG_ERR_RL(&rl, "error receiving data from %s: %s", > + netdev_get_name(port->netdev), > + ovs_strerror(error)); > + } > + } > + } > + > + if (!received_anything) { > + break; > + } > + } > + > + if (received_anything) { > + poll_immediate_wake(); > + } else { > + struct dp_netdev_port *port; > + > + HMAP_FOR_EACH (port, node, &f->dp->ports) > + if (port->rx > + && port->node.hash >= f->min_hash > + && port->node.hash <= f->max_hash) { > + netdev_rx_wait(port->rx); > + } > + seq_wait(dp->port_seq, seq_read(dp->port_seq)); > + latch_wait(&dp->exit_latch); > + } > + ovs_rwlock_unlock(&dp->port_rwlock); > + > + poll_block(); > + } > + ofpbuf_uninit(&packet); > + > + free(f->name); > + > + return NULL; > +} > + > +static void > +dp_netdev_set_threads(struct dp_netdev *dp, int n) > +{ > + int i; > + > + if (n == dp->n_forwarders) { > + return; > + } > + > + /* Stop existing threads. */ > + latch_set(&dp->exit_latch); > + for (i = 0; i < dp->n_forwarders; i++) { > + struct dp_forwarder *f = &dp->forwarders[i]; > + > + xpthread_join(f->thread, NULL); > + } > + latch_poll(&dp->exit_latch); > + free(dp->forwarders); > + > + /* Start new threads. */ > + dp->forwarders = xmalloc(n * sizeof *dp->forwarders); > + dp->n_forwarders = n; > + for (i = 0; i < n; i++) { > + struct dp_forwarder *f = &dp->forwarders[i]; > + > + f->dp = dp; > + f->min_hash = UINT32_MAX / n * i; > + f->max_hash = UINT32_MAX / n * (i + 1) - 1; > + if (i == n - 1) { > + f->max_hash = UINT32_MAX; > + } > + xpthread_create(&f->thread, NULL, dp_forwarder_main, f); > + } > +} > + > static void > dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, > const struct ofpbuf *packet) > @@ -1571,73 +1710,6 @@ dp_netdev_port_input(struct dp_netdev *dp, struct > dp_netdev_port *port, > } > } > > -static void > -dpif_netdev_run(struct dpif *dpif) > -{ > - struct dp_netdev_port *port; > - struct dp_netdev *dp; > - struct ofpbuf packet; > - > - dp = get_dp_netdev(dpif); > - ofpbuf_init(&packet, 0); > - > - ovs_rwlock_rdlock(&dp->port_rwlock); > - HMAP_FOR_EACH (port, node, &dp->ports) { > - int buf_size; > - int error; > - int mtu; > - > - error = netdev_get_mtu(port->netdev, &mtu); > - if (error) { > - mtu = ETH_PAYLOAD_MAX; > - } > - buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu; > - > - ofpbuf_clear(&packet); > - ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM, buf_size); > - > - error = port->rx ? netdev_rx_recv(port->rx, &packet) : EOPNOTSUPP; > - if (!error) { > - dp_netdev_port_input(dp, port, &packet); > - } else if (error != EAGAIN && error != EOPNOTSUPP) { > - static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); > - > - VLOG_ERR_RL(&rl, "error receiving data from %s: %s", > - netdev_get_name(port->netdev), ovs_strerror(error)); > - } > - } > - ovs_rwlock_unlock(&dp->port_rwlock); > - > - ofpbuf_uninit(&packet); > -} > - > -static void > -dpif_netdev_wait(struct dpif *dpif) > -{ > - struct dp_netdev_port *port; > - > - /* There is a race here, if thread A calls dpif_netdev_wait(dpif) and > - * thread B calls dpif_port_add(dpif) or dpif_port_remove(dpif) before > - * A makes it to poll_block(). > - * > - * But I think it doesn't matter: > - * > - * - In the dpif_port_add() case, A will not wake up when a packet > - * arrives on the new port, but this would also happen if the > - * ordering were reversed. > - * > - * - In the dpif_port_remove() case, A might wake up spuriously, but > - * that is harmless. */ > - > - ovs_mutex_lock(&dp_netdev_mutex); > - HMAP_FOR_EACH (port, node, &get_dp_netdev(dpif)->ports) { > - if (port->rx) { > - netdev_rx_wait(port->rx); > - } > - } > - ovs_mutex_unlock(&dp_netdev_mutex); > -} > - > static int > dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, > int queue_no, const struct flow *flow, > @@ -1750,8 +1822,8 @@ const struct dpif_class dpif_netdev_class = { > dpif_netdev_open, > dpif_netdev_close, > dpif_netdev_destroy, > - dpif_netdev_run, > - dpif_netdev_wait, > + NULL, /* run */ > + NULL, /* wait */ > dpif_netdev_get_stats, > dpif_netdev_port_add, > dpif_netdev_port_del, > -- > 1.7.10.4 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev