On Thu, 01/22 15:47, Paolo Bonzini wrote: > Asynchronous callbacks provided by call_rcu are particularly important > for QEMU, because the BQL makes it hard to use synchronize_rcu. > > In addition, the current RCU implementation is not particularly friendly > to multiple concurrent synchronize_rcu callers, making call_rcu even > more important. > > Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> > --- > docs/rcu.txt | 110 +++++++++++++++++++++++++++++++++++++++++++++++-- > include/qemu/rcu.h | 22 ++++++++++ > util/rcu.c | 119 > +++++++++++++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 247 insertions(+), 4 deletions(-) > > diff --git a/docs/rcu.txt b/docs/rcu.txt > index 9938ad3..61752b9 100644 > --- a/docs/rcu.txt > +++ b/docs/rcu.txt > @@ -82,7 +82,50 @@ The core RCU API is small: > Note that it would be valid for another update to come while > synchronize_rcu is running. Because of this, it is better that > the updater releases any locks it may hold before calling > - synchronize_rcu. > + synchronize_rcu. If this is not possible (for example, because > + the updater is protected by the BQL), you can use call_rcu. > + > + void call_rcu1(struct rcu_head * head, > + void (*func)(struct rcu_head *head)); > + > + This function invokes func(head) after all pre-existing RCU > + read-side critical sections on all threads have completed. This > + marks the end of the removal phase, with func taking care > + asynchronously of the reclamation phase. > + > + The foo struct needs to have an rcu_head structure added, > + perhaps as follows: > + > + struct foo { > + struct rcu_head rcu; > + int a; > + char b; > + long c; > + }; > + > + so that the reclaimer function can fetch the struct foo address > + and free it: > + > + call_rcu1(&foo.rcu, foo_reclaim); > + > + void foo_reclaim(struct rcu_head *rp) > + { > + struct foo *fp = container_of(rp, struct foo, rcu); > + g_free(fp); > + } > + > + For the common case where the rcu_head member is the first of the > + struct, you can use the following macro. > + > + void call_rcu(T *p, > + void (*func)(T *p), > + field-name); > + > + call_rcu1 is typically used through this macro, in the common case > + where the "struct rcu_head" is the first field in the struct. In > + the above case, one could have written simply: > + > + call_rcu(foo_reclaim, g_free, rcu); > > typeof(*p) atomic_rcu_read(p); > > @@ -153,6 +196,11 @@ DIFFERENCES WITH LINUX > - atomic_rcu_read and atomic_rcu_set replace rcu_dereference and > rcu_assign_pointer. They take a _pointer_ to the variable being accessed. > > +- call_rcu is a macro that has an extra argument (the name of the first > + field in the struct, which must be a struct rcu_head), and expects the > + type of the callback's argument to be the type of the first argument. > + call_rcu1 is the same as Linux's call_rcu. > + > > RCU PATTERNS > ============ > @@ -206,7 +254,47 @@ The write side looks simply like this (with appropriate > locking): > synchronize_rcu(); > free(old); > > -Note that the same idiom would be possible with reader/writer > +If the processing cannot be done purely within the critical section, it > +is possible to combine this idiom with a "real" reference count: > + > + rcu_read_lock(); > + p = atomic_rcu_read(&foo); > + foo_ref(p); > + rcu_read_unlock(); > + /* do something with p. */ > + foo_unref(p); > + > +The write side can be like this: > + > + qemu_mutex_lock(&foo_mutex); > + old = foo; > + atomic_rcu_set(&foo, new); > + qemu_mutex_unlock(&foo_mutex); > + synchronize_rcu(); > + foo_unref(old); > + > +or with call_rcu: > + > + qemu_mutex_lock(&foo_mutex); > + old = foo; > + atomic_rcu_set(&foo, new); > + qemu_mutex_unlock(&foo_mutex); > + call_rcu(foo_unref, old, rcu); > + > +In both cases, the write side only performs removal. Reclamation > +happens when the last reference to a "foo" object is dropped. > +Using synchronize_rcu() is undesirably expensive, because the > +last reference may be dropped on the read side. Hence you can > +use call_rcu() instead: > + > + foo_unref(struct foo *p) { > + if (atomic_fetch_dec(&p->refcount) == 1) { > + call_rcu(foo_destroy, p, rcu); > + } > + } > + > + > +Note that the same idioms would be possible with reader/writer > locks: > > read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock); > @@ -216,13 +304,27 @@ locks: > write_mutex_unlock(&foo_rwlock); > free(p); > > + ------------------------------------------------------------------ > + > + read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock); > + p = foo; old = foo; > + foo_ref(p); foo = new; > + read_unlock(&foo_rwlock); foo_unref(old); > + /* do something with p. */ write_mutex_unlock(&foo_rwlock); > + read_lock(&foo_rwlock); > + foo_unref(p); > + read_unlock(&foo_rwlock); > + > +foo_unref could use a mechanism such as bottom halves to move deallocation > +out of the write-side critical section. > + > > RCU resizable arrays > -------------------- > > Resizable arrays can be used with RCU. The expensive RCU synchronization > -only needs to take place when the array is resized. The two items to > -take care of are: > +(or call_rcu) only needs to take place when the array is resized. > +The two items to take care of are: > > - ensuring that the old version of the array is available between removal > and reclamation; > diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h > index da043f2..068a279 100644 > --- a/include/qemu/rcu.h > +++ b/include/qemu/rcu.h > @@ -118,6 +118,28 @@ extern void synchronize_rcu(void); > extern void rcu_register_thread(void); > extern void rcu_unregister_thread(void); > > +struct rcu_head; > +typedef void RCUCBFunc(struct rcu_head *head); > + > +struct rcu_head { > + struct rcu_head *next; > + RCUCBFunc *func; > +}; > + > +extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func); > + > +/* The operands of the minus operator must have the same type, > + * which must be the one that we specify in the cast. > + */ > +#define call_rcu(head, func, field) \ > + call_rcu1(({ \ > + char __attribute__((unused)) \ > + offset_must_be_zero[-offsetof(typeof(*(head)), field)], \ > + func_type_invalid = (func) - (void (*)(typeof(head)))(func); \ > + &(head)->field; \ > + }), \ > + (RCUCBFunc *)(func)) > + > #ifdef __cplusplus > } > #endif > diff --git a/util/rcu.c b/util/rcu.c > index 1f737d5..c9c3e6e 100644 > --- a/util/rcu.c > +++ b/util/rcu.c > @@ -26,6 +26,7 @@ > * IBM's contributions to this file may be relicensed under LGPLv2 or later. > */ > > +#include "qemu-common.h" > #include <stdio.h> > #include <assert.h> > #include <stdlib.h> > @@ -33,6 +34,7 @@ > #include <errno.h> > #include "qemu/rcu.h" > #include "qemu/atomic.h" > +#include "qemu/thread.h" > > /* > * Global grace period counter. Bit 0 is always one in rcu_gp_ctr. > @@ -149,6 +151,116 @@ void synchronize_rcu(void) > qemu_mutex_unlock(&rcu_gp_lock); > } > > + > +#define RCU_CALL_MIN_SIZE 30 > + > +/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h > + * from liburcu. Note that head is only used by the consumer. > + */ > +static struct rcu_head dummy; > +static struct rcu_head *head = &dummy, **tail = &dummy.next; > +static int rcu_call_count; > +static QemuEvent rcu_call_ready_event; > + > +static void enqueue(struct rcu_head *node) > +{ > + struct rcu_head **old_tail; > + > + node->next = NULL; > + old_tail = atomic_xchg(&tail, &node->next); > + atomic_mb_set(old_tail, node); > +} > + > +static struct rcu_head *try_dequeue(void) > +{ > + struct rcu_head *node, *next; > + > +retry: > + /* Test for an empty list, which we do not expect. Note that for > + * the consumer head and tail are always consistent. The head > + * is consistent because only the consumer reads/writes it. > + * The tail, because it is the first step in the enqueuing. > + * It is only the next pointers that might be inconsistent. > + */ > + if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) { > + abort(); > + } > + > + /* If the head node has NULL in its next pointer, the value is > + * wrong and we need to wait until its enqueuer finishes the update. > + */ > + node = head; > + next = atomic_mb_read(&head->next); > + if (!next) { > + return NULL; > + } > + > + /* Since we are the sole consumer, and we excluded the empty case > + * above, the queue will always have at least two nodes: the > + * dummy node, and the one being removed. So we do not need to update > + * the tail pointer. > + */ > + head = next; > + > + /* If we dequeued the dummy node, add it back at the end and retry. */ > + if (node == &dummy) { > + enqueue(node); > + goto retry; > + } > + > + return node; > +} > + > +static void *call_rcu_thread(void *opaque) > +{ > + struct rcu_head *node; > + > + for (;;) { > + int tries = 0; > + int n = atomic_read(&rcu_call_count); > + > + /* Heuristically wait for a decent number of callbacks to pile up. > + * Fetch rcu_call_count now, we only must process elements that were > + * added before synchronize_rcu() starts. > + */ > + while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) { > + g_usleep(100000); > + qemu_event_reset(&rcu_call_ready_event); > + n = atomic_read(&rcu_call_count); > + if (n < RCU_CALL_MIN_SIZE) { > + qemu_event_wait(&rcu_call_ready_event); > + n = atomic_read(&rcu_call_count); > + } > + } > + > + atomic_sub(&rcu_call_count, n); > + synchronize_rcu(); > + while (n > 0) { > + node = try_dequeue(); > + while (!node) { > + qemu_event_reset(&rcu_call_ready_event); > + node = try_dequeue(); > + if (!node) { > + qemu_event_wait(&rcu_call_ready_event); > + node = try_dequeue(); > + } > + } > + > + n--; > + node->func(node); > + } > + } > + abort(); > +} > + > +void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) > +{ > + node->func = func; > + enqueue(node); > + atomic_inc(&rcu_call_count); > + qemu_event_set(&rcu_call_ready_event); > +} > + > void rcu_register_thread(void) > { > assert(rcu_reader.ctr == 0); > @@ -166,7 +278,14 @@ void rcu_unregister_thread(void) > > static void __attribute__((__constructor__)) rcu_init(void) > { > + QemuThread thread; > + > qemu_mutex_init(&rcu_gp_lock); > qemu_event_init(&rcu_gp_event, true); > + > + qemu_event_init(&rcu_call_ready_event, false); > + qemu_thread_create(&thread, "call_rcu", call_rcu_thread, > + NULL, QEMU_THREAD_DETACHED); > + > rcu_register_thread(); > } > -- > 1.8.3.1 > > >
Reviewed-by: Fam Zheng <f...@redhat.com>