Acked-by: Andy Zhou <az...@nicira.com> On Tue, Mar 11, 2014 at 1:56 PM, Ben Pfaff <b...@nicira.com> wrote:
> RCU allows multiple threads to read objects in parallel without any > performance penalty. The following commit will introduce the first use. > > Signed-off-by: Ben Pfaff <b...@nicira.com> > --- > lib/automake.mk | 2 + > lib/ovs-rcu.c | 293 > +++++++++++++++++++++++++++++++++++++++++ > lib/ovs-rcu.h | 170 ++++++++++++++++++++++++ > lib/ovs-thread.c | 16 ++- > lib/ovs-thread.h | 2 + > lib/timeval.c | 11 ++ > ofproto/ofproto-dpif-upcall.c | 5 + > 7 files changed, 498 insertions(+), 1 deletion(-) > create mode 100644 lib/ovs-rcu.c > create mode 100644 lib/ovs-rcu.h > > diff --git a/lib/automake.mk b/lib/automake.mk > index cc04633..a6c614d 100644 > --- a/lib/automake.mk > +++ b/lib/automake.mk > @@ -140,6 +140,8 @@ lib_libopenvswitch_la_SOURCES = \ > lib/ovs-atomic-locked.h \ > lib/ovs-atomic-pthreads.h \ > lib/ovs-atomic.h \ > + lib/ovs-rcu.c \ > + lib/ovs-rcu.h \ > lib/ovs-thread.c \ > lib/ovs-thread.h \ > lib/ovsdb-data.c \ > diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c > new file mode 100644 > index 0000000..ac4513b > --- /dev/null > +++ b/lib/ovs-rcu.c > @@ -0,0 +1,293 @@ > +/* > + * Copyright (c) 2014 Nicira, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include <config.h> > +#include "ovs-rcu.h" > +#include "guarded-list.h" > +#include "list.h" > +#include "ovs-thread.h" > +#include "poll-loop.h" > +#include "seq.h" > + > +struct ovsrcu_cb { > + void (*function)(void *aux); > + void *aux; > +}; > + > +struct ovsrcu_cbset { > + struct list list_node; > + struct ovsrcu_cb cbs[16]; > + int n_cbs; > +}; > + > +struct ovsrcu_perthread { > + struct list list_node; /* In global list. */ > + > + struct ovs_mutex mutex; > + uint64_t seqno; > + struct ovsrcu_cbset *cbset; > +}; > + > +static struct seq *global_seqno; > + > +static pthread_key_t perthread_key; > +static struct list ovsrcu_threads; > +static struct ovs_mutex ovsrcu_threads_mutex; > + > +static struct guarded_list flushed_cbsets; > +static struct seq *flushed_cbsets_seq; > + > +static void ovsrcu_init(void); > +static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); > +static void ovsrcu_unregister__(struct ovsrcu_perthread *); > +static bool ovsrcu_call_postponed(void); > +static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); > +static void ovsrcu_synchronize(void); > + > +static struct ovsrcu_perthread * > +ovsrcu_perthread_get(void) > +{ > + struct ovsrcu_perthread *perthread; > + > + ovsrcu_init(); > + > + perthread = pthread_getspecific(perthread_key); > + if (!perthread) { > + perthread = xmalloc(sizeof *perthread); > + ovs_mutex_init(&perthread->mutex); > + perthread->seqno = seq_read(global_seqno); > + perthread->cbset = NULL; > + > + ovs_mutex_lock(&ovsrcu_threads_mutex); > + list_push_back(&ovsrcu_threads, &perthread->list_node); > + ovs_mutex_unlock(&ovsrcu_threads_mutex); > + > + pthread_setspecific(perthread_key, perthread); > + } > + return perthread; > +} > + > +/* Indicates the end of a quiescent state. See "Details" near the top of > + * ovs-rcu.h. > + * > + * Quiescent states don't stack or nest, so this always ends a quiescent > state > + * even if ovsrcu_quiesce_start() was called multiple times in a row. */ > +void > +ovsrcu_quiesce_end(void) > +{ > + ovsrcu_perthread_get(); > +} > + > +static void > +ovsrcu_quiesced(void) > +{ > + if (single_threaded()) { > + ovsrcu_call_postponed(); > + } else { > + static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; > + if (ovsthread_once_start(&once)) { > + xpthread_create(NULL, NULL, ovsrcu_postpone_thread, NULL); > + ovsthread_once_done(&once); > + } > + } > +} > + > +/* Indicates the beginning of a quiescent state. See "Details" near the > top of > + * ovs-rcu.h. */ > +void > +ovsrcu_quiesce_start(void) > +{ > + struct ovsrcu_perthread *perthread; > + > + ovsrcu_init(); > + perthread = pthread_getspecific(perthread_key); > + if (perthread) { > + pthread_setspecific(perthread_key, NULL); > + ovsrcu_unregister__(perthread); > + } > + > + ovsrcu_quiesced(); > +} > + > +/* Indicates a momentary quiescent state. See "Details" near the top of > + * ovs-rcu.h. */ > +void > +ovsrcu_quiesce(void) > +{ > + ovsrcu_perthread_get()->seqno = seq_read(global_seqno); > + seq_change(global_seqno); > + > + ovsrcu_quiesced(); > +} > + > +static void > +ovsrcu_synchronize(void) > +{ > + uint64_t target_seqno; > + > + if (single_threaded()) { > + return; > + } > + > + target_seqno = seq_read(global_seqno); > + ovsrcu_quiesce_start(); > + > + for (;;) { > + uint64_t cur_seqno = seq_read(global_seqno); > + struct ovsrcu_perthread *perthread; > + bool done = true; > + > + ovs_mutex_lock(&ovsrcu_threads_mutex); > + LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) { > + if (perthread->seqno <= target_seqno) { > + done = false; > + break; > + } > + } > + ovs_mutex_unlock(&ovsrcu_threads_mutex); > + > + if (done) { > + break; > + } > + > + seq_wait(global_seqno, cur_seqno); > + poll_block(); > + } > + ovsrcu_quiesce_end(); > +} > + > +/* Registers 'function' to be called, passing 'aux' as argument, after the > + * next grace period. > + * > + * This function is more conveniently called through the ovsrcu_postpone() > + * macro, which provides a type-safe way to allow 'function''s parameter > to be > + * any pointer type. */ > +void > +ovsrcu_postpone__(void (*function)(void *aux), void *aux) > +{ > + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); > + struct ovsrcu_cbset *cbset; > + struct ovsrcu_cb *cb; > + > + cbset = perthread->cbset; > + if (!cbset) { > + cbset = perthread->cbset = xmalloc(sizeof *perthread->cbset); > + cbset->n_cbs = 0; > + } > + > + cb = &cbset->cbs[cbset->n_cbs++]; > + cb->function = function; > + cb->aux = aux; > + > + if (cbset->n_cbs >= ARRAY_SIZE(cbset->cbs)) { > + ovsrcu_flush_cbset(perthread); > + } > +} > + > +static bool > +ovsrcu_call_postponed(void) > +{ > + struct ovsrcu_cbset *cbset, *next_cbset; > + struct list cbsets; > + > + guarded_list_pop_all(&flushed_cbsets, &cbsets); > + if (list_is_empty(&cbsets)) { > + return false; > + } > + > + ovsrcu_synchronize(); > + > + LIST_FOR_EACH_SAFE (cbset, next_cbset, list_node, &cbsets) { > + struct ovsrcu_cb *cb; > + > + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { > + cb->function(cb->aux); > + } > + list_remove(&cbset->list_node); > + free(cbset); > + } > + > + return true; > +} > + > +static void * > +ovsrcu_postpone_thread(void *arg OVS_UNUSED) > +{ > + pthread_detach(pthread_self()); > + > + for (;;) { > + uint64_t seqno = seq_read(flushed_cbsets_seq); > + if (!ovsrcu_call_postponed()) { > + seq_wait(flushed_cbsets_seq, seqno); > + poll_block(); > + } > + } > + > + OVS_NOT_REACHED(); > +} > + > +static void > +ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) > +{ > + struct ovsrcu_cbset *cbset = perthread->cbset; > + > + if (cbset) { > + guarded_list_push_back(&flushed_cbsets, &cbset->list_node, > SIZE_MAX); > + perthread->cbset = NULL; > + > + seq_change(flushed_cbsets_seq); > + } > +} > + > +static void > +ovsrcu_unregister__(struct ovsrcu_perthread *perthread) > +{ > + if (perthread->cbset) { > + ovsrcu_flush_cbset(perthread); > + } > + > + ovs_mutex_lock(&ovsrcu_threads_mutex); > + list_remove(&perthread->list_node); > + ovs_mutex_unlock(&ovsrcu_threads_mutex); > + > + ovs_mutex_destroy(&perthread->mutex); > + free(perthread); > + > + seq_change(global_seqno); > +} > + > +static void > +ovsrcu_thread_exit_cb(void *perthread) > +{ > + ovsrcu_unregister__(perthread); > +} > + > +static void > +ovsrcu_init(void) > +{ > + static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; > + if (ovsthread_once_start(&once)) { > + global_seqno = seq_create(); > + xpthread_key_create(&perthread_key, ovsrcu_thread_exit_cb); > + list_init(&ovsrcu_threads); > + ovs_mutex_init(&ovsrcu_threads_mutex); > + > + guarded_list_init(&flushed_cbsets); > + flushed_cbsets_seq = seq_create(); > + > + ovsthread_once_done(&once); > + } > +} > diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h > new file mode 100644 > index 0000000..6721273 > --- /dev/null > +++ b/lib/ovs-rcu.h > @@ -0,0 +1,170 @@ > +/* > + * Copyright (c) 2014 Nicira, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#ifndef OVS_RCU_H > +#define OVS_RCU_H 1 > + > +/* Read-Copy-Update (RCU) > + * ====================== > + * > + * Introduction > + * ------------ > + * > + * Atomic pointer access makes it pretty easy to implement lock-free > + * algorithms. There is one big problem, though: when a writer updates a > + * pointer to point to a new data structure, some thread might be reading > the > + * old version, and there's no convenient way to free the old version > when all > + * threads are done with the old version. > + * > + * The function ovsrcu_postpone() solves that problem. The function > pointer > + * passed in as its argument is called only after all threads are done > with old > + * versions of data structures. The function callback frees an old > version of > + * data no longer in use. This technique is called "read-copy-update", > or RCU > + * for short. > + * > + * > + * Details > + * ------- > + * > + * A "quiescent state" is a time at which a thread holds no pointers to > memory > + * that is managed by RCU; that is, when the thread is known not to > reference > + * memory that might be an old version of some object freed via RCU. For > + * example, poll_block() includes a quiescent state, as does > + * ovs_mutex_cond_wait(). > + * > + * The following functions manage the recognition of quiescent states: > + * > + * void ovsrcu_quiesce(void) > + * > + * Recognizes a momentary quiescent state in the current thread. > + * > + * void ovsrcu_quiesce_start(void) > + * void ovsrcu_quiesce_end(void) > + * > + * Brackets a time period during which the current thread is > quiescent. > + * > + * A newly created thread is initially active, not quiescent. > + * > + * When a quiescient state has occurred in every thread, we say that a > "grace > + * period" has occurred. Following a grace period, all of the callbacks > + * postponed before the start of the grace period may be invoked. OVS > takes > + * care of this automatically through the RCU mechanism: while a process > still > + * has only a single thread, it invokes the postponed callbacks directly > from > + * ovsrcu_quiesce() and ovsrcu_quiesce_start(); after additional threads > have > + * been created, it creates an extra helper thread to invoke callbacks. > + * > + * > + * Use > + * --- > + * > + * Use OVSRCU_TYPE(TYPE) to declare a pointer to RCU-protected data, e.g. > the > + * following declares an RCU-protected "struct flow *" named flowp: > + * > + * OVSRCU_TYPE(struct flow *) flowp; > + * > + * Use ovsrcu_get(TYPE, VAR) to read an RCU-protected pointer, e.g. to > read the > + * pointer variable declared above: > + * > + * struct flow *flow = ovsrcu_get(struct flow *, flowp); > + * > + * Use ovsrcu_set() to write an RCU-protected pointer and > ovsrcu_postpone() to > + * free the previous data. If more than one thread can write the > pointer, then > + * some form of external synchronization, e.g. a mutex, is needed to > prevent > + * writers from interfering with one another. For example, to write the > + * pointer variable declared above while safely freeing the old value: > + * > + * static struct ovs_mutex mutex = OVS_MUTEX_INITIALIZER; > + * > + * static void > + * free_flow(struct flow *flow) > + * { > + * free(flow); > + * } > + * > + * void > + * change_flow(struct flow *new_flow) > + * { > + * ovs_mutex_lock(&mutex); > + * ovsrcu_postpone(free_flow, ovsrcu_get(struct flow *, &flowp)); > + * ovsrcu_set(&flowp, new_flow); > + * ovs_mutex_unlock(&mutex); > I assume the mutex lock and unlock should have embeded memory barrier operations to make sure all previous writes are visble. If true, then ovsrcu_get() and ovsrcu_set() does not seem necessary in this example. They are correct though. > + * } > + * > + */ > + > +#include "compiler.h" > +#include "ovs-atomic.h" > + > +/* Use OVSRCU_TYPE(TYPE) to declare a pointer to RCU-protected data, e.g. > the > + * following declares an RCU-protected "struct flow *" named flowp: > + * > + * OVSRCU_TYPE(struct flow *) flowp; > + * > + * Use ovsrcu_get(TYPE, VAR) to read an RCU-protected pointer, e.g. to > read the > + * pointer variable declared above: > + * > + * struct flow *flow = ovsrcu_get(struct flow *, flowp); > + * > + * (With GNU C or Clang, you get a compiler error if TYPE is wrong; other > + * compilers will merrily carry along accepting the wrong type.) > + */ > +#if __GNUC__ > +#define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } > +#define ovsrcu_get__(TYPE, VAR) \ > + ({ \ > + TYPE value__; \ > + \ > + atomic_read_explicit(CONST_CAST(ATOMIC(TYPE) *, &(VAR)->p), \ > + &value__, memory_order_consume); \ > + \ > + value__; \ > + }) > +#define ovsrcu_get(TYPE, VAR) CONST_CAST(TYPE, ovsrcu_get__(TYPE, VAR)) > +#else /* not GNU C */ > +typedef struct ovsrcu_pointer { ATOMIC(void *) p; }; > +#define OVSRCU_TYPE(TYPE) struct ovsrcu_pointer > +static inline void * > +ovsrcu_get__(const struct ovsrcu_pointer *pointer) > +{ > + void *value; > + atomic_read_explicit(&CONST_CAST(struct ovsrcu_pointer *, pointer)->p, > + &value, memory_order_consume); > + return value; > +} > +#define ovsrcu_get(TYPE, VAR) CONST_CAST(TYPE, ovsrcu_get__(VAR)) > +#endif > + > +/* Writes VALUE to the RCU-protected pointer whose address is VAR. > + * > + * Users require external synchronization (e.g. a mutex). See "Usage" > above > + * for an example. */ > +#define ovsrcu_set(VAR, VALUE) \ > + atomic_store_explicit(&(VAR)->p, VALUE, memory_order_release) > + > +/* Calls FUNCTION passing ARG as its pointer-type argument following the > next > + * grace period. See "Usage" above for example. */ > +void ovsrcu_postpone__(void (*function)(void *aux), void *aux); > +#define ovsrcu_postpone(FUNCTION, ARG) \ > + ((void) sizeof((FUNCTION)(ARG), 1), I suppose the idea of using sizeof is for type checking. It is not obvious to me why "1" required. > \ > + (void) sizeof(*(ARG)), \ > + ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) > + > +/* Quiescent states. */ > +void ovsrcu_quiesce_start(void); > +void ovsrcu_quiesce_end(void); > +void ovsrcu_quiesce(void); > + > +#endif /* ovs-rcu.h */ > diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c > index b6b51c7..33b9e08 100644 > --- a/lib/ovs-thread.c > +++ b/lib/ovs-thread.c > @@ -22,6 +22,7 @@ > #include <unistd.h> > #include "compiler.h" > #include "hash.h" > +#include "ovs-rcu.h" > #include "poll-loop.h" > #include "socket-util.h" > #include "util.h" > @@ -223,7 +224,12 @@ void > ovs_mutex_cond_wait(pthread_cond_t *cond, const struct ovs_mutex *mutex_) > { > struct ovs_mutex *mutex = CONST_CAST(struct ovs_mutex *, mutex_); > - int error = pthread_cond_wait(cond, &mutex->lock); > + int error; > + > + ovsrcu_quiesce_start(); > + error = pthread_cond_wait(cond, &mutex->lock); > + ovsrcu_quiesce_end(); > + > if (OVS_UNLIKELY(error)) { > ovs_abort(error, "pthread_cond_wait failed"); > } > @@ -264,6 +270,7 @@ ovsthread_wrapper(void *aux_) > aux = *auxp; > free(auxp); > > + ovsrcu_quiesce_end(); > return aux.start(aux.arg); > } > > @@ -277,6 +284,7 @@ xpthread_create(pthread_t *threadp, pthread_attr_t > *attr, > > forbid_forking("multiple threads exist"); > multithreaded = true; > + ovsrcu_quiesce_end(); > > aux = xmalloc(sizeof *aux); > aux->start = start; > @@ -307,6 +315,12 @@ ovsthread_once_done(struct ovsthread_once *once) > ovs_mutex_unlock(&once->mutex); > } > > +bool > +single_threaded(void) > +{ > + return !multithreaded; > +} > + > /* Asserts that the process has not yet created any threads (beyond the > initial > * thread). > * > diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h > index 8868c51..f489308 100644 > --- a/lib/ovs-thread.h > +++ b/lib/ovs-thread.h > @@ -598,6 +598,8 @@ void ovsthread_counter_inc(struct ovsthread_counter *, > unsigned long long int); > unsigned long long int ovsthread_counter_read( > const struct ovsthread_counter *); > > +bool single_threaded(void); > + > void assert_single_threaded_at(const char *where); > #define assert_single_threaded() assert_single_threaded_at(SOURCE_LOCATOR) > > diff --git a/lib/timeval.c b/lib/timeval.c > index 74efa59..e794cfd 100644 > --- a/lib/timeval.c > +++ b/lib/timeval.c > @@ -31,6 +31,7 @@ > #include "fatal-signal.h" > #include "hash.h" > #include "hmap.h" > +#include "ovs-rcu.h" > #include "ovs-thread.h" > #include "signals.h" > #include "seq.h" > @@ -286,6 +287,12 @@ time_poll(struct pollfd *pollfds, int n_pollfds, > HANDLE *handles OVS_UNUSED, > time_left = timeout_when - now; > } > > + if (!time_left) { > + ovsrcu_quiesce(); > + } else { > + ovsrcu_quiesce_start(); > + } > + > #ifndef _WIN32 > retval = poll(pollfds, n_pollfds, time_left); > if (retval < 0) { > @@ -306,6 +313,10 @@ time_poll(struct pollfd *pollfds, int n_pollfds, > HANDLE *handles OVS_UNUSED, > } > #endif > > + if (time_left) { > + ovsrcu_quiesce_end(); > + } > + > if (deadline <= time_msec()) { > #ifndef _WIN32 > fatal_signal_handler(SIGALRM); > diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c > index 39f47c7..0cc9148 100644 > --- a/ofproto/ofproto-dpif-upcall.c > +++ b/ofproto/ofproto-dpif-upcall.c > @@ -32,6 +32,7 @@ > #include "ofproto-dpif-ipfix.h" > #include "ofproto-dpif-sflow.h" > #include "ofproto-dpif-xlate.h" > +#include "ovs-rcu.h" > #include "packets.h" > #include "poll-loop.h" > #include "seq.h" > @@ -298,6 +299,8 @@ void > udpif_set_threads(struct udpif *udpif, size_t n_handlers, > size_t n_revalidators) > { > + ovsrcu_quiesce_start(); > + > /* Stop the old threads (if any). */ > if (udpif->handlers && > (udpif->n_handlers != n_handlers > @@ -406,6 +409,8 @@ udpif_set_threads(struct udpif *udpif, size_t > n_handlers, > xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, > udpif); > xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, > udpif); > } > + > + ovsrcu_quiesce_end(); > } > > /* Waits for all ongoing upcall translations to complete. This ensures > that > -- > 1.7.10.4 > > _______________________________________________ > dev mailing list > dev@openvswitch.org > http://openvswitch.org/mailman/listinfo/dev >
_______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev