Source-Changes-HG archive
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index][Old Index]
[src/trunk]: src/sys/net wg: Use threadpool(9) and workqueue(9) for asynchron...
details: https://anonhg.NetBSD.org/src/rev/e85c2bbd47be
branches: trunk
changeset: 938366:e85c2bbd47be
user: riastradh <riastradh%NetBSD.org@localhost>
date: Mon Sep 07 01:15:25 2020 +0000
description:
wg: Use threadpool(9) and workqueue(9) for asynchronous tasks.
- Using threadpool(9) job per interface to receive incoming handshake
messages gives the same concurrency for active interfaces but
doesn't waste kthreads for inactive ones.
=> Can't really do this with a global workqueue(9) because there's
no bound on the amount of time wg_receive_packets() might run
for; we really need separate threads or threadpool jobs in order
to avoid having one interface starve all the others.
- Using a global workqueue(9) for asynchronous peer tasks avoids
creating unnecessary kthreads.
=> Each task does a more or less bounded amount of work, so it's OK
to share a global workqueue -- there's no advantage to adding
concurrency for what is almost certainly going to be CPU-bound
asymmetric crypto.
=> This way we don't need a thread per peer or iteration over a
list of all peers, so the task mechanism should no longer be a
bottleneck to scaling to thousands of peers.
XXX This doesn't distribute the load across CPUs -- it keeps it on
the same CPU where the packet came in. Should consider doing
something to balance the load -- maybe note if the current CPU is
loaded, and if so, sort CPUs by queue length or some other measure of
load and pick the least loaded one or something.
diffstat:
sys/net/if_wg.c | 369 +++++++++++++++++++++++++------------------------------
1 files changed, 168 insertions(+), 201 deletions(-)
diffs (truncated from 656 to 300 lines):
diff -r 3666fec983a6 -r e85c2bbd47be sys/net/if_wg.c
--- a/sys/net/if_wg.c Mon Sep 07 01:14:42 2020 +0000
+++ b/sys/net/if_wg.c Mon Sep 07 01:15:25 2020 +0000
@@ -1,4 +1,4 @@
-/* $NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $ */
+/* $NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $ */
/*
* Copyright (C) Ryota Ozaki <ozaki.ryota%gmail.com@localhost>
@@ -41,7 +41,7 @@
*/
#include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $");
+__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $");
#ifdef _KERNEL_OPT
#include "opt_inet.h"
@@ -61,7 +61,6 @@
#include <sys/ioctl.h>
#include <sys/kernel.h>
#include <sys/kmem.h>
-#include <sys/kthread.h>
#include <sys/mbuf.h>
#include <sys/module.h>
#include <sys/mutex.h>
@@ -77,8 +76,10 @@
#include <sys/syslog.h>
#include <sys/systm.h>
#include <sys/thmap.h>
+#include <sys/threadpool.h>
#include <sys/time.h>
#include <sys/timespec.h>
+#include <sys/workqueue.h>
#include <net/bpf.h>
#include <net/if.h>
@@ -120,10 +121,11 @@
* Data structures
* - struct wg_softc is an instance of wg interfaces
* - It has a list of peers (struct wg_peer)
- * - It has a kthread that sends/receives handshake messages and
+ * - It has a threadpool job that sends/receives handshake messages and
* runs event handlers
* - It has its own two routing tables: one is for IPv4 and the other IPv6
* - struct wg_peer is a representative of a peer
+ * - It has a struct work to handle handshakes and timer tasks
* - It has a pair of session instances (struct wg_session)
* - It has a pair of endpoint instances (struct wg_sockaddr)
* - Normally one endpoint is used and the second one is used only on
@@ -446,18 +448,6 @@
return 0;
}
-struct wg_worker {
- kmutex_t wgw_lock;
- kcondvar_t wgw_cv;
- bool wgw_todie;
- struct socket *wgw_so4;
- struct socket *wgw_so6;
- int wgw_wakeup_reasons;
-#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4 __BIT(0)
-#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6 __BIT(1)
-#define WG_WAKEUP_REASON_PEER __BIT(2)
-};
-
struct wg_session {
struct wg_peer *wgs_peer;
struct psref_target
@@ -550,6 +540,7 @@
pserialize_t wgp_psz;
struct psref_target wgp_psref;
kmutex_t *wgp_lock;
+ kmutex_t *wgp_intr_lock;
uint8_t wgp_pubkey[WG_STATIC_KEY_LEN];
struct wg_sockaddr *wgp_endpoint;
@@ -594,7 +585,8 @@
struct wg_ppsratecheck wgp_ppsratecheck;
- volatile unsigned int wgp_tasks;
+ struct work wgp_work;
+ unsigned int wgp_tasks;
#define WGP_TASK_SEND_INIT_MESSAGE __BIT(0)
#define WGP_TASK_RETRY_HANDSHAKE __BIT(1)
#define WGP_TASK_ESTABLISH_SESSION __BIT(2)
@@ -609,6 +601,7 @@
struct ifnet wg_if;
LIST_ENTRY(wg_softc) wg_list;
kmutex_t *wg_lock;
+ kmutex_t *wg_intr_lock;
krwlock_t *wg_rwlock;
uint8_t wg_privkey[WG_STATIC_KEY_LEN];
@@ -621,11 +614,21 @@
struct thmap *wg_sessions_byindex;
uint16_t wg_listen_port;
- struct wg_worker *wg_worker;
- lwp_t *wg_worker_lwp;
-
+ struct threadpool *wg_threadpool;
+
+ struct threadpool_job wg_job;
+ int wg_upcalls;
+#define WG_UPCALL_INET __BIT(0)
+#define WG_UPCALL_INET6 __BIT(1)
+
+#ifdef INET
+ struct socket *wg_so4;
struct radix_node_head *wg_rtable_ipv4;
+#endif
+#ifdef INET6
+ struct socket *wg_so6;
struct radix_node_head *wg_rtable_ipv6;
+#endif
struct wg_ppsratecheck wg_ppsratecheck;
@@ -659,8 +662,6 @@
static struct mbuf *
wg_get_mbuf(size_t, size_t);
-static void wg_wakeup_worker(struct wg_worker *, int);
-
static int wg_send_data_msg(struct wg_peer *, struct wg_session *,
struct mbuf *);
static int wg_send_cookie_msg(struct wg_softc *, struct wg_peer *,
@@ -704,6 +705,8 @@
static int wg_init(struct ifnet *);
static void wg_stop(struct ifnet *, int);
+static void wg_peer_work(struct work *, void *);
+static void wg_job(struct threadpool_job *);
static void wgintr(void *);
static void wg_purge_pending_packets(struct wg_peer *);
@@ -788,6 +791,7 @@
IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy);
static struct pktqueue *wg_pktq __read_mostly;
+static struct workqueue *wg_wq __read_mostly;
void wgattach(int);
/* ARGSUSED */
@@ -803,6 +807,7 @@
static void
wginit(void)
{
+ int error __diagused;
wg_psref_class = psref_class_create("wg", IPL_SOFTNET);
@@ -812,6 +817,10 @@
wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL);
KASSERT(wg_pktq != NULL);
+ error = workqueue_create(&wg_wq, "wgpeer", wg_peer_work, NULL,
+ PRI_NONE, IPL_SOFTNET, WQ_MPSAFE|WQ_PERCPU);
+ KASSERT(error == 0);
+
if_clone_attach(&wg_cloner);
}
@@ -1555,17 +1564,17 @@
}
static struct socket *
-wg_get_so_by_af(struct wg_worker *wgw, const int af)
+wg_get_so_by_af(struct wg_softc *wg, const int af)
{
- return (af == AF_INET) ? wgw->wgw_so4 : wgw->wgw_so6;
+ return (af == AF_INET) ? wg->wg_so4 : wg->wg_so6;
}
static struct socket *
wg_get_so_by_peer(struct wg_peer *wgp, struct wg_sockaddr *wgsa)
{
- return wg_get_so_by_af(wgp->wgp_sc->wg_worker, wgsa_family(wgsa));
+ return wg_get_so_by_af(wgp->wgp_sc, wgsa_family(wgsa));
}
static struct wg_sockaddr *
@@ -2246,9 +2255,18 @@
wg_schedule_peer_task(struct wg_peer *wgp, int task)
{
- atomic_or_uint(&wgp->wgp_tasks, task);
+ mutex_enter(wgp->wgp_intr_lock);
WG_DLOG("tasks=%d, task=%d\n", wgp->wgp_tasks, task);
- wg_wakeup_worker(wgp->wgp_sc->wg_worker, WG_WAKEUP_REASON_PEER);
+ if (wgp->wgp_tasks == 0)
+ /*
+ * XXX If the current CPU is already loaded -- e.g., if
+ * there's already a bunch of handshakes queued up --
+ * consider tossing this over to another CPU to
+ * distribute the load.
+ */
+ workqueue_enqueue(wg_wq, &wgp->wgp_work, NULL);
+ wgp->wgp_tasks |= task;
+ mutex_exit(wgp->wgp_intr_lock);
}
static void
@@ -2783,7 +2801,7 @@
struct mbuf *paddr = NULL;
struct sockaddr *src;
- so = wg_get_so_by_af(wg->wg_worker, af);
+ so = wg_get_so_by_af(wg, af);
flags = MSG_DONTWAIT;
dummy_uio.uio_resid = 1000000000;
@@ -2987,28 +3005,16 @@
}
static void
-wg_process_peer_tasks(struct wg_softc *wg)
+wg_peer_work(struct work *wk, void *cookie)
{
- struct wg_peer *wgp;
- int s;
-
- /* XXX should avoid checking all peers */
- s = pserialize_read_enter();
- WG_PEER_READER_FOREACH(wgp, wg) {
- struct psref psref;
- unsigned int tasks;
-
- if (wgp->wgp_tasks == 0)
- continue;
-
- wg_get_peer(wgp, &psref);
- pserialize_read_exit(s);
-
- restart:
- tasks = atomic_swap_uint(&wgp->wgp_tasks, 0);
- KASSERT(tasks != 0);
-
- WG_DLOG("tasks=%x\n", tasks);
+ struct wg_peer *wgp = container_of(wk, struct wg_peer, wgp_work);
+ struct wg_softc *wg = wgp->wgp_sc;
+ int tasks;
+
+ mutex_enter(wgp->wgp_intr_lock);
+ while ((tasks = wgp->wgp_tasks) != 0) {
+ wgp->wgp_tasks = 0;
+ mutex_exit(wgp->wgp_intr_lock);
mutex_enter(wgp->wgp_lock);
if (ISSET(tasks, WGP_TASK_SEND_INIT_MESSAGE))
@@ -3025,66 +3031,37 @@
wg_task_destroy_prev_session(wg, wgp);
mutex_exit(wgp->wgp_lock);
- /* New tasks may be scheduled during processing tasks */
- WG_DLOG("wgp_tasks=%d\n", wgp->wgp_tasks);
- if (wgp->wgp_tasks != 0)
- goto restart;
-
- s = pserialize_read_enter();
- wg_put_peer(wgp, &psref);
+ mutex_enter(wgp->wgp_intr_lock);
}
- pserialize_read_exit(s);
+ mutex_exit(wgp->wgp_intr_lock);
}
static void
-wg_worker(void *arg)
+wg_job(struct threadpool_job *job)
{
- struct wg_softc *wg = arg;
- struct wg_worker *wgw = wg->wg_worker;
- bool todie = false;
-
- KASSERT(wg != NULL);
- KASSERT(wgw != NULL);
-
- while (!todie) {
- int reasons;
- int bound;
-
- mutex_enter(&wgw->wgw_lock);
- /* New tasks may come during task handling */
- while ((reasons = wgw->wgw_wakeup_reasons) == 0 &&
- !(todie = wgw->wgw_todie))
- cv_wait(&wgw->wgw_cv, &wgw->wgw_lock);
- wgw->wgw_wakeup_reasons = 0;
- mutex_exit(&wgw->wgw_lock);
-
+ struct wg_softc *wg = container_of(job, struct wg_softc, wg_job);
+ int bound, upcalls;
+
+ mutex_enter(wg->wg_intr_lock);
+ while ((upcalls = wg->wg_upcalls) != 0) {
+ wg->wg_upcalls = 0;
Home |
Main Index |
Thread Index |
Old Index