Source-Changes-HG archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

[src/trunk]: src/lib A bunch of improvements:



details:   https://anonhg.NetBSD.org/src/rev/470f77f25c32
branches:  trunk
changeset: 760674:470f77f25c32
user:      pooka <pooka%NetBSD.org@localhost>
date:      Mon Jan 10 19:49:43 2011 +0000

description:
A bunch of improvements:

* don't hold spc mutex while sending data
* use send() for the banner to avoid SIGPIPE in case a client
  connects and immediately goes away
* fix error path locking
* use kevent() instead of pollts() in the client.  Apparently that
  is the only sensible way for a library to support both multithreading
  and signal-reentrancy in a race-free manner.
  (can I catch all signals with one kevent instead of installing
  NSIG different ones??)
* mark client comm descriptor non-blocking so that clients have
  better signal-interruptibility (we now sleep in signal-accepting
  kevent() instead of signal-masked recvfrom())

diffstat:

 lib/librumpclient/rumpclient.c |  120 ++++++++++++++++++++++++++--------------
 lib/librumpuser/rumpuser_sp.c  |   12 ++-
 lib/librumpuser/sp_common.c    |    7 +-
 3 files changed, 91 insertions(+), 48 deletions(-)

diffs (truncated from 315 to 300 lines):

diff -r 343087c002a2 -r 470f77f25c32 lib/librumpclient/rumpclient.c
--- a/lib/librumpclient/rumpclient.c    Mon Jan 10 19:30:21 2011 +0000
+++ b/lib/librumpclient/rumpclient.c    Mon Jan 10 19:49:43 2011 +0000
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpclient.c,v 1.14 2011/01/09 14:10:03 pooka Exp $   */
+/*      $NetBSD: rumpclient.c,v 1.15 2011/01/10 19:49:43 pooka Exp $   */
 
 /*
  * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
@@ -33,6 +33,7 @@
 __RCSID("$NetBSD");
 
 #include <sys/param.h>
+#include <sys/event.h>
 #include <sys/mman.h>
 #include <sys/socket.h>
 
@@ -60,6 +61,7 @@
 int    (*host_socket)(int, int, int);
 int    (*host_close)(int);
 int    (*host_connect)(int, const struct sockaddr *, socklen_t);
+int    (*host_fcntl)(int, int, ...);
 int    (*host_poll)(struct pollfd *, nfds_t, int);
 int    (*host_pollts)(struct pollfd *, nfds_t, const struct timespec *,
                      const sigset_t *);
@@ -74,28 +76,14 @@
        .spc_fd = -1,
 };
 
-/*
- * This version of waitresp is optimized for single-threaded clients
- * and is required by signal-safe clientside rump syscalls.
- */
+static int kq;
+static sigset_t fullset;
 
-static void
-releasercvlock(struct spclient *spc)
+static int
+waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
 {
 
        pthread_mutex_lock(&spc->spc_mtx);
-       if (spc->spc_istatus == SPCSTATUS_WANTED)
-               kickall(spc);
-       spc->spc_istatus = SPCSTATUS_FREE;
-}
-
-static sigset_t fullset;
-static int
-waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
-{
-       struct pollfd pfd;
-       int rv = 0;
-
        sendunlockl(spc);
 
        rw->rw_error = 0;
@@ -103,32 +91,41 @@
            && spc->spc_state != SPCSTATE_DYING){
                /* are we free to receive? */
                if (spc->spc_istatus == SPCSTATUS_FREE) {
+                       struct kevent kev[8];
+                       int gotresp, dosig, rv, i;
+
                        spc->spc_istatus = SPCSTATUS_BUSY;
                        pthread_mutex_unlock(&spc->spc_mtx);
 
-                       pfd.fd = spc->spc_fd;
-                       pfd.events = POLLIN;
+                       dosig = 0;
+                       for (gotresp = 0; !gotresp; ) {
+                               switch (readframe(spc)) {
+                               case 0:
+                                       rv = kevent(kq, NULL, 0,
+                                           kev, __arraycount(kev), NULL);
+                                       assert(rv > 0);
+                                       for (i = 0; i < rv; i++) {
+                                               if (kev[i].filter
+                                                   == EVFILT_SIGNAL)
+                                                       dosig++;
+                                       }
+                                       if (dosig)
+                                               goto cleanup;
 
-                       switch (readframe(spc)) {
-                       case 0:
-                               releasercvlock(spc);
-                               pthread_mutex_unlock(&spc->spc_mtx);
-                               host_pollts(&pfd, 1, NULL, mask);
-                               pthread_mutex_lock(&spc->spc_mtx);
-                               continue;
-                       case -1:
-                               releasercvlock(spc);
-                               rv = errno;
-                               spc->spc_state = SPCSTATE_DYING;
-                               continue;
-                       default:
-                               break;
-                       }
+                                       continue;
+                               case -1:
+                                       spc->spc_state = SPCSTATE_DYING;
+                                       goto cleanup;
+                               default:
+                                       break;
+                               }
 
-                       switch (spc->spc_hdr.rsp_class) {
+                               switch (spc->spc_hdr.rsp_class) {
                                case RUMPSP_RESP:
                                case RUMPSP_ERROR:
                                        kickwaiter(spc);
+                                       gotresp = spc->spc_hdr.rsp_reqno ==
+                                           rw->rw_reqno;
                                        break;
                                case RUMPSP_REQ:
                                        handlereq(spc);
@@ -136,9 +133,22 @@
                                default:
                                        /* panic */
                                        break;
+                               }
                        }
 
-                       releasercvlock(spc);
+ cleanup:
+                       pthread_mutex_lock(&spc->spc_mtx);
+                       if (spc->spc_istatus == SPCSTATUS_WANTED)
+                               kickall(spc);
+                       spc->spc_istatus = SPCSTATUS_FREE;
+
+                       /* take one for the team */
+                       if (dosig) {
+                               pthread_mutex_unlock(&spc->spc_mtx);
+                               pthread_sigmask(SIG_SETMASK, mask, NULL);
+                               pthread_sigmask(SIG_SETMASK, &fullset, NULL);
+                               pthread_mutex_lock(&spc->spc_mtx);
+                       }
                } else {
                        spc->spc_istatus = SPCSTATUS_WANTED;
                        pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
@@ -148,8 +158,6 @@
        pthread_mutex_unlock(&spc->spc_mtx);
        pthread_cond_destroy(&rw->rw_cv);
 
-       if (rv)
-               return rv;
        if (spc->spc_state == SPCSTATE_DYING)
                return ENOTCONN;
        return rw->rw_error;
@@ -385,8 +393,9 @@
 static int
 doconnect(void)
 {
+       struct kevent kev[NSIG+1];
        char banner[MAXBANNER];
-       int s, error;
+       int s, error, flags, i;
        ssize_t n;
 
        s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0);
@@ -421,8 +430,34 @@
        }
        banner[n] = '\0';
 
+       flags = host_fcntl(s, F_GETFL, 0);
+       if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
+               fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n");
+               errno = EINVAL;
+               return -1;
+       }
+
        /* parse the banner some day */
 
+       /* setup kqueue, we want all signals and the fd */
+       if ((kq = kqueue()) == -1) {
+               error = errno;
+               fprintf(stderr, "rump_sp: cannot setup kqueue");
+               errno = error;
+               return -1;
+       }
+
+       for (i = 0; i < NSIG; i++) {
+               EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0);
+       }
+       EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0);
+       if (kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) {
+               error = errno;
+               fprintf(stderr, "rump_sp: kevent() failed");
+               errno = error;
+               return -1;
+       }
+
        clispc.spc_fd = s;
        TAILQ_INIT(&clispc.spc_respwait);
        pthread_mutex_init(&clispc.spc_mtx, NULL);
@@ -455,6 +490,7 @@
        FINDSYM2(socket,__socket30);
        FINDSYM(close);
        FINDSYM(connect);
+       FINDSYM(fcntl);
        FINDSYM(poll);
        FINDSYM(pollts);
        FINDSYM(read);
@@ -522,6 +558,8 @@
        int error;
 
        host_close(clispc.spc_fd);
+       host_close(kq);
+       kq = -1;
        memset(&clispc, 0, sizeof(clispc));
        clispc.spc_fd = -1;
 
diff -r 343087c002a2 -r 470f77f25c32 lib/librumpuser/rumpuser_sp.c
--- a/lib/librumpuser/rumpuser_sp.c     Mon Jan 10 19:30:21 2011 +0000
+++ b/lib/librumpuser/rumpuser_sp.c     Mon Jan 10 19:49:43 2011 +0000
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $  */
+/*      $NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $  */
 
 /*
  * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
@@ -35,7 +35,7 @@
  */
 
 #include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $");
 
 #include <sys/types.h>
 #include <sys/atomic.h>
@@ -104,20 +104,23 @@
 static int
 waitresp(struct spclient *spc, struct respwait *rw)
 {
+       int spcstate;
        int rv = 0;
 
+       pthread_mutex_lock(&spc->spc_mtx);
        sendunlockl(spc);
        while (!rw->rw_done && spc->spc_state != SPCSTATE_DYING) {
                pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
        }
        TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
+       spcstate = spc->spc_state;
        pthread_mutex_unlock(&spc->spc_mtx);
 
        pthread_cond_destroy(&rw->rw_cv);
 
        if (rv)
                return rv;
-       if (spc->spc_state == SPCSTATE_DYING)
+       if (spcstate == SPCSTATE_DYING)
                return ENOTCONN;
        return rw->rw_error;
 }
@@ -511,7 +514,8 @@
        }
 
        /* write out a banner for the client */
-       if (write(newfd, banner, strlen(banner)) != (ssize_t)strlen(banner)) {
+       if (send(newfd, banner, strlen(banner), MSG_NOSIGNAL)
+           != (ssize_t)strlen(banner)) {
                close(newfd);
                return 0;
        }
diff -r 343087c002a2 -r 470f77f25c32 lib/librumpuser/sp_common.c
--- a/lib/librumpuser/sp_common.c       Mon Jan 10 19:30:21 2011 +0000
+++ b/lib/librumpuser/sp_common.c       Mon Jan 10 19:49:43 2011 +0000
@@ -1,4 +1,4 @@
-/*      $NetBSD: sp_common.c,v 1.22 2011/01/10 11:57:53 pooka Exp $    */
+/*      $NetBSD: sp_common.c,v 1.23 2011/01/10 19:49:43 pooka Exp $    */
 
 /*
  * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
@@ -212,7 +212,6 @@
 sendlockl(struct spclient *spc)
 {
 
-       /* assert(pthread_mutex_owned) */
        while (spc->spc_ostatus != SPCSTATUS_FREE) {
                spc->spc_ostatus = SPCSTATUS_WANTED;
                pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
@@ -233,7 +232,6 @@
 sendunlockl(struct spclient *spc)
 {
 
-       /* assert(pthread_mutex_owned) */
        if (spc->spc_ostatus == SPCSTATUS_WANTED)
                pthread_cond_broadcast(&spc->spc_cv);
        spc->spc_ostatus = SPCSTATUS_FREE;
@@ -298,12 +296,14 @@
        TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries);
 
        sendlockl(spc);
+       pthread_mutex_unlock(&spc->spc_mtx);
 }
 
 static void



Home | Main Index | Thread Index | Old Index