From fd9660170073fd86da033608f52c604b9664ae91 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH 1/4] oauth: Remove stale events from the kqueue multiplexer

If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().

In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.

Implement drain_socket_events() to call kevent() and unstick any stale
events. This is called right after drive_request(), before we return
control to the client to wait. To make sure we've taken a look at the
entire queue, register_socket() now tracks the number of outstanding
registrations.

Suggested-by: Thomas Munro <thomas.munro@gmail.com>
---
 src/interfaces/libpq-oauth/oauth-curl.c | 218 ++++++++++++++++++------
 1 file changed, 166 insertions(+), 52 deletions(-)

diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..8430356cfb5 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,10 @@ struct async_ctx
 	bool		user_prompted;	/* have we already sent the authz prompt? */
 	bool		used_basic_auth;	/* did we send a client secret? */
 	bool		debugging;		/* can we give unsafe developer assistance? */
+
+#if defined(HAVE_SYS_EVENT_H)
+	int			nevents;		/* how many events are we waiting on? */
+#endif
 };
 
 /*
@@ -1291,41 +1295,95 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
 
 	return 0;
 #elif defined(HAVE_SYS_EVENT_H)
-	struct kevent ev[2] = {0};
+	struct kevent ev[2];
 	struct kevent ev_out[2];
 	struct timespec timeout = {0};
-	int			nev = 0;
+	int			nev;
 	int			res;
 
+	/*
+	 * First, any existing registrations for this socket need to be removed,
+	 * both to track the outstanding number of events, and to ensure that
+	 * we're not woken up for things that Curl no longer cares about.
+	 *
+	 * ENOENT is okay, but we have to track how many we get, so use
+	 * EV_RECEIPT.
+	 */
+	nev = 0;
+	EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+	nev++;
+	EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+	nev++;
+
+	Assert(nev <= lengthof(ev));
+	Assert(nev <= lengthof(ev_out));
+
+	res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
+	if (res < 0)
+	{
+		actx_error(actx, "could not delete from kqueue: %m");
+		return -1;
+	}
+
+	/*
+	 * We can't use the simple errno version of kevent, because we need to
+	 * skip over ENOENT while still allowing a second change to be processed.
+	 * So we need a longer-form error checking loop.
+	 */
+	for (int i = 0; i < res; ++i)
+	{
+		/*
+		 * EV_RECEIPT should guarantee one EV_ERROR result for every change,
+		 * whether successful or not. Failed entries contain a non-zero errno
+		 * in the data field.
+		 */
+		Assert(ev_out[i].flags & EV_ERROR);
+
+		errno = ev_out[i].data;
+		if (!errno)
+		{
+			/* Successfully removed; update the event count. */
+			Assert(actx->nevents > 0);
+			actx->nevents--;
+		}
+		else if (errno != ENOENT)
+		{
+			actx_error(actx, "could not delete from kqueue: %m");
+			return -1;
+		}
+	}
+
+	/* If we're only removing registrations, we're done. */
+	if (what == CURL_POLL_REMOVE)
+		return 0;
+
+	/*
+	 * Now add the new filters. This is more straightfoward than deletion.
+	 *
+	 * Combining this kevent() call with the one above seems like it should be
+	 * theoretically possible, but beware that not all BSDs keep the original
+	 * event flags when using EV_RECEIPT, so it's tricky to figure out which
+	 * operations succeeded. For now we keep the deletions and the additions
+	 * separate.
+	 */
+	nev = 0;
+
 	switch (what)
 	{
 		case CURL_POLL_IN:
-			EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
+			EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
 			nev++;
 			break;
 
 		case CURL_POLL_OUT:
-			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
+			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
 			nev++;
 			break;
 
 		case CURL_POLL_INOUT:
-			EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
-			nev++;
-			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
-			nev++;
-			break;
-
-		case CURL_POLL_REMOVE:
-
-			/*
-			 * We don't know which of these is currently registered, perhaps
-			 * both, so we try to remove both.  This means we need to tolerate
-			 * ENOENT below.
-			 */
-			EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+			EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
 			nev++;
-			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
 			nev++;
 			break;
 
@@ -1334,45 +1392,91 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
 			return -1;
 	}
 
-	res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+	Assert(nev <= lengthof(ev));
+
+	res = kevent(actx->mux, ev, nev, NULL, 0, NULL);
 	if (res < 0)
 	{
 		actx_error(actx, "could not modify kqueue: %m");
 		return -1;
 	}
 
+	/* Update the event count, and we're done. */
+	actx->nevents += nev;
+
+	return 0;
+#else
+#error register_socket is not implemented on this platform
+#endif
+}
+
+/*-------
+ * Drains any stale level-triggered events out of the multiplexer. This is
+ * necessary only if the mux implementation requires it.
+ *
+ * As an example, consider the following sequence of events:
+ * 1. libcurl tries to write data to the send buffer, but it fills up.
+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
+ *    client to wait.
+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
+ *    and the client wakes up and calls back into the flow.
+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
+ *    The socket is no longer writable.
+ *
+ * At this point, an epoll-based mux no longer signals readiness, so nothing
+ * further needs to be done. But a kqueue-based mux will continue to signal
+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
+ * or the old socket-writable event is read from the queue. Since Curl isn't
+ * guaranteed to do the former, we must do the latter here.
+ */
+static bool
+drain_socket_events(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+	/* The epoll implementation doesn't need to drain pending events. */
+	return true;
+#elif defined(HAVE_SYS_EVENT_H)
+	struct timespec timeout = {0};
+	struct kevent *drain;
+	int			drain_len;
+
 	/*
-	 * We can't use the simple errno version of kevent, because we need to
-	 * skip over ENOENT while still allowing a second change to be processed.
-	 * So we need a longer-form error checking loop.
+	 * Drain the events in one call, rather than looping. (We could maybe call
+	 * kevent() drain_len times, instead of allocating space for the maximum
+	 * number of events, but that relies on the events being in FIFO order to
+	 * avoid starvation. The kqueue man pages don't seem to make any
+	 * guarantees about that.)
+	 *
+	 * register_socket() keeps actx->nevents updated with the number of
+	 * outstanding event filters. We don't track the registration of the
+	 * timer; we just assume one could be registered here.
 	 */
-	for (int i = 0; i < res; ++i)
+	drain_len = actx->nevents + 1;
+
+	drain = malloc(sizeof(*drain) * drain_len);
+	if (!drain)
 	{
-		/*
-		 * EV_RECEIPT should guarantee one EV_ERROR result for every change,
-		 * whether successful or not. Failed entries contain a non-zero errno
-		 * in the data field.
-		 */
-		Assert(ev_out[i].flags & EV_ERROR);
+		actx_error(actx, "out of memory");
+		return false;
+	}
 
-		errno = ev_out[i].data;
-		if (errno && errno != ENOENT)
-		{
-			switch (what)
-			{
-				case CURL_POLL_REMOVE:
-					actx_error(actx, "could not delete from kqueue: %m");
-					break;
-				default:
-					actx_error(actx, "could not add to kqueue: %m");
-			}
-			return -1;
-		}
+	/*
+	 * Discard all pending events. Since our registrations are level-triggered
+	 * (even the timer, since we use a chained kqueue for that instead of an
+	 * EVFILT_TIMER on the top-level mux!), any events that we still need will
+	 * remain signalled, and the stale ones will be swept away.
+	 */
+	if (kevent(actx->mux, NULL, 0, drain, drain_len, &timeout) < 0)
+	{
+		actx_error(actx, "could not drain kqueue: %m");
+		free(drain);
+		return false;
 	}
 
-	return 0;
+	free(drain);
+	return true;
 #else
-#error register_socket is not implemented on this platform
+#error drain_socket_events is not implemented on this platform
 #endif
 }
 
@@ -1441,7 +1545,8 @@ set_timer(struct async_ctx *actx, long timeout)
 	 * macOS.)
 	 *
 	 * If there was no previous timer set, the kevent calls will result in
-	 * ENOENT, which is fine.
+	 * ENOENT, which is fine. (We don't track actx->nevents for this case;
+	 * instead, drain_socket_events() just assumes a timer could be set.)
 	 */
 	EV_SET(&ev, 1, EVFILT_TIMER, EV_DELETE, 0, 0, 0);
 	if (kevent(actx->timerfd, &ev, 1, NULL, 0, NULL) < 0 && errno != ENOENT)
@@ -2755,13 +2860,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
 
 					if (status == PGRES_POLLING_FAILED)
 						goto error_return;
-					else if (status != PGRES_POLLING_OK)
-					{
-						/* not done yet */
-						return status;
-					}
+					else if (status == PGRES_POLLING_OK)
+						break;	/* done! */
+
+					/*
+					 * This request is still running.
+					 *
+					 * Drain any stale socket events from the mux before we
+					 * ask the client to poll. (Currently, this can occur only
+					 * with kqueue.) If this is forgotten, the multiplexer can
+					 * get stuck in a signalled state and we'll burn CPU
+					 * cycles pointlessly.
+					 */
+					if (!drain_socket_events(actx))
+						goto error_return;
 
-					break;
+					return status;
 				}
 
 			case OAUTH_STEP_WAIT_INTERVAL:
-- 
2.34.1

