Source-Changes-HG archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

[src/trunk]: src/sys/net wg: Use a global pktqueue rather than a per-peer pcq.



details:   https://anonhg.NetBSD.org/src/rev/56a728d4f4c1
branches:  trunk
changeset: 954603:56a728d4f4c1
user:      riastradh <riastradh%NetBSD.org@localhost>
date:      Mon Sep 07 01:14:42 2020 +0000

description:
wg: Use a global pktqueue rather than a per-peer pcq.

- Improves scalability -- won't hit limit on softints no matter how
  many peers there are.
- Improves parallelism -- softint was kernel-locked to serialize
  access to the pcq.
- Requires per-peer queue on handshake init to avoid dropping first
  packet.
  . Per-peer queue is currently a single packet -- should serve well
    enough for pings, dns queries, tcp connections, &c.

diffstat:

 sys/net/if_wg.c |  248 ++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 145 insertions(+), 103 deletions(-)

diffs (truncated from 472 to 300 lines):

diff -r 698cd610df6c -r 56a728d4f4c1 sys/net/if_wg.c
--- a/sys/net/if_wg.c   Mon Sep 07 01:08:27 2020 +0000
+++ b/sys/net/if_wg.c   Mon Sep 07 01:14:42 2020 +0000
@@ -1,4 +1,4 @@
-/*     $NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $     */
+/*     $NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $     */
 
 /*
  * Copyright (C) Ryota Ozaki <ozaki.ryota%gmail.com@localhost>
@@ -41,7 +41,7 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $");
+__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $");
 
 #ifdef _KERNEL_OPT
 #include "opt_inet.h"
@@ -65,7 +65,6 @@
 #include <sys/mbuf.h>
 #include <sys/module.h>
 #include <sys/mutex.h>
-#include <sys/pcq.h>
 #include <sys/percpu.h>
 #include <sys/pserialize.h>
 #include <sys/psref.h>
@@ -85,6 +84,7 @@
 #include <net/if.h>
 #include <net/if_types.h>
 #include <net/if_wg.h>
+#include <net/pktqueue.h>
 #include <net/route.h>
 
 #include <netinet/in.h>
@@ -124,8 +124,6 @@
  *     runs event handlers
  *   - It has its own two routing tables: one is for IPv4 and the other IPv6
  * - struct wg_peer is a representative of a peer
- *   - It has a softint that is used to send packets over an wg interface
- *     to a peer
  *   - It has a pair of session instances (struct wg_session)
  *   - It has a pair of endpoint instances (struct wg_sockaddr)
  *     - Normally one endpoint is used and the second one is used only on
@@ -562,12 +560,12 @@
                        /* The preshared key (optional) */
        uint8_t         wgp_psk[WG_PRESHARED_KEY_LEN];
 
-       void            *wgp_si;
-       pcq_t           *wgp_q;
-
        struct wg_session       *wgp_session_stable;
        struct wg_session       *wgp_session_unstable;
 
+       /* first outgoing packet awaiting session initiation */
+       struct mbuf             *wgp_pending;
+
        /* timestamp in big-endian */
        wg_timestamp_t  wgp_timestamp_latest_init;
 
@@ -706,6 +704,7 @@
 static int     wg_init(struct ifnet *);
 static void    wg_stop(struct ifnet *, int);
 
+static void    wgintr(void *);
 static void    wg_purge_pending_packets(struct wg_peer *);
 
 static int     wg_clone_create(struct if_clone *, int);
@@ -788,6 +787,7 @@
 static struct if_clone wg_cloner =
     IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy);
 
+static struct pktqueue *wg_pktq __read_mostly;
 
 void wgattach(int);
 /* ARGSUSED */
@@ -808,6 +808,10 @@
 
        mutex_init(&wg_softcs.lock, MUTEX_DEFAULT, IPL_NONE);
        LIST_INIT(&wg_softcs.list);
+
+       wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL);
+       KASSERT(wg_pktq != NULL);
+
        if_clone_attach(&wg_cloner);
 }
 
@@ -1650,6 +1654,9 @@
                    MIN(wg_rekey_timeout, INT_MAX/hz) * hz);
        } else {
                wg_put_session_index(wg, wgs);
+               /* Initiation failed; toss packet waiting for it if any.  */
+               if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL)
+                       m_freem(m);
        }
 
        return error;
@@ -1780,6 +1787,7 @@
        int error;
        uint8_t mac1[WG_MAC_LEN];
        struct wg_session *wgs_prev;
+       struct mbuf *m;
 
        wg_algo_mac_mac1(mac1, sizeof(mac1),
            wg->wg_pubkey, sizeof(wg->wg_pubkey),
@@ -1932,17 +1940,21 @@
        wg_update_endpoint_if_necessary(wgp, src);
 
        /*
-        * Send something immediately (same as the official implementation)
-        * XXX if there are pending data packets, we don't need to send
-        *     a keepalive message.
+        * If we had a data packet queued up, send it; otherwise send a
+        * keepalive message -- either way we have to send something
+        * immediately or else the responder will never answer.
         */
-       wg_send_keepalive_msg(wgp, wgs);
-
-       /* Anyway run a softint to flush pending packets */
-       kpreempt_disable();
-       softint_schedule(wgp->wgp_si);
-       kpreempt_enable();
-       WG_TRACE("softint scheduled");
+       if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) {
+               const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m)
+
+               M_SETCTX(m, wgp);
+               if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) {
+                       WGLOG(LOG_ERR, "pktq full, dropping\n");
+                       m_freem(m);
+               }
+       } else {
+               wg_send_keepalive_msg(wgp, wgs);
+       }
 
        if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) {
                /* Wait for wg_get_stable_session to drain.  */
@@ -2874,6 +2886,7 @@
 wg_task_establish_session(struct wg_softc *wg, struct wg_peer *wgp)
 {
        struct wg_session *wgs, *wgs_prev;
+       struct mbuf *m;
 
        KASSERT(mutex_owned(wgp->wgp_lock));
 
@@ -2896,6 +2909,17 @@
        wgp->wgp_last_sent_mac1_valid = false;
        wgp->wgp_last_sent_cookie_valid = false;
 
+       /* If we had a data packet queued up, send it.  */
+       if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) {
+               const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m)
+
+               M_SETCTX(m, wgp);
+               if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) {
+                       WGLOG(LOG_ERR, "pktq full, dropping\n");
+                       m_freem(m);
+               }
+       }
+
        if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) {
                /* Wait for wg_get_stable_session to drain.  */
                pserialize_perform(wgp->wgp_psz);
@@ -2911,11 +2935,6 @@
                wg_clear_states(wgs_prev);
                wgs_prev->wgs_state = WGS_STATE_UNKNOWN;
        }
-
-       /* Anyway run a softint to flush pending packets */
-       kpreempt_disable();
-       softint_schedule(wgp->wgp_si);
-       kpreempt_enable();
 }
 
 static void
@@ -3290,29 +3309,32 @@
 }
 
 static void
-wg_peer_softint(void *arg)
+wgintr(void *cookie)
 {
-       struct wg_peer *wgp = arg;
+       struct wg_peer *wgp;
        struct wg_session *wgs;
        struct mbuf *m;
        struct psref psref;
 
-       if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) {
-               /* XXX how to treat? */
-               WG_TRACE("skipped");
-               return;
+       while ((m = pktq_dequeue(wg_pktq)) != NULL) {
+               wgp = M_GETCTX(m, struct wg_peer *);
+               if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) {
+                       WG_TRACE("no stable session");
+                       wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
+                       goto next0;
+               }
+               if (__predict_false(wg_session_hit_limits(wgs))) {
+                       WG_TRACE("stable session hit limits");
+                       wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
+                       goto next1;
+               }
+               wg_send_data_msg(wgp, wgs, m);
+               m = NULL;       /* consumed */
+next1:         wg_put_session(wgs, &psref);
+next0:         if (m)
+                       m_freem(m);
+               /* XXX Yield to avoid userland starvation?  */
        }
-       if (wg_session_hit_limits(wgs)) {
-               wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
-               goto out;
-       }
-       WG_TRACE("running");
-
-       while ((m = pcq_get(wgp->wgp_q)) != NULL) {
-               wg_send_data_msg(wgp, wgs, m);
-       }
-out:
-       wg_put_session(wgs, &psref);
 }
 
 static void
@@ -3328,9 +3350,9 @@
 {
        struct mbuf *m;
 
-       while ((m = pcq_get(wgp->wgp_q)) != NULL) {
+       if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL)
                m_freem(m);
-       }
+       pktq_barrier(wg_pktq);
 }
 
 static void
@@ -3351,8 +3373,6 @@
        wgp = kmem_zalloc(sizeof(*wgp), KM_SLEEP);
 
        wgp->wgp_sc = wg;
-       wgp->wgp_q = pcq_create(1024, KM_SLEEP);
-       wgp->wgp_si = softint_establish(SOFTINT_NET, wg_peer_softint, wgp);
        callout_init(&wgp->wgp_rekey_timer, CALLOUT_MPSAFE);
        callout_setfunc(&wgp->wgp_rekey_timer, wg_rekey_timer, wgp);
        callout_init(&wgp->wgp_handshake_timeout_timer, CALLOUT_MPSAFE);
@@ -3430,7 +3450,6 @@
        wg_purge_pending_packets(wgp);
 
        /* Halt all packet processing and timeouts.  */
-       softint_disestablish(wgp->wgp_si);
        callout_halt(&wgp->wgp_rekey_timer, NULL);
        callout_halt(&wgp->wgp_handshake_timeout_timer, NULL);
        callout_halt(&wgp->wgp_session_dtor_timer, NULL);
@@ -3467,7 +3486,6 @@
        kmem_free(wgp->wgp_endpoint0, sizeof(*wgp->wgp_endpoint0));
 
        pserialize_destroy(wgp->wgp_psz);
-       pcq_destroy(wgp->wgp_q);
        mutex_obj_free(wgp->wgp_lock);
 
        kmem_free(wgp, sizeof(*wgp));
@@ -3581,21 +3599,28 @@
        return 0;
 }
 
+static void
+wg_if_detach(struct wg_softc *wg)
+{
+       struct ifnet *ifp = &wg->wg_if;
+
+       bpf_detach(ifp);
+       if_detach(ifp);
+}
+
 static int
 wg_clone_create(struct if_clone *ifc, int unit)
 {
        struct wg_softc *wg;
        int error;
 
-       wg = kmem_zalloc(sizeof(struct wg_softc), KM_SLEEP);
+       wg = kmem_zalloc(sizeof(*wg), KM_SLEEP);
 
        if_initname(&wg->wg_if, ifc->ifc_name, unit);
 
        error = wg_worker_init(wg);
-       if (error != 0) {
-               kmem_free(wg, sizeof(struct wg_softc));
-               return error;
-       }
+       if (error)
+               goto fail0;
 
        rn_inithead((void **)&wg->wg_rtable_ipv4,
            offsetof(struct sockaddr_in, sin_addr) * NBBY);
@@ -3613,26 +3638,34 @@
        wg->wg_ops = &wg_ops_rumpkernel;
 
        error = wg_if_attach(wg);
-       if (error != 0) {



Home | Main Index | Thread Index | Old Index