Non-leader revalidator thread uses pthread_barrier_* functions in their main loop to synchronize with leader thread. However, since those threads only call poll_block() intermittently, the poll interval check in poll_block() can wrongly take the time since last call as poll interval and issue the following warnings:
"Unreasonably long XXXXms poll interval". To prevent it, this commit implements the barrier struct and operations for OVS which allow thread waiting on barrier via poll_block(). Signed-off-by: Alex Wang <al...@nicira.com> --- lib/ovs-thread.c | 41 ++++++++++++++++++++++++++++------------- lib/ovs-thread.h | 19 +++++++++++++------ ofproto/ofproto-dpif-upcall.c | 13 ++++++------- 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c index 49b3474..cce0113 100644 --- a/lib/ovs-thread.c +++ b/lib/ovs-thread.c @@ -24,6 +24,7 @@ #include "hash.h" #include "ovs-rcu.h" #include "poll-loop.h" +#include "seq.h" #include "socket-util.h" #include "util.h" @@ -166,10 +167,6 @@ XPTHREAD_FUNC1(pthread_cond_destroy, pthread_cond_t *); XPTHREAD_FUNC1(pthread_cond_signal, pthread_cond_t *); XPTHREAD_FUNC1(pthread_cond_broadcast, pthread_cond_t *); -XPTHREAD_FUNC3(pthread_barrier_init, pthread_barrier_t *, - pthread_barrierattr_t *, unsigned int); -XPTHREAD_FUNC1(pthread_barrier_destroy, pthread_barrier_t *); - XPTHREAD_FUNC2(pthread_join, pthread_t, void **); typedef void destructor_func(void *); @@ -255,20 +252,38 @@ ovs_mutex_cond_wait(pthread_cond_t *cond, const struct ovs_mutex *mutex_) } } -int -xpthread_barrier_wait(pthread_barrier_t *barrier) +void +ovs_barrier_init(struct ovs_barrier *barrier, uint32_t size) { - int error; + barrier->size = size; + atomic_init(&barrier->count, 0); + barrier->seq = seq_create(); +} - ovsrcu_quiesce_start(); - error = pthread_barrier_wait(barrier); - ovsrcu_quiesce_end(); +void +ovs_barrier_destroy(struct ovs_barrier *barrier) +{ + seq_destroy(barrier->seq); +} - if (error && OVS_UNLIKELY(error != PTHREAD_BARRIER_SERIAL_THREAD)) { - ovs_abort(error, "pthread_barrier_wait failed"); +void +ovs_barrier_wait(struct ovs_barrier *barrier) +{ + uint64_t seq = seq_read(barrier->seq); + uint32_t orig; + + atomic_add(&barrier->count, 1, &orig); + if (orig + 1 == barrier->size) { + atomic_store(&barrier->count, 0); + seq_change(barrier->seq); } - return error; + /* To prevent thread from waking up by other event, + * keeps waiting for the seq_change(). */ + while (seq == seq_read(barrier->seq)) { + seq_wait(barrier->seq, seq); + poll_block(); + } } DEFINE_EXTERN_PER_THREAD_DATA(ovsthread_id, 0); diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h index 68db71f..c04d1df 100644 --- a/lib/ovs-thread.h +++ b/lib/ovs-thread.h @@ -23,6 +23,7 @@ #include "ovs-atomic.h" #include "util.h" +struct seq; /* Mutex. */ struct OVS_LOCKABLE ovs_mutex { @@ -30,6 +31,13 @@ struct OVS_LOCKABLE ovs_mutex { const char *where; /* NULL if and only if uninitialized. */ }; +/* Poll-block()-able barrier similar to pthread_barrier_t. */ +struct ovs_barrier { + uint32_t size; /* Number of threads to wait. */ + atomic_uint32_t count; /* Number of threads hitting the barrier. */ + struct seq *seq; +}; + /* "struct ovs_mutex" initializer. */ #ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP #define OVS_MUTEX_INITIALIZER { PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP, \ @@ -139,6 +147,11 @@ int ovs_rwlock_tryrdlock_at(const struct ovs_rwlock *rwlock, const char *where) #define ovs_rwlock_tryrdlock(rwlock) \ ovs_rwlock_tryrdlock_at(rwlock, SOURCE_LOCATOR) +/* ovs_barrier functions analogous to pthread_barrier_*() functions. */ +void ovs_barrier_init(struct ovs_barrier *, uint32_t count); +void ovs_barrier_destroy(struct ovs_barrier *); +void ovs_barrier_wait(struct ovs_barrier *); + /* Wrappers for xpthread_cond_*() that abort the process on any error. * * Use ovs_mutex_cond_wait() to wait for a condition. */ @@ -147,12 +160,6 @@ void xpthread_cond_destroy(pthread_cond_t *); void xpthread_cond_signal(pthread_cond_t *); void xpthread_cond_broadcast(pthread_cond_t *); -/* Wrappers for pthread_barrier_*() that abort the process on any error. */ -void xpthread_barrier_init(pthread_barrier_t *, pthread_barrierattr_t *, - unsigned int count); -int xpthread_barrier_wait(pthread_barrier_t *); -void xpthread_barrier_destroy(pthread_barrier_t *); - void xpthread_key_create(pthread_key_t *, void (*destructor)(void *)); void xpthread_key_delete(pthread_key_t); void xpthread_setspecific(pthread_key_t, const void *); diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index db0f17e..b89855a 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -101,7 +101,7 @@ struct udpif { struct seq *reval_seq; /* Incremented to force revalidation. */ bool need_revalidate; /* As indicated by 'reval_seq'. */ bool reval_exit; /* Set by leader on 'exit_latch. */ - pthread_barrier_t reval_barrier; /* Barrier used by revalidators. */ + struct ovs_barrier reval_barrier; /* Barrier used by revalidators. */ struct dpif_flow_dump *dump; /* DPIF flow dump state. */ long long int dump_duration; /* Duration of the last flow dump. */ struct seq *dump_seq; /* Increments each dump iteration. */ @@ -299,7 +299,7 @@ udpif_stop_threads(struct udpif *udpif) latch_poll(&udpif->exit_latch); - xpthread_barrier_destroy(&udpif->reval_barrier); + ovs_barrier_destroy(&udpif->reval_barrier); free(udpif->revalidators); udpif->revalidators = NULL; @@ -336,8 +336,7 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers, "handler", udpif_upcall_handler, handler); } - xpthread_barrier_init(&udpif->reval_barrier, NULL, - udpif->n_revalidators); + ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators); udpif->reval_exit = false; udpif->revalidators = xzalloc(udpif->n_revalidators * sizeof *udpif->revalidators); @@ -562,18 +561,18 @@ udpif_revalidator(void *arg) } /* Wait for the leader to start the flow dump. */ - xpthread_barrier_wait(&udpif->reval_barrier); + ovs_barrier_wait(&udpif->reval_barrier); if (udpif->reval_exit) { break; } revalidate(revalidator); /* Wait for all flows to have been dumped before we garbage collect. */ - xpthread_barrier_wait(&udpif->reval_barrier); + ovs_barrier_wait(&udpif->reval_barrier); revalidator_sweep(revalidator); /* Wait for all revalidators to finish garbage collection. */ - xpthread_barrier_wait(&udpif->reval_barrier); + ovs_barrier_wait(&udpif->reval_barrier); if (leader) { long long int duration; -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev