This is a first step in making thread safety more granular in dpif-netdev,
to allow for multithreaded forwarding.

Signed-off-by: Ben Pfaff <b...@nicira.com>
---
 lib/dpif-netdev.c |   54 ++++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 37 insertions(+), 17 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index d0f6dab..727483a 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -83,8 +83,9 @@ struct dp_netdev_upcall {
 };
 
 struct dp_netdev_queue {
-    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN];
-    unsigned int head, tail;
+    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
+    unsigned int head OVS_GUARDED;
+    unsigned int tail OVS_GUARDED;
 };
 
 /* Datapath based on the network device interface from netdev.h. */
@@ -94,9 +95,12 @@ struct dp_netdev {
     struct ovs_refcount ref_cnt;
     atomic_flag destroyed;
 
-    struct dp_netdev_queue queues[N_QUEUES];
     struct classifier cls;      /* Classifier. */
     struct hmap flow_table;     /* Flow table. */
+
+    /* Queues. */
+    struct ovs_mutex queue_mutex;
+    struct dp_netdev_queue queues[N_QUEUES];
     struct seq *queue_seq;      /* Incremented whenever a packet is queued. */
 
     /* Statistics. */
@@ -190,9 +194,10 @@ static int do_add_port(struct dp_netdev *, const char 
*devname,
 static int do_del_port(struct dp_netdev *, odp_port_t port_no);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
-static int dp_netdev_output_userspace(struct dp_netdev *, struct ofpbuf *,
+static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
                                     int queue_no, const struct flow *,
-                                    const struct nlattr *userdata);
+                                    const struct nlattr *userdata)
+    OVS_EXCLUDED(dp->queue_mutex);
 static void dp_netdev_execute_actions(struct dp_netdev *, const struct flow *,
                                       struct ofpbuf *,
                                       const struct nlattr *actions,
@@ -312,9 +317,12 @@ create_dp_netdev(const char *name, const struct dpif_class 
*class,
     dp->name = xstrdup(name);
     ovs_refcount_init(&dp->ref_cnt);
     atomic_flag_init(&dp->destroyed);
+    ovs_mutex_init(&dp->queue_mutex);
+    ovs_mutex_lock(&dp->queue_mutex);
     for (i = 0; i < N_QUEUES; i++) {
         dp->queues[i].head = dp->queues[i].tail = 0;
     }
+    ovs_mutex_unlock(&dp->queue_mutex);
     dp->queue_seq = seq_create();
     classifier_init(&dp->cls, NULL);
     hmap_init(&dp->flow_table);
@@ -367,6 +375,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp)
 {
     int i;
 
+    ovs_mutex_lock(&dp->queue_mutex);
     for (i = 0; i < N_QUEUES; i++) {
         struct dp_netdev_queue *q = &dp->queues[i];
 
@@ -376,6 +385,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp)
             ofpbuf_uninit(&u->buf);
         }
     }
+    ovs_mutex_unlock(&dp->queue_mutex);
 }
 
 static void
@@ -390,8 +400,11 @@ dp_netdev_free(struct dp_netdev *dp)
     ovsthread_counter_destroy(dp->n_hit);
     ovsthread_counter_destroy(dp->n_missed);
     ovsthread_counter_destroy(dp->n_lost);
+
     dp_netdev_purge_queues(dp);
     seq_destroy(dp->queue_seq);
+    ovs_mutex_destroy(&dp->queue_mutex);
+
     classifier_destroy(&dp->cls);
     hmap_destroy(&dp->flow_table);
     seq_destroy(dp->port_seq);
@@ -1195,9 +1208,9 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif 
OVS_UNUSED,
 }
 
 static struct dp_netdev_queue *
-find_nonempty_queue(struct dpif *dpif)
+find_nonempty_queue(struct dp_netdev *dp)
+    OVS_REQUIRES(dp->queue_mutex)
 {
-    struct dp_netdev *dp = get_dp_netdev(dpif);
     int i;
 
     for (i = 0; i < N_QUEUES; i++) {
@@ -1213,11 +1226,12 @@ static int
 dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
                  struct ofpbuf *buf)
 {
+    struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_queue *q;
     int error;
 
-    ovs_mutex_lock(&dp_netdev_mutex);
-    q = find_nonempty_queue(dpif);
+    ovs_mutex_lock(&dp->queue_mutex);
+    q = find_nonempty_queue(dp);
     if (q) {
         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
 
@@ -1230,7 +1244,7 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall 
*upcall,
     } else {
         error = EAGAIN;
     }
-    ovs_mutex_unlock(&dp_netdev_mutex);
+    ovs_mutex_unlock(&dp->queue_mutex);
 
     return error;
 }
@@ -1241,23 +1255,22 @@ dpif_netdev_recv_wait(struct dpif *dpif)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     uint64_t seq;
 
-    ovs_mutex_lock(&dp_netdev_mutex);
+    ovs_mutex_lock(&dp->queue_mutex);
     seq = seq_read(dp->queue_seq);
-    if (find_nonempty_queue(dpif)) {
+    if (find_nonempty_queue(dp)) {
         poll_immediate_wake();
     } else {
         seq_wait(dp->queue_seq, seq);
     }
-    ovs_mutex_unlock(&dp_netdev_mutex);
+    ovs_mutex_unlock(&dp->queue_mutex);
 }
 
 static void
 dpif_netdev_recv_purge(struct dpif *dpif)
 {
     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
-    ovs_mutex_lock(&dp_netdev_mutex);
+
     dp_netdev_purge_queues(dpif_netdev->dp);
-    ovs_mutex_unlock(&dp_netdev_mutex);
 }
 
 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
@@ -1410,8 +1423,12 @@ static int
 dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
                            int queue_no, const struct flow *flow,
                            const struct nlattr *userdata)
+    OVS_EXCLUDED(dp->queue_mutex)
 {
     struct dp_netdev_queue *q = &dp->queues[queue_no];
+    int error;
+
+    ovs_mutex_lock(&dp->queue_mutex);
     if (q->head - q->tail < MAX_QUEUE_LEN) {
         struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
         struct dpif_upcall *upcall = &u->upcall;
@@ -1445,11 +1462,14 @@ dp_netdev_output_userspace(struct dp_netdev *dp, struct 
ofpbuf *packet,
 
         seq_change(dp->queue_seq);
 
-        return 0;
+        error = 0;
     } else {
         ovsthread_counter_inc(dp->n_lost, 1);
-        return ENOBUFS;
+        error = ENOBUFS;
     }
+    ovs_mutex_unlock(&dp->queue_mutex);
+
+    return error;
 }
 
 struct dp_netdev_execute_aux {
-- 
1.7.10.4

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to