Source-Changes-HG archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

[src/trunk]: src Replace pcq(9) with the implementation from ad@ and minor ch...



details:   https://anonhg.NetBSD.org/src/rev/a9ef439997bf
branches:  trunk
changeset: 772937:a9ef439997bf
user:      rmind <rmind%NetBSD.org@localhost>
date:      Sun Jan 22 02:55:47 2012 +0000

description:
Replace pcq(9) with the implementation from ad@ and minor changes by me.

PR/40516, PR/45631.

diffstat:

 share/man/man9/pcq.9 |   10 +-
 sys/kern/subr_pcq.c  |  259 ++++++++++++++++++++++++++------------------------
 2 files changed, 140 insertions(+), 129 deletions(-)

diffs (truncated from 374 to 300 lines):

diff -r cda4d103bcac -r a9ef439997bf share/man/man9/pcq.9
--- a/share/man/man9/pcq.9      Sat Jan 21 23:54:57 2012 +0000
+++ b/share/man/man9/pcq.9      Sun Jan 22 02:55:47 2012 +0000
@@ -1,4 +1,4 @@
-.\"     $NetBSD: pcq.9,v 1.5 2010/12/02 12:54:13 wiz Exp $
+.\"     $NetBSD: pcq.9,v 1.6 2012/01/22 02:55:47 rmind Exp $
 .\"
 .\" Copyright (c) 2010 The NetBSD Foundation, Inc.
 .\" All rights reserved.
@@ -27,7 +27,7 @@
 .\" ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 .\" POSSIBILITY OF SUCH DAMAGE.
 .\"
-.Dd January 8, 2010
+.Dd January 22, 2012
 .Dt PCQ 9
 .Os
 .Sh NAME
@@ -95,11 +95,11 @@
 Free the resources held by
 .Fa pcq .
 .It Fn pcq_get "pcq"
-Remove the next item to be consumed from the queue and return
-it.
+Remove the next item to be consumed from the queue and return it.
 If the queue is empty,
 return
 .Dv NULL .
+The caller must prevent concurrent gets from occuring.
 .It Fn pcq_maxitems "pcq"
 Return the maximum number of items that the queue can store at
 any one time.
@@ -132,8 +132,6 @@
 .Nm
 interface first appeared in
 .Nx 6.0 .
-.Sh AUTHORS
-.An Matt Thomas Aq matt%NetBSD.org@localhost
 .\" .Sh CAVEATS
 .\" .Sh BUGS
 .\" .Sh SECURITY CONSIDERATIONS
diff -r cda4d103bcac -r a9ef439997bf sys/kern/subr_pcq.c
--- a/sys/kern/subr_pcq.c       Sat Jan 21 23:54:57 2012 +0000
+++ b/sys/kern/subr_pcq.c       Sun Jan 22 02:55:47 2012 +0000
@@ -1,11 +1,11 @@
-/*     $NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $       */
+/*     $NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $       */
 
 /*-
- * Copyright (c) 2008 The NetBSD Foundation, Inc.
+ * Copyright (c) 2009 The NetBSD Foundation, Inc.
  * All rights reserved.
  *
  * This code is derived from software contributed to The NetBSD Foundation
- * by Matt Thomas <matt%3am-software.com@localhost>
+ * by Andrew Doran.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -29,173 +29,181 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
+/*
+ * Lockless producer/consumer queue.
+ */
+
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.3 2008/11/11 21:45:33 rmind Exp $");
+__KERNEL_RCSID(0, "$NetBSD: subr_pcq.c,v 1.4 2012/01/22 02:55:47 rmind Exp $");
 
 #include <sys/param.h>
 #include <sys/types.h>
 #include <sys/atomic.h>
-#include <sys/errno.h>
 #include <sys/kmem.h>
 
 #include <sys/pcq.h>
 
-typedef void * volatile pcq_entry_t;
-
+/*
+ * Internal producer-consumer queue structure.  Note: providing a separate
+ * cache-line both for pcq_t::pcq_pc and pcq_t::pcq_items.
+ */
 struct pcq {
-       pcq_entry_t *pcq_consumer;
-       pcq_entry_t *pcq_producer;
-       pcq_entry_t *pcq_limit;
-       pcq_entry_t pcq_base[];
+       u_int                   pcq_nitems;
+       uint8_t                 pcq_pad1[COHERENCY_UNIT - sizeof(u_int)];
+       volatile uint32_t       pcq_pc;
+       uint8_t                 pcq_pad2[COHERENCY_UNIT - sizeof(uint32_t)];
+       void * volatile         pcq_items[];
 };
 
-static inline pcq_entry_t *
-pcq_advance(pcq_t *pcq, pcq_entry_t *ptr)
+/*
+ * Producer (p) - stored in the lower 16 bits of pcq_t::pcq_pc.
+ * Consumer (c) - in the higher 16 bits.
+ *
+ * We have a limitation of 16 bits i.e. 0xffff items in the queue.
+ */
+
+static inline void
+pcq_split(uint32_t v, u_int *p, u_int *c)
 {
 
-       if (__predict_false(++ptr == pcq->pcq_limit))
-               return pcq->pcq_base;
+       *p = v & 0xffff;
+       *c = v >> 16;
+}
 
-       return ptr;
+static inline uint32_t
+pcq_combine(u_int p, u_int c)
+{
+
+       return p | (c << 16);
 }
 
+static inline u_int
+pcq_advance(pcq_t *pcq, u_int pc)
+{
+
+       if (__predict_false(++pc == pcq->pcq_nitems)) {
+               return 0;
+       }
+       return pc;
+}
+
+/*
+ * pcq_put: place an item at the end of the queue.
+ */
 bool
 pcq_put(pcq_t *pcq, void *item)
 {
-       pcq_entry_t *producer;
+       uint32_t v, nv;
+       u_int op, p, c;
 
        KASSERT(item != NULL);
 
-       /*
-        * Get our starting point,  While we are doing this, it is
-        * imperative that pcq->pcq_base/pcq->pcq_limit not change
-        * in value.  If you need to resize a pcq, init a new pcq
-        * with the right size and swap pointers to it.
-        */
-       membar_consumer();      /* see updates to pcq_producer */
-       producer = pcq->pcq_producer;
-       for (;;) {
-               /*
-                * Preadvance so we reduce the window on updates.
-                */
-               pcq_entry_t * const new_producer = pcq_advance(pcq, producer);
+       do {
+               v = pcq->pcq_pc;
+               pcq_split(v, &op, &c);
+               p = pcq_advance(pcq, op);
+               if (p == c) {
+                       /* Queue is full. */
+                       return false;
+               }
+               nv = pcq_combine(p, c);
+       } while (atomic_cas_32(&pcq->pcq_pc, v, nv) != v);
 
-               /*
-                * Try to fill an empty slot
-                */
-               if (NULL == atomic_cas_ptr(producer, NULL, item)) {
-                       /*
-                        * We need to use atomic_cas_ptr since another thread
-                        * might have inserted between these two cas operations
-                        * and we don't want to overwrite a producer that's
-                        * more up-to-date.
-                        */
-                       atomic_cas_ptr(&pcq->pcq_producer,
-                           __UNVOLATILE(producer), 
-                           __UNVOLATILE(new_producer));
-                       /*
-                        * Tell them we were able to enqueue it.
-                        */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
-                       membar_producer();
+       /*
+        * Ensure that the update to pcq_pc is globally visible before the
+        * data item.  See pcq_get().  This also ensures that any changes
+        * that the caller made to the data item are globally visible
+        * before we put it onto the list.
+        */
+#ifndef _HAVE_ATOMIC_AS_MEMBAR
+       membar_producer();
 #endif
-                       return true;
-               }
+       pcq->pcq_items[op] = item;
 
-               /*
-                * If we've reached the consumer, we've filled all the
-                * slots and there's no more room so return false.
-                */
-#ifndef __HAVE_ATOMIC_AS_MEMBAR
-               membar_consumer();      /* see updates to pcq_consumer */
-#endif
-               if (producer == pcq->pcq_consumer)
-                       return false;
-
-               /*
-                * Let's see if the next slot is free...
-                */
-               producer = new_producer;
-       }
+       /*
+        * Synchronization activity to wake up the consumer will ensure
+        * that the update to pcq_items[] is visible before the wakeup
+        * arrives.  So, we do not need an additonal memory barrier here.
+        */
+       return true;
 }
 
 /*
- * It's assumed that the enclosing structure that contains the pcq will
- * provide appropriate locking to prevent concurrent gets from occurring.
+ * pcq_peek: return the next item from the queue without removal.
+ */
+void *
+pcq_peek(pcq_t *pcq)
+{
+       const uint32_t v = pcq->pcq_pc;
+       u_int p, c;
+
+       pcq_split(v, &p, &c);
+
+       /* See comment on race below in pcq_get(). */
+       return (p == c) ? NULL : pcq->pcq_items[c];
+}
+
+/*
+ * pcq_get: remove and return the next item for consumption or NULL if empty.
+ *
+ * => The caller must prevent concurrent gets from occuring.
  */
 void *
 pcq_get(pcq_t *pcq)
 {
-       pcq_entry_t * const consumer = pcq->pcq_consumer;
+       uint32_t v, nv;
+       u_int p, c;
        void *item;
 
-       /*
-        * Updates to pcq_consumer doesn't matter since we control it but we
-        * want to make sure that any stores to what it references have
-        * completed.
-        */
-       membar_consumer();
-
-       /*
-        * If there's nothing to return, just return.
-        */
-       if ((item = *consumer) == NULL)
+       v = pcq->pcq_pc;
+       pcq_split(v, &p, &c);
+       if (p == c) {
+               /* Queue is empty: nothing to return. */
                return NULL;
+       }
+       item = pcq->pcq_items[c];
+       if (item == NULL) {
+               /*
+                * Raced with sender: we rely on a notification (e.g. softint
+                * or wakeup) being generated after the producer's pcq_put(),
+                * causing us to retry pcq_get() later.
+                */
+               return NULL;
+       }
+       pcq->pcq_items[c] = NULL;
+       c = pcq_advance(pcq, c);
+       nv = pcq_combine(p, c);
 
        /*
-        * Update the consumer and free the slot.
-        * Update the consumer pointer first so when producer == consumer
-        * the right thing happens.
-        *
-        * 1) until the slot set to NULL, pcq_put will fail since
-        *    the slot != NULL && producer == consumer.
-        * 2) consumer is advanced but the slot is still not NULL,
-        *    pcq_put will advance by one, see that producer == consumer,
-        *    and fail.
-        * 4) Once the slot is set to NULL, the producer can fill the slot
-        *    and advance the producer.
-        *    
-        * and then we are back to 1.
+        * Ensure that update to pcq_items[] becomes globally visible
+        * before the update to pcq_pc.  If it were reodered to occur



Home | Main Index | Thread Index | Old Index