On Fri, Dec 27, 2013 at 8:03 PM, Ben Pfaff <b...@nicira.com> wrote:
> This is a first step in making thread safety more granular in dpif-netdev,
> to allow for multithreaded forwarding.
>
> Signed-off-by: Ben Pfaff <b...@nicira.com>
> ---
>  lib/dpif-netdev.c |   54 
> ++++++++++++++++++++++++++++++++++++-----------------
>  1 file changed, 37 insertions(+), 17 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index d0f6dab..727483a 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -83,8 +83,9 @@ struct dp_netdev_upcall {
>  };
>
>  struct dp_netdev_queue {
> -    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN];
> -    unsigned int head, tail;
> +    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
> +    unsigned int head OVS_GUARDED;
> +    unsigned int tail OVS_GUARDED;
>  };
>
>  /* Datapath based on the network device interface from netdev.h. */
> @@ -94,9 +95,12 @@ struct dp_netdev {
>      struct ovs_refcount ref_cnt;
>      atomic_flag destroyed;
>
> -    struct dp_netdev_queue queues[N_QUEUES];
>      struct classifier cls;      /* Classifier. */
>      struct hmap flow_table;     /* Flow table. */
> +
> +    /* Queues. */
> +    struct ovs_mutex queue_mutex;
> +    struct dp_netdev_queue queues[N_QUEUES];
>      struct seq *queue_seq;      /* Incremented whenever a packet is queued. 
> */
>

It is just more clear if we move the lock inside dp_netdev_queue since
it is protecting head and tail. Is there any reason not to do it?
But I am fine either way.

>      /* Statistics. */
> @@ -190,9 +194,10 @@ static int do_add_port(struct dp_netdev *, const char 
> *devname,
>  static int do_del_port(struct dp_netdev *, odp_port_t port_no);
>  static int dpif_netdev_open(const struct dpif_class *, const char *name,
>                              bool create, struct dpif **);
> -static int dp_netdev_output_userspace(struct dp_netdev *, struct ofpbuf *,
> +static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
>                                      int queue_no, const struct flow *,
> -                                    const struct nlattr *userdata);
> +                                    const struct nlattr *userdata)
> +    OVS_EXCLUDED(dp->queue_mutex);
>  static void dp_netdev_execute_actions(struct dp_netdev *, const struct flow 
> *,
>                                        struct ofpbuf *,
>                                        const struct nlattr *actions,
> @@ -312,9 +317,12 @@ create_dp_netdev(const char *name, const struct 
> dpif_class *class,
>      dp->name = xstrdup(name);
>      ovs_refcount_init(&dp->ref_cnt);
>      atomic_flag_init(&dp->destroyed);
> +    ovs_mutex_init(&dp->queue_mutex);
> +    ovs_mutex_lock(&dp->queue_mutex);
>      for (i = 0; i < N_QUEUES; i++) {
>          dp->queues[i].head = dp->queues[i].tail = 0;
>      }
> +    ovs_mutex_unlock(&dp->queue_mutex);
>      dp->queue_seq = seq_create();
>      classifier_init(&dp->cls, NULL);
>      hmap_init(&dp->flow_table);
> @@ -367,6 +375,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp)
>  {
>      int i;
>
> +    ovs_mutex_lock(&dp->queue_mutex);
>      for (i = 0; i < N_QUEUES; i++) {
>          struct dp_netdev_queue *q = &dp->queues[i];
>
> @@ -376,6 +385,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp)
>              ofpbuf_uninit(&u->buf);
>          }
>      }
> +    ovs_mutex_unlock(&dp->queue_mutex);
>  }
>
>  static void
> @@ -390,8 +400,11 @@ dp_netdev_free(struct dp_netdev *dp)
>      ovsthread_counter_destroy(dp->n_hit);
>      ovsthread_counter_destroy(dp->n_missed);
>      ovsthread_counter_destroy(dp->n_lost);
> +
>      dp_netdev_purge_queues(dp);
>      seq_destroy(dp->queue_seq);
> +    ovs_mutex_destroy(&dp->queue_mutex);
> +
>      classifier_destroy(&dp->cls);
>      hmap_destroy(&dp->flow_table);
>      seq_destroy(dp->port_seq);
> @@ -1195,9 +1208,9 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif 
> OVS_UNUSED,
>  }
>
>  static struct dp_netdev_queue *
> -find_nonempty_queue(struct dpif *dpif)
> +find_nonempty_queue(struct dp_netdev *dp)
> +    OVS_REQUIRES(dp->queue_mutex)
>  {
> -    struct dp_netdev *dp = get_dp_netdev(dpif);
>      int i;
>
>      for (i = 0; i < N_QUEUES; i++) {
> @@ -1213,11 +1226,12 @@ static int
>  dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
>                   struct ofpbuf *buf)
>  {
> +    struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_queue *q;
>      int error;
>
> -    ovs_mutex_lock(&dp_netdev_mutex);
> -    q = find_nonempty_queue(dpif);
> +    ovs_mutex_lock(&dp->queue_mutex);
> +    q = find_nonempty_queue(dp);
>      if (q) {
>          struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
>
> @@ -1230,7 +1244,7 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall 
> *upcall,
>      } else {
>          error = EAGAIN;
>      }
> -    ovs_mutex_unlock(&dp_netdev_mutex);
> +    ovs_mutex_unlock(&dp->queue_mutex);
>
>      return error;
>  }
> @@ -1241,23 +1255,22 @@ dpif_netdev_recv_wait(struct dpif *dpif)
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      uint64_t seq;
>
> -    ovs_mutex_lock(&dp_netdev_mutex);
> +    ovs_mutex_lock(&dp->queue_mutex);
>      seq = seq_read(dp->queue_seq);
> -    if (find_nonempty_queue(dpif)) {
> +    if (find_nonempty_queue(dp)) {
>          poll_immediate_wake();
>      } else {
>          seq_wait(dp->queue_seq, seq);
>      }
> -    ovs_mutex_unlock(&dp_netdev_mutex);
> +    ovs_mutex_unlock(&dp->queue_mutex);
>  }
>
>  static void
>  dpif_netdev_recv_purge(struct dpif *dpif)
>  {
>      struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
> -    ovs_mutex_lock(&dp_netdev_mutex);
> +
>      dp_netdev_purge_queues(dpif_netdev->dp);
> -    ovs_mutex_unlock(&dp_netdev_mutex);
>  }
>
>  /* Creates and returns a new 'struct dp_netdev_actions', with a reference 
> count
> @@ -1410,8 +1423,12 @@ static int
>  dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
>                             int queue_no, const struct flow *flow,
>                             const struct nlattr *userdata)
> +    OVS_EXCLUDED(dp->queue_mutex)
>  {
>      struct dp_netdev_queue *q = &dp->queues[queue_no];
> +    int error;
> +
> +    ovs_mutex_lock(&dp->queue_mutex);
>      if (q->head - q->tail < MAX_QUEUE_LEN) {
>          struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
>          struct dpif_upcall *upcall = &u->upcall;
> @@ -1445,11 +1462,14 @@ dp_netdev_output_userspace(struct dp_netdev *dp, 
> struct ofpbuf *packet,
>
>          seq_change(dp->queue_seq);
>
> -        return 0;
> +        error = 0;
>      } else {
>          ovsthread_counter_inc(dp->n_lost, 1);
> -        return ENOBUFS;
> +        error = ENOBUFS;
>      }
> +    ovs_mutex_unlock(&dp->queue_mutex);
> +
> +    return error;
>  }
>
>  struct dp_netdev_execute_aux {
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to