Non-leader revalidator thread uses pthread_barrier_* functions in their
main loop to synchronize with leader thread.  However, since those threads
only call poll_block() intermittently, the poll interval check in
poll_block() can wrongly take the time since last call as poll interval
and issue the following warnings:

"Unreasonably long XXXXms poll interval".

To prevent it, this commit implements the barrier struct and operations
for OVS which allow thread waiting on barrier via poll_block().

Signed-off-by: Alex Wang <al...@nicira.com>
---
 lib/ovs-thread.c              |   41 ++++++++++++++++++++++++++++-------------
 lib/ovs-thread.h              |   19 +++++++++++++------
 ofproto/ofproto-dpif-upcall.c |   13 ++++++-------
 3 files changed, 47 insertions(+), 26 deletions(-)

diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
index 49b3474..cce0113 100644
--- a/lib/ovs-thread.c
+++ b/lib/ovs-thread.c
@@ -24,6 +24,7 @@
 #include "hash.h"
 #include "ovs-rcu.h"
 #include "poll-loop.h"
+#include "seq.h"
 #include "socket-util.h"
 #include "util.h"
 
@@ -166,10 +167,6 @@ XPTHREAD_FUNC1(pthread_cond_destroy, pthread_cond_t *);
 XPTHREAD_FUNC1(pthread_cond_signal, pthread_cond_t *);
 XPTHREAD_FUNC1(pthread_cond_broadcast, pthread_cond_t *);
 
-XPTHREAD_FUNC3(pthread_barrier_init, pthread_barrier_t *,
-               pthread_barrierattr_t *, unsigned int);
-XPTHREAD_FUNC1(pthread_barrier_destroy, pthread_barrier_t *);
-
 XPTHREAD_FUNC2(pthread_join, pthread_t, void **);
 
 typedef void destructor_func(void *);
@@ -255,20 +252,38 @@ ovs_mutex_cond_wait(pthread_cond_t *cond, const struct 
ovs_mutex *mutex_)
     }
 }
 
-int
-xpthread_barrier_wait(pthread_barrier_t *barrier)
+void
+ovs_barrier_init(struct ovs_barrier *barrier, uint32_t size)
 {
-    int error;
+    barrier->size = size;
+    atomic_init(&barrier->count, 0);
+    barrier->seq = seq_create();
+}
 
-    ovsrcu_quiesce_start();
-    error = pthread_barrier_wait(barrier);
-    ovsrcu_quiesce_end();
+void
+ovs_barrier_destroy(struct ovs_barrier *barrier)
+{
+    seq_destroy(barrier->seq);
+}
 
-    if (error && OVS_UNLIKELY(error != PTHREAD_BARRIER_SERIAL_THREAD)) {
-        ovs_abort(error, "pthread_barrier_wait failed");
+void
+ovs_barrier_wait(struct ovs_barrier *barrier)
+{
+    uint64_t seq = seq_read(barrier->seq);
+    uint32_t orig;
+
+    atomic_add(&barrier->count, 1, &orig);
+    if (orig + 1 == barrier->size) {
+        atomic_store(&barrier->count, 0);
+        seq_change(barrier->seq);
     }
 
-    return error;
+    /* To prevent thread from waking up by other event,
+     * keeps waiting for the seq_change(). */
+    while (seq == seq_read(barrier->seq)) {
+        seq_wait(barrier->seq, seq);
+        poll_block();
+    }
 }
 
 DEFINE_EXTERN_PER_THREAD_DATA(ovsthread_id, 0);
diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
index 68db71f..c04d1df 100644
--- a/lib/ovs-thread.h
+++ b/lib/ovs-thread.h
@@ -23,6 +23,7 @@
 #include "ovs-atomic.h"
 #include "util.h"
 
+struct seq;
 
 /* Mutex. */
 struct OVS_LOCKABLE ovs_mutex {
@@ -30,6 +31,13 @@ struct OVS_LOCKABLE ovs_mutex {
     const char *where;          /* NULL if and only if uninitialized. */
 };
 
+/* Poll-block()-able barrier similar to pthread_barrier_t. */
+struct ovs_barrier {
+    uint32_t size;            /* Number of threads to wait. */
+    atomic_uint32_t count;    /* Number of threads hitting the barrier. */
+    struct seq *seq;
+};
+
 /* "struct ovs_mutex" initializer. */
 #ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
 #define OVS_MUTEX_INITIALIZER { PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP, \
@@ -139,6 +147,11 @@ int ovs_rwlock_tryrdlock_at(const struct ovs_rwlock 
*rwlock, const char *where)
 #define ovs_rwlock_tryrdlock(rwlock) \
         ovs_rwlock_tryrdlock_at(rwlock, SOURCE_LOCATOR)
 
+/* ovs_barrier functions analogous to pthread_barrier_*() functions. */
+void ovs_barrier_init(struct ovs_barrier *, uint32_t count);
+void ovs_barrier_destroy(struct ovs_barrier *);
+void ovs_barrier_wait(struct ovs_barrier *);
+
 /* Wrappers for xpthread_cond_*() that abort the process on any error.
  *
  * Use ovs_mutex_cond_wait() to wait for a condition. */
@@ -147,12 +160,6 @@ void xpthread_cond_destroy(pthread_cond_t *);
 void xpthread_cond_signal(pthread_cond_t *);
 void xpthread_cond_broadcast(pthread_cond_t *);
 
-/* Wrappers for pthread_barrier_*() that abort the process on any error. */
-void xpthread_barrier_init(pthread_barrier_t *, pthread_barrierattr_t *,
-                           unsigned int count);
-int xpthread_barrier_wait(pthread_barrier_t *);
-void xpthread_barrier_destroy(pthread_barrier_t *);
-
 void xpthread_key_create(pthread_key_t *, void (*destructor)(void *));
 void xpthread_key_delete(pthread_key_t);
 void xpthread_setspecific(pthread_key_t, const void *);
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index db0f17e..b89855a 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -101,7 +101,7 @@ struct udpif {
     struct seq *reval_seq;             /* Incremented to force revalidation. */
     bool need_revalidate;              /* As indicated by 'reval_seq'. */
     bool reval_exit;                   /* Set by leader on 'exit_latch. */
-    pthread_barrier_t reval_barrier;   /* Barrier used by revalidators. */
+    struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
     long long int dump_duration;       /* Duration of the last flow dump. */
     struct seq *dump_seq;              /* Increments each dump iteration. */
@@ -299,7 +299,7 @@ udpif_stop_threads(struct udpif *udpif)
 
         latch_poll(&udpif->exit_latch);
 
-        xpthread_barrier_destroy(&udpif->reval_barrier);
+        ovs_barrier_destroy(&udpif->reval_barrier);
 
         free(udpif->revalidators);
         udpif->revalidators = NULL;
@@ -336,8 +336,7 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers,
                 "handler", udpif_upcall_handler, handler);
         }
 
-        xpthread_barrier_init(&udpif->reval_barrier, NULL,
-                              udpif->n_revalidators);
+        ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
         udpif->reval_exit = false;
         udpif->revalidators = xzalloc(udpif->n_revalidators
                                       * sizeof *udpif->revalidators);
@@ -562,18 +561,18 @@ udpif_revalidator(void *arg)
         }
 
         /* Wait for the leader to start the flow dump. */
-        xpthread_barrier_wait(&udpif->reval_barrier);
+        ovs_barrier_wait(&udpif->reval_barrier);
         if (udpif->reval_exit) {
             break;
         }
         revalidate(revalidator);
 
         /* Wait for all flows to have been dumped before we garbage collect. */
-        xpthread_barrier_wait(&udpif->reval_barrier);
+        ovs_barrier_wait(&udpif->reval_barrier);
         revalidator_sweep(revalidator);
 
         /* Wait for all revalidators to finish garbage collection. */
-        xpthread_barrier_wait(&udpif->reval_barrier);
+        ovs_barrier_wait(&udpif->reval_barrier);
 
         if (leader) {
             long long int duration;
-- 
1.7.9.5

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to