Source-Changes-HG archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

[src/trunk]: src/lib/librumpuser Sneeze some locking into connect/disconnect.



details:   https://anonhg.NetBSD.org/src/rev/2f18a2846001
branches:  trunk
changeset: 759006:2f18a2846001
user:      pooka <pooka%NetBSD.org@localhost>
date:      Wed Nov 24 11:40:24 2010 +0000

description:
Sneeze some locking into connect/disconnect.

diffstat:

 lib/librumpuser/rumpuser_sp.c |  147 +++++++++++++++++++++++++++--------------
 1 files changed, 97 insertions(+), 50 deletions(-)

diffs (271 lines):

diff -r 95f8367ba11a -r 2f18a2846001 lib/librumpuser/rumpuser_sp.c
--- a/lib/librumpuser/rumpuser_sp.c     Tue Nov 23 22:14:27 2010 +0000
+++ b/lib/librumpuser/rumpuser_sp.c     Wed Nov 24 11:40:24 2010 +0000
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $  */
+/*      $NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $  */
 
 /*
  * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
@@ -38,9 +38,10 @@
  */
 
 #include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $");
 
 #include <sys/types.h>
+#include <sys/atomic.h>
 #include <sys/mman.h>
 #include <sys/socket.h>
 
@@ -67,7 +68,7 @@
 
 static struct pollfd pfdlist[MAXCLI];
 static struct spclient spclist[MAXCLI];
-static unsigned int nfds, maxidx;
+static unsigned int disco;
 
 static struct rumpuser_sp_ops spops;
 
@@ -287,66 +288,81 @@
 }
 
 static void
-serv_handledisco(unsigned int idx)
+spcref(struct spclient *spc)
+{
+
+       pthread_mutex_lock(&spc->spc_mtx);
+       spc->spc_refcnt++;
+       pthread_mutex_unlock(&spc->spc_mtx);
+}
+
+static void
+spcrelease(struct spclient *spc)
 {
-       struct spclient *spc = &spclist[idx];
-       int fd = spc->spc_fd;
+       int ref;
+
+       pthread_mutex_lock(&spc->spc_mtx);
+       ref = --spc->spc_refcnt;
+       pthread_mutex_unlock(&spc->spc_mtx);
 
-       DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
+       if (ref > 0)
+               return;
+
+       _DIAGASSERT(TAILQ_EMPTY(&spclist[i].spc_respwait));
+       _DIAGASSERT(spc->spc_buf == NULL);
 
        lwproc_switch(spc->spc_mainlwp);
        lwproc_release();
+       spc->spc_mainlwp = NULL;
 
-       pthread_mutex_destroy(&spc->spc_mtx);
-       pthread_cond_destroy(&spc->spc_cv);
-       free(spc->spc_buf);
-       memset(spc, 0, sizeof(*spc));
-       close(fd);
-       pfdlist[idx].fd = -1;
-       nfds--;
+       close(spc->spc_fd);
+       spc->spc_fd = -1;
 
-       if (idx == maxidx) {
-               while (idx--) {
-                       if (pfdlist[idx].fd != -1) {
-                               maxidx = idx;
-                               break;
-                       }
-                       assert(idx != 0);
-               }
-               DPRINTF(("rump_sp: set maxidx to [%u]\n", maxidx));
-       }
+       spc->spc_pfd->fd = -1;
+       membar_producer();
+       atomic_inc_uint(&disco);
+
 }
 
-static int
-serv_handleconn(int fd, connecthook_fn connhook)
+static void
+serv_handledisco(unsigned int idx)
+{
+       struct spclient *spc = &spclist[idx];
+
+       DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
+
+       spcrelease(spc);
+}
+
+static unsigned
+serv_handleconn(int fd, connecthook_fn connhook, int busy)
 {
        struct sockaddr_storage ss;
        socklen_t sl = sizeof(ss);
-       int newfd, flags, error;
+       int newfd, flags;
        unsigned i;
 
        /*LINTED: cast ok */
        newfd = accept(fd, (struct sockaddr *)&ss, &sl);
        if (newfd == -1)
-               return errno;
+               return 0;
+
+       if (busy) {
+               close(newfd); /* EBUSY */
+               return 0;
+       }
 
        /* XXX: should do some sort of handshake too */
 
-       if (nfds == MAXCLI) {
-               close(newfd); /* EBUSY */
-               return EBUSY;
-       }
-
        flags = fcntl(newfd, F_GETFL, 0);
        if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
                close(newfd);
-               return errno;
+               return 0;
        }
-       flags = 1;
 
-       if ((error = connhook(newfd)) != 0) {
+       if (connhook(newfd) != 0) {
                close(newfd);
-               return error;
+               return 0;
        }
 
        /* find empty slot the simple way */
@@ -355,33 +371,28 @@
                        break;
        }
 
-       if ((error = lwproc_newproc(&spclist[i])) != 0) {
+       if (lwproc_newproc(&spclist[i]) != 0) {
                close(newfd);
-               return error;
+               return 0;
        }
 
        assert(i < MAXCLI);
-       nfds++;
 
        pfdlist[i].fd = newfd;
        spclist[i].spc_fd = newfd;
        spclist[i].spc_mainlwp = lwproc_curlwp();
        spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
        spclist[i].spc_pid = lwproc_getpid();
+       spclist[i].spc_refcnt = 1;
 
        TAILQ_INIT(&spclist[i].spc_respwait);
-       pthread_mutex_init(&spclist[i].spc_mtx, NULL);
-       pthread_cond_init(&spclist[i].spc_cv, NULL);
-
-       if (maxidx < i)
-               maxidx = i;
 
        DPRINTF(("rump_sp: added new connection at idx %u, pid %d\n",
            i, lwproc_getpid()));
 
        lwproc_switch(NULL);
 
-       return 0;
+       return i;
 }
 
 static void
@@ -503,6 +514,7 @@
        pthread_attr_init(&pattr);
        pthread_attr_setdetachstate(&pattr, 1);
 
+       spcref(spc);
        if ((rv = pthread_create(&pt, &pattr, serv_syscallbouncer, sba)) != 0) {
                /* panic */
                abort();
@@ -513,13 +525,21 @@
 spserver(void *arg)
 {
        struct spservarg *sarg = arg;
+       struct spclient *spc;
        unsigned idx;
        int seen;
        int rv;
+       unsigned int nfds, maxidx;
 
-       for (idx = 1; idx < MAXCLI; idx++) {
+       for (idx = 0; idx < MAXCLI; idx++) {
                pfdlist[idx].fd = -1;
                pfdlist[idx].events = POLLIN;
+
+               spc = &spclist[idx];
+
+               spc->spc_pfd = &pfdlist[idx];
+               pthread_mutex_init(&spc->spc_mtx, NULL);
+               pthread_cond_init(&spc->spc_cv, NULL);
        }
        pfdlist[0].fd = sarg->sps_sock;
        pfdlist[0].events = POLLIN;
@@ -529,6 +549,27 @@
        DPRINTF(("rump_sp: server mainloop\n"));
 
        for (;;) {
+               /* g/c hangarounds (eventually) */
+               if (disco) {
+                       int discoed;
+
+                       membar_consumer();
+                       discoed = atomic_swap_uint(&disco, 0);
+                       while (discoed--) {
+                               nfds--;
+                               idx = maxidx;
+                               while (idx--) {
+                                       if (pfdlist[idx].fd != -1) {
+                                               maxidx = idx;
+                                               break;
+                                       }
+                                       assert(idx != 0);
+                               }
+                               DPRINTF(("rump_sp: set maxidx to [%u]\n",
+                                   maxidx));
+                       }
+               }
+
                DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
                seen = 0;
                rv = poll(pfdlist, maxidx+1, INFTIM);
@@ -552,7 +593,7 @@
                        DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
                            idx, seen, rv));
                        if (idx > 0) {
-                               struct spclient *spc = &spclist[idx];
+                               spc = &spclist[idx];
 
                                DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
                                switch (readframe(spc)) {
@@ -576,10 +617,16 @@
                                        }
                                        break;
                                }
+
                        } else {
                                DPRINTF(("rump_sp: mainloop new connection\n"));
-                               serv_handleconn(pfdlist[0].fd,
-                                   sarg->sps_connhook);
+
+                               idx = serv_handleconn(pfdlist[0].fd,
+                                   sarg->sps_connhook, nfds == MAXCLI);
+                               if (idx)
+                                       nfds++;
+                               if (idx > maxidx)
+                                       maxidx = idx;
                        }
                }
        }



Home | Main Index | Thread Index | Old Index