Mathieu Desnoyers <mathieu.desnoy...@efficios.com> wrote: > On 2022-09-22 05:15, Eric Wong via lttng-dev wrote: > > Hello, I'm using urcu-bp + rculfhash + call_rcu to implement > > malloc instrumentation (via LD_PRELOAD) on an existing > > single-threaded Perl codebase which uses Linux signalfd. > > > > signalfd depends on signals being blocked in all threads > > of the process, otherwise threads with unblocked signals > > can receive them and starve the signalfd. > > > > While some threads in URCU do block signals (e.g. workqueue > > worker for rculfhash), the call_rcu thread and rculfhash > > partition_resize_helper threads do not... > > > > Should all threads URCU creates block signals (aside from SIGRCU)? > > Yes, I think you are right. The SIGRCU signal is only needed for the > urcu-signal flavor though. > > Would you like to submit a patch ?
Sure. Is there a way to detect at runtime when urcu-signal is in use so SIGRCU (SIGUSR1) doesn't get unblocked when using other flavors? I actually use SIGUSR1 in my signalfd-using codebase. I also want to remove cds_lfht_worker_init entirely since it's racy. Signal blocking needs to be done in the parent before pthread_create to avoid a window where the child has unblocked signals. Thanks. Anyways, this is my work-in-progress: diff --git a/src/rculfhash.c b/src/rculfhash.c index 7c0b9fb..5f455af 100644 --- a/src/rculfhash.c +++ b/src/rculfhash.c @@ -1251,6 +1251,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, struct partition_resize_work *work; int ret; unsigned long thread, nr_threads; + sigset_t newmask, oldmask; urcu_posix_assert(nr_cpus_mask != -1); if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) @@ -1273,6 +1274,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, dbg_printf("error allocating for resize, single-threading\n"); goto fallback; } + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + for (thread = 0; thread < nr_threads; thread++) { work[thread].ht = ht; work[thread].i = i; @@ -1294,6 +1301,10 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, } urcu_posix_assert(!ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); + for (thread = 0; thread < nr_threads; thread++) { ret = pthread_join(work[thread].thread_id, NULL); urcu_posix_assert(!ret); diff --git a/src/urcu-call-rcu-impl.h b/src/urcu-call-rcu-impl.h index e9366b4..9f85d55 100644 --- a/src/urcu-call-rcu-impl.h +++ b/src/urcu-call-rcu-impl.h @@ -434,6 +434,7 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, { struct call_rcu_data *crdp; int ret; + sigset_t newmask, oldmask; crdp = malloc(sizeof(*crdp)); if (crdp == NULL) @@ -448,9 +449,18 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, crdp->gp_count = 0; cmm_smp_mb(); /* Structure initialized before pointer is planted. */ *crdpp = crdp; + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + ret = pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp); if (ret) urcu_die(ret); + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); } /* diff --git a/src/urcu-defer-impl.h b/src/urcu-defer-impl.h index b5d7926..1c96287 100644 --- a/src/urcu-defer-impl.h +++ b/src/urcu-defer-impl.h @@ -409,9 +409,18 @@ void defer_rcu(void (*fct)(void *p), void *p) static void start_defer_thread(void) { int ret; + sigset_t newmask, oldmask; + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); ret = pthread_create(&tid_defer, NULL, thr_defer, NULL); urcu_posix_assert(!ret); + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); } static void stop_defer_thread(void) diff --git a/src/workqueue.c b/src/workqueue.c index b6361ad..1039d72 100644 --- a/src/workqueue.c +++ b/src/workqueue.c @@ -284,6 +284,7 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags, { struct urcu_workqueue *workqueue; int ret; + sigset_t newmask, oldmask; workqueue = malloc(sizeof(*workqueue)); if (workqueue == NULL) @@ -304,10 +305,20 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags, workqueue->cpu_affinity = cpu_affinity; workqueue->loop_count = 0; cmm_smp_mb(); /* Structure initialized before pointer is planted. */ + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue); if (ret) { urcu_die(ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); + return workqueue; } @@ -464,13 +475,23 @@ void urcu_workqueue_resume_worker(struct urcu_workqueue *workqueue) void urcu_workqueue_create_worker(struct urcu_workqueue *workqueue) { int ret; + sigset_t newmask, oldmask; /* Clear workqueue state from parent. */ workqueue->flags &= ~URCU_WORKQUEUE_PAUSED; workqueue->flags &= ~URCU_WORKQUEUE_PAUSE; workqueue->tid = 0; + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue); if (ret) { urcu_die(ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); } _______________________________________________ lttng-dev mailing list lttng-dev@lists.lttng.org https://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev