Module Name: src Committed By: martin Date: Thu Apr 18 15:51:36 UTC 2024
Modified Files: src/share/man/man9 [netbsd-9]: workqueue.9 src/sys/kern [netbsd-9]: subr_workqueue.c src/tests/rump/kernspace [netbsd-9]: kernspace.h workqueue.c src/tests/rump/rumpkern [netbsd-9]: Makefile t_workqueue.c Log Message: Pull up following revision(s) (requested by riastradh in ticket #1830): sys/kern/subr_workqueue.c: revision 1.40 sys/kern/subr_workqueue.c: revision 1.41 sys/kern/subr_workqueue.c: revision 1.42 sys/kern/subr_workqueue.c: revision 1.43 sys/kern/subr_workqueue.c: revision 1.44 sys/kern/subr_workqueue.c: revision 1.45 sys/kern/subr_workqueue.c: revision 1.46 tests/rump/kernspace/workqueue.c: revision 1.7 sys/kern/subr_workqueue.c: revision 1.47 tests/rump/kernspace/workqueue.c: revision 1.8 tests/rump/kernspace/workqueue.c: revision 1.9 tests/rump/rumpkern/t_workqueue.c: revision 1.3 tests/rump/rumpkern/t_workqueue.c: revision 1.4 tests/rump/kernspace/kernspace.h: revision 1.9 tests/rump/rumpkern/Makefile: revision 1.20 sys/kern/subr_workqueue.c: revision 1.39 share/man/man9/workqueue.9: revision 1.15 (all via patch) workqueue: Lift unnecessary restriction on workqueue_wait. Allow multiple concurrent waits at a time, and allow enqueueing work at the same time (as long as it's not the work we're waiting for). This way multiple users can use a shared global workqueue and safely wait for individual work items concurrently, while the workqueue is still in use for other items (e.g., wg(4) peers). This has the side effect of taking away a diagnostic measure, but I think allowing the diagnostic's false positives instead of rejecting them is worth it. We could cheaply add it back with some false negatives if it's important. workqueue(9): workqueue_wait and workqueue_destroy may sleep. But might not, so assert sleepable up front. workqueue(9): Sprinkle dtrace probes. tests/rump/rumpkern: Use PROGDPLIBS, not explicit -L/-l. This way we relink the t_* test programs whenever changes under tests/rump/kernspace change libkernspace.a. workqueue(9) tests: Nix trailing whitespace. workqueue(9) tests: Destroy struct work immediately on entry. workqueue(9) tests: Add test for PR kern/57574. workqueue(9): Avoid touching running work items in workqueue_wait. As soon as the workqueue function has called, it is forbidden to touch the struct work passed to it -- the function might free or reuse the data structure it is embedded in. So workqueue_wait is forbidden to search the queue for the batch of running work items. Instead, use a generation number which is odd while the thread is processing a batch of work and even when not. There's still a small optimization available with the struct work pointer to wait for: if we find the work item in one of the per-CPU _pending_ queues, then after we wait for a batch of work to complete on that CPU, we don't need to wait for work on any other CPUs. PR kern/57574 workqueue(9): Sprinkle dtrace probes for workqueue_wait edge cases. Let's make it easy to find out whether these are hit. workqueue(9): Stop violating queue(3) internals. workqueue(9): Avoid unnecessary mutex_exit/enter cycle each loop. workqueue(9): Sort includes. No functional change intended. workqueue(9): Factor out wq->wq_flags & WQ_FPU in workqueue_worker. No functional change intended. Makes it clearer that s is initialized when used. To generate a diff of this commit: cvs rdiff -u -r1.12 -r1.12.6.1 src/share/man/man9/workqueue.9 cvs rdiff -u -r1.37 -r1.37.6.1 src/sys/kern/subr_workqueue.c cvs rdiff -u -r1.8 -r1.8.2.1 src/tests/rump/kernspace/kernspace.h cvs rdiff -u -r1.6 -r1.6.8.1 src/tests/rump/kernspace/workqueue.c cvs rdiff -u -r1.18 -r1.18.2.1 src/tests/rump/rumpkern/Makefile cvs rdiff -u -r1.2 -r1.2.8.1 src/tests/rump/rumpkern/t_workqueue.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/share/man/man9/workqueue.9 diff -u src/share/man/man9/workqueue.9:1.12 src/share/man/man9/workqueue.9:1.12.6.1 --- src/share/man/man9/workqueue.9:1.12 Thu Dec 28 07:00:52 2017 +++ src/share/man/man9/workqueue.9 Thu Apr 18 15:51:36 2024 @@ -1,4 +1,4 @@ -.\" $NetBSD: workqueue.9,v 1.12 2017/12/28 07:00:52 ozaki-r Exp $ +.\" $NetBSD: workqueue.9,v 1.12.6.1 2024/04/18 15:51:36 martin Exp $ .\" .\" Copyright (c)2005 YAMAMOTO Takashi, .\" All rights reserved. @@ -128,11 +128,11 @@ waits for a specified work on the workqueue .Fa wq to finish. -The caller must ensure that no new work will be enqueued to the workqueue -beforehand. -Note that if the workqueue is -.Dv WQ_PERCPU , -the caller can enqueue a new work to another queue other than the waiting queue. +The caller must ensure that +.Fa wk +will not be enqueued to the workqueue again until after +.Fn workqueue_wait +returns. .Pp .\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - .Fn workqueue_destroy Index: src/sys/kern/subr_workqueue.c diff -u src/sys/kern/subr_workqueue.c:1.37 src/sys/kern/subr_workqueue.c:1.37.6.1 --- src/sys/kern/subr_workqueue.c:1.37 Wed Jun 13 05:26:12 2018 +++ src/sys/kern/subr_workqueue.c Thu Apr 18 15:51:35 2024 @@ -1,4 +1,4 @@ -/* $NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $ */ +/* $NetBSD: subr_workqueue.c,v 1.37.6.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, @@ -27,18 +27,20 @@ */ #include <sys/cdefs.h> -__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $"); +__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.37.6.1 2024/04/18 15:51:35 martin Exp $"); #include <sys/param.h> + +#include <sys/condvar.h> #include <sys/cpu.h> -#include <sys/systm.h> -#include <sys/kthread.h> #include <sys/kmem.h> -#include <sys/proc.h> -#include <sys/workqueue.h> +#include <sys/kthread.h> #include <sys/mutex.h> -#include <sys/condvar.h> +#include <sys/proc.h> #include <sys/queue.h> +#include <sys/sdt.h> +#include <sys/systm.h> +#include <sys/workqueue.h> typedef struct work_impl { SIMPLEQ_ENTRY(work_impl) wk_entry; @@ -50,9 +52,8 @@ struct workqueue_queue { kmutex_t q_mutex; kcondvar_t q_cv; struct workqhead q_queue_pending; - struct workqhead q_queue_running; + uint64_t q_gen; lwp_t *q_worker; - work_impl_t *q_waiter; }; struct workqueue { @@ -70,6 +71,49 @@ struct workqueue { #define POISON 0xaabbccdd +SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create, + "struct workqueue *"/*wq*/, + "const char *"/*name*/, + "void (*)(struct work *, void *)"/*func*/, + "void *"/*arg*/, + "pri_t"/*prio*/, + "int"/*ipl*/, + "int"/*flags*/); +SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy, + "struct workqueue *"/*wq*/); + +SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/, + "struct cpu_info *"/*ci*/); +SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/, + "void (*)(struct work *, void *)"/*func*/, + "void *"/*arg*/); +SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/, + "void (*)(struct work *, void *)"/*func*/, + "void *"/*arg*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__self, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__hit, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); + +SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start, + "struct workqueue *"/*wq*/); +SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done, + "struct workqueue *"/*wq*/); + static size_t workqueue_size(int flags) { @@ -97,13 +141,13 @@ workqueue_runlist(struct workqueue *wq, work_impl_t *wk; work_impl_t *next; - /* - * note that "list" is not a complete SIMPLEQ. - */ - for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { next = SIMPLEQ_NEXT(wk, wk_entry); + SDT_PROBE4(sdt, kernel, workqueue, entry, + wq, wk, wq->wq_func, wq->wq_arg); (*wq->wq_func)((void *)wk, wq->wq_arg); + SDT_PROBE4(sdt, kernel, workqueue, return, + wq, wk, wq->wq_func, wq->wq_arg); } } @@ -116,31 +160,36 @@ workqueue_worker(void *cookie) /* find the workqueue of this kthread */ q = workqueue_queue_lookup(wq, curlwp->l_cpu); + mutex_enter(&q->q_mutex); for (;;) { - /* - * we violate abstraction of SIMPLEQ. - */ + struct workqhead tmp; + + SIMPLEQ_INIT(&tmp); - mutex_enter(&q->q_mutex); while (SIMPLEQ_EMPTY(&q->q_queue_pending)) cv_wait(&q->q_cv, &q->q_mutex); - KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); - q->q_queue_running.sqh_first = - q->q_queue_pending.sqh_first; /* XXX */ + SIMPLEQ_CONCAT(&tmp, &q->q_queue_pending); SIMPLEQ_INIT(&q->q_queue_pending); + + /* + * Mark the queue as actively running a batch of work + * by setting the generation number odd. + */ + q->q_gen |= 1; mutex_exit(&q->q_mutex); - workqueue_runlist(wq, &q->q_queue_running); + workqueue_runlist(wq, &tmp); + /* + * Notify workqueue_wait that we have completed a batch + * of work by incrementing the generation number. + */ mutex_enter(&q->q_mutex); - KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); - SIMPLEQ_INIT(&q->q_queue_running); - if (__predict_false(q->q_waiter != NULL)) { - /* Wake up workqueue_wait */ - cv_signal(&q->q_cv); - } - mutex_exit(&q->q_mutex); + KASSERTMSG(q->q_gen & 1, "q=%p gen=%"PRIu64, q, q->q_gen); + q->q_gen++; + cv_broadcast(&q->q_cv); } + mutex_exit(&q->q_mutex); } static void @@ -168,7 +217,7 @@ workqueue_initqueue(struct workqueue *wq mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); cv_init(&q->q_cv, wq->wq_name); SIMPLEQ_INIT(&q->q_queue_pending); - SIMPLEQ_INIT(&q->q_queue_running); + q->q_gen = 0; ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); if (wq->wq_prio < PRI_KERNEL) ktf |= KTHREAD_TS; @@ -206,7 +255,7 @@ workqueue_exit(struct work *wk, void *ar KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); mutex_enter(&q->q_mutex); q->q_worker = NULL; - cv_signal(&q->q_cv); + cv_broadcast(&q->q_cv); mutex_exit(&q->q_mutex); kthread_exit(0); } @@ -223,7 +272,7 @@ workqueue_finiqueue(struct workqueue *wq KASSERT(q->q_worker != NULL); mutex_enter(&q->q_mutex); SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); - cv_signal(&q->q_cv); + cv_broadcast(&q->q_cv); while (q->q_worker != NULL) { cv_wait(&q->q_cv, &q->q_mutex); } @@ -281,33 +330,56 @@ workqueue_create(struct workqueue **wqp, } static bool -workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) +workqueue_q_wait(struct workqueue *wq, struct workqueue_queue *q, + work_impl_t *wk_target) { work_impl_t *wk; bool found = false; + uint64_t gen; mutex_enter(&q->q_mutex); - if (q->q_worker == curlwp) + + /* + * Avoid a deadlock scenario. We can't guarantee that + * wk_target has completed at this point, but we can't wait for + * it either, so do nothing. + * + * XXX Are there use-cases that require this semantics? + */ + if (q->q_worker == curlwp) { + SDT_PROBE2(sdt, kernel, workqueue, wait__self, wq, wk_target); goto out; + } + + /* + * Wait until the target is no longer pending. If we find it + * on this queue, the caller can stop looking in other queues. + * If we don't find it in this queue, however, we can't skip + * waiting -- it may be hidden in the running queue which we + * have no access to. + */ again: SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { - if (wk == wk_target) - goto found; + if (wk == wk_target) { + SDT_PROBE2(sdt, kernel, workqueue, wait__hit, wq, wk); + found = true; + cv_wait(&q->q_cv, &q->q_mutex); + goto again; + } } - SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { - if (wk == wk_target) - goto found; - } - found: - if (wk != NULL) { - found = true; - KASSERT(q->q_waiter == NULL); - q->q_waiter = wk; - cv_wait(&q->q_cv, &q->q_mutex); - goto again; + + /* + * The target may be in the batch of work currently running, + * but we can't touch that queue. So if there's anything + * running, wait until the generation changes. + */ + gen = q->q_gen; + if (gen & 1) { + do + cv_wait(&q->q_cv, &q->q_mutex); + while (gen == q->q_gen); } - if (q->q_waiter != NULL) - q->q_waiter = NULL; + out: mutex_exit(&q->q_mutex); @@ -326,19 +398,23 @@ workqueue_wait(struct workqueue *wq, str struct workqueue_queue *q; bool found; + ASSERT_SLEEPABLE(); + + SDT_PROBE2(sdt, kernel, workqueue, wait__start, wq, wk); if (ISSET(wq->wq_flags, WQ_PERCPU)) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; for (CPU_INFO_FOREACH(cii, ci)) { q = workqueue_queue_lookup(wq, ci); - found = workqueue_q_wait(q, (work_impl_t *)wk); + found = workqueue_q_wait(wq, q, (work_impl_t *)wk); if (found) break; } } else { q = workqueue_queue_lookup(wq, NULL); - (void) workqueue_q_wait(q, (work_impl_t *)wk); + (void)workqueue_q_wait(wq, q, (work_impl_t *)wk); } + SDT_PROBE2(sdt, kernel, workqueue, wait__done, wq, wk); } void @@ -348,6 +424,9 @@ workqueue_destroy(struct workqueue *wq) struct cpu_info *ci; CPU_INFO_ITERATOR cii; + ASSERT_SLEEPABLE(); + + SDT_PROBE1(sdt, kernel, workqueue, exit__start, wq); wq->wq_func = workqueue_exit; for (CPU_INFO_FOREACH(cii, ci)) { q = workqueue_queue_lookup(wq, ci); @@ -355,6 +434,7 @@ workqueue_destroy(struct workqueue *wq) workqueue_finiqueue(wq, q); } } + SDT_PROBE1(sdt, kernel, workqueue, exit__done, wq); kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); } @@ -377,15 +457,16 @@ workqueue_enqueue(struct workqueue *wq, struct workqueue_queue *q; work_impl_t *wk = (void *)wk0; + SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk0, ci); + KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); q = workqueue_queue_lookup(wq, ci); mutex_enter(&q->q_mutex); - KASSERT(q->q_waiter == NULL); #ifdef DEBUG workqueue_check_duplication(q, wk); #endif SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); - cv_signal(&q->q_cv); + cv_broadcast(&q->q_cv); mutex_exit(&q->q_mutex); } Index: src/tests/rump/kernspace/kernspace.h diff -u src/tests/rump/kernspace/kernspace.h:1.8 src/tests/rump/kernspace/kernspace.h:1.8.2.1 --- src/tests/rump/kernspace/kernspace.h:1.8 Fri Dec 28 19:54:36 2018 +++ src/tests/rump/kernspace/kernspace.h Thu Apr 18 15:51:35 2024 @@ -1,4 +1,4 @@ -/* $NetBSD: kernspace.h,v 1.8 2018/12/28 19:54:36 thorpej Exp $ */ +/* $NetBSD: kernspace.h,v 1.8.2.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c) 2010, 2018 The NetBSD Foundation, Inc. @@ -42,6 +42,7 @@ void rumptest_alloc(size_t); void rumptest_lockme(enum locktest); void rumptest_workqueue1(void); void rumptest_workqueue_wait(void); +void rumptest_workqueue_wait_pause(void); void rumptest_sendsig(char *); void rumptest_localsig(int); Index: src/tests/rump/kernspace/workqueue.c diff -u src/tests/rump/kernspace/workqueue.c:1.6 src/tests/rump/kernspace/workqueue.c:1.6.8.1 --- src/tests/rump/kernspace/workqueue.c:1.6 Thu Dec 28 07:46:34 2017 +++ src/tests/rump/kernspace/workqueue.c Thu Apr 18 15:51:35 2024 @@ -1,4 +1,4 @@ -/* $NetBSD: workqueue.c,v 1.6 2017/12/28 07:46:34 ozaki-r Exp $ */ +/* $NetBSD: workqueue.c,v 1.6.8.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c) 2017 The NetBSD Foundation, Inc. @@ -29,7 +29,7 @@ #include <sys/cdefs.h> #if !defined(lint) -__RCSID("$NetBSD: workqueue.c,v 1.6 2017/12/28 07:46:34 ozaki-r Exp $"); +__RCSID("$NetBSD: workqueue.c,v 1.6.8.1 2024/04/18 15:51:35 martin Exp $"); #endif /* !lint */ #include <sys/param.h> @@ -48,13 +48,19 @@ struct test_softc { struct workqueue *wq; struct work wk; int counter; -}; - + bool pause; +}; + static void rump_work1(struct work *wk, void *arg) { struct test_softc *sc = arg; + memset(wk, 0x5a, sizeof(*wk)); + + if (sc->pause) + kpause("tstwk1", /*intr*/false, /*timo*/2, /*lock*/NULL); + mutex_enter(&sc->mtx); ++sc->counter; cv_broadcast(&sc->cv); @@ -137,3 +143,34 @@ rumptest_workqueue_wait(void) destroy_sc(sc); #undef ITERATIONS } + +void +rumptest_workqueue_wait_pause(void) +{ + struct test_softc *sc; + struct work dummy; + + sc = create_sc(); + sc->pause = true; + +#define ITERATIONS 1 + for (size_t i = 0; i < ITERATIONS; ++i) { + struct work wk; + + KASSERT(sc->counter == i); + workqueue_enqueue(sc->wq, &wk, NULL); + workqueue_enqueue(sc->wq, &sc->wk, NULL); + kpause("tstwk2", /*intr*/false, /*timo*/1, /*lock*/NULL); + workqueue_wait(sc->wq, &sc->wk); + workqueue_wait(sc->wq, &wk); + KASSERT(sc->counter == (i + 2)); + } + + KASSERT(sc->counter == 2*ITERATIONS); + + /* Wait for a work that is not enqueued. Just return immediately. */ + workqueue_wait(sc->wq, &dummy); + + destroy_sc(sc); +#undef ITERATIONS +} Index: src/tests/rump/rumpkern/Makefile diff -u src/tests/rump/rumpkern/Makefile:1.18 src/tests/rump/rumpkern/Makefile:1.18.2.1 --- src/tests/rump/rumpkern/Makefile:1.18 Wed Dec 26 14:27:23 2018 +++ src/tests/rump/rumpkern/Makefile Thu Apr 18 15:51:35 2024 @@ -1,4 +1,4 @@ -# $NetBSD: Makefile,v 1.18 2018/12/26 14:27:23 thorpej Exp $ +# $NetBSD: Makefile,v 1.18.2.1 2024/04/18 15:51:35 martin Exp $ .include <bsd.own.mk> @@ -25,8 +25,8 @@ LDADD.t_modlinkset+= -lukfs -lrumpdev_di LDADD.t_modlinkset+= -lrumpfs_cd9660 ${ADD_TO_LD} LDADD+= ${ADD_TO_LD} -KERNSPACE != cd ${.CURDIR}/../kernspace && ${PRINTOBJDIR} -LDADD+= -L${KERNSPACE} -lkernspace -lrump +PROGDPLIBS+= kernspace ${.CURDIR}/../kernspace +LDADD+= -lrump WARNS= 4 Index: src/tests/rump/rumpkern/t_workqueue.c diff -u src/tests/rump/rumpkern/t_workqueue.c:1.2 src/tests/rump/rumpkern/t_workqueue.c:1.2.8.1 --- src/tests/rump/rumpkern/t_workqueue.c:1.2 Thu Dec 28 07:10:26 2017 +++ src/tests/rump/rumpkern/t_workqueue.c Thu Apr 18 15:51:35 2024 @@ -1,4 +1,4 @@ -/* $NetBSD: t_workqueue.c,v 1.2 2017/12/28 07:10:26 ozaki-r Exp $ */ +/* $NetBSD: t_workqueue.c,v 1.2.8.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c) 2017 The NetBSD Foundation, Inc. @@ -72,10 +72,36 @@ ATF_TC_BODY(workqueue_wait, tc) rump_unschedule(); } +static void +sigsegv(int signo) +{ + atf_tc_fail("SIGSEGV"); +} + +ATF_TC(workqueue_wait_pause); +ATF_TC_HEAD(workqueue_wait_pause, tc) +{ + + atf_tc_set_md_var(tc, "descr", "Checks workqueue_wait with pause"); +} + +ATF_TC_BODY(workqueue_wait_pause, tc) +{ + + REQUIRE_LIBC(signal(SIGSEGV, &sigsegv), SIG_ERR); + + rump_init(); + + rump_schedule(); + rumptest_workqueue_wait_pause(); /* panics or SIGSEGVs if fails */ + rump_unschedule(); +} + ATF_TP_ADD_TCS(tp) { ATF_TP_ADD_TC(tp, workqueue1); ATF_TP_ADD_TC(tp, workqueue_wait); + ATF_TP_ADD_TC(tp, workqueue_wait_pause); return atf_no_error(); }