Signed-off-by: Mathieu Desnoyers <[email protected]>
---
 include/urcu/rculfhash.h |  15 ++++--
 src/rculfhash-internal.h |   2 +-
 src/rculfhash.c          | 124 +++++++++++++++++++++++------------------------
 3 files changed, 74 insertions(+), 67 deletions(-)

diff --git a/include/urcu/rculfhash.h b/include/urcu/rculfhash.h
index 9934422..0789aa5 100644
--- a/include/urcu/rculfhash.h
+++ b/include/urcu/rculfhash.h
@@ -176,10 +176,17 @@ struct cds_lfht *cds_lfht_new(unsigned long init_size,
  *        need to be informed of the value passed to cds_lfht_new().
  *
  * Return 0 on success, negative error value on error.
- * Threads calling this API need to be registered RCU read-side threads.
- * cds_lfht_destroy should *not* be called from a RCU read-side critical
- * section. It should *not* be called from a call_rcu thread context
- * neither.
+
+ * Prior to liburcu 0.10:
+ * - Threads calling this API need to be registered RCU read-side
+ *   threads.
+ * - cds_lfht_destroy should *not* be called from a RCU read-side
+ *   critical section. It should *not* be called from a call_rcu thread
+ *   context neither.
+ *
+ * Starting from liburcu 0.10, rculfhash implements its own worker
+ * thread to handle resize operations, which removes RCU requirements on
+ * cds_lfht_destroy.
  */
 extern
 int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr);
diff --git a/src/rculfhash-internal.h b/src/rculfhash-internal.h
index d7cec95..0f8df97 100644
--- a/src/rculfhash-internal.h
+++ b/src/rculfhash-internal.h
@@ -82,7 +82,7 @@ struct cds_lfht {
         */
        pthread_mutex_t resize_mutex;   /* resize mutex: add/del mutex */
        pthread_attr_t *resize_attr;    /* Resize threads attributes */
-       unsigned int in_progress_resize, in_progress_destroy;
+       unsigned int in_progress_destroy;
        unsigned long resize_target;
        int resize_initiated;
 
diff --git a/src/rculfhash.c b/src/rculfhash.c
index d7a1f23..b7b8f95 100644
--- a/src/rculfhash.c
+++ b/src/rculfhash.c
@@ -64,7 +64,7 @@
  * - Split-counters are used to keep track of the number of
  *   nodes within the hash table for automatic resize triggering.
  * - Resize operation initiated by long chain detection is executed by a
- *   call_rcu thread, which keeps lock-freedom of add and remove.
+ *   worker thread, which keeps lock-freedom of add and remove.
  * - Resize operations are protected by a mutex.
  * - The removal operation is split in two parts: first, a "removed"
  *   flag is set in the next pointer within the node to remove. Then,
@@ -276,6 +276,8 @@
 #include <rculfhash-internal.h>
 #include <stdio.h>
 #include <pthread.h>
+#include "workqueue.h"
+#include "urcu-die.h"
 
 /*
  * Split-counters lazily update the global counter each 1024
@@ -335,11 +337,11 @@ struct ht_items_count {
 } __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
 
 /*
- * rcu_resize_work: Contains arguments passed to RCU worker thread
+ * resize_work: Contains arguments passed to worker thread
  * responsible for performing lazy resize.
  */
-struct rcu_resize_work {
-       struct rcu_head head;
+struct resize_work {
+       struct urcu_work work;
        struct cds_lfht *ht;
 };
 
@@ -356,6 +358,8 @@ struct partition_resize_work {
                    unsigned long start, unsigned long len);
 };
 
+static struct urcu_workqueue *cds_lfht_workqueue;
+
 /*
  * Algorithm to reverse bits in a word by lookup table, extended to
  * 64-bit words.
@@ -1224,14 +1228,12 @@ void partition_resize_helper(struct cds_lfht *ht, 
unsigned long i,
        if (start == 0 && nr_threads > 0)
                return;
 fallback:
-       ht->flavor->thread_online();
        fct(ht, i, start, len);
-       ht->flavor->thread_offline();
 }
 
 /*
  * Holding RCU read lock to protect _cds_lfht_add against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
  * problem).
  *
  * When we reach a certain length, we can split this population phase over
@@ -1308,7 +1310,7 @@ void init_table(struct cds_lfht *ht,
 
 /*
  * Holding RCU read lock to protect _cds_lfht_remove against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
  * problem).
  * For a single level, we logically remove and garbage collect each node.
  *
@@ -1320,8 +1322,9 @@ void init_table(struct cds_lfht *ht,
  *
  * Concurrent removal and add operations are helping us perform garbage
  * collection of logically removed nodes. We guarantee that all logically
- * removed nodes have been garbage-collected (unlinked) before call_rcu is
- * invoked to free a hole level of bucket nodes (after a grace period).
+ * removed nodes have been garbage-collected (unlinked) before work
+ * enqueue is invoked to free a hole level of bucket nodes (after a
+ * grace period).
  *
  * Logical removal and garbage collection can therefore be done in batch
  * or on a node-per-node basis, as long as the guarantee above holds.
@@ -1772,25 +1775,12 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
  */
 int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
 {
-       int ret, was_online;
+       int ret;
 
-       /* Wait for in-flight resize operations to complete */
+       /* Cancel ongoing resize operations. */
        _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
-       cmm_smp_mb();   /* Store destroy before load resize */
-       was_online = ht->flavor->read_ongoing();
-       if (was_online)
-               ht->flavor->thread_offline();
-       /* Calling with RCU read-side held is an error. */
-       if (ht->flavor->read_ongoing()) {
-               ret = -EINVAL;
-               if (was_online)
-                       ht->flavor->thread_online();
-               goto end;
-       }
-       while (uatomic_read(&ht->in_progress_resize))
-               poll(NULL, 0, 100);     /* wait for 100ms */
-       if (was_online)
-               ht->flavor->thread_online();
+       /* Wait for in-flight resize operations to complete */
+       urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
        ret = cds_lfht_delete_bucket(ht);
        if (ret)
                return ret;
@@ -1801,7 +1791,6 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t 
**attr)
        if (ret)
                ret = -EBUSY;
        poison_free(ht);
-end:
        return ret;
 }
 
@@ -1897,7 +1886,6 @@ void _do_cds_lfht_resize(struct cds_lfht *ht)
         * Resize table, re-do if the target size has changed under us.
         */
        do {
-               assert(uatomic_read(&ht->in_progress_resize));
                if (CMM_LOAD_SHARED(ht->in_progress_destroy))
                        break;
                ht->resize_initiated = 1;
@@ -1930,71 +1918,47 @@ void resize_target_update_count(struct cds_lfht *ht,
 
 void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
 {
-       int was_online;
-
-       was_online = ht->flavor->read_ongoing();
-       if (was_online)
-               ht->flavor->thread_offline();
-       /* Calling with RCU read-side held is an error. */
-       if (ht->flavor->read_ongoing()) {
-               static int print_once;
-
-               if (!CMM_LOAD_SHARED(print_once))
-                       fprintf(stderr, "[error] rculfhash: cds_lfht_resize "
-                               "called with RCU read-side lock held.\n");
-               CMM_STORE_SHARED(print_once, 1);
-               assert(0);
-               goto end;
-       }
        resize_target_update_count(ht, new_size);
        CMM_STORE_SHARED(ht->resize_initiated, 1);
        pthread_mutex_lock(&ht->resize_mutex);
        _do_cds_lfht_resize(ht);
        pthread_mutex_unlock(&ht->resize_mutex);
-end:
-       if (was_online)
-               ht->flavor->thread_online();
 }
 
 static
-void do_resize_cb(struct rcu_head *head)
+void do_resize_cb(struct urcu_work *work)
 {
-       struct rcu_resize_work *work =
-               caa_container_of(head, struct rcu_resize_work, head);
-       struct cds_lfht *ht = work->ht;
+       struct resize_work *resize_work =
+               caa_container_of(work, struct resize_work, work);
+       struct cds_lfht *ht = resize_work->ht;
 
-       ht->flavor->thread_offline();
+       ht->flavor->register_thread();
        pthread_mutex_lock(&ht->resize_mutex);
        _do_cds_lfht_resize(ht);
        pthread_mutex_unlock(&ht->resize_mutex);
-       ht->flavor->thread_online();
+       ht->flavor->unregister_thread();
        poison_free(work);
-       cmm_smp_mb();   /* finish resize before decrement */
-       uatomic_dec(&ht->in_progress_resize);
 }
 
 static
 void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht)
 {
-       struct rcu_resize_work *work;
+       struct resize_work *work;
 
        /* Store resize_target before read resize_initiated */
        cmm_smp_mb();
        if (!CMM_LOAD_SHARED(ht->resize_initiated)) {
-               uatomic_inc(&ht->in_progress_resize);
-               cmm_smp_mb();   /* increment resize count before load destroy */
                if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
-                       uatomic_dec(&ht->in_progress_resize);
                        return;
                }
                work = malloc(sizeof(*work));
                if (work == NULL) {
                        dbg_printf("error allocating resize work, bailing 
out\n");
-                       uatomic_dec(&ht->in_progress_resize);
                        return;
                }
                work->ht = ht;
-               ht->flavor->update_call_rcu(&work->head, do_resize_cb);
+               urcu_workqueue_queue_work(cds_lfht_workqueue,
+                       &work->work, do_resize_cb);
                CMM_STORE_SHARED(ht->resize_initiated, 1);
        }
 }
@@ -2045,3 +2009,39 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, 
unsigned long size,
        }
        __cds_lfht_resize_lazy_launch(ht);
 }
+
+static void cds_lfht_fork_prepare(void)
+{
+       urcu_workqueue_pause_worker(cds_lfht_workqueue);
+}
+
+static void cds_lfht_fork_parent(void)
+{
+       urcu_workqueue_resume_worker(cds_lfht_workqueue);
+}
+
+static void cds_lfht_fork_child(void)
+{
+       urcu_workqueue_create_worker(cds_lfht_workqueue);
+}
+
+static void __attribute__((constructor)) cds_lfht_init_worker(void)
+{
+       int ret;
+
+       if (cds_lfht_workqueue)
+               return;
+       cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
+               NULL, NULL, NULL, NULL, NULL, NULL, NULL);
+       ret = pthread_atfork(cds_lfht_fork_prepare,
+               cds_lfht_fork_parent, cds_lfht_fork_child);
+       if (ret) {
+               urcu_die(ret);
+       }
+}
+
+static void __attribute__((destructor)) cds_lfht_fini_worker(void)
+{
+       urcu_workqueue_destroy(cds_lfht_workqueue);
+       cds_lfht_workqueue = NULL;
+}
-- 
2.1.4

_______________________________________________
lttng-dev mailing list
[email protected]
https://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to