PMD threads use pvectors but do not need the overhead of the
concurrent version.  Expose the non-concurrent version for
that use.

Note that struct pvector is renamed as struct cpvector (for concurrent
priority vector), and the former struct pvector_impl is now struct
pvector.

Signed-off-by: Jarno Rajahalme <ja...@ovn.org>
---
 lib/classifier.c        |  30 ++++----
 lib/classifier.h        |   6 +-
 lib/dpif-netdev.c       |  14 ++--
 lib/pvector.c           | 190 +++++++++++++++++++++++++-----------------------
 lib/pvector.h           | 187 ++++++++++++++++++++++++++++++++---------------
 tests/test-classifier.c |  12 +--
 6 files changed, 257 insertions(+), 182 deletions(-)

diff --git a/lib/classifier.c b/lib/classifier.c
index 54941ed..8af1cb6 100644
--- a/lib/classifier.c
+++ b/lib/classifier.c
@@ -326,7 +326,7 @@ classifier_init(struct classifier *cls, const uint8_t 
*flow_segments)
 {
     cls->n_rules = 0;
     cmap_init(&cls->subtables_map);
-    pvector_init(&cls->subtables);
+    cpvector_init(&cls->subtables);
     cls->n_flow_segments = 0;
     if (flow_segments) {
         while (cls->n_flow_segments < CLS_MAX_INDICES
@@ -360,7 +360,7 @@ classifier_destroy(struct classifier *cls)
         }
         cmap_destroy(&cls->subtables_map);
 
-        pvector_destroy(&cls->subtables);
+        cpvector_destroy(&cls->subtables);
     }
 }
 
@@ -659,20 +659,20 @@ classifier_replace(struct classifier *cls, const struct 
cls_rule *rule,
     if (n_rules == 1) {
         subtable->max_priority = rule->priority;
         subtable->max_count = 1;
-        pvector_insert(&cls->subtables, subtable, rule->priority);
+        cpvector_insert(&cls->subtables, subtable, rule->priority);
     } else if (rule->priority == subtable->max_priority) {
         ++subtable->max_count;
     } else if (rule->priority > subtable->max_priority) {
         subtable->max_priority = rule->priority;
         subtable->max_count = 1;
-        pvector_change_priority(&cls->subtables, subtable, rule->priority);
+        cpvector_change_priority(&cls->subtables, subtable, rule->priority);
     }
 
     /* Nothing was replaced. */
     cls->n_rules++;
 
     if (cls->publish) {
-        pvector_publish(&cls->subtables);
+        cpvector_publish(&cls->subtables);
     }
 
     return NULL;
@@ -804,12 +804,12 @@ check_priority:
                 }
             }
             subtable->max_priority = max_priority;
-            pvector_change_priority(&cls->subtables, subtable, max_priority);
+            cpvector_change_priority(&cls->subtables, subtable, max_priority);
         }
     }
 
     if (cls->publish) {
-        pvector_publish(&cls->subtables);
+        cpvector_publish(&cls->subtables);
     }
 
     /* free the rule. */
@@ -960,8 +960,8 @@ classifier_lookup__(const struct classifier *cls, 
cls_version_t version,
 
     /* Main loop. */
     struct cls_subtable *subtable;
-    PVECTOR_FOR_EACH_PRIORITY (subtable, hard_pri + 1, 2, sizeof *subtable,
-                               &cls->subtables) {
+    CPVECTOR_FOR_EACH_PRIORITY (subtable, hard_pri + 1, 2, sizeof *subtable,
+                                &cls->subtables) {
         struct cls_conjunction_set *conj_set;
 
         /* Skip subtables with no match, or where the match is lower-priority
@@ -1232,8 +1232,8 @@ classifier_rule_overlaps(const struct classifier *cls,
     struct cls_subtable *subtable;
 
     /* Iterate subtables in the descending max priority order. */
-    PVECTOR_FOR_EACH_PRIORITY (subtable, target->priority, 2,
-                               sizeof(struct cls_subtable), &cls->subtables) {
+    CPVECTOR_FOR_EACH_PRIORITY (subtable, target->priority, 2,
+                                sizeof(struct cls_subtable), &cls->subtables) {
         struct {
             struct minimask mask;
             uint64_t storage[FLOW_U64S];
@@ -1351,8 +1351,8 @@ cls_cursor_start(const struct classifier *cls, const 
struct cls_rule *target,
     cursor.rule = NULL;
 
     /* Find first rule. */
-    PVECTOR_CURSOR_FOR_EACH (subtable, &cursor.subtables,
-                             &cursor.cls->subtables) {
+    CPVECTOR_CURSOR_FOR_EACH (subtable, &cursor.subtables,
+                              &cursor.cls->subtables) {
         const struct cls_rule *rule = search_subtable(subtable, &cursor);
 
         if (rule) {
@@ -1379,7 +1379,7 @@ cls_cursor_next(struct cls_cursor *cursor)
         }
     }
 
-    PVECTOR_CURSOR_FOR_EACH_CONTINUE (subtable, &cursor->subtables) {
+    CPVECTOR_CURSOR_FOR_EACH_CONTINUE (subtable, &cursor->subtables) {
         rule = search_subtable(subtable, cursor);
         if (rule) {
             cursor->subtable = subtable;
@@ -1511,7 +1511,7 @@ destroy_subtable(struct classifier *cls, struct 
cls_subtable *subtable)
 {
     int i;
 
-    pvector_remove(&cls->subtables, subtable);
+    cpvector_remove(&cls->subtables, subtable);
     cmap_remove(&cls->subtables_map, &subtable->cmap_node,
                 minimask_hash(&subtable->mask, 0));
 
diff --git a/lib/classifier.h b/lib/classifier.h
index 6247350..f058071 100644
--- a/lib/classifier.h
+++ b/lib/classifier.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -340,7 +340,7 @@ struct classifier {
     uint8_t flow_segments[CLS_MAX_INDICES]; /* Flow segment boundaries to use
                                              * for staged lookup. */
     struct cmap subtables_map;      /* Contains "struct cls_subtable"s.  */
-    struct pvector subtables;
+    struct cpvector subtables;
     struct cmap partitions;         /* Contains "struct cls_partition"s. */
     struct cls_trie tries[CLS_MAX_TRIES]; /* Prefix tries. */
     unsigned int n_tries;
@@ -471,7 +471,7 @@ static inline void
 classifier_publish(struct classifier *cls)
 {
     cls->publish = true;
-    pvector_publish(&cls->subtables);
+    cpvector_publish(&cls->subtables);
 }
 
 #ifdef __cplusplus
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 37c2631..5abc363 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -151,7 +151,7 @@ struct emc_cache {
 
 struct dpcls {
     struct cmap subtables_map;
-    struct pvector subtables;
+    struct pvector *subtables;
 };
 
 /* A rule to be inserted to the classifier. */
@@ -4381,13 +4381,13 @@ static void
 dpcls_init(struct dpcls *cls)
 {
     cmap_init(&cls->subtables_map);
-    pvector_init(&cls->subtables);
+    cls->subtables = pvector_alloc(4);
 }
 
 static void
 dpcls_destroy_subtable(struct dpcls *cls, struct dpcls_subtable *subtable)
 {
-    pvector_remove(&cls->subtables, subtable);
+    pvector_remove(cls->subtables, subtable);
     cmap_remove(&cls->subtables_map, &subtable->cmap_node,
                 subtable->mask.hash);
     cmap_destroy(&subtable->rules);
@@ -4408,7 +4408,7 @@ dpcls_destroy(struct dpcls *cls)
             dpcls_destroy_subtable(cls, subtable);
         }
         cmap_destroy(&cls->subtables_map);
-        pvector_destroy(&cls->subtables);
+        free(cls->subtables);
     }
 }
 
@@ -4423,8 +4423,7 @@ dpcls_create_subtable(struct dpcls *cls, const struct 
netdev_flow_key *mask)
     cmap_init(&subtable->rules);
     netdev_flow_key_clone(&subtable->mask, mask);
     cmap_insert(&cls->subtables_map, &subtable->cmap_node, mask->hash);
-    pvector_insert(&cls->subtables, subtable, 0);
-    pvector_publish(&cls->subtables);
+    pvector_push_back(&cls->subtables, subtable, 0);
 
     return subtable;
 }
@@ -4467,7 +4466,6 @@ dpcls_remove(struct dpcls *cls, struct dpcls_rule *rule)
     if (cmap_remove(&subtable->rules, &rule->cmap_node, rule->flow.hash)
         == 0) {
         dpcls_destroy_subtable(cls, subtable);
-        pvector_publish(&cls->subtables);
     }
 }
 
@@ -4521,7 +4519,7 @@ dpcls_lookup(const struct dpcls *cls, const struct 
netdev_flow_key keys[],
     }
     memset(rules, 0, cnt * sizeof *rules);
 
-    PVECTOR_FOR_EACH (subtable, &cls->subtables) {
+    PVECTOR_FOR_EACH (subtable, cls->subtables) {
         const struct netdev_flow_key *mkeys = keys;
         struct dpcls_rule **mrules = rules;
         map_type remains = 0;
diff --git a/lib/pvector.c b/lib/pvector.c
index aaeee92..5fb1899 100644
--- a/lib/pvector.c
+++ b/lib/pvector.c
@@ -21,60 +21,30 @@
  * reallocations. */
 enum { PVECTOR_EXTRA_ALLOC = 4 };
 
-static struct pvector_impl *
-pvector_impl_get(const struct pvector *pvec)
+struct pvector *
+pvector_alloc(size_t size)
 {
-    return ovsrcu_get(struct pvector_impl *, &pvec->impl);
-}
-
-static struct pvector_impl *
-pvector_impl_alloc(size_t size)
-{
-    struct pvector_impl *impl;
+    struct pvector *pvec;
 
-    impl = xmalloc(sizeof *impl + size * sizeof impl->vector[0]);
-    impl->size = 0;
-    impl->allocated = size;
+    pvec = xmalloc(sizeof *pvec + size * sizeof pvec->vector[0]);
+    pvec->size = 0;
+    pvec->allocated = size;
 
-    return impl;
-}
-
-static struct pvector_impl *
-pvector_impl_dup(struct pvector_impl *old)
-{
-    struct pvector_impl *impl;
-    size_t alloc = old->size + PVECTOR_EXTRA_ALLOC;
-
-    impl = xmalloc(sizeof *impl + alloc * sizeof impl->vector[0]);
-    impl->allocated = alloc;
-    impl->size = old->size;
-    memcpy(impl->vector, old->vector, old->size * sizeof old->vector[0]);
-    return impl;
+    return pvec;
 }
 
-/* Initializes 'pvec' as an empty concurrent priority vector. */
-void
-pvector_init(struct pvector *pvec)
+static struct pvector *
+pvector_dup(const struct pvector *old)
 {
-    ovsrcu_set(&pvec->impl, pvector_impl_alloc(PVECTOR_EXTRA_ALLOC));
-    pvec->temp = NULL;
-}
+    struct pvector *pvec = pvector_alloc(old->size + PVECTOR_EXTRA_ALLOC);
 
-/* Destroys 'pvec'.
- *
- * The client is responsible for destroying any data previously held in
- * 'pvec'. */
-void
-pvector_destroy(struct pvector *pvec)
-{
-    free(pvec->temp);
-    pvec->temp = NULL;
-    ovsrcu_postpone(free, pvector_impl_get(pvec));
-    ovsrcu_set(&pvec->impl, NULL); /* Poison. */
+    pvec->size = old->size;
+    memcpy(pvec->vector, old->vector, old->size * sizeof old->vector[0]);
+    return pvec;
 }
 
-/* Iterators for callers that need the 'index' afterward. */
-#define PVECTOR_IMPL_FOR_EACH(ENTRY, INDEX, IMPL)          \
+/* Iterator for callers that need the 'index' afterward. */
+#define PVECTOR_FOR_EACH_ENTRY(ENTRY, INDEX, IMPL)         \
     for ((INDEX) = 0;                                      \
          (INDEX) < (IMPL)->size                            \
              && ((ENTRY) = &(IMPL)->vector[INDEX], true);  \
@@ -91,20 +61,20 @@ pvector_entry_cmp(const void *a_, const void *b_)
     return a > b ? -1 : a < b;
 }
 
-static void
-pvector_impl_sort(struct pvector_impl *impl)
+void
+pvector_sort(struct pvector *pvec)
 {
-    qsort(impl->vector, impl->size, sizeof *impl->vector, pvector_entry_cmp);
+    qsort(pvec->vector, pvec->size, sizeof *pvec->vector, pvector_entry_cmp);
 }
 
 /* Returns the index of the 'ptr' in the vector, or -1 if none is found. */
 static int
-pvector_impl_find(struct pvector_impl *impl, void *target)
+pvector_find(const struct pvector *pvec, void *target)
 {
     const struct pvector_entry *entry;
     int index;
 
-    PVECTOR_IMPL_FOR_EACH (entry, index, impl) {
+    PVECTOR_FOR_EACH_ENTRY (entry, index, pvec) {
         if (entry->ptr == target) {
             return index;
         }
@@ -112,11 +82,67 @@ pvector_impl_find(struct pvector_impl *impl, void *target)
     return -1;
 }
 
+/* May re-allocate 'impl' */
+void
+pvector_push_back(struct pvector **pvecp, void *ptr, int priority)
+{
+    struct pvector *pvec = *pvecp;
+
+    if (pvec->size == pvec->allocated) {
+        pvec = pvector_dup(pvec);
+        free(*pvecp);
+        *pvecp = pvec;
+    }
+    /* Insert at the end, will be sorted later. */
+    pvec->vector[pvec->size].ptr = ptr;
+    pvec->vector[pvec->size].priority = priority;
+    pvec->size++;
+}
+
 void
-pvector_insert(struct pvector *pvec, void *ptr, int priority)
+pvector_remove(struct pvector *pvec, void *ptr)
 {
-    struct pvector_impl *temp = pvec->temp;
-    struct pvector_impl *old = pvector_impl_get(pvec);
+    int index;
+
+    index = pvector_find(pvec, ptr);
+    ovs_assert(index >= 0);
+    /* Now at the index of the entry to be deleted.
+     * Swap another entry in if needed, can be sorted later. */
+    pvec->size--;
+    if (index != pvec->size) {
+        pvec->vector[index] = pvec->vector[pvec->size];
+    }
+}
+
+
+/* Concurrent version. */
+
+/* Initializes 'cpvec' as an empty concurrent priority vector. */
+void
+cpvector_init(struct cpvector *cpvec)
+{
+    ovsrcu_set(&cpvec->impl, pvector_alloc(PVECTOR_EXTRA_ALLOC));
+    cpvec->temp = NULL;
+}
+
+/* Destroys 'cpvec'.
+ *
+ * The client is responsible for destroying any data previously held in
+ * 'pvec'. */
+void
+cpvector_destroy(struct cpvector *cpvec)
+{
+    free(cpvec->temp);
+    cpvec->temp = NULL;
+    ovsrcu_postpone(free, cpvector_get_pvector(cpvec));
+    ovsrcu_set(&cpvec->impl, NULL); /* Poison. */
+}
+
+void
+cpvector_insert(struct cpvector *cpvec, void *ptr, int priority)
+{
+    struct pvector *temp = cpvec->temp;
+    struct pvector *old = cpvector_get_pvector(cpvec);
 
     ovs_assert(ptr != NULL);
 
@@ -131,53 +157,37 @@ pvector_insert(struct pvector *pvec, void *ptr, int 
priority)
         ++old->size;
     } else {
         if (!temp) {
-            temp = pvector_impl_dup(old);
-            pvec->temp = temp;
-        } else if (temp->size == temp->allocated) {
-            temp = pvector_impl_dup(temp);
-            free(pvec->temp);
-            pvec->temp = temp;
+            cpvec->temp = pvector_dup(old);
         }
-        /* Insert at the end, publish will sort. */
-        temp->vector[temp->size].ptr = ptr;
-        temp->vector[temp->size].priority = priority;
-        temp->size += 1;
+        pvector_push_back(&cpvec->temp, ptr, priority);
     }
 }
 
 void
-pvector_remove(struct pvector *pvec, void *ptr)
+cpvector_remove(struct cpvector *cpvec, void *ptr)
 {
-    struct pvector_impl *temp = pvec->temp;
-    int index;
+    struct pvector *temp = cpvec->temp;
 
     if (!temp) {
-        temp = pvector_impl_dup(pvector_impl_get(pvec));
-        pvec->temp = temp;
+        temp = pvector_dup(cpvector_get_pvector(cpvec));
+        cpvec->temp = temp;
     }
     ovs_assert(temp->size > 0);
-    index = pvector_impl_find(temp, ptr);
-    ovs_assert(index >= 0);
-    /* Now at the index of the entry to be deleted.
-     * Swap another entry in if needed, publish will sort anyway. */
-    temp->size--;
-    if (index != temp->size) {
-        temp->vector[index] = temp->vector[temp->size];
-    }
+    pvector_remove(temp, ptr);   /* Publish will sort. */
 }
 
 /* Change entry's 'priority' and keep the vector ordered. */
 void
-pvector_change_priority(struct pvector *pvec, void *ptr, int priority)
+cpvector_change_priority(struct cpvector *cpvec, void *ptr, int priority)
 {
-    struct pvector_impl *old = pvec->temp;
+    struct pvector *old = cpvec->temp;
     int index;
 
     if (!old) {
-        old = pvector_impl_get(pvec);
+        old = cpvector_get_pvector(cpvec);
     }
 
-    index = pvector_impl_find(old, ptr);
+    index = pvector_find(old, ptr);
 
     ovs_assert(index >= 0);
     /* Now at the index of the entry to be updated. */
@@ -188,10 +198,10 @@ pvector_change_priority(struct pvector *pvec, void *ptr, 
int priority)
         || (priority < old->vector[index].priority && index < old->size - 1
             && priority < old->vector[index + 1].priority)) {
         /* Have to use a temp. */
-        if (!pvec->temp) {
+        if (!cpvec->temp) {
             /* Have to reallocate to reorder. */
-            pvec->temp = pvector_impl_dup(old);
-            old = pvec->temp;
+            cpvec->temp = pvector_dup(old);
+            old = cpvec->temp;
             /* Publish will sort. */
         }
     }
@@ -199,13 +209,13 @@ pvector_change_priority(struct pvector *pvec, void *ptr, 
int priority)
 }
 
 /* Make the modified pvector available for iteration. */
-void pvector_publish__(struct pvector *pvec)
+void cpvector_publish__(struct cpvector *cpvec)
 {
-    struct pvector_impl *temp = pvec->temp;
+    struct pvector *temp = cpvec->temp;
 
-    pvec->temp = NULL;
-    pvector_impl_sort(temp); /* Also removes gaps. */
-    ovsrcu_postpone(free, ovsrcu_get_protected(struct pvector_impl *,
-                                               &pvec->impl));
-    ovsrcu_set(&pvec->impl, temp);
+    cpvec->temp = NULL;
+    pvector_sort(temp); /* Also removes gaps. */
+    ovsrcu_postpone(free, ovsrcu_get_protected(struct pvector *,
+                                               &cpvec->impl));
+    ovsrcu_set(&cpvec->impl, temp);
 }
diff --git a/lib/pvector.h b/lib/pvector.h
index b175b21..00cb757 100644
--- a/lib/pvector.h
+++ b/lib/pvector.h
@@ -49,7 +49,7 @@
  * values, or decrement the 'size' on a copy that readers have access to.
  *
  * Most modifications are internally staged at the 'temp' vector, from which
- * they can be published at 'impl' by calling pvector_publish().  This saves
+ * they can be published at 'impl' by calling cpvector_publish().  This saves
  * unnecessary memory allocations when many changes are done back-to-back.
  * 'temp' may contain NULL pointers and it may be in unsorted order.  It is
  * sorted before it is published at 'impl', which also removes the NULLs from
@@ -61,33 +61,17 @@ struct pvector_entry {
     void *ptr;
 };
 
-struct pvector_impl {
+/* Non-concurrent priority vector. */
+struct pvector {
     size_t size;       /* Number of entries in the vector. */
     size_t allocated;  /* Number of allocated entries. */
     struct pvector_entry vector[];
 };
 
-/* Concurrent priority vector. */
-struct pvector {
-    OVSRCU_TYPE(struct pvector_impl *) impl;
-    struct pvector_impl *temp;
-};
-
-/* Initialization. */
-void pvector_init(struct pvector *);
-void pvector_destroy(struct pvector *);
-
-/* Insertion and deletion.  These work on 'temp'.  */
-void pvector_insert(struct pvector *, void *, int priority);
-void pvector_change_priority(struct pvector *, void *, int priority);
-void pvector_remove(struct pvector *, void *);
-
-/* Make the modified pvector available for iteration. */
-static inline void pvector_publish(struct pvector *);
-
-/* Count.  These operate on the published pvector. */
-static inline size_t pvector_count(const struct pvector *);
-static inline bool pvector_is_empty(const struct pvector *);
+struct pvector *pvector_alloc(size_t);
+void pvector_push_back(struct pvector **, void *ptr, int priority);
+void pvector_remove(struct pvector *, void *ptr);
+void pvector_sort(struct pvector *);
 
 /* Iteration.
  *
@@ -95,8 +79,9 @@ static inline bool pvector_is_empty(const struct pvector *);
  * Thread-safety
  * =============
  *
- * Iteration is safe even in a pvector that is changing concurrently.
- * Multiple writers must exclude each other via e.g., a mutex.
+ * These iterators operate on the non-concurrent pvector, and are not thread
+ * safe.  Any entry may be skipped if entires are removed (with
+ * pvector_remove()) during iteration.
  *
  * Example
  * =======
@@ -106,32 +91,28 @@ static inline bool pvector_is_empty(const struct pvector 
*);
  *     };
  *
  *     struct my_node elem1, elem2, *iter;
- *     struct pvector my_pvector;
+ *     struct pvector *my_pvector;
  *
- *     pvector_init(&my_pvector);
+ *     my_pvector = pvector_alloc(0);
  *     ...add data...
- *     pvector_insert(&my_pvector, &elem1, 1);
- *     pvector_insert(&my_pvector, &elem2, 2);
+ *     pvector_push_back(&my_pvector, &elem1, 1);
+ *     pvector_push_back(&my_pvector, &elem2, 2);
+ *     ...sort...
+ *     pvector_sort(my_pvector);
  *     ...
- *     PVECTOR_FOR_EACH (iter, &my_pvector) {
+ *     PVECTOR_FOR_EACH (iter, &my_cpvector) {
  *         ...operate on '*iter'...
  *         ...elem2 to be seen before elem1...
  *     }
+ *     ...remove entries...
+ *     pvector_remove(my_pvector, &elem1);
  *     ...
- *     pvector_destroy(&my_pvector);
+ *     free(my_pvector);
  *
- * There is no PVECTOR_FOR_EACH_SAFE variant as iteration is performed on RCU
- * protected instance of the priority vector.  Any concurrent modifications
- * that would be disruptive for readers (such as deletions), will be performed
- * on a new instance.  To see any of the modifications, a new iteration loop
- * has to be started.
+ * Currently there is no PVECTOR_FOR_EACH_SAFE variant.
  *
  * The PVECTOR_FOR_EACH_PRIORITY limits the iteration to entries with higher
  * than or equal to the given priority and allows for object lookahead.
- *
- * The iteration loop must be completed without entering the OVS RCU quiescent
- * period.  That is, an old iteration loop must not be continued after any
- * blocking IO (VLOG is non-blocking, so that is OK).
  */
 struct pvector_cursor {
     size_t size;        /* Number of entries in the vector. */
@@ -143,7 +124,7 @@ static inline struct pvector_cursor 
pvector_cursor_init(const struct pvector *,
                                                         size_t n_ahead,
                                                         size_t obj_size);
 static inline void *pvector_cursor_next(struct pvector_cursor *,
-                                        int lowest_priority,
+                                        int stop_at_priority,
                                         size_t n_ahead, size_t obj_size);
 static inline void pvector_cursor_lookahead(const struct pvector_cursor *,
                                             int n, size_t size);
@@ -158,29 +139,109 @@ static inline void pvector_cursor_lookahead(const struct 
pvector_cursor *,
     for (struct pvector_cursor cursor__ = pvector_cursor_init(PVECTOR, N, SZ); 
\
          ((PTR) = pvector_cursor_next(&cursor__, PRIORITY, N, SZ)) != NULL; )
 
-#define PVECTOR_CURSOR_FOR_EACH(PTR, CURSOR, PVECTOR)                \
-    for (*(CURSOR) = pvector_cursor_init(PVECTOR, 0, 0);             \
+#define PVECTOR_CURSOR_FOR_EACH(PTR, CURSOR, PVECTOR)                   \
+    for (*(CURSOR) = pvector_cursor_init(PVECTOR, 0, 0);                \
          ((PTR) = pvector_cursor_next(CURSOR, INT_MIN, 0, 0)) != NULL; )
 
 #define PVECTOR_CURSOR_FOR_EACH_CONTINUE(PTR, CURSOR)                   \
     for (; ((PTR) = pvector_cursor_next(CURSOR, INT_MIN, 0, 0)) != NULL; )
 
 
+/* Concurrent priority vector. */
+struct cpvector {
+    OVSRCU_TYPE(struct pvector *) impl;
+    struct pvector *temp;
+};
+
+/* Initialization. */
+void cpvector_init(struct cpvector *);
+void cpvector_destroy(struct cpvector *);
+
+/* Insertion and deletion.  These work on 'temp'.  */
+void cpvector_insert(struct cpvector *, void *, int priority);
+void cpvector_change_priority(struct cpvector *, void *, int priority);
+void cpvector_remove(struct cpvector *, void *);
+
+/* Make the modified cpvector available for iteration. */
+static inline void cpvector_publish(struct cpvector *);
+
+/* Count.  These operate on the published cpvector. */
+static inline size_t cpvector_count(const struct cpvector *);
+static inline bool cpvector_is_empty(const struct cpvector *);
+
+static inline struct pvector *cpvector_get_pvector(const struct cpvector *);
+
+/* Iteration.
+ *
+ *
+ * Thread-safety
+ * =============
+ *
+ * Iteration is safe even in a cpvector that is changing concurrently.
+ * Multiple writers must exclude each other via e.g., a mutex.
+ *
+ * Example
+ * =======
+ *
+ *     struct my_node {
+ *         int data;
+ *     };
+ *
+ *     struct my_node elem1, elem2, *iter;
+ *     struct cpvector my_cpvector;
+ *
+ *     cpvector_init(&my_cpvector);
+ *     ...add data...
+ *     cpvector_insert(&my_cpvector, &elem1, 1);
+ *     cpvector_insert(&my_cpvector, &elem2, 2);
+ *     ...
+ *     CPVECTOR_FOR_EACH (iter, &my_cpvector) {
+ *         ...operate on '*iter'...
+ *         ...elem2 to be seen before elem1...
+ *     }
+ *     ...
+ *     cpvector_destroy(&my_cpvector);
+ *
+ * There is no CPVECTOR_FOR_EACH_SAFE variant as iteration is performed on RCU
+ * protected instance of the priority vector.  Any concurrent modifications
+ * that would be disruptive for readers (such as deletions), will be performed
+ * on a new instance.  To see any of the modifications, a new iteration loop
+ * has to be started.
+ *
+ * The CPVECTOR_FOR_EACH_PRIORITY limits the iteration to entries with higher
+ * than or equal to the given priority and allows for object lookahead.
+ *
+ * The iteration loop must be completed without entering the OVS RCU quiescent
+ * period.  That is, an old iteration loop must not be continued after any
+ * blocking IO (VLOG is non-blocking, so that is OK).
+ */
+
+#define CPVECTOR_FOR_EACH(PTR, CPVECTOR)                \
+    PVECTOR_FOR_EACH(PTR, cpvector_get_pvector(CPVECTOR))
+
+#define CPVECTOR_FOR_EACH_PRIORITY(PTR, PRIORITY, N, SZ, CPVECTOR)      \
+    PVECTOR_FOR_EACH_PRIORITY(PTR, PRIORITY, N, SZ,                     \
+                              cpvector_get_pvector(CPVECTOR))
+
+#define CPVECTOR_CURSOR_FOR_EACH(PTR, CURSOR, CPVECTOR)                 \
+    PVECTOR_CURSOR_FOR_EACH(PTR, CURSOR, cpvector_get_pvector(CPVECTOR))
+
+#define CPVECTOR_CURSOR_FOR_EACH_CONTINUE(PTR, CURSOR)  \
+    PVECTOR_CURSOR_FOR_EACH_CONTINUE(PTR, CURSOR)
+
+
 /* Inline implementations. */
 
 static inline struct pvector_cursor
-pvector_cursor_init(const struct pvector *pvec,
-                    size_t n_ahead, size_t obj_size)
+pvector_cursor_init(const struct pvector *pvec, size_t n_ahead,
+                    size_t obj_size)
 {
-    const struct pvector_impl *impl;
     struct pvector_cursor cursor;
 
-    impl = ovsrcu_get(struct pvector_impl *, &pvec->impl);
+    ovs_prefetch_range(pvec->vector, pvec->size * sizeof pvec->vector[0]);
 
-    ovs_prefetch_range(impl->vector, impl->size * sizeof impl->vector[0]);
-
-    cursor.size = impl->size;
-    cursor.vector = impl->vector;
+    cursor.size = pvec->size;
+    cursor.vector = pvec->vector;
     cursor.entry_idx = -1;
 
     for (size_t i = 0; i < n_ahead; i++) {
@@ -212,23 +273,29 @@ static inline void pvector_cursor_lookahead(const struct 
pvector_cursor *cursor,
     }
 }
 
-static inline size_t pvector_count(const struct pvector *pvec)
+static inline struct pvector *
+cpvector_get_pvector(const struct cpvector *cpvec)
+{
+    return ovsrcu_get(struct pvector *, &cpvec->impl);
+}
+
+static inline size_t cpvector_count(const struct cpvector *cpvec)
 {
-    return ovsrcu_get(struct pvector_impl *, &pvec->impl)->size;
+    return cpvector_get_pvector(cpvec)->size;
 }
 
-static inline bool pvector_is_empty(const struct pvector *pvec)
+static inline bool cpvector_is_empty(const struct cpvector *cpvec)
 {
-    return pvector_count(pvec) == 0;
+    return cpvector_count(cpvec) == 0;
 }
 
-void pvector_publish__(struct pvector *);
+void cpvector_publish__(struct cpvector *);
 
-/* Make the modified pvector available for iteration. */
-static inline void pvector_publish(struct pvector *pvec)
+/* Make the modified cpvector available for iteration. */
+static inline void cpvector_publish(struct cpvector *cpvec)
 {
-    if (pvec->temp) {
-        pvector_publish__(pvec);
+    if (cpvec->temp) {
+        cpvector_publish__(cpvec);
     }
 }
 
diff --git a/tests/test-classifier.c b/tests/test-classifier.c
index c74c440..3ec2269 100644
--- a/tests/test-classifier.c
+++ b/tests/test-classifier.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -472,12 +472,12 @@ destroy_classifier(struct classifier *cls)
 }
 
 static void
-pvector_verify(const struct pvector *pvec)
+cpvector_verify(const struct cpvector *cpvec)
 {
     void *ptr OVS_UNUSED;
     int prev_priority = INT_MAX;
 
-    PVECTOR_FOR_EACH (ptr, pvec) {
+    CPVECTOR_FOR_EACH (ptr, cpvec) {
         int priority = cursor__.vector[cursor__.entry_idx].priority;
         if (priority > prev_priority) {
             ovs_abort(0, "Priority vector is out of order (%u > %u)",
@@ -533,7 +533,7 @@ check_tables(const struct classifier *cls, int n_tables, 
int n_rules,
     int found_visible_but_removable = 0;
     int found_rules2 = 0;
 
-    pvector_verify(&cls->subtables);
+    cpvector_verify(&cls->subtables);
     CMAP_FOR_EACH (table, cmap_node, &cls->subtables_map) {
         const struct cls_match *head;
         int max_priority = INT_MIN;
@@ -543,7 +543,7 @@ check_tables(const struct classifier *cls, int n_tables, 
int n_rules,
         const struct cls_subtable *iter;
 
         /* Locate the subtable from 'subtables'. */
-        PVECTOR_FOR_EACH (iter, &cls->subtables) {
+        CPVECTOR_FOR_EACH (iter, &cls->subtables) {
             if (iter == table) {
                 if (found) {
                     ovs_abort(0, "Subtable %p duplicated in 'subtables'.",
@@ -647,7 +647,7 @@ check_tables(const struct classifier *cls, int n_tables, 
int n_rules,
     }
 
     assert(found_tables == cmap_count(&cls->subtables_map));
-    assert(found_tables == pvector_count(&cls->subtables));
+    assert(found_tables == cpvector_count(&cls->subtables));
     assert(n_tables == -1 || n_tables == found_tables_with_visible_rules);
     assert(n_rules == -1 || found_rules == n_rules + found_invisible);
     assert(n_dups == -1 || found_dups == n_dups);
-- 
2.1.4

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to