tech-kern archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

[PATCH] Make workqueue(9) use threadpool(9)



The attached patch adapts workqueue(9) to use a threadpool job instead
of a worker kthread in each queue.  There are a couple of advantages
to this change:

1. No kthreads for unused workqueues.

   A lot of drivers and subsystems create workqueues to handle tasks
   in thread context -- often per-CPU workqueues.  Even if these
   subsystems aren't in use, they currently require kthreads to be
   allocated, which may use up a substantial amount of kva.

   With this change, kthreads are created only according to demand by
   threadpool(9) -- but the workqueues are still guaranteed the same
   concurrency as before, as long as kthreads can be created.

2. Workqueues can be created earlier (and are ready for CPU hotplug).

   Right now, workqueue_create with WQ_PERCPU cannot be used until
   after all CPUs have been detected.  I accidentally started doing
   this in wg(4) because I've been running with this patch (and the
   issue doesn't affect the rumpy atf tests).

   For now I'll apply a workaround to wg(4), but it would be nice if
   modules could create workqueues before configure (and if, should
   anyone make CPU hotplug happen, workqueues were not a barrier to
   that).

   This change uses percpu_create and threadpool_job_init, rather than
   explicit allocation of ncpu-sized arrays and kthread_create.  Using
   percpu_create means percpu(9) takes care of running initialization
   when CPUs are attached, and using threadpool_job_init instead of
   kthread_create means we don't have to worry about failure when
   initializing each CPU's queue in the percpu_create constructor.

The downside, of course, is that workqueue_create no longer guarantees
preallocation of all the threads needed to run the workqueues --
instead, if there is a shortage of threads dynamically assigned to do
work under load, the threadpool(9) logic will block until they can be
created.

Thoughts?
# HG changeset patch
# User Taylor R Campbell <riastradh%NetBSD.org@localhost>
# Date 1599431734 0
#      Sun Sep 06 22:35:34 2020 +0000
# Branch trunk
# Node ID 83b001debedf1ab0bf9ef2ed2074117067f9bfba
# Parent  951ac41912f926701ba072f3886812c99aa795a1
# EXP-Topic riastradh-wg
workqueue: Switch from direct use of kthread(9) to threadpool(9).

Should save a lot of lwps for seldom-used workqueues in the system.

diff -r 951ac41912f9 -r 83b001debedf sys/kern/subr_workqueue.c
--- a/sys/kern/subr_workqueue.c	Tue Sep 08 15:57:29 2020 +0000
+++ b/sys/kern/subr_workqueue.c	Sun Sep 06 22:35:34 2020 +0000
@@ -30,15 +30,17 @@
 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.39 2020/09/08 17:02:18 riastradh Exp $");
 
 #include <sys/param.h>
+#include <sys/condvar.h>
 #include <sys/cpu.h>
-#include <sys/systm.h>
+#include <sys/kmem.h>
 #include <sys/kthread.h>
-#include <sys/kmem.h>
+#include <sys/mutex.h>
+#include <sys/percpu.h>
 #include <sys/proc.h>
+#include <sys/queue.h>
+#include <sys/systm.h>
+#include <sys/threadpool.h>
 #include <sys/workqueue.h>
-#include <sys/mutex.h>
-#include <sys/condvar.h>
-#include <sys/queue.h>
 
 typedef struct work_impl {
 	SIMPLEQ_ENTRY(work_impl) wk_entry;
@@ -52,6 +54,8 @@ struct workqueue_queue {
 	struct workqhead q_queue_pending;
 	struct workqhead q_queue_running;
 	lwp_t *q_worker;
+	struct threadpool_job q_job;
+	struct workqueue *q_wq;
 };
 
 struct workqueue {
@@ -61,45 +65,23 @@ struct workqueue {
 
 	char wq_name[MAXCOMLEN];
 	pri_t wq_prio;
-	void *wq_ptr;
+	int wq_ipl;
+	union {
+		struct threadpool_percpu *wq_tp_percpu;
+		struct threadpool *wq_tp_global;
+	};
+	union {
+		struct percpu *wq_percpu;	/* struct workqueue_queue * */
+		struct workqueue_queue *wq_global;
+	};
 };
 
-#define	WQ_SIZE		(roundup2(sizeof(struct workqueue), coherency_unit))
-#define	WQ_QUEUE_SIZE	(roundup2(sizeof(struct workqueue_queue), coherency_unit))
-
-#define	POISON	0xaabbccdd
-
-static size_t
-workqueue_size(int flags)
-{
-
-	return WQ_SIZE
-	    + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE
-	    + coherency_unit;
-}
-
-static struct workqueue_queue *
-workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci)
-{
-	u_int idx = 0;
-
-	if (wq->wq_flags & WQ_PERCPU) {
-		idx = ci ? cpu_index(ci) : cpu_index(curcpu());
-	}
-
-	return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE));
-}
-
 static void
 workqueue_runlist(struct workqueue *wq, struct workqhead *list)
 {
 	work_impl_t *wk;
 	work_impl_t *next;
 
-	/*
-	 * note that "list" is not a complete SIMPLEQ.
-	 */
-
 	for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
 		next = SIMPLEQ_NEXT(wk, wk_entry);
 		(*wq->wq_func)((void *)wk, wq->wq_arg);
@@ -107,41 +89,34 @@ workqueue_runlist(struct workqueue *wq, 
 }
 
 static void
-workqueue_worker(void *cookie)
+workqueue_worker(struct threadpool_job *job)
 {
-	struct workqueue *wq = cookie;
-	struct workqueue_queue *q;
-	int s;
+	struct workqueue_queue *q = container_of(job, struct workqueue_queue,
+	    q_job);
+	struct workqueue *wq = q->q_wq;
+	int s, fpu = wq->wq_flags & WQ_FPU;
 
-	/* find the workqueue of this kthread */
-	q = workqueue_queue_lookup(wq, curlwp->l_cpu);
-
-	if (wq->wq_flags & WQ_FPU)
+	if (fpu)
 		s = kthread_fpu_enter();
-	for (;;) {
-		/*
-		 * we violate abstraction of SIMPLEQ.
-		 */
-
-		mutex_enter(&q->q_mutex);
-		while (SIMPLEQ_EMPTY(&q->q_queue_pending))
-			cv_wait(&q->q_cv, &q->q_mutex);
-		KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
-		q->q_queue_running.sqh_first =
-		    q->q_queue_pending.sqh_first; /* XXX */
-		SIMPLEQ_INIT(&q->q_queue_pending);
+	mutex_enter(&q->q_mutex);
+	KASSERT(q->q_worker == NULL);
+	q->q_worker = curlwp;
+	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
+	while (!SIMPLEQ_EMPTY(&q->q_queue_pending)) {
+		SIMPLEQ_CONCAT(&q->q_queue_running, &q->q_queue_pending);
 		mutex_exit(&q->q_mutex);
-
 		workqueue_runlist(wq, &q->q_queue_running);
-
 		mutex_enter(&q->q_mutex);
 		KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running));
 		SIMPLEQ_INIT(&q->q_queue_running);
 		/* Wake up workqueue_wait */
 		cv_broadcast(&q->q_cv);
-		mutex_exit(&q->q_mutex);
 	}
-	if (wq->wq_flags & WQ_FPU)
+	KASSERT(q->q_worker == curlwp);
+	q->q_worker = NULL;
+	threadpool_job_done(job);
+	mutex_exit(&q->q_mutex);
+	if (fpu)
 		kthread_fpu_exit(s);
 }
 
@@ -155,83 +130,77 @@ workqueue_init(struct workqueue *wq, con
 	strncpy(wq->wq_name, name, sizeof(wq->wq_name));
 
 	wq->wq_prio = prio;
+	wq->wq_ipl = ipl;
 	wq->wq_func = callback_func;
 	wq->wq_arg = callback_arg;
 }
 
-static int
-workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
-    int ipl, struct cpu_info *ci)
+static void
+workqueue_initqueue(struct workqueue *wq, struct workqueue_queue **qp,
+    struct cpu_info *ci)
 {
-	int error, ktf;
+	struct workqueue_queue *q;
 
-	KASSERT(q->q_worker == NULL);
+	*qp = q = kmem_zalloc(sizeof(*q), KM_SLEEP);
 
-	mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
+	mutex_init(&q->q_mutex, MUTEX_DEFAULT, wq->wq_ipl);
 	cv_init(&q->q_cv, wq->wq_name);
 	SIMPLEQ_INIT(&q->q_queue_pending);
 	SIMPLEQ_INIT(&q->q_queue_running);
-	ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
-	if (wq->wq_prio < PRI_KERNEL)
-		ktf |= KTHREAD_TS;
-	if (ci) {
-		error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
-		    wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index);
+	if (wq->wq_flags & WQ_PERCPU) {
+		threadpool_job_init(&q->q_job, workqueue_worker, &q->q_mutex,
+		    "%s/%d", wq->wq_name, cpu_index(ci));
 	} else {
-		error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
-		    wq, &q->q_worker, "%s", wq->wq_name);
-	}
-	if (error != 0) {
-		mutex_destroy(&q->q_mutex);
-		cv_destroy(&q->q_cv);
-		KASSERT(q->q_worker == NULL);
+		threadpool_job_init(&q->q_job, workqueue_worker, &q->q_mutex,
+		    "%s", wq->wq_name);
 	}
-	return error;
-}
-
-struct workqueue_exitargs {
-	work_impl_t wqe_wk;
-	struct workqueue_queue *wqe_q;
-};
-
-static void
-workqueue_exit(struct work *wk, void *arg)
-{
-	struct workqueue_exitargs *wqe = (void *)wk;
-	struct workqueue_queue *q = wqe->wqe_q;
-
-	/*
-	 * only competition at this point is workqueue_finiqueue.
-	 */
-
-	KASSERT(q->q_worker == curlwp);
-	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
-	mutex_enter(&q->q_mutex);
-	q->q_worker = NULL;
-	cv_broadcast(&q->q_cv);
-	mutex_exit(&q->q_mutex);
-	kthread_exit(0);
+	q->q_wq = wq;
 }
 
 static void
-workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
+workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue **qp,
+    struct cpu_info *ci)
 {
-	struct workqueue_exitargs wqe;
+	struct workqueue_queue *q = *qp;
+	struct threadpool *tp;
 
-	KASSERT(wq->wq_func == workqueue_exit);
+	mutex_enter(&q->q_mutex);
+	if (wq->wq_flags & WQ_PERCPU) {
+		tp = threadpool_percpu_ref_remote(wq->wq_tp_percpu, ci);
+	} else {
+		KASSERT(ci == NULL);
+		tp = wq->wq_tp_global;
+	}
+	threadpool_cancel_job(tp, &q->q_job);
+	mutex_exit(&q->q_mutex);
 
-	wqe.wqe_q = q;
+	KASSERT(q->q_wq == wq);
+	threadpool_job_destroy(&q->q_job);
+	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
 	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
-	KASSERT(q->q_worker != NULL);
-	mutex_enter(&q->q_mutex);
-	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
-	cv_broadcast(&q->q_cv);
-	while (q->q_worker != NULL) {
-		cv_wait(&q->q_cv, &q->q_mutex);
-	}
-	mutex_exit(&q->q_mutex);
+	cv_destroy(&q->q_cv);
 	mutex_destroy(&q->q_mutex);
-	cv_destroy(&q->q_cv);
+
+	kmem_free(q, sizeof(*q));
+	*qp = NULL;
+}
+
+static void
+workqueue_init_cpu(void *vqp, void *vwq, struct cpu_info *ci)
+{
+	struct workqueue_queue **qp = vqp;
+	struct workqueue *wq = vwq;
+
+	workqueue_initqueue(wq, qp, ci);
+}
+
+static void
+workqueue_fini_cpu(void *vqp, void *vwq, struct cpu_info *ci)
+{
+	struct workqueue_queue **qp = vqp;
+	struct workqueue *wq = vwq;
+
+	workqueue_finiqueue(wq, qp, ci);
 }
 
 /* --- */
@@ -242,43 +211,32 @@ workqueue_create(struct workqueue **wqp,
     pri_t prio, int ipl, int flags)
 {
 	struct workqueue *wq;
-	struct workqueue_queue *q;
-	void *ptr;
-	int error = 0;
+	int error;
 
 	CTASSERT(sizeof(work_impl_t) <= sizeof(struct work));
 
-	ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP);
-	wq = (void *)roundup2((uintptr_t)ptr, coherency_unit);
-	wq->wq_ptr = ptr;
+	wq = kmem_zalloc(sizeof(*wq), KM_SLEEP);
 	wq->wq_flags = flags;
 
 	workqueue_init(wq, name, callback_func, callback_arg, prio, ipl);
 
 	if (flags & WQ_PERCPU) {
-		struct cpu_info *ci;
-		CPU_INFO_ITERATOR cii;
-
-		/* create the work-queue for each CPU */
-		for (CPU_INFO_FOREACH(cii, ci)) {
-			q = workqueue_queue_lookup(wq, ci);
-			error = workqueue_initqueue(wq, q, ipl, ci);
-			if (error) {
-				break;
-			}
-		}
+		error = threadpool_percpu_get(&wq->wq_tp_percpu, prio);
+		if (error)
+			goto fail;
+		wq->wq_percpu = percpu_create(sizeof(struct workqueue_queue *),
+		    workqueue_init_cpu, workqueue_fini_cpu, wq);
 	} else {
-		/* initialize a work-queue */
-		q = workqueue_queue_lookup(wq, NULL);
-		error = workqueue_initqueue(wq, q, ipl, NULL);
+		error = threadpool_get(&wq->wq_tp_global, prio);
+		if (error)
+			goto fail;
+		workqueue_initqueue(wq, &wq->wq_global, NULL);
 	}
 
-	if (error != 0) {
-		workqueue_destroy(wq);
-	} else {
-		*wqp = wq;
-	}
+	*wqp = wq;
+	return 0;
 
+fail:	kmem_free(wq, sizeof(*wq));
 	return error;
 }
 
@@ -321,39 +279,45 @@ workqueue_q_wait(struct workqueue_queue 
 void
 workqueue_wait(struct workqueue *wq, struct work *wk)
 {
-	struct workqueue_queue *q;
+	struct workqueue_queue *q, **qp;
 	bool found;
 
 	if (ISSET(wq->wq_flags, WQ_PERCPU)) {
 		struct cpu_info *ci;
 		CPU_INFO_ITERATOR cii;
+
 		for (CPU_INFO_FOREACH(cii, ci)) {
-			q = workqueue_queue_lookup(wq, ci);
+			/*
+			 * Bind to the CPU _and_ prevent percpu-swap
+			 * xcalls from completing.  This is safe as
+			 * long as we don't sleep.
+			 */
+			kpreempt_disable();
+			qp = percpu_getptr_remote(wq->wq_percpu, ci);
+			q = *qp;
+			kpreempt_enable();
 			found = workqueue_q_wait(q, (work_impl_t *)wk);
 			if (found)
 				break;
 		}
 	} else {
-		q = workqueue_queue_lookup(wq, NULL);
-		(void) workqueue_q_wait(q, (work_impl_t *)wk);
+		(void) workqueue_q_wait(wq->wq_global, (work_impl_t *)wk);
 	}
 }
 
 void
 workqueue_destroy(struct workqueue *wq)
 {
-	struct workqueue_queue *q;
-	struct cpu_info *ci;
-	CPU_INFO_ITERATOR cii;
 
-	wq->wq_func = workqueue_exit;
-	for (CPU_INFO_FOREACH(cii, ci)) {
-		q = workqueue_queue_lookup(wq, ci);
-		if (q->q_worker != NULL) {
-			workqueue_finiqueue(wq, q);
-		}
+	if (wq->wq_flags & WQ_PERCPU) {
+		percpu_free(wq->wq_percpu, sizeof(struct workqueue_queue *));
+		threadpool_percpu_put(wq->wq_tp_percpu, wq->wq_prio);
+	} else {
+		workqueue_finiqueue(wq, &wq->wq_global, NULL);
+		threadpool_put(wq->wq_tp_global, wq->wq_prio);
 	}
-	kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags));
+
+	kmem_free(wq, sizeof(*wq));
 }
 
 #ifdef DEBUG
@@ -372,17 +336,35 @@ workqueue_check_duplication(struct workq
 void
 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
 {
-	struct workqueue_queue *q;
+	struct workqueue_queue *q, **qp;
+	struct threadpool *tp;
 	work_impl_t *wk = (void *)wk0;
 
 	KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
-	q = workqueue_queue_lookup(wq, ci);
+
+	if (wq->wq_flags & WQ_PERCPU) {
+		/*
+		 * Bind to the CPU _and_ block percpu-swap xcalls from
+		 * completing.  This is safe as long as we don't sleep.
+		 */
+		kpreempt_disable();
+		if (ci == NULL)
+			ci = curcpu();
+		qp = percpu_getptr_remote(wq->wq_percpu, ci);
+		q = *qp;
+		tp = threadpool_percpu_ref_remote(wq->wq_tp_percpu, ci);
+		kpreempt_enable();
+	} else {
+		KASSERT(ci == NULL);
+		q = wq->wq_global;
+		tp = wq->wq_tp_global;
+	}
 
 	mutex_enter(&q->q_mutex);
 #ifdef DEBUG
 	workqueue_check_duplication(q, wk);
 #endif
 	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
-	cv_broadcast(&q->q_cv);
+	threadpool_schedule_job(tp, &q->q_job);
 	mutex_exit(&q->q_mutex);
 }


Home | Main Index | Thread Index | Old Index