On Wed, May 15, 2013 at 11:48 PM, Paolo Bonzini <pbonz...@redhat.com> wrote: > This includes a (mangled) copy of the urcu-qsbr code from liburcu. > The main changes are: 1) removing dependencies on many other header files > in liburcu; 2) removing for simplicity the tentative busy waiting in > synchronize_rcu, which has limited performance effects; 3) replacing > futexes in synchronize_rcu with QemuEvents for Win32 portability. > The API is the same as liburcu, so it should be possible in the future > to require liburcu on POSIX systems for example and use our copy only > on Windows. > > Among the various versions available I chose urcu-qsbr, which has the > fastest rcu_read_{lock,unlock} but requires the program to manually > annotate quiescent points or intervals. QEMU threads usually have easily > identified quiescent periods, so this should not be a problem. > > Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> > --- > docs/rcu.txt | 301 > +++++++++++++++++++++++++++++++++++++++++++++ > hw/9pfs/virtio-9p-synth.c | 1 + > include/qemu/queue.h | 13 ++ > include/qemu/rcu-pointer.h | 110 +++++++++++++++++ > include/qemu/rcu.h | 168 +++++++++++++++++++++++++ > include/qemu/thread.h | 3 - > libcacard/Makefile | 3 +- > util/Makefile.objs | 1 + > util/rcu.c | 203 ++++++++++++++++++++++++++++++ > 9 files changed, 799 insertions(+), 4 deletions(-) > create mode 100644 docs/rcu.txt > create mode 100644 include/qemu/rcu-pointer.h > create mode 100644 include/qemu/rcu.h > create mode 100644 util/rcu.c > > diff --git a/docs/rcu.txt b/docs/rcu.txt > new file mode 100644 > index 0000000..19e4840 > --- /dev/null > +++ b/docs/rcu.txt > @@ -0,0 +1,301 @@ > +Using RCU (Read-Copy-Update) for synchronization > +================================================ > + > +Read-copy update (RCU) is a synchronization mechanism that is used to > +protect read-mostly data structures. RCU is very efficient and scalable > +on the read side (it is wait-free), and thus can make the read paths > +extremely fast. > + > +RCU supports concurrency between a single writer and multiple readers, > +thus it is not used alone. Typically, the write-side will use a lock to > +serialize multiple updates, but other approaches are possible (e.g., > +restricting updates to a single task). In QEMU, when a lock is used, > +this will often be the "iothread mutex", also known as the "big QEMU > +lock" (BQL). Also, restricting updates to a single task is done in > +QEMU using the "bottom half" API. > + > +RCU is fundamentally a "wait-to-finish" mechanism. The read side marks > +sections of code with "critical sections", and the update side will wait > +for the execution of all *currently running* critical sections before > +proceeding, or before asynchronously executing a callback. > + > +The key point here is that only the currently running critical sections > +are waited for; critical sections that are started _after_ the beginning > +of the wait do not extend the wait, despite running concurrently with > +the updater. This is the reason why RCU is more scalable than, > +for example, reader-writer locks. It is so much more scalable that > +the system will have a single instance of the RCU mechanism; a single > +mechanism can be used for an arbitrary number of "things", without > +having to worry about things such as contention or deadlocks. > + > +How is this possible? The basic idea is to split updates in two phases, > +"removal" and "reclamation". During removal, we ensure that subsequent > +readers will not be able to get a reference to the old data. After > +removal has completed, a critical section will not be able to access > +the old data. Therefore, critical sections that begin after removal > +do not matter; as soon as all previous critical sections have finished, > +there cannot be any readers who hold references to the data structure, > +which may not be safely reclaimed (e.g., freed or unref'ed). > + > +Here is a picutre: > + > + thread 1 thread 2 thread 3 > + ------------------- ------------------------ ------------------- > + enter RCU crit.sec. > + | finish removal phase > + | begin wait > + | | enter RCU crit.sec. > + exit RCU crit.sec | | > + complete wait | > + begin reclamation phase | > + exit RCU crit.sec. > + > + > +Note how thread 3 is still executing its critical section when thread 2 > +starts reclaiming data. This is possible, because the old version of the > +data structure was not accessible at the time thread 3 began executing > +that critical section. > + > + > +RCU API > +======= > + > +The core RCU API is small: > + > + void rcu_read_lock(void); > + > + Used by a reader to inform the reclaimer that the reader is > + entering an RCU read-side critical section. > + > + void rcu_read_unlock(void); > + > + Used by a reader to inform the reclaimer that the reader is > + exiting an RCU read-side critical section. Note that RCU > + read-side critical sections may be nested and/or overlapping. > + > + void synchronize_rcu(void); > + > + Blocks until all pre-existing RCU read-side critical sections > + on all threads have completed. This marks the end of the removal > + phase and the beginning of reclamation phase. > + > + Note that it would be valid for another update to come while > + synchronize_rcu is running. Because of this, it is better that > + the updater releases any locks it may hold before calling > + synchronize_rcu. > + > + typeof(*p) rcu_dereference(p); > + typeof(p) rcu_assign_pointer(p, typeof(p) v); > + > + These macros are similar to atomic_mb_read() and atomic_mb_set() > + respectively. However, they make some assumptions on the code > + that calls them, which allows a more optimized implementation. > + > + rcu_assign_pointer assumes that the update side is not going > + to read from the data structure after "publishing" the new > + values; that is, it assumes that all assignments happen at > + the very end of the removal phase. > + > + rcu_dereference assumes that whenever a single RCU critical > + section reads multiple shared data, these reads are either > + data-dependent or need no ordering. This is almost always the > + case when using RCU. If this were not the case, you can use > + atomic_mb_read() or smp_rmb(). > + > + If you are going to be fetching multiple fields from the > + RCU-protected structure, repeated rcu_dereference() calls > + would look ugly and incur unnecessary overhead on Alpha CPUs. > + You can then do this: > + > + p = &rcu_dereference(head); > + foo = head->foo; > + bar = head->bar; > + > + > +RCU QUIESCENT STATES > +==================== > + > +An efficient implementation of rcu_read_lock() and rcu_read_unlock() > +relies on the availability of fast thread-local storage. Unfortunately, > +this is not possible on all the systems supported by QEMU (in particular > +on many POSIX systems other than Linux and Solaris). > + > +For this reason, QEMU's RCU implementation resorts to manual annotation > +of "quiescent states", i.e. points where no RCU read-side critical > +section can be active. All threads that participate in the RCU mechanism > +need to annotate such points. > + > +Marking quiescent states is done with the following three APIs: > + > + void rcu_quiescent_state(void); > + > + Marks a point in the execution of the current thread where no > + RCU read-side critical section can be active. > + > + void rcu_thread_offline(void); > + > + Marks the beginning of an "extended quiescent state" for the > + current thread, i.e. an interval of time during which no > + RCU read-side critical section can be active. > + > + void rcu_thread_online(void); > + > + Marks the end of an extended quiescent state for the current > + thread. > + > + > +Furthermore, threads that participate in the RCU mechanism must communicate > +this fact using the following APIs: > + > + void rcu_register_thread(void); > + > + Mark a thread as taking part in the RCU mechanism. Such a thread > + will have to report quiescent points regularly, either manually > + or through the QemuCond/QemuSemaphore/QemuEvent APIs. > + > + void rcu_unregister_thread(void); > + > + Mark a thread as not taking part anymore in the RCU mechanism. > + It is not a problem if such a thread reports quiescent points, > + either manually or by using the QemuCond/QemuSemaphore/QemuEvent > + APIs. > + > +Note that these APIs are relatively heavyweight, and should _not_ be > +nested. > + > + > +DIFFERENCES WITH LINUX > +====================== > + > +- Sleeping is possible, though discouraged, within an RCU critical section. > + > +- rcu_dereference takes a _pointer_ to the variable being accessed. > + Wrong usage will be detected by the compiler. > + > +- Quiescent points must be marked explicitly unless the thread uses > + condvars/semaphores/events for synchronization. > + > + > +RCU PATTERNS > +============ > + > +Many patterns using read-writer locks translate directly to RCU, with > +the advantages of higher scalability and deadlock immunity. > + > +In general, RCU can be used whenever it is possible to create a new > +"version" of a data structure every time the updater runs. This may > +sound like a very strict restriction, however: > + > +- the updater does not mean "everything that writes to a data structure", > + but rather "everything that involves a reclamation step". See the > + array example below > + > +- in some cases, creating a new version of a data structure may actually > + be very cheap. For example, modifying the "next" pointer of a singly > + linked list is effectively creating a new version of the list. > + > + > +them however are worth noting. > + > +RCU list processing > +------------------- > + > +TBD (not yet used in QEMU) > + > + > +RCU reference counting > +---------------------- > + > +Because grace periods are not allowed to complete while there is an RCU > +read-side critical section in progress, the RCU read-side primitives > +may be used as a restricted reference-counting mechanism. For example, > +consider the following code fragment: > + > + rcu_read_lock(); > + p = rcu_dereference(&foo); > + /* do something with p. */ > + rcu_read_unlock(); > + > +The RCU read-side critical section ensures that the value of "p" remains > +valid until after the rcu_read_unlock(). In some sense, it is acquiring > +a reference to p that is later released when the critical section ends. > +The write side looks simply like this (with appropriate locking): > + > + qemu_mutex_lock(&foo_mutex); > + old = foo; > + rcu_assign_pointer(foo, new); > + qemu_mutex_unlock(&foo_mutex); > + synchronize_rcu(); > + free(old); > + > +Note that the same idiom would be possible with reader/writer > +locks: > + > + read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock); > + p = foo; p = foo; > + /* do something with p. */ foo = new; > + read_unlock(&foo_rwlock); free(p); > + write_mutex_unlock(&foo_rwlock); > + free(p); > + > + > +RCU resizable arrays > +-------------------- > + > +Resizable arrays can be used with RCU. The expensive RCU synchronization > +only needs to take place when the array is resized. The two items to > +take care of are: > + > +- ensuring that the old version of the array is available between removal > + and reclamation; > + > +- avoiding mismatches in the read side between the array data and the > + array size. > + > +The first problem is avoided simply by not using realloc. Instead, > +each resize will allocate a new array and copy the old data into it. > +The second problem would arise if the size and the data pointers were > +two members of a larger struct: > + > + struct mystuff { > + ... > + int data_size; > + int data_alloc; > + T *data; > + ... > + }; > + > +Instead, we store the size of the array with the array itself: > + > + struct arr { > + int size; > + int alloc; > + T data[]; > + }; > + struct arr *global_array; > + > + read side: > + rcu_read_lock(); > + struct arr *array = rcu_dereference(&global_array); > + x = i < array->size ? array->data[i] : -1; > + rcu_read_unlock(); > + return x; > + > + write side (running under a lock): > + if (global_array->size == global_array->alloc) { > + /* Creating a new version. */ > + new_array = g_malloc(sizeof(struct arr) + > + global_array->alloc * 2 * sizeof(T)); > + new_array->size = global_array->size; > + new_array->alloc = global_array->alloc * 2; > + memcpy(new_array->data, global_array->data, > + global_array->alloc * sizeof(T)); > + > + /* Removal phase. */ > + old_array = global_array; > + rcu_assign_pointer(new_array->data, new_array); > + synchronize_rcu(); > + > + /* Reclamation phase. */ > + free(old_array); > + } > diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c > index 840e4eb..d5f5842 100644 > --- a/hw/9pfs/virtio-9p-synth.c > +++ b/hw/9pfs/virtio-9p-synth.c > @@ -17,6 +17,7 @@ > #include "virtio-9p-xattr.h" > #include "fsdev/qemu-fsdev.h" > #include "virtio-9p-synth.h" > +#include "util/rcu.h" > > #include <sys/stat.h> > > diff --git a/include/qemu/queue.h b/include/qemu/queue.h > index d433b90..847ddd1 100644 > --- a/include/qemu/queue.h > +++ b/include/qemu/queue.h > @@ -104,6 +104,19 @@ struct { > \ > (head)->lh_first = NULL; \ > } while (/*CONSTCOND*/0) > > +#define QLIST_SWAP(dstlist, srclist, field) do { \ > + void *tmplist; \ > + tmplist = (srclist)->lh_first; \ > + (srclist)->lh_first = (dstlist)->lh_first; \ > + if ((srclist)->lh_first != NULL) { \ > + (srclist)->lh_first->field.le_prev = &(srclist)->lh_first; \ > + } \ > + (dstlist)->lh_first = tmplist; \ > + if ((dstlist)->lh_first != NULL) { \ > + (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first; \ > + } \ > +} while (/*CONSTCOND*/0) > + > #define QLIST_INSERT_AFTER(listelm, elm, field) do { \ > if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \ > (listelm)->field.le_next->field.le_prev = \ > diff --git a/include/qemu/rcu-pointer.h b/include/qemu/rcu-pointer.h > new file mode 100644 > index 0000000..0e6417c > --- /dev/null > +++ b/include/qemu/rcu-pointer.h > @@ -0,0 +1,110 @@ > +#ifndef _URCU_POINTER_STATIC_H > +#define _URCU_POINTER_STATIC_H > + > +/* > + * urcu-pointer-static.h > + * > + * Userspace RCU header. Operations on pointers. > + * > + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu-pointer.h for > + * linking dynamically with the userspace rcu library. > + * > + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoy...@efficios.com> > + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation. > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, write to the Free Software > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 > USA > + * > + * IBM's contributions to this file may be relicensed under LGPLv2 or later. > + */ > + > +#include "compiler.h" > +#include "qemu/atomic.h" > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +/** > + * rcu_dereference - reads (copy) a RCU-protected pointer to a local variable > + * into a RCU read-side critical section. The pointer can later be safely > + * dereferenced within the critical section. > + * > + * This ensures that the pointer copy is invariant thorough the whole > critical > + * section. > + * > + * Inserts memory barriers on architectures that require them (currently only > + * Alpha) and documents which pointers are protected by RCU. > + * > + * The compiler memory barrier in atomic_read() ensures that > value-speculative > + * optimizations (e.g. VSS: Value Speculation Scheduling) does not perform > the > + * data read before the pointer read by speculating the value of the pointer. > + * Correct ordering is ensured because the pointer is read as a volatile > access. > + * This acts as a global side-effect operation, which forbids reordering of > + * dependent memory operations. Note that such concern about > dependency-breaking > + * optimizations will eventually be taken care of by the > "memory_order_consume" > + * addition to forthcoming C++ standard. > + * > + * Should match rcu_assign_pointer() or rcu_xchg_pointer(). > + */ > + > +#define rcu_dereference(p) \ > + ({ \ > + typeof(p) _p1 = (p); \ > + smp_read_barrier_depends(); \ > + *(_p1); \ > + }) > + > +/** > + * rcu_cmpxchg_pointer - same as rcu_set_pointer, but tests if the pointer > + * is as expected by "old". If succeeds, returns the previous pointer to the > + * data structure, which can be safely freed after waiting for a quiescent > state > + * using synchronize_rcu(). If fails (unexpected value), returns old (which > + * should not be freed !). > + */ > + > +#define rcu_cmpxchg_pointer(p, old, _new) \ > + ({ \ > + typeof(*p) _pold = (old); \ > + typeof(*p) _pnew = (_new); \ > + atomic_cmpxchg(p, _pold, _pnew); \ > + }) > + > +#define rcu_set_pointer(p, v) \ > + ({ \ > + typeof(*p) _pv = (v); \ > + smp_wmb(); \ > + atomic_set(p, _pv); \ > + }) > + > +/** > + * rcu_assign_pointer - assign (publicize) a pointer to a new data structure > + * meant to be read by RCU read-side critical sections. Returns the assigned > + * value. > + * > + * Documents which pointers will be dereferenced by RCU read-side critical > + * sections and adds the required memory barriers on architectures requiring > + * them. It also makes sure the compiler does not reorder code initializing > the > + * data structure before its publication. > + * > + * Should match rcu_dereference(). > + */ > + > +#define rcu_assign_pointer(p, v) rcu_set_pointer(&(p), v) > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _URCU_POINTER_STATIC_H */ > diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h > new file mode 100644 > index 0000000..b875593 > --- /dev/null > +++ b/include/qemu/rcu.h > @@ -0,0 +1,168 @@ > +#ifndef _URCU_QSBR_H > +#define _URCU_QSBR_H > + > +/* > + * urcu-qsbr.h > + * > + * Userspace RCU QSBR header. > + * > + * LGPL-compatible code should include this header with : > + * > + * #define _LGPL_SOURCE > + * #include <urcu.h> > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, write to the Free Software > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 > USA > + * > + * IBM's contributions to this file may be relicensed under LGPLv2 or later. > + */ > + > +#include <stdlib.h> > +#include <assert.h> > +#include <limits.h> > +#include <unistd.h> > +#include <stdint.h> > +#include <stdbool.h> > +#include <glib.h> > + > +#include "qemu/compiler.h" > +#include "qemu/rcu-pointer.h" > +#include "qemu/thread.h" > +#include "qemu/queue.h" > +#include "qemu/atomic.h" > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +/* > + * Important ! > + * > + * Each thread containing read-side critical sections must be registered > + * with rcu_register_thread() before calling rcu_read_lock(). > + * rcu_unregister_thread() should be called before the thread exits. > + */ > + > +#ifdef DEBUG_RCU > +#define rcu_assert(args...) assert(args) > +#else > +#define rcu_assert(args...) > +#endif > + > +#define RCU_GP_ONLINE (1UL << 0) > +#define RCU_GP_CTR (1UL << 1) > + > +/* > + * Global quiescent period counter with low-order bits unused. > + * Using a int rather than a char to eliminate false register dependencies > + * causing stalls on some architectures. > + */ > +extern unsigned long rcu_gp_ctr; > + > +extern QemuEvent rcu_gp_event; > + > +struct rcu_reader_data { > + /* Data used by both reader and synchronize_rcu() */ > + unsigned long ctr; > + bool waiting; > + > + /* Data used for registry, protected by rcu_gp_lock */ > + QLIST_ENTRY(rcu_reader_data) node; > +}; > + > +#ifdef __linux__ > +extern __thread struct rcu_reader_data rcu_reader; > +#define DEFINE_RCU_READER() \ > + __thread struct rcu_reader_data rcu_reader > + > +static inline struct rcu_reader_data *get_rcu_reader(void) > +{ > + return &rcu_reader; > +} > + > +static inline void alloc_rcu_reader(void) > +{ > +} > +#else > +extern GPrivate rcu_reader_key; > +#define DEFINE_RCU_READER() \ > + GPrivate rcu_reader_key = G_PRIVATE_INIT(g_free) > + > +static inline struct rcu_reader_data *get_rcu_reader(void) > +{ > + return g_private_get(&rcu_reader_key); > +} > + > +static inline void alloc_rcu_reader(void) > +{ > + g_private_replace(&rcu_reader_key, > + g_malloc0(sizeof(struct rcu_reader_data))); > +} > +#endif > + > +static inline void rcu_read_lock(void) > +{ > + rcu_assert(get_rcu_reader()->ctr); > +} > + > +static inline void rcu_read_unlock(void) > +{ > + /* Ensure that the previous reads complete before starting those > + * in another critical section. > + */ > + smp_rmb(); > +} > + > +static inline void rcu_quiescent_state(void) > +{ > + struct rcu_reader_data *p_rcu_reader = get_rcu_reader(); > + > + /* Reuses smp_rmb() in the last rcu_read_unlock(). */ > + unsigned ctr = atomic_read(&rcu_gp_ctr); > + atomic_xchg(&p_rcu_reader->ctr, ctr); > + if (atomic_read(&p_rcu_reader->waiting)) { > + atomic_set(&p_rcu_reader->waiting, false); > + qemu_event_set(&rcu_gp_event); > + } > +} > + > +static inline void rcu_thread_offline(void) > +{ > + struct rcu_reader_data *p_rcu_reader = get_rcu_reader(); > + > + atomic_xchg(&p_rcu_reader->ctr, 0); > + if (atomic_read(&p_rcu_reader->waiting)) { > + atomic_set(&p_rcu_reader->waiting, false); > + qemu_event_set(&rcu_gp_event); > + } > +} > + > +static inline void rcu_thread_online(void) > +{ > + rcu_quiescent_state(); > +} > + > +extern void synchronize_rcu(void); > + > +/* > + * Reader thread registration. > + */ > +extern void rcu_register_thread(void); > +extern void rcu_unregister_thread(void); > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _URCU_QSBR_H */ > diff --git a/include/qemu/thread.h b/include/qemu/thread.h > index 3e32c65..5d64a20 100644 > --- a/include/qemu/thread.h > +++ b/include/qemu/thread.h > @@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex); > int qemu_mutex_trylock(QemuMutex *mutex); > void qemu_mutex_unlock(QemuMutex *mutex); > > -#define rcu_read_lock() do { } while (0) > -#define rcu_read_unlock() do { } while (0) > - > void qemu_cond_init(QemuCond *cond); > void qemu_cond_destroy(QemuCond *cond); > > diff --git a/libcacard/Makefile b/libcacard/Makefile > index 47827a0..f7a3b07 100644 > --- a/libcacard/Makefile > +++ b/libcacard/Makefile > @@ -4,7 +4,8 @@ TOOLS += vscclient$(EXESUF) > > # objects linked into a shared library, built with libtool with -fPIC if > required > libcacard-obj-y = $(stub-obj-y) $(libcacard-y) > -libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o > util/error.o > +libcacard-obj-y += util/osdep.o util/cutils.o util/qemu-timer-common.o > +libcacard-obj-y += util/rcu.o util/error.o > libcacard-obj-$(CONFIG_WIN32) += util/oslib-win32.o util/qemu-thread-win32.o > libcacard-obj-$(CONFIG_POSIX) += util/oslib-posix.o util/qemu-thread-posix.o > libcacard-obj-y += $(filter trace/%, $(util-obj-y)) > diff --git a/util/Makefile.objs b/util/Makefile.objs > index 4a1bd4e..f05eba1 100644 > --- a/util/Makefile.objs > +++ b/util/Makefile.objs > @@ -11,3 +11,4 @@ util-obj-y += iov.o aes.o qemu-config.o qemu-sockets.o > uri.o notify.o > util-obj-y += qemu-option.o qemu-progress.o > util-obj-y += hexdump.o > util-obj-y += crc32c.o > +util-obj-y += rcu.o > diff --git a/util/rcu.c b/util/rcu.c > new file mode 100644 > index 0000000..48686a3 > --- /dev/null > +++ b/util/rcu.c > @@ -0,0 +1,203 @@ > +/* > + * urcu-qsbr.c > + * > + * Userspace RCU QSBR library > + * > + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoy...@efficios.com> > + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation. > + * Copyright 2013 Red Hat, Inc. > + * > + * Ported to QEMU by Paolo Bonzini <pbonz...@redhat.com> > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, write to the Free Software > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 > USA > + * > + * IBM's contributions to this file may be relicensed under LGPLv2 or later. > + */ > + > +#include <stdio.h> > +#include <assert.h> > +#include <stdlib.h> > +#include <stdint.h> > +#include <errno.h> > +#include "qemu/rcu.h" > +#include "qemu/atomic.h" > + > +/* > + * Global grace period counter. Bit 0 is one if the thread is online. > + * Bits 1 and above are defined in synchronize_rcu/update_counter_and_wait. > + */ > +#define RCU_GP_ONLINE (1UL << 0) > +#define RCU_GP_CTR (1UL << 1) > + > +unsigned long rcu_gp_ctr = RCU_GP_ONLINE; > + > +QemuEvent rcu_gp_event; > +static QemuMutex rcu_gp_lock; > + > +/* > + * Check whether a quiescent state was crossed between the beginning of > + * update_counter_and_wait and now. > + */ > +static inline int rcu_gp_ongoing(unsigned long *ctr) > +{ > + unsigned long v; > + > + /* See update_counter_and_wait for the discussion of memory barriers. */ > + v = atomic_read(ctr); > + return v && (v != rcu_gp_ctr); > +} > + > +/* Written to only by each individual reader. Read by both the reader and the > + * writers. > + */ > +DEFINE_RCU_READER(); > + > +/* Protected by rcu_gp_lock. */ > +typedef QLIST_HEAD(, rcu_reader_data) ThreadList; > +static ThreadList registry = QLIST_HEAD_INITIALIZER(registry); > + > +/* Wait for previous parity/grace period to be empty of readers. */ > +static void wait_for_readers(void) > +{ > + ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders); > + struct rcu_reader_data *index, *tmp; > + > + for (;;) { > + /* We want to be notified of changes made to rcu_gp_ongoing > + * while we walk the list. > + */ > + qemu_event_reset(&rcu_gp_event); > + > + /* Instead of using atomic_mb_set for index->waiting, and > + * atomic_mb_read for index->ctr, memory barriers are placed > + * manually since writes to different threads are independent. > + * atomic_mb_set has a smp_wmb before... > + */ > + smp_wmb(); > + QLIST_FOREACH(index, ®istry, node) { > + atomic_set(&index->waiting, true); > + } > + > + /* ... and a smp_mb after. > + * > + * This barrier also blocks stores that free old RCU-protected > + * pointers. > + */ > + smp_mb(); > + > + QLIST_FOREACH_SAFE(index, ®istry, node, tmp) { > + if (!rcu_gp_ongoing(&index->ctr)) { > + QLIST_REMOVE(index, node); > + QLIST_INSERT_HEAD(&qsreaders, index, node); > + > + /* No need for mb_set here, worst of all we > + * get some extra futex wakeups. > + */ > + atomic_set(&index->waiting, false); > + } > + } > + > + /* atomic_mb_read has smp_rmb after. */ > + smp_rmb(); > + > + if (QLIST_EMPTY(®istry)) { > + break; > + } > + > + /* Wait for one thread to report a quiescent state and > + * try again. > + */ > + qemu_event_wait(&rcu_gp_event); > + } > + > + /* put back the reader list in the registry */ > + QLIST_SWAP(®istry, &qsreaders, node); > +} > + > +void synchronize_rcu(void) > +{ > + unsigned long was_online; > + > + was_online = get_rcu_reader()->ctr; > + > + /* Mark the writer thread offline to make sure we don't wait for > + * our own quiescent state. This allows using synchronize_rcu() > + * in threads registered as readers. > + * > + * rcu_thread_offline() and rcu_thread_online() include a > + * memory barrier. > + */ > + if (was_online) { > + rcu_thread_offline();
Encourage the user to call synchronize_rcu() in reader? I think the caller should ensure it is outside read-section. Also online can be nested which make the situation even worse. Regards, Pingfan > + } else { > + smp_mb(); > + } > + > + qemu_mutex_lock(&rcu_gp_lock); > + > + if (!QLIST_EMPTY(®istry)) { > + if (sizeof(rcu_gp_ctr) < 8) { > + /* For architectures with 32-bit longs, a two-subphases algorithm > + * ensures we do not encounter overflow bugs. > + * > + * Switch parity: 0 -> 1, 1 -> 0. > + */ > + atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); > + wait_for_readers(); > + atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); > + } else { > + /* Increment current grace period. */ > + atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR); > + } > + > + wait_for_readers(); > + } > + > + qemu_mutex_unlock(&rcu_gp_lock); > + > + if (was_online) { > + rcu_thread_online(); > + } else { > + smp_mb(); > + } > +} > + > +void rcu_register_thread(void) > +{ > + if (!get_rcu_reader()) { > + alloc_rcu_reader(); > + } > + > + assert(get_rcu_reader()->ctr == 0); > + qemu_mutex_lock(&rcu_gp_lock); > + QLIST_INSERT_HEAD(®istry, get_rcu_reader(), node); > + qemu_mutex_unlock(&rcu_gp_lock); > + rcu_quiescent_state(); > +} > + > +void rcu_unregister_thread(void) > +{ > + rcu_thread_offline(); > + qemu_mutex_lock(&rcu_gp_lock); > + QLIST_REMOVE(get_rcu_reader(), node); > + qemu_mutex_unlock(&rcu_gp_lock); > +} > + > +static void __attribute__((__constructor__)) rcu_init(void) > +{ > + qemu_mutex_init(&rcu_gp_lock); > + qemu_event_init(&rcu_gp_event, true); > + rcu_register_thread(); > +} > -- > 1.8.1.4 > >