One of the limiting factors on the number of flows that can be supported
in the datapath is the overhead of assembling flow dump messages in the
datapath. This patch modifies the flow_dump interface to allow
revalidators to skip dumping the key, mask and actions from the
datapath, by making use of the unique identifiers introduced in earlier
patches.

For each flow dump, the dpif user specifies whether to skip these
attributes, allowing the common case to only dump a pair of 128-bit ID
and flow stats. This increases the number of flows that a revalidator
can handle per second by 50% or more.

Signed-off-by: Joe Stringer <joestrin...@nicira.com>
---
v7: Shift UID terseness flag below the dpif layer.
    Shift UID support detection to earlier patch.
    Handle ukey_creation when key is missing from flow dump.
    Use ovs_u128_equal().
v6: Split out from "dpif: Add Unique flow identifiers."
    Rebase.
v5: Always pass flow_key down to dpif, to improve error logging.
    Fix extraneous netflow_unref.
    Rebase.
v4: Fix bug where UID-based terse dump wasn't enabled by default.
    Fix race conditions with tests.
    Combine dpif,upcall,dpif-netdev,dpif-linux changes into one patch.
    Display whether terse dump is enabled in ovs-appctl upcall/show.
v3: Rebase.
---
 lib/dpctl.c                   |    4 +-
 lib/dpif-netdev.c             |   75 +++++++++------
 lib/dpif-netlink.c            |   31 ++++--
 lib/dpif-provider.h           |   12 ++-
 lib/dpif.c                    |   21 ++++-
 lib/dpif.h                    |    3 +-
 ofproto/ofproto-dpif-upcall.c |  210 ++++++++++++++++++++++++++++++++---------
 ofproto/ofproto-dpif.c        |    5 +-
 tests/dpif-netdev.at          |    4 +
 tests/ofproto-dpif.at         |    4 +
 10 files changed, 278 insertions(+), 91 deletions(-)

diff --git a/lib/dpctl.c b/lib/dpctl.c
index 5d15bc3..e3e41e9 100644
--- a/lib/dpctl.c
+++ b/lib/dpctl.c
@@ -736,7 +736,7 @@ dpctl_dump_flows(int argc, const char *argv[], struct 
dpctl_params *dpctl_p)
     }
 
     ds_init(&ds);
-    flow_dump = dpif_flow_dump_create(dpif);
+    flow_dump = dpif_flow_dump_create(dpif, false);
     flow_dump_thread = dpif_flow_dump_thread_create(flow_dump);
     while (dpif_flow_dump_next(flow_dump_thread, &f, 1)) {
         if (filter) {
@@ -760,6 +760,8 @@ dpctl_dump_flows(int argc, const char *argv[], struct 
dpctl_params *dpctl_p)
             minimatch_destroy(&minimatch);
         }
         ds_clear(&ds);
+        odp_format_uid(&f.uid, &ds);
+        ds_put_cstr(&ds, " ");
         odp_flow_format(f.key, f.key_len, f.mask, f.mask_len,
                         &portno_names, &ds, dpctl_p->verbosity);
         ds_put_cstr(&ds, ", ");
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 2fac065..b051602 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -1429,6 +1429,12 @@ get_dpif_flow_stats(const struct dp_netdev_flow 
*netdev_flow,
     }
 }
 
+static bool
+dpif_netdev_check_uid(struct dpif *dpif OVS_UNUSED)
+{
+    return true;
+}
+
 /* Converts to the dpif_flow format, using 'key_buf' and 'mask_buf' for
  * storing the netlink-formatted key/mask. 'key_buf' may be the same as
  * 'mask_buf'. Actions will be returned without copying, by relying on RCU to
@@ -1436,35 +1442,39 @@ get_dpif_flow_stats(const struct dp_netdev_flow 
*netdev_flow,
 static void
 dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow,
                             struct ofpbuf *key_buf, struct ofpbuf *mask_buf,
-                            struct dpif_flow *flow)
+                            struct dpif_flow *flow, bool terse)
 {
-    struct flow_wildcards wc;
-    struct dp_netdev_actions *actions;
-    size_t offset;
-
-    minimask_expand(&netdev_flow->cr.match.mask, &wc);
-
-    /* Key */
-    offset = ofpbuf_size(key_buf);
-    flow->key = ofpbuf_tail(key_buf);
-    odp_flow_key_from_flow(key_buf, &netdev_flow->flow, &wc.masks,
-                           netdev_flow->flow.in_port.odp_port, true);
-    flow->key_len = ofpbuf_size(key_buf) - offset;
-
-    /* Mask */
-    offset = ofpbuf_size(mask_buf);
-    flow->mask = ofpbuf_tail(mask_buf);
-    odp_flow_key_from_mask(mask_buf, &wc.masks, &netdev_flow->flow,
-                           odp_to_u32(wc.masks.in_port.odp_port),
-                           SIZE_MAX, true);
-    flow->mask_len = ofpbuf_size(mask_buf) - offset;
-
-    /* Actions */
-    actions = dp_netdev_flow_get_actions(netdev_flow);
-    flow->actions = actions->actions;
-    flow->actions_len = actions->size;
-
-    memcpy(&flow->uid, &netdev_flow->uid, sizeof flow->uid);
+    if (terse) {
+        memset(flow, 0, sizeof *flow);
+    } else {
+        struct flow_wildcards wc;
+        struct dp_netdev_actions *actions;
+        size_t offset;
+
+        minimask_expand(&netdev_flow->cr.match.mask, &wc);
+
+        /* Key */
+        offset = ofpbuf_size(key_buf);
+        flow->key = ofpbuf_tail(key_buf);
+        odp_flow_key_from_flow(key_buf, &netdev_flow->flow, &wc.masks,
+                               netdev_flow->flow.in_port.odp_port, true);
+        flow->key_len = ofpbuf_size(key_buf) - offset;
+
+        /* Mask */
+        offset = ofpbuf_size(mask_buf);
+        flow->mask = ofpbuf_tail(mask_buf);
+        odp_flow_key_from_mask(mask_buf, &wc.masks, &netdev_flow->flow,
+                               odp_to_u32(wc.masks.in_port.odp_port),
+                               SIZE_MAX, true);
+        flow->mask_len = ofpbuf_size(mask_buf) - offset;
+
+        /* Actions */
+        actions = dp_netdev_flow_get_actions(netdev_flow);
+        flow->actions = actions->actions;
+        flow->actions_len = actions->size;
+    }
+
+    flow->uid = netdev_flow->uid;
     get_dpif_flow_stats(netdev_flow, &flow->stats);
 }
 
@@ -1574,7 +1584,7 @@ dpif_netdev_flow_get(const struct dpif *dpif, const 
struct dpif_flow_get *get)
     netdev_flow = dp_netdev_find_flow(dp, get->uid);
     if (netdev_flow) {
         dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer,
-                                    get->flow);
+                                    get->flow, false);
      } else {
         error = ENOENT;
     }
@@ -1756,7 +1766,7 @@ dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
 }
 
 static struct dpif_flow_dump *
-dpif_netdev_flow_dump_create(const struct dpif *dpif_)
+dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse)
 {
     struct dpif_netdev_flow_dump *dump;
 
@@ -1764,6 +1774,7 @@ dpif_netdev_flow_dump_create(const struct dpif *dpif_)
     dpif_flow_dump_init(&dump->up, dpif_);
     memset(&dump->pos, 0, sizeof dump->pos);
     dump->status = 0;
+    dump->up.terse = terse;
     ovs_mutex_init(&dump->mutex);
 
     return &dump->up;
@@ -1852,7 +1863,8 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread 
*thread_,
 
         ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
         ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
-        dp_netdev_flow_to_dpif_flow(netdev_flow, &key, &mask, f);
+        dp_netdev_flow_to_dpif_flow(netdev_flow, &key, &mask, f,
+                                    dump->up.terse);
     }
 
     return n_flows;
@@ -2988,6 +3000,7 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_register_upcall_cb,
     dpif_netdev_enable_upcall,
     dpif_netdev_disable_upcall,
+    dpif_netdev_check_uid,
 };
 
 static void
diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
index dcd0520..3f098db 100644
--- a/lib/dpif-netlink.c
+++ b/lib/dpif-netlink.c
@@ -134,7 +134,7 @@ static void dpif_netlink_flow_get_stats(const struct 
dpif_netlink_flow *,
                                         struct dpif_flow_stats *);
 static void dpif_netlink_flow_to_dpif_flow(struct dpif *, struct dpif_flow *,
                                            const struct dpif_netlink_flow *);
-static bool dpif_netlink_check_uid(struct dpif *dpif);
+static bool dpif_netlink_check_uid__(struct dpif *dpif);
 
 /* One of the dpif channels between the kernel and userspace. */
 struct dpif_channel {
@@ -285,7 +285,7 @@ open_dpif(const struct dpif_netlink_dp *dp, struct dpif 
**dpifp)
               dp->dp_ifindex, dp->dp_ifindex);
 
     dpif->dp_ifindex = dp->dp_ifindex;
-    dpif->uid_supported = dpif_netlink_check_uid(&dpif->dpif);
+    dpif->uid_supported = dpif_netlink_check_uid__(&dpif->dpif);
     *dpifp = &dpif->dpif;
 
     return 0;
@@ -1251,12 +1251,14 @@ dpif_netlink_flow_dump_cast(struct dpif_flow_dump *dump)
 }
 
 static struct dpif_flow_dump *
-dpif_netlink_flow_dump_create(const struct dpif *dpif_)
+dpif_netlink_flow_dump_create(const struct dpif *dpif_, bool terse)
 {
+    static const uint32_t dump_flags = OVS_UID_F_SKIP_KEY | OVS_UID_F_SKIP_MASK
+                                       | OVS_UID_F_SKIP_ACTIONS;
     const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
     struct dpif_netlink_flow_dump *dump;
     struct dpif_netlink_flow request;
-    struct ofpbuf *buf;
+    struct ofpbuf *buf, uid;
 
     dump = xmalloc(sizeof *dump);
     dpif_flow_dump_init(&dump->up, dpif_);
@@ -1265,11 +1267,19 @@ dpif_netlink_flow_dump_create(const struct dpif *dpif_)
     request.cmd = OVS_FLOW_CMD_GET;
     request.dp_ifindex = dpif->dp_ifindex;
 
+    ofpbuf_use_stack(&uid, &request.uid_buf, sizeof request.uid_buf);
+    if (terse) {
+        odp_uid_to_nlattrs(&uid, NULL, dump_flags);
+    }
+    request.uid = ofpbuf_data(&uid);
+    request.uid_len = ofpbuf_size(&uid);
+
     buf = ofpbuf_new(1024);
     dpif_netlink_flow_to_ofpbuf(&request, buf);
     nl_dump_start(&dump->nl_dump, NETLINK_GENERIC, buf);
     ofpbuf_delete(buf);
     atomic_init(&dump->status, 0);
+    dump->up.terse = terse;
 
     return &dump->up;
 }
@@ -1380,7 +1390,7 @@ dpif_netlink_flow_dump_next(struct dpif_flow_dump_thread 
*thread_,
             break;
         }
 
-        if (datapath_flow.actions) {
+        if (dump->up.terse || datapath_flow.actions) {
             /* Common case: the flow includes actions. */
             dpif_netlink_flow_to_dpif_flow(&dpif->dpif, &flows[n_flows++],
                                            &datapath_flow);
@@ -1603,7 +1613,7 @@ dpif_netlink_operate(struct dpif *dpif_, struct dpif_op 
**ops, size_t n_ops)
 
 /* Checks support for unique flow identifiers. */
 static bool
-dpif_netlink_check_uid(struct dpif *dpif_)
+dpif_netlink_check_uid__(struct dpif *dpif_)
 {
     struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
     struct flow flow;
@@ -1663,6 +1673,14 @@ done:
     return enable_uid;
 }
 
+static bool
+dpif_netlink_check_uid(struct dpif *dpif_)
+{
+    const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
+
+    return dpif->uid_supported;
+}
+
 /* Synchronizes 'channels' in 'dpif->handlers'  with the set of vports
  * currently in 'dpif' in the kernel, by adding a new set of channels for
  * any kernel vport that lacks one and deleting any channels that have no
@@ -2104,6 +2122,7 @@ const struct dpif_class dpif_netlink_class = {
     NULL,                       /* register_upcall_cb */
     NULL,                       /* enable_upcall */
     NULL,                       /* disable_upcall */
+    dpif_netlink_check_uid,
 };
 
 static int
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index 65cf505..9e9d9de 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -53,6 +53,7 @@ static inline void dpif_assert_class(const struct dpif *dpif,
 
 struct dpif_flow_dump {
     struct dpif *dpif;
+    bool terse;         /* If true, key/mask/actions may be omitted. */
 };
 
 static inline void
@@ -256,8 +257,12 @@ struct dpif_class {
      *
      * 'flow_dump_create' and 'flow_dump_thread_create' must initialize the
      * structures that they return with dpif_flow_dump_init() and
-     * dpif_flow_dump_thread_init(), respectively. */
-    struct dpif_flow_dump *(*flow_dump_create)(const struct dpif *dpif);
+     * dpif_flow_dump_thread_init(), respectively.
+     *
+     * If 'terse' is true, then only UID and statistics will
+     * be returned in the dump. Otherwise, all fields will be returned. */
+    struct dpif_flow_dump *(*flow_dump_create)(const struct dpif *dpif,
+                                               bool terse);
     int (*flow_dump_destroy)(struct dpif_flow_dump *dump);
 
     struct dpif_flow_dump_thread *(*flow_dump_thread_create)(
@@ -363,6 +368,9 @@ struct dpif_class {
 
     /* Disables upcalls if 'dpif' directly executes upcall functions. */
     void (*disable_upcall)(struct dpif *);
+
+    /* Returns whether 'dpif' supports UID for flow operations. */
+    bool (*get_uid_support)(struct dpif *);
 };
 
 extern const struct dpif_class dpif_netlink_class;
diff --git a/lib/dpif.c b/lib/dpif.c
index 2eed084..d37a57f 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -847,6 +847,20 @@ dpif_flow_flush(struct dpif *dpif)
     return error;
 }
 
+/* Tests whether 'dpif' supports userspace flow ids. We can skip serializing
+ * some flow attributes for datapaths that support this feature.
+ *
+ * Returns true if 'dpif' supports UID for flow operations.
+ * Returns false if  'dpif' does not support UID. */
+bool
+dpif_get_enable_uid(struct dpif *dpif)
+{
+    if (dpif->dpif_class->get_uid_support) {
+        return dpif->dpif_class->get_uid_support(dpif);
+    }
+    return false;
+}
+
 /* A dpif_operate() wrapper for performing a single DPIF_OP_FLOW_GET. */
 int
 dpif_flow_get(struct dpif *dpif,
@@ -924,14 +938,15 @@ dpif_flow_del(struct dpif *dpif,
 }
 
 /* Creates and returns a new 'struct dpif_flow_dump' for iterating through the
- * flows in 'dpif'.
+ * flows in 'dpif'. If 'terse' is true, then only UID and statistics will
+ * be returned in the dump. Otherwise, all fields will be returned.
  *
  * This function always successfully returns a dpif_flow_dump.  Error
  * reporting is deferred to dpif_flow_dump_destroy(). */
 struct dpif_flow_dump *
-dpif_flow_dump_create(const struct dpif *dpif)
+dpif_flow_dump_create(const struct dpif *dpif, bool terse)
 {
-    return dpif->dpif_class->flow_dump_create(dpif);
+    return dpif->dpif_class->flow_dump_create(dpif, terse);
 }
 
 /* Destroys 'dump', which must have been created with dpif_flow_dump_create().
diff --git a/lib/dpif.h b/lib/dpif.h
index 5130f9f..d232ff8 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -515,6 +515,7 @@ enum dpif_flow_put_flags {
     DPIF_FP_PROBE = 1 << 3      /* Suppress error messages, if any. */
 };
 
+bool dpif_get_enable_uid(struct dpif *);
 void dpif_flow_hash(const struct dpif *, const void *key, size_t key_len,
                     ovs_u128 *hash);
 int dpif_flow_flush(struct dpif *);
@@ -560,7 +561,7 @@ int dpif_flow_get(struct dpif *,
  *
  * All error reporting is deferred to the call to dpif_flow_dump_destroy().
  */
-struct dpif_flow_dump *dpif_flow_dump_create(const struct dpif *);
+struct dpif_flow_dump *dpif_flow_dump_create(const struct dpif *, bool terse);
 int dpif_flow_dump_destroy(struct dpif_flow_dump *);
 
 struct dpif_flow_dump_thread *dpif_flow_dump_thread_create(
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index b25398c..5c79158 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -114,6 +114,7 @@ struct udpif {
     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
     long long int dump_duration;       /* Duration of the last flow dump. */
     struct seq *dump_seq;              /* Increments each dump iteration. */
+    atomic_bool enable_uid;            /* If true, skip dumping flow attrs. */
 
     /* There are 'N_UMAPS' maps containing 'struct udpif_key' elements.
      *
@@ -254,21 +255,26 @@ static void upcall_unixctl_disable_megaflows(struct 
unixctl_conn *, int argc,
                                              const char *argv[], void *aux);
 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
                                             const char *argv[], void *aux);
+static void upcall_unixctl_disable_uid(struct unixctl_conn *, int argc,
+                                              const char *argv[], void *aux);
+static void upcall_unixctl_enable_uid(struct unixctl_conn *, int argc,
+                                             const char *argv[], void *aux);
 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
                                             const char *argv[], void *aux);
 static void upcall_unixctl_dump_wait(struct unixctl_conn *conn, int argc,
                                      const char *argv[], void *aux);
 
 static struct udpif_key *ukey_create_from_upcall(const struct upcall *);
-static struct udpif_key *ukey_create_from_dpif_flow(const struct udpif *,
-                                                    const struct dpif_flow *);
+static int ukey_create_from_dpif_flow(const struct udpif *,
+                                      const struct dpif_flow *,
+                                      struct udpif_key **);
 static bool ukey_install_start(struct udpif *, struct udpif_key *ukey);
 static bool ukey_install_finish(struct udpif_key *ukey, int error);
 static bool ukey_install(struct udpif *udpif, struct udpif_key *ukey);
 static struct udpif_key *ukey_lookup(struct udpif *udpif,
                                      const ovs_u128 *uid);
 static int ukey_acquire(struct udpif *, const struct dpif_flow *,
-                        struct udpif_key **result);
+                        struct udpif_key **result, int *error);
 static void ukey_delete__(struct udpif_key *);
 static void ukey_delete(struct umap *, struct udpif_key *);
 static enum upcall_type classify_upcall(enum dpif_upcall_type type,
@@ -297,6 +303,10 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif)
                                  upcall_unixctl_disable_megaflows, NULL);
         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
                                  upcall_unixctl_enable_megaflows, NULL);
+        unixctl_command_register("upcall/disable-uid", "", 0, 0,
+                                 upcall_unixctl_disable_uid, NULL);
+        unixctl_command_register("upcall/enable-uid", "", 0, 0,
+                                 upcall_unixctl_enable_uid, NULL);
         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
                                  upcall_unixctl_set_flow_limit, NULL);
         unixctl_command_register("revalidator/wait", "", 0, 0,
@@ -311,6 +321,7 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif)
     udpif->dump_seq = seq_create();
     latch_init(&udpif->exit_latch);
     list_push_back(&all_udpifs, &udpif->list_node);
+    atomic_init(&udpif->enable_uid, false);
     atomic_init(&udpif->n_flows, 0);
     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
     ovs_mutex_init(&udpif->n_flows_mutex);
@@ -426,6 +437,7 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers,
                 "handler", udpif_upcall_handler, handler);
         }
 
+        atomic_init(&udpif->enable_uid, dpif_get_enable_uid(udpif->dpif));
         dpif_enable_upcall(udpif->dpif);
 
         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
@@ -718,7 +730,10 @@ udpif_revalidator(void *arg)
 
             start_time = time_msec();
             if (!udpif->reval_exit) {
-                udpif->dump = dpif_flow_dump_create(udpif->dpif);
+                bool terse_dump;
+
+                atomic_read_relaxed(&udpif->enable_uid, &terse_dump);
+                udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump);
             }
         }
 
@@ -1293,19 +1308,36 @@ ukey_create_from_upcall(const struct upcall *upcall)
                          upcall->reval_seq, 0);
 }
 
-static struct udpif_key *
+static int
 ukey_create_from_dpif_flow(const struct udpif *udpif,
-                           const struct dpif_flow *flow)
+                           const struct dpif_flow *flow,
+                           struct udpif_key **ukey)
 {
+    struct dpif_flow full_flow;
     struct ofpbuf actions;
     uint64_t dump_seq, reval_seq;
-
+    uint64_t stub[DPIF_FLOW_BUFSIZE / 8];
+
+    if (!flow->key_len) {
+        struct ofpbuf buf;
+        int err;
+
+        /* If the key was not provided by the datapath, fetch the full flow. */
+        ofpbuf_use_stack(&buf, &stub, sizeof stub);
+        err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->uid, &buf,
+                            &full_flow);
+        if (err) {
+            return err;
+        }
+        flow = &full_flow;
+    }
     dump_seq = seq_read(udpif->dump_seq);
     reval_seq = seq_read(udpif->reval_seq);
     ofpbuf_use_const(&actions, &flow->actions, flow->actions_len);
-    return ukey_create__(flow->key, flow->key_len,
-                         flow->mask, flow->mask_len, &flow->uid, &actions,
-                         dump_seq, reval_seq, flow->stats.used);
+    *ukey = ukey_create__(flow->key, flow->key_len,
+                          flow->mask, flow->mask_len, &flow->uid, &actions,
+                          dump_seq, reval_seq, flow->stats.used);
+    return 0;
 }
 
 /* Attempts to insert a ukey into the shared ukey maps.
@@ -1381,46 +1413,54 @@ ukey_install(struct udpif *udpif, struct udpif_key 
*ukey)
 /* Searches for a ukey in 'udpif->ukeys' that matches 'flow' and attempts to
  * lock the ukey. If the ukey does not exist, create it.
  *
- * Returns true on success, setting *result to the matching ukey and returning
- * it in a locked state. Otherwise, returns false and clears *result. */
+ * Returns 0 on success, setting *result to the matching ukey and returning it
+ * in a locked state. Otherwise, returns an errno and clears *result. EBUSY
+ * indicates that another thread is handling this flow. Other errors an
+ * unexpected condition creating a new ukey.
+ *
+ * *error is an output parameter provided to appease the threadsafety analyser,
+ * and its value matches the return value. */
 static int
 ukey_acquire(struct udpif *udpif, const struct dpif_flow *flow,
-             struct udpif_key **result)
-    OVS_TRY_LOCK(true, (*result)->mutex)
+             struct udpif_key **result, int *error)
+    OVS_TRY_LOCK(0, (*result)->mutex)
 {
     struct udpif_key *ukey;
-    bool locked = false;
+    int retval;
 
     ukey = ukey_lookup(udpif, &flow->uid);
     if (ukey) {
-        if (!ovs_mutex_trylock(&ukey->mutex)) {
-            locked = true;
-        }
+        retval = ovs_mutex_trylock(&ukey->mutex);
     } else {
-        bool installed;
-
         /* Usually we try to avoid installing flows from revalidator threads,
          * because locking on a umap may cause handler threads to block.
          * However there are certain cases, like when ovs-vswitchd is
          * restarted, where it is desirable to handle flows that exist in the
          * datapath gracefully (ie, don't just clear the datapath). */
-        ukey = ukey_create_from_dpif_flow(udpif, flow);
-        installed = ukey_install_start(udpif, ukey);
-        if (installed) {
+        bool install;
+
+        retval = ukey_create_from_dpif_flow(udpif, flow, &ukey);
+        if (retval) {
+            goto done;
+        }
+        install = ukey_install_start(udpif, ukey);
+        if (install) {
             ukey_install_finish__(ukey);
-            locked = true;
+            retval = 0;
         } else {
             ukey_delete__(ukey);
-            locked = false;
+            retval = EBUSY;
         }
     }
 
-    if (locked) {
-        *result = ukey;
-    } else {
+done:
+    *error = retval;
+    if (retval) {
         *result = NULL;
+    } else {
+        *result = ukey;
     }
-    return locked;
+    return retval;
 }
 
 static void
@@ -1610,6 +1650,16 @@ exit:
 }
 
 static void
+delete_op_init__(struct ukey_op *op, const struct dpif_flow *flow)
+{
+    op->dop.type = DPIF_OP_FLOW_DEL;
+    op->dop.u.flow_del.key = flow->key;
+    op->dop.u.flow_del.key_len = flow->key_len;
+    op->dop.u.flow_del.uid = &flow->uid;
+    op->dop.u.flow_del.stats = &op->stats;
+}
+
+static void
 delete_op_init(struct ukey_op *op, struct udpif_key *ukey)
 {
     op->ukey = ukey;
@@ -1640,30 +1690,39 @@ push_ukey_ops__(struct udpif *udpif, struct ukey_op 
*ops, size_t n_ops)
         stats = op->dop.u.flow_del.stats;
         push = &push_buf;
 
-        ovs_mutex_lock(&op->ukey->mutex);
-        push->used = MAX(stats->used, op->ukey->stats.used);
-        push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
-        push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
-        push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
-        ovs_mutex_unlock(&op->ukey->mutex);
+        if (op->ukey) {
+            ovs_mutex_lock(&op->ukey->mutex);
+            push->used = MAX(stats->used, op->ukey->stats.used);
+            push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
+            push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
+            push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
+            ovs_mutex_unlock(&op->ukey->mutex);
+        } else {
+            push = stats;
+        }
 
         if (push->n_packets || netflow_exists()) {
+            const struct nlattr *key = op->dop.u.flow_del.key;
+            size_t key_len = op->dop.u.flow_del.key_len;
             struct ofproto_dpif *ofproto;
             struct netflow *netflow;
             ofp_port_t ofp_in_port;
             struct flow flow;
             int error;
 
-            ovs_mutex_lock(&op->ukey->mutex);
-            if (op->ukey->xcache) {
-                xlate_push_stats(op->ukey->xcache, push);
+            if (op->ukey) {
+                ovs_mutex_lock(&op->ukey->mutex);
+                if (op->ukey->xcache) {
+                    xlate_push_stats(op->ukey->xcache, push);
+                    ovs_mutex_unlock(&op->ukey->mutex);
+                    continue;
+                }
                 ovs_mutex_unlock(&op->ukey->mutex);
-                continue;
+                key = op->ukey->key;
+                key_len = op->ukey->key_len;
             }
-            ovs_mutex_unlock(&op->ukey->mutex);
 
-            if (odp_flow_key_to_flow(op->dop.u.flow_del.key,
-                                     op->dop.u.flow_del.key_len, &flow)
+            if (odp_flow_key_to_flow(key, key_len, &flow)
                 == ODP_FIT_ERROR) {
                 continue;
             }
@@ -1703,6 +1762,19 @@ push_ukey_ops(struct udpif *udpif, struct umap *umap,
 }
 
 static void
+log_unexpected_flow(const struct dpif_flow *flow, int error)
+{
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 60);
+    struct ds ds = DS_EMPTY_INITIALIZER;
+
+    ds_put_format(&ds, "Failed to acquire udpif_key corresponding to "
+                  "unexpected flow (%s): ", ovs_strerror(error));
+    odp_format_uid(&flow->uid, &ds);
+    VLOG_WARN_RL(&rl, "%s", ds_cstr(&ds));
+
+}
+
+static void
 revalidate(struct revalidator *revalidator)
 {
     struct udpif *udpif = revalidator->udpif;
@@ -1754,11 +1826,17 @@ revalidate(struct revalidator *revalidator)
             long long int used = f->stats.used;
             struct udpif_key *ukey;
             bool already_dumped, keep;
+            int error;
 
-            if (!ukey_acquire(udpif, f, &ukey)) {
-                /* Another thread is processing this flow, so don't bother
-                 * processing it.*/
-                COVERAGE_INC(upcall_ukey_contention);
+            if (ukey_acquire(udpif, f, &ukey, &error)) {
+                if (error == EBUSY) {
+                    /* Another thread is processing this flow, so don't bother
+                     * processing it.*/
+                    COVERAGE_INC(upcall_ukey_contention);
+                } else {
+                    log_unexpected_flow(f, error);
+                    delete_op_init__(&ops[n_ops++], f);
+                }
                 continue;
             }
 
@@ -1896,15 +1974,23 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc 
OVS_UNUSED,
 
     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
         unsigned int flow_limit;
+        bool uid_enabled;
         size_t i;
 
         atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
+        atomic_read_relaxed(&udpif->enable_uid, &uid_enabled);
 
         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
         ds_put_format(&ds, "\tflows         : (current %lu)"
             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
+        ds_put_format(&ds, "\tuid enabled : ");
+        if (uid_enabled) {
+            ds_put_format(&ds, "true\n");
+        } else {
+            ds_put_format(&ds, "false\n");
+        }
         ds_put_char(&ds, '\n');
 
         for (i = 0; i < n_revalidators; i++) {
@@ -1952,6 +2038,38 @@ upcall_unixctl_enable_megaflows(struct unixctl_conn 
*conn,
     unixctl_command_reply(conn, "megaflows enabled");
 }
 
+/* Disable skipping flow attributes during flow dump.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_disable_uid(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                           const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+    struct udpif *udpif;
+
+    LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+        atomic_store(&udpif->enable_uid, false);
+    }
+    unixctl_command_reply(conn, "Datapath dumping tersely using UID disabled");
+}
+
+/* Re-enable skipping flow attributes during flow dump.
+ *
+ * This command is only needed for advanced debugging, so it's not documented
+ * in the man page. */
+static void
+upcall_unixctl_enable_uid(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                          const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+    struct udpif *udpif;
+
+    LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+        atomic_store(&udpif->enable_uid, true);
+    }
+    unixctl_command_reply(conn, "Datapath dumping tersely using UID enabled");
+}
+
 /* Set the flow limit.
  *
  * This command is only needed for advanced debugging, so it's not
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index f85d2f4..1f290ea 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -280,6 +280,9 @@ struct dpif_backer {
     /* Maximum number of MPLS label stack entries that the datapath supports
      * in a match */
     size_t max_mpls_depth;
+
+    /* Support for userspace-generated unique flow ids. */
+    bool enable_uid;
 };
 
 /* All existing ofproto_backer instances, indexed by ofproto->up.type. */
@@ -4935,7 +4938,7 @@ ofproto_unixctl_dpif_dump_flows(struct unixctl_conn *conn,
     }
 
     ds_init(&ds);
-    flow_dump = dpif_flow_dump_create(ofproto->backer->dpif);
+    flow_dump = dpif_flow_dump_create(ofproto->backer->dpif, false);
     flow_dump_thread = dpif_flow_dump_thread_create(flow_dump);
     while (dpif_flow_dump_next(flow_dump_thread, &f, 1)) {
         struct flow flow;
diff --git a/tests/dpif-netdev.at b/tests/dpif-netdev.at
index b2c7908..18210b3 100644
--- a/tests/dpif-netdev.at
+++ b/tests/dpif-netdev.at
@@ -94,6 +94,8 @@ OVS_VSWITCHD_START(
   [add-port br0 p1 -- set interface p1 type=dummy 
options:pstream=punix:$OVS_RUNDIR/p0.sock
    set bridge br0 datapath-type=dummy other-config:datapath-id=1234 \
                   fail-mode=secure])
+AT_CHECK([ovs-appctl upcall/disable-uid], [0], [Datapath dumping tersely using 
UID disabled
+], [])
 AT_CHECK([ovs-appctl vlog/set dpif:dbg dpif_netdev:dbg])
 
 AT_CHECK([ovs-ofctl add-flow br0 action=normal])
@@ -110,6 +112,8 @@ 
skb_priority(0/0),skb_mark(0/0),recirc_id(0),dp_hash(0/0),in_port(1),eth(src=50:
 # Now, the same again without megaflows.
 AT_CHECK([ovs-appctl upcall/disable-megaflows], [0], [megaflows disabled
 ])
+AT_CHECK([ovs-appctl upcall/disable-uid], [0], [Datapath dumping tersely using 
UID disabled
+], [])
 AT_CHECK([ovs-appctl netdev-dummy/receive p1 
'in_port(1),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)'])
 sleep 1
 
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index 2f83d4b..124b289 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -4732,6 +4732,8 @@ OVS_VSWITCHD_START([add-br br1 \
 ADD_OF_PORTS([br0], [2])
 ADD_OF_PORTS([br1], [3])
 
+AT_CHECK([ovs-appctl upcall/disable-uid], [0], [Datapath dumping tersely using 
UID disabled
+], [])
 AT_CHECK([ovs-appctl time/stop])
 AT_CHECK([ovs-appctl vlog/set dpif:dbg dpif_netdev:dbg])
 
@@ -5307,6 +5309,8 @@ table=0 in_port=1,ip,nw_dst=10.0.0.3 actions=drop
 ])
 AT_CHECK([ovs-appctl upcall/disable-megaflows], [0], [megaflows disabled
 ], [])
+AT_CHECK([ovs-appctl upcall/disable-uid], [0], [Datapath dumping tersely using 
UID disabled
+], [])
 AT_CHECK([ovs-appctl vlog/set dpif_netdev:dbg], [0], [], [])
 AT_CHECK([ovs-ofctl add-flows br0 flows.txt])
 for i in 1 2 3 4; do
-- 
1.7.10.4

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to