On Tue, Feb 18, 2014 at 10:52 AM, Alex Wang <al...@nicira.com> wrote:
> This commit removes the 'dispatcher' thread by allowing 'handler'
> threads to read upcalls directly from dpif.  vport in dpif will
> open netlink sockets for each handler and will use the 5-tuple
> hash from the missed packet to choose which socket (handler) to
> send the upcall.
>
> This patch also significantly simplifies the flow miss handling
> code and brings slight improvement to flow setup rate.
>
> Signed-off-by: Alex Wang <al...@nicira.com>
>
> ---
> v2->v3:
> - removes the rcu read-side critical section in ovs_vport_set_upcall_pids().
> - fix unused variables, acquire rdlock instead of rwlock for read-only
>   critical section.
> - issues to be discussed:
>   1. Should we pass in the "struct flow *" and allow dpif to decide
>      what to hash ?
>   2. Should we add a new API for setting the n_handlers ? currently,
>      recv_set() do both upcall enable/disable and set n_handlers.
>
> PATCH->v2:
> - change attribute name back to OVS_VPORT_ATTR_UPCALL_PID for consistency.
>
> RFC->PATCH
> - use XOR to calculate the 5-tuple hash.  this fixes the performance
>   variation issue.
> - replace the malloc of 'struct upcall *'  in udpif_upcall_handler()
>   by local 'struct upcall' array.
> ---
>  datapath/datapath.c           |   22 +-
>  datapath/vport.c              |  127 +++++++++++-
>  datapath/vport.h              |   25 ++-
>  include/linux/openvswitch.h   |    9 +-
>  lib/dpif-linux.c              |  454 
> +++++++++++++++++++++++------------------
>  lib/dpif-linux.h              |    3 +-
>  lib/dpif-netdev.c             |    9 +-
>  lib/dpif-provider.h           |   26 ++-
>  lib/dpif.c                    |   38 ++--
>  lib/dpif.h                    |   10 +-
>  lib/flow.c                    |   18 ++
>  lib/flow.h                    |    3 +-
>  ofproto/ofproto-dpif-upcall.c |  265 ++++++------------------
>  ofproto/ofproto-dpif-xlate.c  |    5 +-
>  ofproto/ofproto-dpif.c        |    8 +-
>  15 files changed, 556 insertions(+), 466 deletions(-)
>
> diff --git a/datapath/datapath.c b/datapath/datapath.c
> index f7c3391..e972c5e 100644
> --- a/datapath/datapath.c
> +++ b/datapath/datapath.c
> @@ -242,7 +242,7 @@ void ovs_dp_process_received_packet(struct vport *p, 
> struct sk_buff *skb)
>                 upcall.cmd = OVS_PACKET_CMD_MISS;
>                 upcall.key = &key;
>                 upcall.userdata = NULL;
> -               upcall.portid = p->upcall_portid;
> +               upcall.portid = ovs_vport_find_pid(p, &key);
>                 ovs_dp_upcall(dp, skb, &upcall);

Can you use different name than pid? pid is generally used for process id.

>                 consume_skb(skb);
>                 stats_counter = &stats->n_missed;
> @@ -1241,7 +1241,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct 
> genl_info *info)
>         parms.options = NULL;
>         parms.dp = dp;
>         parms.port_no = OVSP_LOCAL;
> -       parms.upcall_portid = nla_get_u32(a[OVS_DP_ATTR_UPCALL_PID]);
> +       parms.upcall_pids = a[OVS_DP_ATTR_UPCALL_PID];
>
>         ovs_dp_change(dp, a);
>
> @@ -1459,7 +1459,7 @@ static const struct nla_policy 
> vport_policy[OVS_VPORT_ATTR_MAX + 1] = {
>         [OVS_VPORT_ATTR_STATS] = { .len = sizeof(struct ovs_vport_stats) },
>         [OVS_VPORT_ATTR_PORT_NO] = { .type = NLA_U32 },
>         [OVS_VPORT_ATTR_TYPE] = { .type = NLA_U32 },
> -       [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_U32 },
> +       [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_UNSPEC },
>         [OVS_VPORT_ATTR_OPTIONS] = { .type = NLA_NESTED },
>  };
>

Have you tried this patch with old userspace to check compatibility?

> @@ -1494,8 +1494,7 @@ static int ovs_vport_cmd_fill_info(struct vport *vport, 
> struct sk_buff *skb,
>
>         if (nla_put_u32(skb, OVS_VPORT_ATTR_PORT_NO, vport->port_no) ||
>             nla_put_u32(skb, OVS_VPORT_ATTR_TYPE, vport->ops->type) ||
> -           nla_put_string(skb, OVS_VPORT_ATTR_NAME, 
> vport->ops->get_name(vport)) ||
> -           nla_put_u32(skb, OVS_VPORT_ATTR_UPCALL_PID, vport->upcall_portid))
> +           nla_put_string(skb, OVS_VPORT_ATTR_NAME, 
> vport->ops->get_name(vport)))
>                 goto nla_put_failure;
>
>         ovs_vport_get_stats(vport, &vport_stats);
> @@ -1503,6 +1502,9 @@ static int ovs_vport_cmd_fill_info(struct vport *vport, 
> struct sk_buff *skb,
>                     &vport_stats))
>                 goto nla_put_failure;
>
> +       if (ovs_vport_get_upcall_pids(vport, skb))
> +               goto nla_put_failure;
> +
>         err = ovs_vport_get_options(vport, skb);
>         if (err == -EMSGSIZE)
>                 goto error;
> @@ -1579,8 +1581,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, 
> struct genl_info *info)
>         int err;
>
>         err = -EINVAL;
> -       if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE] ||
> -           !a[OVS_VPORT_ATTR_UPCALL_PID])
> +       if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE])
>                 goto exit;
>

Port ids array should be mandatory. as it is today.

>         ovs_lock();
> @@ -1617,7 +1618,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, 
> struct genl_info *info)
>         parms.options = a[OVS_VPORT_ATTR_OPTIONS];
>         parms.dp = dp;
>         parms.port_no = port_no;
> -       parms.upcall_portid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
> +       parms.upcall_pids = a[OVS_VPORT_ATTR_UPCALL_PID];
>
>         vport = new_vport(&parms);
>         err = PTR_ERR(vport);
> @@ -1678,8 +1679,9 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, 
> struct genl_info *info)
>         if (a[OVS_VPORT_ATTR_STATS])
>                 ovs_vport_set_stats(vport, nla_data(a[OVS_VPORT_ATTR_STATS]));
>
> -       if (a[OVS_VPORT_ATTR_UPCALL_PID])
> -               vport->upcall_portid = 
> nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
> +       err = ovs_vport_set_upcall_pids(vport, a[OVS_VPORT_ATTR_UPCALL_PID]);
> +       if (err)
> +               goto exit_free;
>
>         err = ovs_vport_cmd_fill_info(vport, reply, info->snd_portid,
>                                       info->snd_seq, 0, OVS_VPORT_CMD_NEW);
> diff --git a/datapath/vport.c b/datapath/vport.c
> index 7f12acc..86e04b5 100644
> --- a/datapath/vport.c
> +++ b/datapath/vport.c
> @@ -135,10 +135,12 @@ struct vport *ovs_vport_alloc(int priv_size, const 
> struct vport_ops *ops,
>
>         vport->dp = parms->dp;
>         vport->port_no = parms->port_no;
> -       vport->upcall_portid = parms->upcall_portid;
>         vport->ops = ops;
>         INIT_HLIST_NODE(&vport->dp_hash_node);
>
> +       if (ovs_vport_set_upcall_pids(vport, parms->upcall_pids))
> +               return ERR_PTR(-EINVAL);
> +
>         vport->percpu_stats = alloc_percpu(struct pcpu_tstats);
>         if (!vport->percpu_stats) {
>                 kfree(vport);
> @@ -162,6 +164,7 @@ struct vport *ovs_vport_alloc(int priv_size, const struct 
> vport_ops *ops,
>   */
>  void ovs_vport_free(struct vport *vport)
>  {
> +       ovs_vport_set_upcall_pids(vport, NULL);
>         free_percpu(vport->percpu_stats);
>         kfree(vport);
>  }
> @@ -348,6 +351,128 @@ int ovs_vport_get_options(const struct vport *vport, 
> struct sk_buff *skb)
>         return 0;
>  }
>
> +static void __vport_pids_destroy(struct vport_pids *pids)
> +{
> +       if (pids->pids)
> +               kfree(pids->pids);
> +
> +       kfree(pids);
> +}
> +

This is not called from anywhere else. Can you use rcu-callback
function to free?

> +static void vport_pids_destroy_rcu_cb(struct rcu_head *rcu)
> +{
> +       struct vport_pids *pids = container_of(rcu, struct vport_pids, rcu);
> +
> +       __vport_pids_destroy(pids);
> +}
> +
> +/**
> + *     ovs_vport_set_upcall_pids - set upcall pids for sending upcall.
> + *
> + * @vport: vport to modify.
> + * @pids: new configuration.
> + *
> + * If the pids is non-null, sets the vport's upcall_pids pointer.  If the
> + * pids is null, frees the vport's upcall_pids.
> + *
> + * Returns 0 if successful, -EINVAL if @pids cannot be parsed as an array
> + * of U32.
> + *
> + * Must be called with ovs_mutex.
> + */
> +int ovs_vport_set_upcall_pids(struct vport *vport,  struct nlattr *pids)
> +{
> +       struct vport_pids *old;
> +
> +       if (pids && nla_len(pids) % sizeof(u32))
> +               return -EINVAL;
> +
> +       old = ovsl_dereference(vport->upcall_pids);
> +

rcu_dereference_protected() is better here as we do expect it called under lock.


> +       if (pids) {
> +               struct vport_pids *vport_pids;
> +
> +               vport_pids = kmalloc(sizeof *vport_pids, GFP_KERNEL);
> +               vport_pids->pids = kmalloc(nla_len(pids), GFP_KERNEL);
> +               vport_pids->n_pids = nla_len(pids)
> +                       / (sizeof *vport_pids->pids);
> +               memcpy(vport_pids->pids, nla_data(pids), nla_len(pids));
> +

We can just use single memory space to allocate both struct vport_pids
and array. Same is done for flow-stats.

> +               rcu_assign_pointer(vport->upcall_pids, vport_pids);
> +       } else if (old) {
> +               rcu_assign_pointer(vport->upcall_pids, NULL);
> +       }
> +
> +       if (old)
> +               call_rcu(&old->rcu, vport_pids_destroy_rcu_cb);
> +

This is bit confusing. We can free pids array in from vport destroy callback.
So there is no  need to handle case where pids is NULL.

> +       return 0;
> +}
> +
> +/**
> + *     ovs_vport_get_options - get the upcall_pids value.
> + *
> + * @vport: vport from which to retrieve the pids.
> + * @skb: sk_buff where pids should be appended.
> + *
> + * Retrieves the configuration of the given vport, appending the
> + * %OVS_VPORT_ATTR_UPCALL_PID attribute which is the array of upcall
> + * pids to @skb.
> + *
> + * Returns 0 if successful, -EMSGSIZE if @skb has insufficient room.
> + * If an error occurs, @skb is left unmodified.
> + */
> +int ovs_vport_get_upcall_pids(const struct vport *vport, struct sk_buff *skb)
> +{
> +       struct vport_pids *pids;
> +       int err = 0;
> +
> +       rcu_read_lock();
> +       pids = ovsl_dereference(vport->upcall_pids);
> +
This function is always called with rcu or ovs mutex. so there is no
need to take extra lock.

> +       if (!pids)
> +               goto exit;
> +

pids should not be NULL in this case.

> +       if (nla_put(skb, OVS_VPORT_ATTR_UPCALL_PID,
> +                   pids->n_pids * sizeof *pids->pids,
> +                   (void *) pids->pids)) {
> +               err = -EMSGSIZE;
> +               goto exit;
> +       }
> +
> +exit:
> +       rcu_read_unlock();
> +       return err;
> +}
> +
> +/**
> + *     ovs_vport_find_pid - find the upcall pid to send upcall.
> + *
> + * @vport: vport from which the missed packet is received.
> + * @key: flow keys.
> + *
> + * Calculates the 5-tuple hash from the flow key and finds the upcall pid to
> + * send the upcall to.
> + *
> + * Returns the pid of the target socket.  Must be called with rcu_read_lock.
> + */
> +u32 ovs_vport_find_pid(const struct vport *p, const struct sw_flow_key *key)
> +{
> +       struct vport_pids *pids;
> +       u32 hash;
> +
> +       pids = ovsl_dereference(p->upcall_pids);
> +
This function is only called under rcu read context, therefore
rcu_dereference is sufficient.

> +       if (!pids)
> +               return 0;
> +
pids should not be NULL in this case.

> +       hash = key->ipv4.addr.src ^ key->ipv4.addr.dst
> +               ^ key->ip.proto ^ key->ipv4.tp.src
> +               ^ key->ipv4.tp.dst;
> +

skb_get_rxhash() should be cheaper in general than calculating hash.

> +       return pids->pids[jhash((void *) &hash, 4, 0) % pids->n_pids];
> +}
> +

 We should use hash seed here since we are using values from packet
coming from network.

>  /**
>   *     ovs_vport_receive - pass up received packet to the datapath for 
> processing
>   *
> diff --git a/datapath/vport.h b/datapath/vport.h
> index 18b723e..f11faa9 100644
> --- a/datapath/vport.h
> +++ b/datapath/vport.h
> @@ -50,6 +50,11 @@ void ovs_vport_get_stats(struct vport *, struct 
> ovs_vport_stats *);
>  int ovs_vport_set_options(struct vport *, struct nlattr *options);
>  int ovs_vport_get_options(const struct vport *, struct sk_buff *);
>
> +int ovs_vport_set_upcall_pids(struct vport *, struct nlattr *pids);
> +int ovs_vport_get_upcall_pids(const struct vport *, struct sk_buff *);
> +
> +u32 ovs_vport_find_pid(const struct vport *, const struct sw_flow_key *);
> +
>  int ovs_vport_send(struct vport *, struct sk_buff *);
>
>  /* The following definitions are for implementers of vport devices: */
> @@ -60,13 +65,25 @@ struct vport_err_stats {
>         u64 tx_dropped;
>         u64 tx_errors;
>  };
> +/**
> + * struct vport_pids - array of netlink pids for a vport.
> + *                     must be protected by rcu.
> + * @rcu: RCU callback head for deferred destruction.
> + * @n_pids: Size of @upcall_pids array.
> + * @pids: Array storing the Netlink socket pids to use for packets received
> + * on this port that miss the flow table.
> + */
> +struct vport_pids {
> +       struct rcu_head rcu;
> +       u32 n_pids;
> +       u32 *pids;
> +};
>
>  /**
>   * struct vport - one port within a datapath
>   * @rcu: RCU callback head for deferred destruction.
>   * @dp: Datapath to which this port belongs.
> - * @upcall_portid: The Netlink port to use for packets received on this port 
> that
> - * miss the flow table.
> + * @upcall_pids: RCU protected vport_pids array.
>   * @port_no: Index into @dp's @ports array.
>   * @hash_node: Element in @dev_table hash table in vport.c.
>   * @dp_hash_node: Element in @datapath->ports hash table in datapath.c.
> @@ -80,7 +97,7 @@ struct vport_err_stats {
>  struct vport {
>         struct rcu_head rcu;
>         struct datapath *dp;
> -       u32 upcall_portid;
> +       struct vport_pids __rcu *upcall_pids;
>         u16 port_no;
>
>         struct hlist_node hash_node;
> @@ -112,7 +129,7 @@ struct vport_parms {
>         /* For ovs_vport_alloc(). */
>         struct datapath *dp;
>         u16 port_no;
> -       u32 upcall_portid;
> +       struct nlattr *upcall_pids;
>  };
>
>  /**
> diff --git a/include/linux/openvswitch.h b/include/linux/openvswitch.h
> index d1ff5ec..9d21326 100644
> --- a/include/linux/openvswitch.h
> +++ b/include/linux/openvswitch.h
> @@ -225,9 +225,9 @@ enum ovs_vport_type {
>   * this is the name of the network device.  Maximum length %IFNAMSIZ-1 bytes
>   * plus a null terminator.
>   * @OVS_VPORT_ATTR_OPTIONS: Vport-specific configuration information.
> - * @OVS_VPORT_ATTR_UPCALL_PID: The Netlink socket in userspace that
> - * OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on
> - * this port.  A value of zero indicates that upcalls should not be sent.
> + * @OVS_VPORT_ATTR_UPCALL_PID: The array of Netlink socket pids in userspace
> + * that OVS_PACKET_CMD_MISS upcalls will be directed to for packets received 
> on
> + * this port.  If this is not specified, upcalls should not be sent.
>   * @OVS_VPORT_ATTR_STATS: A &struct ovs_vport_stats giving statistics for
>   * packets sent or received through the vport.
>   *
> @@ -251,7 +251,8 @@ enum ovs_vport_attr {
>         OVS_VPORT_ATTR_TYPE,    /* u32 OVS_VPORT_TYPE_* constant. */
>         OVS_VPORT_ATTR_NAME,    /* string name, up to IFNAMSIZ bytes long */
>         OVS_VPORT_ATTR_OPTIONS, /* nested attributes, varies by vport type */
> -       OVS_VPORT_ATTR_UPCALL_PID, /* u32 Netlink PID to receive upcalls */
> +       OVS_VPORT_ATTR_UPCALL_PID, /* array of u32 Netlink socket PIDs for */
> +                               /* receiving upcalls */
>         OVS_VPORT_ATTR_STATS,   /* struct ovs_vport_stats */
>         __OVS_VPORT_ATTR_MAX
>  };
> diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
> index f7f5292..4d35e15 100644
> --- a/lib/dpif-linux.c
> +++ b/lib/dpif-linux.c
> @@ -35,6 +35,7 @@
>  #include "bitmap.h"
>  #include "dpif-provider.h"
>  #include "dynamic-string.h"
> +#include "fat-rwlock.h"
>  #include "flow.h"
>  #include "netdev.h"
>  #include "netdev-linux.h"
> @@ -132,7 +133,15 @@ struct dpif_channel {
>      long long int last_poll;    /* Last time this channel was polled. */
>  };
>
> -static void report_loss(struct dpif *, struct dpif_channel *);
> +static void report_loss(struct dpif *, struct dpif_channel *, uint32_t 
> ch_idx,
> +                        uint32_t handler_id);
> +
> +struct dpif_epoll {
> +    struct epoll_event *epoll_events;
> +    int epoll_fd;               /* epoll fd that includes channel socks. */
> +    int n_events;               /* Num events returned by epoll_wait(). */
> +    int event_offset;           /* Offset into 'epoll_events'. */
> +};

......
> -        int idx = dpif->epoll_events[dpif->event_offset].data.u32;
> -        struct dpif_channel *ch = &dpif->channels[idx];
> +    while (epoll->event_offset < epoll->n_events) {
> +        int idx = epoll->epoll_events[epoll->event_offset].data.u32;
> +        struct dpif_channel *ch = &dpif->channels[idx][handler_id];
>

I have not looked at whole userspace part, but just noticed that this
channels access pattern is not good for cpu cacheline.
Is there reason for doing it this way rather than channel[handler_id][idx] ?
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to