tech-kern archive
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]
[PATCH] Make workqueue(9) use threadpool(9)
The attached patch adapts workqueue(9) to use a threadpool job instead
of a worker kthread in each queue. There are a couple of advantages
to this change:
1. No kthreads for unused workqueues.
A lot of drivers and subsystems create workqueues to handle tasks
in thread context -- often per-CPU workqueues. Even if these
subsystems aren't in use, they currently require kthreads to be
allocated, which may use up a substantial amount of kva.
With this change, kthreads are created only according to demand by
threadpool(9) -- but the workqueues are still guaranteed the same
concurrency as before, as long as kthreads can be created.
2. Workqueues can be created earlier (and are ready for CPU hotplug).
Right now, workqueue_create with WQ_PERCPU cannot be used until
after all CPUs have been detected. I accidentally started doing
this in wg(4) because I've been running with this patch (and the
issue doesn't affect the rumpy atf tests).
For now I'll apply a workaround to wg(4), but it would be nice if
modules could create workqueues before configure (and if, should
anyone make CPU hotplug happen, workqueues were not a barrier to
that).
This change uses percpu_create and threadpool_job_init, rather than
explicit allocation of ncpu-sized arrays and kthread_create. Using
percpu_create means percpu(9) takes care of running initialization
when CPUs are attached, and using threadpool_job_init instead of
kthread_create means we don't have to worry about failure when
initializing each CPU's queue in the percpu_create constructor.
The downside, of course, is that workqueue_create no longer guarantees
preallocation of all the threads needed to run the workqueues --
instead, if there is a shortage of threads dynamically assigned to do
work under load, the threadpool(9) logic will block until they can be
created.
Thoughts?
# HG changeset patch
# User Taylor R Campbell <riastradh%NetBSD.org@localhost>
# Date 1599431734 0
# Sun Sep 06 22:35:34 2020 +0000
# Branch trunk
# Node ID 83b001debedf1ab0bf9ef2ed2074117067f9bfba
# Parent 951ac41912f926701ba072f3886812c99aa795a1
# EXP-Topic riastradh-wg
workqueue: Switch from direct use of kthread(9) to threadpool(9).
Should save a lot of lwps for seldom-used workqueues in the system.
diff -r 951ac41912f9 -r 83b001debedf sys/kern/subr_workqueue.c
--- a/sys/kern/subr_workqueue.c Tue Sep 08 15:57:29 2020 +0000
+++ b/sys/kern/subr_workqueue.c Sun Sep 06 22:35:34 2020 +0000
@@ -30,15 +30,17 @@
__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.39 2020/09/08 17:02:18 riastradh Exp $");
#include <sys/param.h>
+#include <sys/condvar.h>
#include <sys/cpu.h>
-#include <sys/systm.h>
+#include <sys/kmem.h>
#include <sys/kthread.h>
-#include <sys/kmem.h>
+#include <sys/mutex.h>
+#include <sys/percpu.h>
#include <sys/proc.h>
+#include <sys/queue.h>
+#include <sys/systm.h>
+#include <sys/threadpool.h>
#include <sys/workqueue.h>
-#include <sys/mutex.h>
-#include <sys/condvar.h>
-#include <sys/queue.h>
typedef struct work_impl {
SIMPLEQ_ENTRY(work_impl) wk_entry;
@@ -52,6 +54,8 @@ struct workqueue_queue {
struct workqhead q_queue_pending;
struct workqhead q_queue_running;
lwp_t *q_worker;
+ struct threadpool_job q_job;
+ struct workqueue *q_wq;
};
struct workqueue {
@@ -61,45 +65,23 @@ struct workqueue {
char wq_name[MAXCOMLEN];
pri_t wq_prio;
- void *wq_ptr;
+ int wq_ipl;
+ union {
+ struct threadpool_percpu *wq_tp_percpu;
+ struct threadpool *wq_tp_global;
+ };
+ union {
+ struct percpu *wq_percpu; /* struct workqueue_queue * */
+ struct workqueue_queue *wq_global;
+ };
};
-#define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit))
-#define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit))
-
-#define POISON 0xaabbccdd
-
-static size_t
-workqueue_size(int flags)
-{
-
- return WQ_SIZE
- + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE
- + coherency_unit;
-}
-
-static struct workqueue_queue *
-workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci)
-{
- u_int idx = 0;
-
- if (wq->wq_flags & WQ_PERCPU) {
- idx = ci ? cpu_index(ci) : cpu_index(curcpu());
- }
-
- return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE));
-}
-
static void
workqueue_runlist(struct workqueue *wq, struct workqhead *list)
{
work_impl_t *wk;
work_impl_t *next;
- /*
- * note that "list" is not a complete SIMPLEQ.
- */
-
for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
next = SIMPLEQ_NEXT(wk, wk_entry);
(*wq->wq_func)((void *)wk, wq->wq_arg);
@@ -107,41 +89,34 @@ workqueue_runlist(struct workqueue *wq,
}
static void
-workqueue_worker(void *cookie)
+workqueue_worker(struct threadpool_job *job)
{
- struct workqueue *wq = cookie;
- struct workqueue_queue *q;
- int s;
+ struct workqueue_queue *q = container_of(job, struct workqueue_queue,
+ q_job);
+ struct workqueue *wq = q->q_wq;
+ int s, fpu = wq->wq_flags & WQ_FPU;
- /* find the workqueue of this kthread */
- q = workqueue_queue_lookup(wq, curlwp->l_cpu);
-
- if (wq->wq_flags & WQ_FPU)
+ if (fpu)
s = kthread_fpu_enter();
- for (;;) {
- /*
- * we violate abstraction of SIMPLEQ.
- */
-
- mutex_enter(&q->q_mutex);
- while (SIMPLEQ_EMPTY(&q->q_queue_pending))
- cv_wait(&q->q_cv, &q->q_mutex);
- KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
- q->q_queue_running.sqh_first =
- q->q_queue_pending.sqh_first; /* XXX */
- SIMPLEQ_INIT(&q->q_queue_pending);
+ mutex_enter(&q->q_mutex);
+ KASSERT(q->q_worker == NULL);
+ q->q_worker = curlwp;
+ KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
+ while (!SIMPLEQ_EMPTY(&q->q_queue_pending)) {
+ SIMPLEQ_CONCAT(&q->q_queue_running, &q->q_queue_pending);
mutex_exit(&q->q_mutex);
-
workqueue_runlist(wq, &q->q_queue_running);
-
mutex_enter(&q->q_mutex);
KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running));
SIMPLEQ_INIT(&q->q_queue_running);
/* Wake up workqueue_wait */
cv_broadcast(&q->q_cv);
- mutex_exit(&q->q_mutex);
}
- if (wq->wq_flags & WQ_FPU)
+ KASSERT(q->q_worker == curlwp);
+ q->q_worker = NULL;
+ threadpool_job_done(job);
+ mutex_exit(&q->q_mutex);
+ if (fpu)
kthread_fpu_exit(s);
}
@@ -155,83 +130,77 @@ workqueue_init(struct workqueue *wq, con
strncpy(wq->wq_name, name, sizeof(wq->wq_name));
wq->wq_prio = prio;
+ wq->wq_ipl = ipl;
wq->wq_func = callback_func;
wq->wq_arg = callback_arg;
}
-static int
-workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
- int ipl, struct cpu_info *ci)
+static void
+workqueue_initqueue(struct workqueue *wq, struct workqueue_queue **qp,
+ struct cpu_info *ci)
{
- int error, ktf;
+ struct workqueue_queue *q;
- KASSERT(q->q_worker == NULL);
+ *qp = q = kmem_zalloc(sizeof(*q), KM_SLEEP);
- mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
+ mutex_init(&q->q_mutex, MUTEX_DEFAULT, wq->wq_ipl);
cv_init(&q->q_cv, wq->wq_name);
SIMPLEQ_INIT(&q->q_queue_pending);
SIMPLEQ_INIT(&q->q_queue_running);
- ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
- if (wq->wq_prio < PRI_KERNEL)
- ktf |= KTHREAD_TS;
- if (ci) {
- error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
- wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index);
+ if (wq->wq_flags & WQ_PERCPU) {
+ threadpool_job_init(&q->q_job, workqueue_worker, &q->q_mutex,
+ "%s/%d", wq->wq_name, cpu_index(ci));
} else {
- error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
- wq, &q->q_worker, "%s", wq->wq_name);
- }
- if (error != 0) {
- mutex_destroy(&q->q_mutex);
- cv_destroy(&q->q_cv);
- KASSERT(q->q_worker == NULL);
+ threadpool_job_init(&q->q_job, workqueue_worker, &q->q_mutex,
+ "%s", wq->wq_name);
}
- return error;
-}
-
-struct workqueue_exitargs {
- work_impl_t wqe_wk;
- struct workqueue_queue *wqe_q;
-};
-
-static void
-workqueue_exit(struct work *wk, void *arg)
-{
- struct workqueue_exitargs *wqe = (void *)wk;
- struct workqueue_queue *q = wqe->wqe_q;
-
- /*
- * only competition at this point is workqueue_finiqueue.
- */
-
- KASSERT(q->q_worker == curlwp);
- KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
- mutex_enter(&q->q_mutex);
- q->q_worker = NULL;
- cv_broadcast(&q->q_cv);
- mutex_exit(&q->q_mutex);
- kthread_exit(0);
+ q->q_wq = wq;
}
static void
-workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
+workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue **qp,
+ struct cpu_info *ci)
{
- struct workqueue_exitargs wqe;
+ struct workqueue_queue *q = *qp;
+ struct threadpool *tp;
- KASSERT(wq->wq_func == workqueue_exit);
+ mutex_enter(&q->q_mutex);
+ if (wq->wq_flags & WQ_PERCPU) {
+ tp = threadpool_percpu_ref_remote(wq->wq_tp_percpu, ci);
+ } else {
+ KASSERT(ci == NULL);
+ tp = wq->wq_tp_global;
+ }
+ threadpool_cancel_job(tp, &q->q_job);
+ mutex_exit(&q->q_mutex);
- wqe.wqe_q = q;
+ KASSERT(q->q_wq == wq);
+ threadpool_job_destroy(&q->q_job);
+ KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
- KASSERT(q->q_worker != NULL);
- mutex_enter(&q->q_mutex);
- SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
- cv_broadcast(&q->q_cv);
- while (q->q_worker != NULL) {
- cv_wait(&q->q_cv, &q->q_mutex);
- }
- mutex_exit(&q->q_mutex);
+ cv_destroy(&q->q_cv);
mutex_destroy(&q->q_mutex);
- cv_destroy(&q->q_cv);
+
+ kmem_free(q, sizeof(*q));
+ *qp = NULL;
+}
+
+static void
+workqueue_init_cpu(void *vqp, void *vwq, struct cpu_info *ci)
+{
+ struct workqueue_queue **qp = vqp;
+ struct workqueue *wq = vwq;
+
+ workqueue_initqueue(wq, qp, ci);
+}
+
+static void
+workqueue_fini_cpu(void *vqp, void *vwq, struct cpu_info *ci)
+{
+ struct workqueue_queue **qp = vqp;
+ struct workqueue *wq = vwq;
+
+ workqueue_finiqueue(wq, qp, ci);
}
/* --- */
@@ -242,43 +211,32 @@ workqueue_create(struct workqueue **wqp,
pri_t prio, int ipl, int flags)
{
struct workqueue *wq;
- struct workqueue_queue *q;
- void *ptr;
- int error = 0;
+ int error;
CTASSERT(sizeof(work_impl_t) <= sizeof(struct work));
- ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP);
- wq = (void *)roundup2((uintptr_t)ptr, coherency_unit);
- wq->wq_ptr = ptr;
+ wq = kmem_zalloc(sizeof(*wq), KM_SLEEP);
wq->wq_flags = flags;
workqueue_init(wq, name, callback_func, callback_arg, prio, ipl);
if (flags & WQ_PERCPU) {
- struct cpu_info *ci;
- CPU_INFO_ITERATOR cii;
-
- /* create the work-queue for each CPU */
- for (CPU_INFO_FOREACH(cii, ci)) {
- q = workqueue_queue_lookup(wq, ci);
- error = workqueue_initqueue(wq, q, ipl, ci);
- if (error) {
- break;
- }
- }
+ error = threadpool_percpu_get(&wq->wq_tp_percpu, prio);
+ if (error)
+ goto fail;
+ wq->wq_percpu = percpu_create(sizeof(struct workqueue_queue *),
+ workqueue_init_cpu, workqueue_fini_cpu, wq);
} else {
- /* initialize a work-queue */
- q = workqueue_queue_lookup(wq, NULL);
- error = workqueue_initqueue(wq, q, ipl, NULL);
+ error = threadpool_get(&wq->wq_tp_global, prio);
+ if (error)
+ goto fail;
+ workqueue_initqueue(wq, &wq->wq_global, NULL);
}
- if (error != 0) {
- workqueue_destroy(wq);
- } else {
- *wqp = wq;
- }
+ *wqp = wq;
+ return 0;
+fail: kmem_free(wq, sizeof(*wq));
return error;
}
@@ -321,39 +279,45 @@ workqueue_q_wait(struct workqueue_queue
void
workqueue_wait(struct workqueue *wq, struct work *wk)
{
- struct workqueue_queue *q;
+ struct workqueue_queue *q, **qp;
bool found;
if (ISSET(wq->wq_flags, WQ_PERCPU)) {
struct cpu_info *ci;
CPU_INFO_ITERATOR cii;
+
for (CPU_INFO_FOREACH(cii, ci)) {
- q = workqueue_queue_lookup(wq, ci);
+ /*
+ * Bind to the CPU _and_ prevent percpu-swap
+ * xcalls from completing. This is safe as
+ * long as we don't sleep.
+ */
+ kpreempt_disable();
+ qp = percpu_getptr_remote(wq->wq_percpu, ci);
+ q = *qp;
+ kpreempt_enable();
found = workqueue_q_wait(q, (work_impl_t *)wk);
if (found)
break;
}
} else {
- q = workqueue_queue_lookup(wq, NULL);
- (void) workqueue_q_wait(q, (work_impl_t *)wk);
+ (void) workqueue_q_wait(wq->wq_global, (work_impl_t *)wk);
}
}
void
workqueue_destroy(struct workqueue *wq)
{
- struct workqueue_queue *q;
- struct cpu_info *ci;
- CPU_INFO_ITERATOR cii;
- wq->wq_func = workqueue_exit;
- for (CPU_INFO_FOREACH(cii, ci)) {
- q = workqueue_queue_lookup(wq, ci);
- if (q->q_worker != NULL) {
- workqueue_finiqueue(wq, q);
- }
+ if (wq->wq_flags & WQ_PERCPU) {
+ percpu_free(wq->wq_percpu, sizeof(struct workqueue_queue *));
+ threadpool_percpu_put(wq->wq_tp_percpu, wq->wq_prio);
+ } else {
+ workqueue_finiqueue(wq, &wq->wq_global, NULL);
+ threadpool_put(wq->wq_tp_global, wq->wq_prio);
}
- kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags));
+
+ kmem_free(wq, sizeof(*wq));
}
#ifdef DEBUG
@@ -372,17 +336,35 @@ workqueue_check_duplication(struct workq
void
workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
{
- struct workqueue_queue *q;
+ struct workqueue_queue *q, **qp;
+ struct threadpool *tp;
work_impl_t *wk = (void *)wk0;
KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
- q = workqueue_queue_lookup(wq, ci);
+
+ if (wq->wq_flags & WQ_PERCPU) {
+ /*
+ * Bind to the CPU _and_ block percpu-swap xcalls from
+ * completing. This is safe as long as we don't sleep.
+ */
+ kpreempt_disable();
+ if (ci == NULL)
+ ci = curcpu();
+ qp = percpu_getptr_remote(wq->wq_percpu, ci);
+ q = *qp;
+ tp = threadpool_percpu_ref_remote(wq->wq_tp_percpu, ci);
+ kpreempt_enable();
+ } else {
+ KASSERT(ci == NULL);
+ q = wq->wq_global;
+ tp = wq->wq_tp_global;
+ }
mutex_enter(&q->q_mutex);
#ifdef DEBUG
workqueue_check_duplication(q, wk);
#endif
SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
- cv_broadcast(&q->q_cv);
+ threadpool_schedule_job(tp, &q->q_job);
mutex_exit(&q->q_mutex);
}
Home |
Main Index |
Thread Index |
Old Index