Source-Changes-D archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

Re: CVS commit: src



> Module Name:    src
> Committed By:   thorpej
> Date:           Mon Dec 24 16:58:54 UTC 2018
> 
> Add threadpool(9), an abstraction that provides shared pools of kernel
> threads running at specific priorities, with support for unbound pools
> and per-cpu pools.

Cool, thanks for working on this.  Some comments.

> diff --git a/sys/kern/kern_threadpool.c b/sys/kern/kern_threadpool.c
> new file mode 100644
> index 000000000000..6e5e218bde11
> --- /dev/null
> +++ b/sys/kern/kern_threadpool.c
> @@ -0,0 +1,1085 @@
> [...]
> +static ONCE_DECL(threadpool_init_once)
> +
> +#define	THREADPOOL_INIT()					\
> +do {								\
> +	int threadpool_init_error =				\
> +	    RUN_ONCE(&threadpool_init_once, threadpools_init);	\
> +	KASSERT(threadpool_init_error == 0);			\
> +} while (/*CONSTCOND*/0)

Can you use a module initialization routine for this, or call it in
main if you don't want to deal with modules, instead of sprinkling
RUN_ONCE in various places?

> +/* Data structures */
> +
> +TAILQ_HEAD(job_head, threadpool_job_impl);
> +TAILQ_HEAD(thread_head, threadpool_thread);
> +
> +typedef struct threadpool_job_impl {
> +	kmutex_t			*job_lock;		/* 1 */
> +	struct threadpool_thread	*job_thread;		/* 1 */
> +	TAILQ_ENTRY(threadpool_job_impl) job_entry;		/* 2 */
> +	volatile unsigned int		job_refcnt;		/* 1 */
> +				/* implicit pad on _LP64 */
> +	kcondvar_t			job_cv;			/* 3 */
> +	threadpool_job_fn_t		job_fn;			/* 1 */
> +							    /* ILP32 / LP64 */
> +	char				job_name[MAXCOMLEN];	/* 4 / 2 */
> +} threadpool_job_impl_t;
> +
> +CTASSERT(sizeof(threadpool_job_impl_t) <= sizeof(threadpool_job_t));
> +#define	THREADPOOL_JOB_TO_IMPL(j)	((threadpool_job_impl_t *)(j))
> +#define	THREADPOOL_IMPL_TO_JOB(j)	((threadpool_job_t *)(j))

Can we just have struct threadpool_job in the header file like I did
originally, without the strict aliasing violations, ABI conditionals,
&c.?  What does this buy us?  We don't have a culture of abusing
abstractions simply because they happen to be written in header files,
so we don't need to put technical measures in the way of such abuse.

> +	/* Idle out threads after 30 seconds */
> +#define	THREADPOOL_IDLE_TICKS	mstohz(30 * 1000)

Maybe this should be a sysctl knob?

> +struct threadpool_unbound {
> +	/* must be first; see threadpool_create() */
> +	struct threadpool		tpu_pool;

Why must this be first?  Unless this is really crucial for some
performance-critical reason, let's try to avoid putting constraints
like this into new code.

> +	/* protected by threadpools_lock */
> +	LIST_ENTRY(threadpool_unbound)	tpu_link;
> +	unsigned int			tpu_refcnt;

We should just make this uint64_t and delete all the code worrying
about reference count failure.  This is always managed under a lock
(except some bugs I left in, below), so no need for 64-bit atomics to
make this work.

Likewise in struct threadpool_percpu.

> +#ifdef THREADPOOL_VERBOSE
> +#define	TP_LOG(x)		printf x
> +#else
> +#define	TP_LOG(x)		/* nothing */
> +#endif /* THREADPOOL_VERBOSE */

Maybe make these dtrace probes?

> +static int
> +threadpool_hold(threadpool_t *pool)
> +{
> +	unsigned int refcnt;
> +
> +	do {
> +		refcnt = pool->tp_refcnt;
> +		if (refcnt == UINT_MAX)
> +			return EBUSY;
> +	} while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt + 1))
> +	    != refcnt);
> +
> +	return 0;
> +}

Atomics here don't hurt but are not necessary because this is always
done when the caller has exclusive access to the pool.  (This was a
mistake in the original, sorry!)  This should just do

	KASSERT(mutex_owned(&pool->tp_lock));
	pool->tp_refcnt++;

which will always succeed with a 64-bit reference count.

+static void
+threadpool_rele(threadpool_t *pool)
+{
+       unsigned int refcnt;
+
+       do {
+               refcnt = pool->tp_refcnt;
+               KASSERT(0 < refcnt);
+               if (refcnt == 1) {
+                       mutex_spin_enter(&pool->tp_lock);
+                       refcnt = atomic_dec_uint_nv(&pool->tp_refcnt);
+                       KASSERT(refcnt != UINT_MAX);
+                       if (refcnt == 0)
+                               cv_broadcast(&pool->tp_overseer.tpt_cv);
+                       mutex_spin_exit(&pool->tp_lock);
+                       return;
+               }
+       } while (atomic_cas_uint(&pool->tp_refcnt, refcnt, (refcnt - 1))
+           != refcnt);
+}

The atomics here don't hurt, but the mutex operations do, because this
is called with the threadpool's lock held.  (Again, mistake in the
original, sorry!)  This should just do

	KASSERT(mutex_owned(&pool->tp_lock));
	KASSERT(0 < pool->tp_refcnt);
	if (--pool->tp_refcnt == 0)
		cv_broadcast(&pool->tp_overseer.tpt_cv);

For this to work, threadpool_overseer_thread and threadpool_thread must
call threadpool_rele while holding the lock.

There's no issue with doing that: they don't use the pool afterward,
and destruction -- including mutex_destroy(&pool->tp_lock) -- can't
proceed until the thread that called threadpool_rele releases the lock
too.

Alternative, requiring more rototilling: The overseer could just join
the worker threads when dying, and threadpool_destroy could just join
the overseer -- then we could skip the reference counting altogether.
Not sure whether this is a net win in bookkeeping, though.

> +int
> +threadpool_get(threadpool_t **poolp, pri_t pri)
> +{
> [...]
> +	if (tmp != NULL)
> +		threadpool_destroy((threadpool_t *)tmp, sizeof(*tpu));

Avoid this cast.  It looks like it would be simpler to just have the
caller allocate the object, and to have threadpool_init/fini routines
that operate on a preallocated structure.

> +void
> +threadpool_percpu_put(threadpool_percpu_t *pool_percpu, pri_t pri)
> +{
> [...]
> +	if (--pool_percpu->tpp_refcnt == 0) {
> +		TP_LOG(("%s: Last reference for pri=%d, destroying pool.\n",
> +			__func__, (int)pri));
> +		threadpool_remove_percpu(pool_percpu);
> +	} else
> +		pool_percpu = NULL;

Not a KNF rule, but I'd prefer it if either every branch is braced or
no branch is braced, rather than a mixture of braced and unbraced
branches in a single `if'.

> +static int
> +threadpool_job_hold(threadpool_job_impl_t *job)
> +{
> +	unsigned int refcnt;
> +	do {

KNF, blank line between declarations and body.

> +		refcnt = job->job_refcnt;
> +		if (refcnt == UINT_MAX)
> +			return EBUSY;
> +	} while (atomic_cas_uint(&job->job_refcnt, refcnt, (refcnt + 1))
> +	    != refcnt);
> +	

Trailing whitespace.

(Consider (setq show-trailing-whitespace t) if you use Emacs!)

> +bool
> +threadpool_cancel_job_async(threadpool_t *pool, threadpool_job_t *ext_job)
> +{
> +	threadpool_job_impl_t *job = THREADPOOL_JOB_TO_IMPL(ext_job);
> +
> +	KASSERT(mutex_owned(job->job_lock));
> +
> +	/*
> +	 * XXXJRT This fails (albeit safely) when all of the following
> +	 * are true:
> +	 *
> +	 *	=> "pool" is something other than what the job was
> +	 *	   scheduled on.  This can legitimately occur if,
> +	 *	   for example, a job is percpu-scheduled on CPU0
> +	 *	   and then CPU1 attempts to cancel it without taking
> +	 *	   a remote pool reference.  (this might happen by
> +	 *	   "luck of the draw").

Under what circumstances can this happen?  This sounds like a bug that
we should KASSERT into oblivion if there's any danger of it.

> +static void __dead
> +threadpool_thread(void *arg)
> +{
> [...]
> +			if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
> +					 THREADPOOL_IDLE_TICKS))

KNF: 4-space continuation lines (8-space here, since two nesting
levels, as in the original), not alignment.

> diff --git a/sys/sys/threadpool.h b/sys/sys/threadpool.h
> new file mode 100644
> index 000000000000..fbfe82f86434
> --- /dev/null
> +++ b/sys/sys/threadpool.h
> @@ -0,0 +1,77 @@
> [...]
> +typedef struct threadpool threadpool_t;
> +typedef struct threadpool_percpu threadpool_percpu_t;

Please, no more _t aliases for struct or struct pointer types!  I left
them out in the original on purpose.  The abstraction isn't necessary
here, no one can remember whether _t means the struct or the pointer,
and the _t aliases don't work with forward declarations of structs and
so add unnecessary depenencies on header files.

(Large cosmetic changes like this also make the diff from the original
substantially noisier and harder to review thoroughly.)

> +typedef void (*threadpool_job_fn_t)(threadpool_job_t *);

Please restore this to the non-pointer function type -- the point of
the way I had it originally is that you can do a forward declaration of
a function as threadpool_job_fn_t, and then define the function, and if
your definition doesn't match the prototype then you get a clear
compiler error identifying the disagreeing lines:

threadpool_job_fn_t my_job_fn;
...
void
my_job_fn(struct threadpool_job *job)
{
	...
}

> diff --git a/tests/kernel/threadpool_tester/threadpool_tester.c b/tests/kernel/threadpool_tester/threadpool_tester.c
> new file mode 100644
> index 000000000000..d6b79eb8f266
> --- /dev/null
> +++ b/tests/kernel/threadpool_tester/threadpool_tester.c
> @@ -0,0 +1,502 @@
> [...]
> +static int
> +threadpool_tester_get_unbound(SYSCTLFN_ARGS)
> +{
> +	struct tester_context *ctx;
> +	threadpool_t *pool, *opool = NULL;
> +	struct sysctlnode node;
> +	int error, val;
> +
> +	node = *rnode;
> +	ctx = node.sysctl_data;
> +
> +	val = -1;
> +	node.sysctl_data = &val;
> +	error = sysctl_lookup(SYSCTLFN_CALL(&node));
> +	if (error || newp == NULL)
> +		return error;
> +	

Trailing whitespace.

> +	if (! pri_is_valid(val))
> +		return EINVAL;
> +	

Trailing whitespace.

> +	error = threadpool_get(&pool, val);
> +	if (error) {
> +		TP_LOG(("%s: threadpool_get(..., %d) failed -> %d\n",
> +		    __func__, val, error));

For TP_LOG((...)), I don't really care whether this takes 4-space or
8-space continuation lines (sorta two nesting levels), but pick one and
be consistent?

> +		return error;
> +	}
> +	

Trailing whitespace.

> +static int
> +threadpool_tester_put_unbound(SYSCTLFN_ARGS)
> +{
> +	struct tester_context *ctx;
> +	threadpool_t *pool;
> +	struct sysctlnode node;
> +	int error, val;
> +
> +	node = *rnode;
> +	ctx = node.sysctl_data;
> +
> +	val = -1;
> +	node.sysctl_data = &val;
> +	error = sysctl_lookup(SYSCTLFN_CALL(&node));
> +	if (error || newp == NULL)
> +		return error;
> +	

Trailing whitespace.

> +	if (! pri_is_valid(val))
> +		return EINVAL;
> +	

Trailing whitespace.

> +static int
> +threadpool_tester_run_unbound(SYSCTLFN_ARGS)
> +{
> +	struct tester_context *ctx;
> +	threadpool_t *pool;
> +	struct sysctlnode node;
> +	int error, val;
> +
> +	node = *rnode;
> +	ctx = node.sysctl_data;
> +
> +	val = -1;
> +	node.sysctl_data = &val;
> +	error = sysctl_lookup(SYSCTLFN_CALL(&node));
> +	if (error || newp == NULL)
> +		return error;
> +	

Trailing whitespace.

> +static int
> +threadpool_tester_get_percpu(SYSCTLFN_ARGS)
> +{
> +	struct tester_context *ctx;
> +	threadpool_percpu_t *pcpu, *opcpu = NULL;
> +	struct sysctlnode node;
> +	int error, val;
> +
> +	node = *rnode;
> +	ctx = node.sysctl_data;
> +
> +	val = -1;
> +	node.sysctl_data = &val;
> +	error = sysctl_lookup(SYSCTLFN_CALL(&node));
> +	if (error || newp == NULL)
> +		return error;
> +	

Trailing whitespace.

> +	if (! pri_is_valid(val))
> +		return EINVAL;
> +	

Trailing whitespace.

> +	error = threadpool_percpu_get(&pcpu, val);
> +	if (error) {
> +		TP_LOG(("%s: threadpool_percpu_get(..., %d) failed -> %d\n",
> +		    __func__, val, error));
> +		return error;
> +	}
> +	

Trailing whitespace.

> +static int
> +threadpool_tester_put_percpu(SYSCTLFN_ARGS)
> +{
> +	struct tester_context *ctx;
> +	threadpool_percpu_t *pcpu;
> +	struct sysctlnode node;
> +	int error, val;
> +
> +	node = *rnode;
> +	ctx = node.sysctl_data;
> +
> +	val = -1;
> +	node.sysctl_data = &val;
> +	error = sysctl_lookup(SYSCTLFN_CALL(&node));
> +	if (error || newp == NULL)
> +		return error;
> +	

Trailing whitespace.

> +	if (! pri_is_valid(val))
> +		return EINVAL;
> +	

Trailing whitespace.

> +static int
> +threadpool_tester_run_percpu(SYSCTLFN_ARGS)
> +{
> +	struct tester_context *ctx;
> +	threadpool_percpu_t *pcpu;
> +	threadpool_t *pool;
> +	struct sysctlnode node;
> +	int error, val;
> +
> +	node = *rnode;
> +	ctx = node.sysctl_data;
> +
> +	val = -1;
> +	node.sysctl_data = &val;
> +	error = sysctl_lookup(SYSCTLFN_CALL(&node));
> +	if (error || newp == NULL)
> +		return error;
> +	

Trailing whitespace.

> +static int
> +threadpool_tester_fini(void)
> +{
> +	pri_t pri;
> +
> +	mutex_enter(&tester_ctx.ctx_mutex);
> +	for (pri = PRI_NONE/*-1*/; pri < PRI_COUNT; pri++) {

This comment can be verified by the compiler!

	CTASSERT(PRI_NONE == -1);

> [...]
> +
> +	sysctl_teardown(&tester_ctx.ctx_sysctllog);
> +	

Trailing whitespace.

> +static int
> +threadpool_tester_modcmd(modcmd_t cmd, void *arg __unused)
> +{
> +	int error;
> +
> +	switch (cmd) {
> +	case MODULE_CMD_INIT:
> +		error = threadpool_tester_init();
> +		break;
> +	

Trailing whitespace.

> +	case MODULE_CMD_FINI:
> +		error = threadpool_tester_fini();
> +		break;
> +	

Trailing whitespace.


Home | Main Index | Thread Index | Old Index