Source-Changes-HG archive
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]
[src/trunk]: src Replace pcq(9) with the implementation from ad@ and minor ch...
details: https://anonhg.NetBSD.org/src/rev/a9ef439997bf
branches: trunk
changeset: 772937:a9ef439997bf
user: rmind <rmind%NetBSD.org@localhost>
date: Sun Jan 22 02:55:47 2012 +0000
description:
Replace pcq(9) with the implementation from ad@ and minor changes by me.
PR/40516, PR/45631.
diffstat:
share/man/man9/pcq.9 | 10 +-
sys/kern/subr_pcq.c | 259 ++++++++++++++++++++++++++------------------------
2 files changed, 140 insertions(+), 129 deletions(-)
diffs (truncated from 374 to 300 lines):
diff -r cda4d103bcac -r a9ef439997bf share/man/man9/pcq.9
--- a/share/man/man9/pcq.9 Sat Jan 21 23:54:57 2012 +0000
+++ b/share/man/man9/pcq.9 Sun Jan 22 02:55:47 2012 +0000
@@ -1,4 +1,4 @@
-.\" $NetBSD: pcq.9,v 1.5 2010/12/02 12:54:13 wiz Exp $
+.\" $NetBSD: pcq.9,v 1.6 2012/01/22 02:55:47 rmind Exp $
.\"
.\" Copyright (c) 2010 The NetBSD Foundation, Inc.
.\" All rights reserved.
@@ -27,7 +27,7 @@
.\" ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
.\" POSSIBILITY OF SUCH DAMAGE.
.\"
-.Dd January 8, 2010
+.Dd January 22, 2012
.Dt PCQ 9
.Os
.Sh NAME
@@ -95,11 +95,11 @@
Free the resources held by
.Fa pcq .
.It Fn pcq_get "pcq"
-Remove the next item to be consumed from the queue and return
-it.
+Remove the next item to be consumed from the queue and return it.
If the queue is empty,
return
.Dv NULL .
+The caller must prevent concurrent gets from occuring.
.It Fn pcq_maxitems "pcq"
Return the maximum number of items that the queue can store at
any one time.
@@ -132,8 +132,6 @@
.Nm
interface first appeared in
.Nx 6.0 .
-.Sh AUTHORS
-.An Matt Thomas Aq matt%NetBSD.org@localhost
.\" .Sh CAVEATS
.\" .Sh BUGS
.\" .Sh SECURITY CONSIDERATIONS
diff -r cda4d103bcac -r a9ef439997bf sys/kern/subr_pcq.c
--- a/sys/kern/subr_pcq.c Sat Jan 21 23:54:57 2012 +0000
+++ b/sys/kern/subr_pcq.c Sun Jan 22 02:55:47 2012 +0000
@@ -1,11 +1,11 @@
-/* $NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $ */
+/* $NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $ */
/*-
- * Copyright (c) 2008 The NetBSD Foundation, Inc.
+ * Copyright (c) 2009 The NetBSD Foundation, Inc.
* All rights reserved.
*
* This code is derived from software contributed to The NetBSD Foundation
- * by Matt Thomas <matt%3am-software.com@localhost>
+ * by Andrew Doran.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -29,173 +29,181 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
+/*
+ * Lockless producer/consumer queue.
+ */
+
#include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $");
+__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $");
#include <sys/param.h>
#include <sys/types.h>
#include <sys/atomic.h>
-#include <sys/errno.h>
#include <sys/kmem.h>
#include <sys/pcq.h>
-typedef void * volatile pcq_entry_t;
-
+/*
+ * Internal producer-consumer queue structure. Note: providing a separate
+ * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items.
+ */
struct pcq {
- pcq_entry_t *pcq_consumer;
- pcq_entry_t *pcq_producer;
- pcq_entry_t *pcq_limit;
- pcq_entry_t pcq_base[];
+ u_int pcq_nitems;
+ uint8_t pcq_pad1[COHERENCY_UNIT - sizeof(u_int)];
+ volatile uint32_t pcq_pc;
+ uint8_t pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)];
+ void * volatile pcq_items[];
};
-static inline pcq_entry_t *
-pcq_advance(pcq_t *pcq, pcq_entry_t *ptr)
+/*
+ * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc.
+ * Consumer (c) - in the higher 16 bits.
+ *
+ * We have a limitation of 16 bits i.e. 0xffff items in the queue.
+ */
+
+static inline void
+pcq_split(uint32_t v, u_int *p, u_int *c)
{
- if (__predict_false(++ptr == pcq->pcq_limit))
- return pcq->pcq_base;
+ *p = v & 0xffff;
+ *c = v >> 16;
+}
- return ptr;
+static inline uint32_t
+pcq_combine(u_int p, u_int c)
+{
+
+ return p | (c << 16);
}
+static inline u_int
+pcq_advance(pcq_t *pcq, u_int pc)
+{
+
+ if (__predict_false(++pc == pcq->pcq_nitems)) {
+ return 0;
+ }
+ return pc;
+}
+
+/*
+ * pcq_put: place an item at the end of the queue.
+ */
bool
pcq_put(pcq_t *pcq, void *item)
{
- pcq_entry_t *producer;
+ uint32_t v, nv;
+ u_int op, p, c;
KASSERT(item != NULL);
- /*
- * Get our starting point, While we are doing this, it is
- * imperative that pcq->pcq_base/pcq->pcq_limit not change
- * in value. If you need to resize a pcq, init a new pcq
- * with the right size and swap pointers to it.
- */
- membar_consumer(); /* see updates to pcq_producer */
- producer = pcq->pcq_producer;
- for (;;) {
- /*
- * Preadvance so we reduce the window on updates.
- */
- pcq_entry_t * const new_producer = pcq_advance(pcq, producer);
+ do {
+ v = pcq->pcq_pc;
+ pcq_split(v, &op, &c);
+ p = pcq_advance(pcq, op);
+ if (p == c) {
+ /* Queue is full. */
+ return false;
+ }
+ nv = pcq_combine(p, c);
+ } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v);
- /*
- * Try to fill an empty slot
- */
- if (NULL == atomic_cas_ptr(producer, NULL, item)) {
- /*
- * We need to use atomic_cas_ptr since another thread
- * might have inserted between these two cas operations
- * and we don't want to overwrite a producer that's
- * more up-to-date.
- */
- atomic_cas_ptr(&pcq->pcq_producer,
- __UNVOLATILE(producer),
- __UNVOLATILE(new_producer));
- /*
- * Tell them we were able to enqueue it.
- */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
- membar_producer();
+ /*
+ * Ensure that the update to pcq_pc is globally visible before the
+ * data item. See pcq_get(). This also ensures that any changes
+ * that the caller made to the data item are globally visible
+ * before we put it onto the list.
+ */
+#ifndef _HAVE_ATOMIC_AS_MEMBAR
+ membar_producer();
#endif
- return true;
- }
+ pcq->pcq_items[op] = item;
- /*
- * If we've reached the consumer, we've filled all the
- * slots and there's no more room so return false.
- */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
- membar_consumer(); /* see updates to pcq_consumer */
-#endif
- if (producer == pcq->pcq_consumer)
- return false;
-
- /*
- * Let's see if the next slot is free...
- */
- producer = new_producer;
- }
+ /*
+ * Synchronization activity to wake up the consumer will ensure
+ * that the update to pcq_items[] is visible before the wakeup
+ * arrives. So, we do not need an additonal memory barrier here.
+ */
+ return true;
}
/*
- * It's assumed that the enclosing structure that contains the pcq will
- * provide appropriate locking to prevent concurrent gets from occurring.
+ * pcq_peek: return the next item from the queue without removal.
+ */
+void *
+pcq_peek(pcq_t *pcq)
+{
+ const uint32_t v = pcq->pcq_pc;
+ u_int p, c;
+
+ pcq_split(v, &p, &c);
+
+ /* See comment on race below in pcq_get(). */
+ return (p == c) ? NULL : pcq->pcq_items[c];
+}
+
+/*
+ * pcq_get: remove and return the next item for consumption or NULL if empty.
+ *
+ * => The caller must prevent concurrent gets from occuring.
*/
void *
pcq_get(pcq_t *pcq)
{
- pcq_entry_t * const consumer = pcq->pcq_consumer;
+ uint32_t v, nv;
+ u_int p, c;
void *item;
- /*
- * Updates to pcq_consumer doesn't matter since we control it but we
- * want to make sure that any stores to what it references have
- * completed.
- */
- membar_consumer();
-
- /*
- * If there's nothing to return, just return.
- */
- if ((item = *consumer) == NULL)
+ v = pcq->pcq_pc;
+ pcq_split(v, &p, &c);
+ if (p == c) {
+ /* Queue is empty: nothing to return. */
return NULL;
+ }
+ item = pcq->pcq_items[c];
+ if (item == NULL) {
+ /*
+ * Raced with sender: we rely on a notification (e.g. softint
+ * or wakeup) being generated after the producer's pcq_put(),
+ * causing us to retry pcq_get() later.
+ */
+ return NULL;
+ }
+ pcq->pcq_items[c] = NULL;
+ c = pcq_advance(pcq, c);
+ nv = pcq_combine(p, c);
/*
- * Update the consumer and free the slot.
- * Update the consumer pointer first so when producer == consumer
- * the right thing happens.
- *
- * 1) until the slot set to NULL, pcq_put will fail since
- * the slot != NULL && producer == consumer.
- * 2) consumer is advanced but the slot is still not NULL,
- * pcq_put will advance by one, see that producer == consumer,
- * and fail.
- * 4) Once the slot is set to NULL, the producer can fill the slot
- * and advance the producer.
- *
- * and then we are back to 1.
+ * Ensure that update to pcq_items[] becomes globally visible
+ * before the update to pcq_pc. If it were reodered to occur
Home |
Main Index |
Thread Index |
Old Index