tech-kern archive
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]
[PATCH] workqueue(9): Allow requeueing
A common frustration with the workqueue(9) API is that you can't
requeue the same work until the work function has started to run, so
every driver using it for, e.g., a low-priority action triggered by an
interrupt has to do something like:
/* interrupt routine */
mutex_enter(&sc->sc_lock);
if (!sc->sc_pending) {
workqueue_enqueue(sc->sc_wq, &sc->sc_work, NULL);
sc->sc_pending = true;
}
mutex_exit(&sc->sc_lock);
/* work function */
mutex_enter(&sc->sc_lock);
sc->sc_pending = false;
mutex_exit(&sc->sc_lock);
... do stuff requiring thread context ...
The attached patch adds a new WQ_CONDQUEUE flag that makes this
unnecessary: with WQ_CONDQUEUE, the interrupt routine can now call
workqueue_enqueue even if the work is already pending, and the work
function need not acknowledge it. This requires struct work to be
zero-initialized, and makes workqueue_enqueue a little costlier, and
usage has some pitfalls, which is why it is opt-in with a new flag.
[BIKESHED ALERT: I put about 30sec of thought into the name
WQ_CONDQUEUE, for `conditional queueing supported'. Feel free to
suggest another paint!]
The tricky part is guaranteeing that _either_ the caller's preceding
memory operations happen-before a single subsequent call to an
already-scheduled work function, _or_, if it's too late to affect that
call, the work function will be scheduled to be called a second time.
What does this mean? Let's illustrate with an example:
/* initially */
foo->x = 0;
foo->y = 0;
/* thread A */
foo->x = 1;
workqueue_enqueue(wq, &foo->work, NULL);
/* thread B */
foo->y = 1;
workqueue_enqueue(wq, &foo->work, NULL);
/* work function */
... foo->x ... foo->y ...
The work function might be called once, or it might be called twice.
And it might observe any of various possible assignments of x and y
each time -- some of which might be confusing, but workqueue(9)
guarantees that it rules out some bad outcomes.
Here's how it might play out:
1. called once, observes x = 1, y = 1
2. called twice, observes x = 1, y = 1 both times
3. called twice; observes first x = 1, y = 0, then x = 1, y = 1
4. called twice; observes first x = 0, y = 1, then x = 1, y = 1
(Note that case (2) might be confusing for some drivers: if the work
function clears both x and y, then it might run a second time even
though both x and y have been cleared, leading it to think there's a
spurious workqueue call! This is unavoidable without an additional
lock around the assignments to foo->x/y and workqueue_enqueue like in
the original code.)
But here's how it is guaranteed NOT to play out:
5. called once; observes x = 1, y = 0
6. called once; observes x = 0, y = 1
In other words, the code quoted above guarantees that the work
function will promptly run when it can observe x = 1, and will
promptly run when it can observe y = 1. It might run _twice_ even if
it has already observed both x = 1 and y = 1, but it won't _fail_ to
run promptly observing x = 1 or y = 1.
How does the patch work?
1. The patch changes the internal queue data structure so it is
terminated by a nonnull sentinel; this way we can use NULL to
indicate that the work is _not on a queue at all_, at the cost of
requiring the caller to zero-initialize the object. This does add
a tiny cost to workqueues without WQ_CONDQUEUE -- as do the
conditionals for WQ_CONDQUEUE -- because compare-to-null is usually
a smidge cheaper than compare-to-nonnull-constant, but that's
pretty minor.
(OK, this might not work on a DS9k where the integer value of a
null pointer is actually 0xdeadbeeffeedface. But the NetBSD/ds9k
port is somewhere between NetBSD/eniac and NetBSD/pdp10 in our
priorities, and this is far from the first case where we rely on
null pointers being all-bits-zero.)
2. A tricky atomic and membar protocol between workqueue_enqueue and
workqueue_runlist, involving the mind-boggling idiom
atomic_cas_ptr(p, v, v) to conditionally store a value only if it
is already there (!), ensures the necessary memory ordering.
The attached samecas.txt shows the protocol condensed, and
samecas.litmus is a formal model of the protocol in aarch64
assembly for verifying, with herd7, that the bad cases above can't
happen, and if you remove the `CAS W3,W3,[X0]; CBZ W3,L08' part
then they can[*]. Haven't put herd7 into pkgsrc (yet) but you can
try it on the web at <https://diy.inria.fr/www/>.
(The cpu0 register X6 corresponds to `resched', cpu0 register X7
corresponds to `retry', and cpu1 register X6 corresponds to `y'.
So 0:X6=0 means it doesn't reschedule, 0:X7=0 means it doesn't
retry the CAS loop, and 1:X6=0 means the work function doesn't
observe the workqueue_enqueue caller's memory operations; herd7
tries to find a counterexample in the form of possible program
traces where these conditions all happen.)
Thoughts?
P.S. I have only proven this patch correct; I have not tested it.
[*] Curiously, if you change CAS W3,W3,[X0] to LDR W3,[X0] rather
than deleting it altogether then herd7 still can't find
counterexamples. But the proof of correctness that I sketched in
the comments fundamentally relies on having a store operation to
advance from one version of an object to the next version (even if
it has the same value!), and makes no sense with a load operation.
It's possible I'm missing something here or holding it wrong!
diff -r 2489653fbd32 share/man/man9/workqueue.9
--- a/share/man/man9/workqueue.9 Fri May 22 06:15:01 2026 +0000
+++ b/share/man/man9/workqueue.9 Sat May 30 20:56:59 2026 +0000
@@ -94,6 +94,15 @@ otherwise the kernel lock will be held w
.It Dv WQ_PERCPU
Specifies that the workqueue should have a separate queue for each CPU,
thus the work could be enqueued on concrete CPUs.
+.It Dv WQ_CONDQUEUE
+If set, then a given work item may be safely passed repeatedly to
+.Fn workqueue_enqueue
+before it is processed.
+Work items must be zero-initialized, and
+.Fn workqueue_enqueue
+may be slightly costlier, if
+.Dv WQ_CONDQUEUE
+is set.
.El
.El
.Pp
@@ -124,7 +133,9 @@ The enqueued work will be processed in a
A work must not be enqueued again until the callback is called by
the
.Nm
-framework.
+framework, unless the
+.Dv WQ_CONDQUEUE
+flag is set.
.Pp
.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
.Fn workqueue_wait
diff -r 2489653fbd32 sys/kern/subr_workqueue.c
--- a/sys/kern/subr_workqueue.c Fri May 22 06:15:01 2026 +0000
+++ b/sys/kern/subr_workqueue.c Sat May 30 20:56:59 2026 +0000
@@ -42,16 +42,13 @@
#include <sys/systm.h>
#include <sys/workqueue.h>
-typedef struct work_impl {
- SIMPLEQ_ENTRY(work_impl) wk_entry;
-} work_impl_t;
-
-SIMPLEQ_HEAD(workqhead, work_impl);
+static struct work workqueue_sentinel;
struct workqueue_queue {
kmutex_t q_mutex;
kcondvar_t q_cv;
- struct workqhead q_queue_pending;
+ struct work *q_queue_head;
+ struct work **q_queue_tail;
uint64_t q_gen;
lwp_t *q_worker;
};
@@ -136,20 +133,31 @@ workqueue_queue_lookup(struct workqueue
}
static void
-workqueue_runlist(struct workqueue *wq, struct workqhead *list)
+workqueue_runlist(struct workqueue *wq, struct work *head,
+ struct work *sentinel)
{
- work_impl_t *wk;
- work_impl_t *next;
+ struct work *wk;
+ struct work *next;
struct lwp *l = curlwp;
KASSERTMSG(l->l_nopreempt == 0, "lwp %p nopreempt %d",
l, l->l_nopreempt);
- for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
- next = SIMPLEQ_NEXT(wk, wk_entry);
+ for (wk = head; wk != sentinel; wk = next) {
+ if (wq->wq_flags & WQ_PERCPU) {
+ /*
+ * Acquire operation here matches release
+ * operation in workqueue_enqueue. See
+ * comments there for details of the protocol.
+ */
+ next = atomic_swap_ptr(&wk->wk_next, NULL);
+ membar_acquire();
+ } else {
+ next = wk->wk_next;
+ }
SDT_PROBE4(sdt, kernel, workqueue, entry,
wq, wk, wq->wq_func, wq->wq_arg);
- (*wq->wq_func)((void *)wk, wq->wq_arg);
+ (*wq->wq_func)(wk, wq->wq_arg);
SDT_PROBE4(sdt, kernel, workqueue, return,
wq, wk, wq->wq_func, wq->wq_arg);
KASSERTMSG(l->l_nopreempt == 0,
@@ -172,14 +180,12 @@ workqueue_worker(void *cookie)
s = kthread_fpu_enter();
mutex_enter(&q->q_mutex);
for (;;) {
- struct workqhead tmp;
-
- SIMPLEQ_INIT(&tmp);
+ struct work *head;
- while (SIMPLEQ_EMPTY(&q->q_queue_pending))
+ while ((head = q->q_queue_head) == NULL)
cv_wait(&q->q_cv, &q->q_mutex);
- SIMPLEQ_CONCAT(&tmp, &q->q_queue_pending);
- SIMPLEQ_INIT(&q->q_queue_pending);
+ q->q_queue_head = NULL;
+ q->q_queue_tail = &q->q_queue_head;
/*
* Mark the queue as actively running a batch of work
@@ -188,7 +194,7 @@ workqueue_worker(void *cookie)
q->q_gen |= 1;
mutex_exit(&q->q_mutex);
- workqueue_runlist(wq, &tmp);
+ workqueue_runlist(wq, head, &workqueue_sentinel);
/*
* Notify workqueue_wait that we have completed a batch
@@ -228,7 +234,8 @@ workqueue_initqueue(struct workqueue *wq
mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
cv_init(&q->q_cv, wq->wq_name);
- SIMPLEQ_INIT(&q->q_queue_pending);
+ q->q_queue_head = NULL;
+ q->q_queue_tail = &q->q_queue_head;
q->q_gen = 0;
ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
if (wq->wq_prio < PRI_KERNEL)
@@ -249,14 +256,15 @@ workqueue_initqueue(struct workqueue *wq
}
struct workqueue_exitargs {
- work_impl_t wqe_wk;
+ struct work wqe_wk;
struct workqueue_queue *wqe_q;
};
static void
workqueue_exit(struct work *wk, void *arg)
{
- struct workqueue_exitargs *wqe = (void *)wk;
+ struct workqueue_exitargs *wqe = container_of(wk,
+ struct workqueue_exitargs, wqe_wk);
struct workqueue_queue *q = wqe->wqe_q;
/*
@@ -264,7 +272,8 @@ workqueue_exit(struct work *wk, void *ar
*/
KASSERT(q->q_worker == curlwp);
- KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
+ KASSERT(q->q_queue_head == NULL);
+ KASSERT(q->q_queue_tail == &q->q_queue_head);
mutex_enter(&q->q_mutex);
q->q_worker = NULL;
cv_broadcast(&q->q_cv);
@@ -280,10 +289,13 @@ workqueue_finiqueue(struct workqueue *wq
KASSERT(wq->wq_func == workqueue_exit);
wqe.wqe_q = q;
- KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
+ KASSERT(q->q_queue_head == NULL);
+ KASSERT(q->q_queue_tail == &q->q_queue_head);
KASSERT(q->q_worker != NULL);
mutex_enter(&q->q_mutex);
- SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
+ wqe.wqe_wk.wk_next = &workqueue_sentinel;
+ *q->q_queue_tail = &wqe.wqe_wk;
+ q->q_queue_tail = &wqe.wqe_wk.wk_next;
cv_broadcast(&q->q_cv);
while (q->q_worker != NULL) {
cv_wait(&q->q_cv, &q->q_mutex);
@@ -305,8 +317,6 @@ workqueue_create(struct workqueue **wqp,
void *ptr;
int error = 0;
- CTASSERT(sizeof(work_impl_t) <= sizeof(struct work));
-
ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP);
wq = (void *)roundup2((uintptr_t)ptr, coherency_unit);
wq->wq_ptr = ptr;
@@ -343,9 +353,9 @@ workqueue_create(struct workqueue **wqp,
static bool
workqueue_q_wait(struct workqueue *wq, struct workqueue_queue *q,
- work_impl_t *wk_target)
+ struct work *wk_target)
{
- work_impl_t *wk;
+ struct work *wk;
bool found = false;
uint64_t gen;
@@ -371,7 +381,9 @@ workqueue_q_wait(struct workqueue *wq, s
* have no access to.
*/
again:
- SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) {
+ for (wk = q->q_queue_head;
+ wk != &workqueue_sentinel;
+ wk = wk->wk_next) {
if (wk == wk_target) {
SDT_PROBE2(sdt, kernel, workqueue, wait__hit, wq, wk);
found = true;
@@ -418,13 +430,13 @@ workqueue_wait(struct workqueue *wq, str
CPU_INFO_ITERATOR cii;
for (CPU_INFO_FOREACH(cii, ci)) {
q = workqueue_queue_lookup(wq, ci);
- found = workqueue_q_wait(wq, q, (work_impl_t *)wk);
+ found = workqueue_q_wait(wq, q, wk);
if (found)
break;
}
} else {
q = workqueue_queue_lookup(wq, NULL);
- (void)workqueue_q_wait(wq, q, (work_impl_t *)wk);
+ (void)workqueue_q_wait(wq, q, wk);
}
SDT_PROBE2(sdt, kernel, workqueue, wait__done, wq, wk);
}
@@ -452,11 +464,13 @@ workqueue_destroy(struct workqueue *wq)
#ifdef DEBUG
static void
-workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk)
+workqueue_check_duplication(struct workqueue_queue *q, struct work *wk)
{
- work_impl_t *_wk;
+ struct work *_wk;
- SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) {
+ for (_wk = q->q_queue_head;
+ _wk != &workqueue_sentinel;
+ _wk = _wk->wk_next) {
if (_wk == wk)
panic("%s: tried to enqueue a queued work", __func__);
}
@@ -464,21 +478,170 @@ workqueue_check_duplication(struct workq
#endif
void
-workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
+workqueue_enqueue(struct workqueue *wq, struct work *wk, struct cpu_info *ci)
{
struct workqueue_queue *q;
- work_impl_t *wk = (void *)wk0;
- SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk0, ci);
+ SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk, ci);
KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
q = workqueue_queue_lookup(wq, ci);
mutex_enter(&q->q_mutex);
+ if (wq->wq_flags & WQ_CONDQUEUE) {
+ struct work *next;
+
+ do {
+ /*
+ * Try to claim the work by changing
+ * wk->wk_next from null to a nonnull sentinel
+ * value. If that succeeded, it wasn't
+ * previously on a queue, and we have now
+ * claimed it (a claim which concurrent
+ * workqueue_enqueue calls will notice), so we
+ * just have to adjust the workqueue's own data
+ * structures (which we have exclusive access
+ * to via q->q_mutex).
+ */
+ next = atomic_cas_ptr(&wk->wk_next, NULL,
+ &workqueue_sentinel);
+ if (next == NULL) {
+ *q->q_queue_tail = wk;
+ q->q_queue_tail = &wk->wk_next;
+ cv_broadcast(&q->q_cv);
+ break;
+ }
+
+ /*
+ * The work is already on a queue. We have to
+ * make sure that EITHER:
+ *
+ * (a) the caller's preceding memory operations
+ * to set up the work happen before the
+ * work function runs, OR
+ *
+ * (b) we start over because workqueue_runlist
+ * has already gotten to the work item and
+ * is about to run it, too late for us to
+ * synchronize further with it, so we
+ * schedule it to run a second time.
+ *
+ * For example, suppose two threads are both
+ * competing to schedule the same work with
+ * different data -- initially, foo->x and
+ * foo->y are both zero, and the threads do:
+ *
+ * // thread A
+ * foo->x = 1;
+ * workqueue_enqueue(wq, &foo->work, NULL);
+ *
+ * // thread B
+ * foo->y = 1;
+ * workqueue_enqueue(wq, &foo->work, NULL);
+ *
+ * // work function
+ * struct foo *foo = container_of(work, ...);
+ * ... foo->x ... foo->y ...
+ *
+ * There are three possible ways this might
+ * play out:
+ *
+ * 1. work function runs once and observes both
+ * foo->x = 1 and foo->y = 1
+ *
+ * 2. work function runs twice: first with
+ * foo->x = 1 (and foo->y either 0 or 1),
+ * then with foo->y = 1
+ *
+ * 3. work function runs twice: first with
+ * foo->y = 1 (and foo->x either 0 or 1),
+ * then with foo->x = 1
+ *
+ * The tricky case we have to avoid is running
+ * once with foo->x = 1 and foo->y = 0, or once
+ * with foo->x = 0 and foo->y = 1, and then
+ * _not running again_.
+ *
+ * This bizarre idiom below of doing a
+ * compare-and-swap to replace a value by
+ * _itself_ is not a typo -- it's actually
+ * critical for the memory ordering, so that
+ * the membar_release() here can match the
+ * membar_acquire() in workqueue_runlist, in
+ * order to prove the above guarantees.
+ *
+ * How it works: the struct work essentially
+ * alternates between two states, QUEUED
+ * (wk_next is nonnull) and INACTIVE (wk_next
+ * is null). At this point, we have observed
+ * the work to be QUEUED, and the worker thread
+ * may be running workqueue_runlist about to do
+ * atomic_swap_ptr(&work->wk_next, NULL) to
+ * transition it from QUEUED to INACTIVE and
+ * then call the work function.
+ *
+ * We need to prove that the caller's memory
+ * operations (foo->x/y = 1, in the example
+ * above) happen-before the work function, and
+ * the only way to do that is to issue a memory
+ * operation in this thread that synchronizes
+ * with the atomic_swap_ptr in the other
+ * thread.
+ *
+ * What memory operation can we issue to
+ * synchronize with workqueue_runlist? The
+ * work is QUEUED already and we want to keep
+ * it that way, not change it!
+ *
+ * The trick is that each atomic object has a
+ * total modification order observed equally by
+ * all threads (even if groups of objects may
+ * appear to be modified at different times by
+ * different threads), so we can think of the
+ * state of the work over time with a version
+ * number:
+ *
+ * v0:INACTIVE, v1:QUEUED, v2:INACTIVE, ...
+ *
+ * If the work is currently at v3:QUEUED, we
+ * can compare-and-swap from QUEUED to QUEUED
+ * again so that it either:
+ *
+ * (a) fails because the worker thread has
+ * already transitioned v3:QUEUED ->
+ * v4:INACTIVE (in which case we start over
+ * from the top), _or_
+ *
+ * (b) transitions to a _new version_ with the
+ * _same value_, v3:QUEUED -> v4:QUEUED, so
+ * that when the worker thread does
+ * atomic_swap_ptr, it _must_ observe
+ * v4:QUEUED and therefore synchronize with
+ * our atomic_cas_ptr!
+ *
+ * Hence either we start over from the top
+ * because the state has changed, or the
+ * caller's memory operations sequentially
+ * precede our atomic_cas_ptr which
+ * synchronizes with the worker thread's
+ * atomic_swap_ptr which sequentially precedes
+ * the work function -- and thus, with a
+ * membar_release here before atomic_cas_ptr,
+ * and a membar_acquire after atomic_swap_ptr
+ * in workqueue_runlist to match, we can prove
+ * the caller's memory operations happen-before
+ * the work function.
+ */
+ membar_release();
+ } while (atomic_cas_ptr(&wk->wk_next, next, next) != next);
+ } else {
#ifdef DEBUG
- workqueue_check_duplication(q, wk);
+ workqueue_check_duplication(q, wk);
#endif
- SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
- cv_broadcast(&q->q_cv);
+ wk->wk_next = &workqueue_sentinel;
+ *q->q_queue_tail = wk;
+ q->q_queue_tail = &wk->wk_next;
+ cv_broadcast(&q->q_cv);
+ }
mutex_exit(&q->q_mutex);
}
diff -r 2489653fbd32 sys/sys/workqueue.h
--- a/sys/sys/workqueue.h Fri May 22 06:15:01 2026 +0000
+++ b/sys/sys/workqueue.h Sat May 30 20:56:59 2026 +0000
@@ -42,7 +42,7 @@ struct cpu_info;
*/
struct work {
- void *wk_dummy;
+ struct work *wk_next;
};
struct workqueue;
@@ -50,6 +50,7 @@ struct workqueue;
#define WQ_MPSAFE 0x01
#define WQ_PERCPU 0x02
#define WQ_FPU 0x04
+#define WQ_CONDQUEUE 0x08
int workqueue_create(struct workqueue **, const char *,
void (*)(struct work *, void *), void *, pri_t, int, int);
Initial memory:
flag = 1;
x = 0;
resched = 0; CPU0 private
retry = 0; CPU0 private
y = 0; CPU1 private
CPU0 (queue work):
x = 1;
resched = 0;
retry = 0;
if (atomic_cas(&flag, 0, 1) == 0) {
resched = 1;
} else {
membar_release();
if (atomic_cas(&flag, 1, 1) != 1)
retry = 1;
}
CPU1 (process queued work):
y = 0;
if (atomic_swap(&flag, 0)) {
membar_acquire();
y = x;
}
Allowed final states:
resched = 0, retry = 0, y = 1 (queued work processed)
resched = 0, retry = 1, y = 0 (will retry CAS loop)
resched = 0, retry = 1, y = 1 (will retry CAS loop)
resched = 1, retry = 0, y = 0 (not processed, work rescheduled)
resched = 1, retry = 0, y = 1 (processed and work harmlessly
rescheduled)
Forbidden final states:
resched = 0, retry = 0, y = 0 (not processed, not rescheduled)
resched = 1, retry = 1, y = 0 (impossible)
resched = 1, retry = 1, y = 1 (impossible)
AArch64 MP
{
int flag=1;
int x=0;
0:X0=flag; 0:X1=x;
1:X0=flag; 1:X1=x;
}
P0 | P1 ;
MOV W6,#0 | MOV W6,#0 ;
MOV W7,#0 | SWP WZR,W3,[X0] ;
MOV W4,#1 | CBZ W3,L10 ;
STR W4,[X1] | DMB ISH ;
MOV W3,#0 | LDR W6,[X1] ;
CAS W3,W4,[X0] |L10: ;
CBNZ W3,L00 | ;
MOV W6,#1 | ;
B L09 | ;
L00: | ;
DMB ISH | ;
MOV W3,#1 | ;
CAS W3,W3,[X0] | ;
CBZ W3,L08 | ;
B L09 | ;
L08: | ;
MOV W7,#1 | ;
L09: | ;
exists
(0:X6=0 /\ 0:X7=0 /\ 1:X6=0)
Home |
Main Index |
Thread Index |
Old Index