> Module Name: src > Committed By: thorpej > Date: Mon Dec 24 16:58:54 UTC 2018 > > Add threadpool(9), an abstraction that provides shared pools of kernel > threads running at specific priorities, with support for unbound pools > and per-cpu pools.
Cool, thanks for working on this. Some comments. > diff --git a/sys/kern/kern_threadpool.c b/sys/kern/kern_threadpool.c > new file mode 100644 > index 000000000000..6e5e218bde11 > --- /dev/null > +++ b/sys/kern/kern_threadpool.c > @@ -0,0 +1,1085 @@ > [...] > +static ONCE_DECL(threadpool_init_once) > + > +#define THREADPOOL_INIT() \ > +do { \ > + int threadpool_init_error = \ > + RUN_ONCE(&threadpool_init_once, threadpools_init); \ > + KASSERT(threadpool_init_error == 0); \ > +} while (/*CONSTCOND*/0) Can you use a module initialization routine for this, or call it in main if you don't want to deal with modules, instead of sprinkling RUN_ONCE in various places? > +/* Data structures */ > + > +TAILQ_HEAD(job_head, threadpool_job_impl); > +TAILQ_HEAD(thread_head, threadpool_thread); > + > +typedef struct threadpool_job_impl { > + kmutex_t *job_lock; /* 1 */ > + struct threadpool_thread *job_thread; /* 1 */ > + TAILQ_ENTRY(threadpool_job_impl) job_entry; /* 2 */ > + volatile unsigned int job_refcnt; /* 1 */ > + /* implicit pad on _LP64 */ > + kcondvar_t job_cv; /* 3 */ > + threadpool_job_fn_t job_fn; /* 1 */ > + /* ILP32 / LP64 */ > + char job_name[MAXCOMLEN]; /* 4 / 2 */ > +} threadpool_job_impl_t; > + > +CTASSERT(sizeof(threadpool_job_impl_t) <= sizeof(threadpool_job_t)); > +#define THREADPOOL_JOB_TO_IMPL(j) ((threadpool_job_impl_t *)(j)) > +#define THREADPOOL_IMPL_TO_JOB(j) ((threadpool_job_t *)(j)) Can we just have struct threadpool_job in the header file like I did originally, without the strict aliasing violations, ABI conditionals, &c.? What does this buy us? We don't have a culture of abusing abstractions simply because they happen to be written in header files, so we don't need to put technical measures in the way of such abuse. > + /* Idle out threads after 30 seconds */ > +#define THREADPOOL_IDLE_TICKS mstohz(30 * 1000) Maybe this should be a sysctl knob? > +struct threadpool_unbound { > + /* must be first; see threadpool_create() */ > + struct threadpool tpu_pool; Why must this be first? Unless this is really crucial for some performance-critical reason, let's try to avoid putting constraints like this into new code. > + /* protected by threadpools_lock */ > + LIST_ENTRY(threadpool_unbound) tpu_link; > + unsigned int tpu_refcnt; We should just make this uint64_t and delete all the code worrying about reference count failure. This is always managed under a lock (except some bugs I left in, below), so no need for 64-bit atomics to make this work. Likewise in struct threadpool_percpu. > +#ifdef THREADPOOL_VERBOSE > +#define TP_LOG(x) printf x > +#else > +#define TP_LOG(x) /* nothing */ > +#endif /* THREADPOOL_VERBOSE */ Maybe make these dtrace probes? > +static int > +threadpool_hold(threadpool_t *pool) > +{ > + unsigned int refcnt; > + > + do { > + refcnt = pool->tp_refcnt; > + if (refcnt == UINT_MAX) > + return EBUSY; > + } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt + 1)) > + != refcnt); > + > + return 0; > +} Atomics here don't hurt but are not necessary because this is always done when the caller has exclusive access to the pool. (This was a mistake in the original, sorry!) This should just do KASSERT(mutex_owned(&pool->tp_lock)); pool->tp_refcnt++; which will always succeed with a 64-bit reference count. +static void +threadpool_rele(threadpool_t *pool) +{ + unsigned int refcnt; + + do { + refcnt = pool->tp_refcnt; + KASSERT(0 < refcnt); + if (refcnt == 1) { + mutex_spin_enter(&pool->tp_lock); + refcnt = atomic_dec_uint_nv(&pool->tp_refcnt); + KASSERT(refcnt != UINT_MAX); + if (refcnt == 0) + cv_broadcast(&pool->tp_overseer.tpt_cv); + mutex_spin_exit(&pool->tp_lock); + return; + } + } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt - 1)) + != refcnt); +} The atomics here don't hurt, but the mutex operations do, because this is called with the threadpool's lock held. (Again, mistake in the original, sorry!) This should just do KASSERT(mutex_owned(&pool->tp_lock)); KASSERT(0 < pool->tp_refcnt); if (--pool->tp_refcnt == 0) cv_broadcast(&pool->tp_overseer.tpt_cv); For this to work, threadpool_overseer_thread and threadpool_thread must call threadpool_rele while holding the lock. There's no issue with doing that: they don't use the pool afterward, and destruction -- including mutex_destroy(&pool->tp_lock) -- can't proceed until the thread that called threadpool_rele releases the lock too. Alternative, requiring more rototilling: The overseer could just join the worker threads when dying, and threadpool_destroy could just join the overseer -- then we could skip the reference counting altogether. Not sure whether this is a net win in bookkeeping, though. > +int > +threadpool_get(threadpool_t **poolp, pri_t pri) > +{ > [...] > + if (tmp != NULL) > + threadpool_destroy((threadpool_t *)tmp, sizeof(*tpu)); Avoid this cast. It looks like it would be simpler to just have the caller allocate the object, and to have threadpool_init/fini routines that operate on a preallocated structure. > +void > +threadpool_percpu_put(threadpool_percpu_t *pool_percpu, pri_t pri) > +{ > [...] > + if (--pool_percpu->tpp_refcnt == 0) { > + TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n", > + __func__, (int)pri)); > + threadpool_remove_percpu(pool_percpu); > + } else > + pool_percpu = NULL; Not a KNF rule, but I'd prefer it if either every branch is braced or no branch is braced, rather than a mixture of braced and unbraced branches in a single `if'. > +static int > +threadpool_job_hold(threadpool_job_impl_t *job) > +{ > + unsigned int refcnt; > + do { KNF, blank line between declarations and body. > + refcnt = job->job_refcnt; > + if (refcnt == UINT_MAX) > + return EBUSY; > + } while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1)) > + != refcnt); > + Trailing whitespace. (Consider (setq show-trailing-whitespace t) if you use Emacs!) > +bool > +threadpool_cancel_job_async(threadpool_t *pool, threadpool_job_t *ext_job) > +{ > + threadpool_job_impl_t *job = THREADPOOL_JOB_TO_IMPL(ext_job); > + > + KASSERT(mutex_owned(job->job_lock)); > + > + /* > + * XXXJRT This fails (albeit safely) when all of the following > + * are true: > + * > + * => "pool" is something other than what the job was > + * scheduled on. This can legitimately occur if, > + * for example, a job is percpu-scheduled on CPU0 > + * and then CPU1 attempts to cancel it without taking > + * a remote pool reference. (this might happen by > + * "luck of the draw"). Under what circumstances can this happen? This sounds like a bug that we should KASSERT into oblivion if there's any danger of it. > +static void __dead > +threadpool_thread(void *arg) > +{ > [...] > + if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock, > + THREADPOOL_IDLE_TICKS)) KNF: 4-space continuation lines (8-space here, since two nesting levels, as in the original), not alignment. > diff --git a/sys/sys/threadpool.h b/sys/sys/threadpool.h > new file mode 100644 > index 000000000000..fbfe82f86434 > --- /dev/null > +++ b/sys/sys/threadpool.h > @@ -0,0 +1,77 @@ > [...] > +typedef struct threadpool threadpool_t; > +typedef struct threadpool_percpu threadpool_percpu_t; Please, no more _t aliases for struct or struct pointer types! I left them out in the original on purpose. The abstraction isn't necessary here, no one can remember whether _t means the struct or the pointer, and the _t aliases don't work with forward declarations of structs and so add unnecessary depenencies on header files. (Large cosmetic changes like this also make the diff from the original substantially noisier and harder to review thoroughly.) > +typedef void (*threadpool_job_fn_t)(threadpool_job_t *); Please restore this to the non-pointer function type -- the point of the way I had it originally is that you can do a forward declaration of a function as threadpool_job_fn_t, and then define the function, and if your definition doesn't match the prototype then you get a clear compiler error identifying the disagreeing lines: threadpool_job_fn_t my_job_fn; ... void my_job_fn(struct threadpool_job *job) { ... } > diff --git a/tests/kernel/threadpool_tester/threadpool_tester.c > b/tests/kernel/threadpool_tester/threadpool_tester.c > new file mode 100644 > index 000000000000..d6b79eb8f266 > --- /dev/null > +++ b/tests/kernel/threadpool_tester/threadpool_tester.c > @@ -0,0 +1,502 @@ > [...] > +static int > +threadpool_tester_get_unbound(SYSCTLFN_ARGS) > +{ > + struct tester_context *ctx; > + threadpool_t *pool, *opool = NULL; > + struct sysctlnode node; > + int error, val; > + > + node = *rnode; > + ctx = node.sysctl_data; > + > + val = -1; > + node.sysctl_data = &val; > + error = sysctl_lookup(SYSCTLFN_CALL(&node)); > + if (error || newp == NULL) > + return error; > + Trailing whitespace. > + if (! pri_is_valid(val)) > + return EINVAL; > + Trailing whitespace. > + error = threadpool_get(&pool, val); > + if (error) { > + TP_LOG(("%s: threadpool_get(..., %d) failed -> %d\n", > + __func__, val, error)); For TP_LOG((...)), I don't really care whether this takes 4-space or 8-space continuation lines (sorta two nesting levels), but pick one and be consistent? > + return error; > + } > + Trailing whitespace. > +static int > +threadpool_tester_put_unbound(SYSCTLFN_ARGS) > +{ > + struct tester_context *ctx; > + threadpool_t *pool; > + struct sysctlnode node; > + int error, val; > + > + node = *rnode; > + ctx = node.sysctl_data; > + > + val = -1; > + node.sysctl_data = &val; > + error = sysctl_lookup(SYSCTLFN_CALL(&node)); > + if (error || newp == NULL) > + return error; > + Trailing whitespace. > + if (! pri_is_valid(val)) > + return EINVAL; > + Trailing whitespace. > +static int > +threadpool_tester_run_unbound(SYSCTLFN_ARGS) > +{ > + struct tester_context *ctx; > + threadpool_t *pool; > + struct sysctlnode node; > + int error, val; > + > + node = *rnode; > + ctx = node.sysctl_data; > + > + val = -1; > + node.sysctl_data = &val; > + error = sysctl_lookup(SYSCTLFN_CALL(&node)); > + if (error || newp == NULL) > + return error; > + Trailing whitespace. > +static int > +threadpool_tester_get_percpu(SYSCTLFN_ARGS) > +{ > + struct tester_context *ctx; > + threadpool_percpu_t *pcpu, *opcpu = NULL; > + struct sysctlnode node; > + int error, val; > + > + node = *rnode; > + ctx = node.sysctl_data; > + > + val = -1; > + node.sysctl_data = &val; > + error = sysctl_lookup(SYSCTLFN_CALL(&node)); > + if (error || newp == NULL) > + return error; > + Trailing whitespace. > + if (! pri_is_valid(val)) > + return EINVAL; > + Trailing whitespace. > + error = threadpool_percpu_get(&pcpu, val); > + if (error) { > + TP_LOG(("%s: threadpool_percpu_get(..., %d) failed -> %d\n", > + __func__, val, error)); > + return error; > + } > + Trailing whitespace. > +static int > +threadpool_tester_put_percpu(SYSCTLFN_ARGS) > +{ > + struct tester_context *ctx; > + threadpool_percpu_t *pcpu; > + struct sysctlnode node; > + int error, val; > + > + node = *rnode; > + ctx = node.sysctl_data; > + > + val = -1; > + node.sysctl_data = &val; > + error = sysctl_lookup(SYSCTLFN_CALL(&node)); > + if (error || newp == NULL) > + return error; > + Trailing whitespace. > + if (! pri_is_valid(val)) > + return EINVAL; > + Trailing whitespace. > +static int > +threadpool_tester_run_percpu(SYSCTLFN_ARGS) > +{ > + struct tester_context *ctx; > + threadpool_percpu_t *pcpu; > + threadpool_t *pool; > + struct sysctlnode node; > + int error, val; > + > + node = *rnode; > + ctx = node.sysctl_data; > + > + val = -1; > + node.sysctl_data = &val; > + error = sysctl_lookup(SYSCTLFN_CALL(&node)); > + if (error || newp == NULL) > + return error; > + Trailing whitespace. > +static int > +threadpool_tester_fini(void) > +{ > + pri_t pri; > + > + mutex_enter(&tester_ctx.ctx_mutex); > + for (pri = PRI_NONE/*-1*/; pri < PRI_COUNT; pri++) { This comment can be verified by the compiler! CTASSERT(PRI_NONE == -1); > [...] > + > + sysctl_teardown(&tester_ctx.ctx_sysctllog); > + Trailing whitespace. > +static int > +threadpool_tester_modcmd(modcmd_t cmd, void *arg __unused) > +{ > + int error; > + > + switch (cmd) { > + case MODULE_CMD_INIT: > + error = threadpool_tester_init(); > + break; > + Trailing whitespace. > + case MODULE_CMD_FINI: > + error = threadpool_tester_fini(); > + break; > + Trailing whitespace.