Source-Changes-HG archive

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]

[src/trunk]: src/sys/net wg: Use threadpool(9) and workqueue(9) for asynchron...



details:   https://anonhg.NetBSD.org/src/rev/93eae6364725
branches:  trunk
changeset: 975828:93eae6364725
user:      riastradh <riastradh%NetBSD.org@localhost>
date:      Mon Sep 07 01:15:25 2020 +0000

description:
wg: Use threadpool(9) and workqueue(9) for asynchronous tasks.

- Using threadpool(9) job per interface to receive incoming handshake
  messages gives the same concurrency for active interfaces but
  doesn't waste kthreads for inactive ones.

  => Can't really do this with a global workqueue(9) because there's
     no bound on the amount of time wg_receive_packets() might run
     for; we really need separate threads or threadpool jobs in order
     to avoid having one interface starve all the others.

- Using a global workqueue(9) for asynchronous peer tasks avoids
  creating unnecessary kthreads.

  => Each task does a more or less bounded amount of work, so it's OK
     to share a global workqueue -- there's no advantage to adding
     concurrency for what is almost certainly going to be CPU-bound
     asymmetric crypto.

  => This way we don't need a thread per peer or iteration over a
     list of all peers, so the task mechanism should no longer be a
     bottleneck to scaling to thousands of peers.

XXX This doesn't distribute the load across CPUs -- it keeps it on
the same CPU where the packet came in.  Should consider doing
something to balance the load -- maybe note if the current CPU is
loaded, and if so, sort CPUs by queue length or some other measure of
load and pick the least loaded one or something.

diffstat:

 sys/net/if_wg.c |  369 +++++++++++++++++++++++++------------------------------
 1 files changed, 168 insertions(+), 201 deletions(-)

diffs (truncated from 656 to 300 lines):

diff -r 55eb40c2bd05 -r 93eae6364725 sys/net/if_wg.c
--- a/sys/net/if_wg.c   Mon Sep 07 01:14:42 2020 +0000
+++ b/sys/net/if_wg.c   Mon Sep 07 01:15:25 2020 +0000
@@ -1,4 +1,4 @@
-/*     $NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $     */
+/*     $NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $     */
 
 /*
  * Copyright (C) Ryota Ozaki <ozaki.ryota%gmail.com@localhost>
@@ -41,7 +41,7 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $");
+__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $");
 
 #ifdef _KERNEL_OPT
 #include "opt_inet.h"
@@ -61,7 +61,6 @@
 #include <sys/ioctl.h>
 #include <sys/kernel.h>
 #include <sys/kmem.h>
-#include <sys/kthread.h>
 #include <sys/mbuf.h>
 #include <sys/module.h>
 #include <sys/mutex.h>
@@ -77,8 +76,10 @@
 #include <sys/syslog.h>
 #include <sys/systm.h>
 #include <sys/thmap.h>
+#include <sys/threadpool.h>
 #include <sys/time.h>
 #include <sys/timespec.h>
+#include <sys/workqueue.h>
 
 #include <net/bpf.h>
 #include <net/if.h>
@@ -120,10 +121,11 @@
  * Data structures
  * - struct wg_softc is an instance of wg interfaces
  *   - It has a list of peers (struct wg_peer)
- *   - It has a kthread that sends/receives handshake messages and
+ *   - It has a threadpool job that sends/receives handshake messages and
  *     runs event handlers
  *   - It has its own two routing tables: one is for IPv4 and the other IPv6
  * - struct wg_peer is a representative of a peer
+ *   - It has a struct work to handle handshakes and timer tasks
  *   - It has a pair of session instances (struct wg_session)
  *   - It has a pair of endpoint instances (struct wg_sockaddr)
  *     - Normally one endpoint is used and the second one is used only on
@@ -446,18 +448,6 @@
        return 0;
 }
 
-struct wg_worker {
-       kmutex_t        wgw_lock;
-       kcondvar_t      wgw_cv;
-       bool            wgw_todie;
-       struct socket   *wgw_so4;
-       struct socket   *wgw_so6;
-       int             wgw_wakeup_reasons;
-#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4  __BIT(0)
-#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6  __BIT(1)
-#define WG_WAKEUP_REASON_PEER                  __BIT(2)
-};
-
 struct wg_session {
        struct wg_peer  *wgs_peer;
        struct psref_target
@@ -550,6 +540,7 @@
        pserialize_t            wgp_psz;
        struct psref_target     wgp_psref;
        kmutex_t                *wgp_lock;
+       kmutex_t                *wgp_intr_lock;
 
        uint8_t wgp_pubkey[WG_STATIC_KEY_LEN];
        struct wg_sockaddr      *wgp_endpoint;
@@ -594,7 +585,8 @@
 
        struct wg_ppsratecheck  wgp_ppsratecheck;
 
-       volatile unsigned int   wgp_tasks;
+       struct work             wgp_work;
+       unsigned int            wgp_tasks;
 #define WGP_TASK_SEND_INIT_MESSAGE             __BIT(0)
 #define WGP_TASK_RETRY_HANDSHAKE               __BIT(1)
 #define WGP_TASK_ESTABLISH_SESSION             __BIT(2)
@@ -609,6 +601,7 @@
        struct ifnet    wg_if;
        LIST_ENTRY(wg_softc) wg_list;
        kmutex_t        *wg_lock;
+       kmutex_t        *wg_intr_lock;
        krwlock_t       *wg_rwlock;
 
        uint8_t         wg_privkey[WG_STATIC_KEY_LEN];
@@ -621,11 +614,21 @@
        struct thmap    *wg_sessions_byindex;
        uint16_t        wg_listen_port;
 
-       struct wg_worker        *wg_worker;
-       lwp_t                   *wg_worker_lwp;
-
+       struct threadpool       *wg_threadpool;
+
+       struct threadpool_job   wg_job;
+       int                     wg_upcalls;
+#define        WG_UPCALL_INET  __BIT(0)
+#define        WG_UPCALL_INET6 __BIT(1)
+
+#ifdef INET
+       struct socket           *wg_so4;
        struct radix_node_head  *wg_rtable_ipv4;
+#endif
+#ifdef INET6
+       struct socket           *wg_so6;
        struct radix_node_head  *wg_rtable_ipv6;
+#endif
 
        struct wg_ppsratecheck  wg_ppsratecheck;
 
@@ -659,8 +662,6 @@
 static struct mbuf *
                wg_get_mbuf(size_t, size_t);
 
-static void    wg_wakeup_worker(struct wg_worker *, int);
-
 static int     wg_send_data_msg(struct wg_peer *, struct wg_session *,
                    struct mbuf *);
 static int     wg_send_cookie_msg(struct wg_softc *, struct wg_peer *,
@@ -704,6 +705,8 @@
 static int     wg_init(struct ifnet *);
 static void    wg_stop(struct ifnet *, int);
 
+static void    wg_peer_work(struct work *, void *);
+static void    wg_job(struct threadpool_job *);
 static void    wgintr(void *);
 static void    wg_purge_pending_packets(struct wg_peer *);
 
@@ -788,6 +791,7 @@
     IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy);
 
 static struct pktqueue *wg_pktq __read_mostly;
+static struct workqueue *wg_wq __read_mostly;
 
 void wgattach(int);
 /* ARGSUSED */
@@ -803,6 +807,7 @@
 static void
 wginit(void)
 {
+       int error __diagused;
 
        wg_psref_class = psref_class_create("wg", IPL_SOFTNET);
 
@@ -812,6 +817,10 @@
        wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL);
        KASSERT(wg_pktq != NULL);
 
+       error = workqueue_create(&wg_wq, "wgpeer", wg_peer_work, NULL,
+           PRI_NONE, IPL_SOFTNET, WQ_MPSAFE|WQ_PERCPU);
+       KASSERT(error == 0);
+
        if_clone_attach(&wg_cloner);
 }
 
@@ -1555,17 +1564,17 @@
 }
 
 static struct socket *
-wg_get_so_by_af(struct wg_worker *wgw, const int af)
+wg_get_so_by_af(struct wg_softc *wg, const int af)
 {
 
-       return (af == AF_INET) ? wgw->wgw_so4 : wgw->wgw_so6;
+       return (af == AF_INET) ? wg->wg_so4 : wg->wg_so6;
 }
 
 static struct socket *
 wg_get_so_by_peer(struct wg_peer *wgp, struct wg_sockaddr *wgsa)
 {
 
-       return wg_get_so_by_af(wgp->wgp_sc->wg_worker, wgsa_family(wgsa));
+       return wg_get_so_by_af(wgp->wgp_sc, wgsa_family(wgsa));
 }
 
 static struct wg_sockaddr *
@@ -2246,9 +2255,18 @@
 wg_schedule_peer_task(struct wg_peer *wgp, int task)
 {
 
-       atomic_or_uint(&wgp->wgp_tasks, task);
+       mutex_enter(wgp->wgp_intr_lock);
        WG_DLOG("tasks=%d, task=%d\n", wgp->wgp_tasks, task);
-       wg_wakeup_worker(wgp->wgp_sc->wg_worker, WG_WAKEUP_REASON_PEER);
+       if (wgp->wgp_tasks == 0)
+               /*
+                * XXX If the current CPU is already loaded -- e.g., if
+                * there's already a bunch of handshakes queued up --
+                * consider tossing this over to another CPU to
+                * distribute the load.
+                */
+               workqueue_enqueue(wg_wq, &wgp->wgp_work, NULL);
+       wgp->wgp_tasks |= task;
+       mutex_exit(wgp->wgp_intr_lock);
 }
 
 static void
@@ -2783,7 +2801,7 @@
                struct mbuf *paddr = NULL;
                struct sockaddr *src;
 
-               so = wg_get_so_by_af(wg->wg_worker, af);
+               so = wg_get_so_by_af(wg, af);
                flags = MSG_DONTWAIT;
                dummy_uio.uio_resid = 1000000000;
 
@@ -2987,28 +3005,16 @@
 }
 
 static void
-wg_process_peer_tasks(struct wg_softc *wg)
+wg_peer_work(struct work *wk, void *cookie)
 {
-       struct wg_peer *wgp;
-       int s;
-
-       /* XXX should avoid checking all peers */
-       s = pserialize_read_enter();
-       WG_PEER_READER_FOREACH(wgp, wg) {
-               struct psref psref;
-               unsigned int tasks;
-
-               if (wgp->wgp_tasks == 0)
-                       continue;
-
-               wg_get_peer(wgp, &psref);
-               pserialize_read_exit(s);
-
-       restart:
-               tasks = atomic_swap_uint(&wgp->wgp_tasks, 0);
-               KASSERT(tasks != 0);
-
-               WG_DLOG("tasks=%x\n", tasks);
+       struct wg_peer *wgp = container_of(wk, struct wg_peer, wgp_work);
+       struct wg_softc *wg = wgp->wgp_sc;
+       int tasks;
+
+       mutex_enter(wgp->wgp_intr_lock);
+       while ((tasks = wgp->wgp_tasks) != 0) {
+               wgp->wgp_tasks = 0;
+               mutex_exit(wgp->wgp_intr_lock);
 
                mutex_enter(wgp->wgp_lock);
                if (ISSET(tasks, WGP_TASK_SEND_INIT_MESSAGE))
@@ -3025,66 +3031,37 @@
                        wg_task_destroy_prev_session(wg, wgp);
                mutex_exit(wgp->wgp_lock);
 
-               /* New tasks may be scheduled during processing tasks */
-               WG_DLOG("wgp_tasks=%d\n", wgp->wgp_tasks);
-               if (wgp->wgp_tasks != 0)
-                       goto restart;
-
-               s = pserialize_read_enter();
-               wg_put_peer(wgp, &psref);
+               mutex_enter(wgp->wgp_intr_lock);
        }
-       pserialize_read_exit(s);
+       mutex_exit(wgp->wgp_intr_lock);
 }
 
 static void
-wg_worker(void *arg)
+wg_job(struct threadpool_job *job)
 {
-       struct wg_softc *wg = arg;
-       struct wg_worker *wgw = wg->wg_worker;
-       bool todie = false;
-
-       KASSERT(wg != NULL);
-       KASSERT(wgw != NULL);
-
-       while (!todie) {
-               int reasons;
-               int bound;
-
-               mutex_enter(&wgw->wgw_lock);
-               /* New tasks may come during task handling */
-               while ((reasons = wgw->wgw_wakeup_reasons) == 0 &&
-                   !(todie = wgw->wgw_todie))
-                       cv_wait(&wgw->wgw_cv, &wgw->wgw_lock);
-               wgw->wgw_wakeup_reasons = 0;
-               mutex_exit(&wgw->wgw_lock);
-
+       struct wg_softc *wg = container_of(job, struct wg_softc, wg_job);
+       int bound, upcalls;
+
+       mutex_enter(wg->wg_intr_lock);
+       while ((upcalls = wg->wg_upcalls) != 0) {
+               wg->wg_upcalls = 0;



Home | Main Index | Thread Index | Old Index