Hi,

Currently all backends have LatchWaitSet (latch.c), and most also have
FeBeWaitSet (pqcomm.c).  It's not the end of the world, but it's a
little bit wasteful in terms of kernel resources to have two
epoll/kqueue descriptors per backend.

I wonder if we should consider merging them into a single
BackendWaitSet.  The reason they exist is because callers of
WaitLatch() might be woken by the kernel just because data appears on
the FeBe socket.  One idea is that we could assume that socket
readiness events should be rare enough at WaitLatch() sites that it's
enough to disable them lazily if they are reported.  The FeBe code
already adjusts as required.  For example, if you're waiting for a
heavyweight lock or condition variable while executing a query, and
pipelined query or COPY data arrives, you'll spuriously wake up, but
only once and not again until you eventually reach FeBe read and all
queued socket data is drained and more data arrives.

Sketch patch attached.  Just an idea, not putting into commitfest yet.

(Besides the wasted kernel sources, I also speculate that things get
pretty confusing if you try to switch to completion based APIs for
more efficient socket IO on various OSes, depending on how you
implement latches.  I have some handwavy theories about various
schemes to achieve that on Linux, Windows and FreeBSD with various
different problems relating to the existence of two kernel objects.
Which is a bit more fuel for my early suspicion that postgres_fdw,
which currently creates and destroys WES, should eventually also use
BackendWaitSet, which should be dynamically resizing.  But that's for
another time.)
From e3c4bb6245d0329f53d5545f487c996da1948ade Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 27 Oct 2022 16:37:28 +1300
Subject: [PATCH] Merge FeBeWaitSet and LatchWaitSet.

Instead of using two epoll/kqueue kernel objects and descriptors in
every backend process, create a new common one called BackendWaitSet.
In backends that are connected to a FeBe socket, WaitLatch() might
occasionally report a socket readiness event if data arrives, but that's
not expected to happen much and can be lazily suppressed, and reenabled
at the next FeBe wait.
---
 src/backend/libpq/be-secure.c       | 10 ++--
 src/backend/libpq/pqcomm.c          | 21 +++-----
 src/backend/replication/walsender.c |  4 +-
 src/backend/storage/ipc/latch.c     | 74 +++++++++++++++++++----------
 src/backend/utils/init/miscinit.c   | 12 ++---
 src/include/libpq/libpq.h           |  6 ---
 src/include/storage/latch.h         |  9 +++-
 7 files changed, 77 insertions(+), 59 deletions(-)

diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index e3e54713e8..431abd58ce 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -179,9 +179,10 @@ retry:
 
 		Assert(waitfor);
 
-		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+		ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, MyLatch);
+		ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, waitfor, NULL);
 
-		WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
+		WaitEventSetWait(BackendWaitSet, -1 /* no timeout */ , &event, 1,
 						 WAIT_EVENT_CLIENT_READ);
 
 		/*
@@ -291,9 +292,10 @@ retry:
 
 		Assert(waitfor);
 
-		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+		ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, MyLatch);
+		ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, waitfor, NULL);
 
-		WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
+		WaitEventSetWait(BackendWaitSet, -1 /* no timeout */ , &event, 1,
 						 WAIT_EVENT_CLIENT_WRITE);
 
 		/* See comments in secure_read. */
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index ce56ab1d41..1762e3a6ba 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -161,8 +161,6 @@ static const PQcommMethods PqCommSocketMethods = {
 
 const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
-WaitEventSet *FeBeWaitSet;
-
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -172,7 +170,6 @@ void
 pq_init(void)
 {
 	int			socket_pos PG_USED_FOR_ASSERTS_ONLY;
-	int			latch_pos PG_USED_FOR_ASSERTS_ONLY;
 
 	/* initialize state variables */
 	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
@@ -200,20 +197,14 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
-	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, FeBeWaitSetNEvents);
-	socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
+	socket_pos = AddWaitEventToSet(BackendWaitSet, WL_SOCKET_WRITEABLE,
 								   MyProcPort->sock, NULL, NULL);
-	latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
-								  MyLatch, NULL);
-	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
-					  NULL, NULL);
 
 	/*
 	 * The event positions match the order we added them, but let's sanity
 	 * check them to be sure.
 	 */
-	Assert(socket_pos == FeBeWaitSetSocketPos);
-	Assert(latch_pos == FeBeWaitSetLatchPos);
+	Assert(socket_pos == BackendWaitSetSocketPos);
 }
 
 /* --------------------------------
@@ -2022,17 +2013,17 @@ show_tcp_user_timeout(void)
 bool
 pq_check_connection(void)
 {
-	WaitEvent	events[FeBeWaitSetNEvents];
+	WaitEvent	events[BackendWaitSetSocketPos + 1];
 	int			rc;
 
 	/*
 	 * It's OK to modify the socket event filter without restoring, because
-	 * all FeBeWaitSet socket wait sites do the same.
+	 * all BackendWaitSet socket wait sites do the same.
 	 */
-	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
+	ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
 
 retry:
-	rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
+	rc = WaitEventSetWait(BackendWaitSet, 0, events, lengthof(events), 0);
 	for (int i = 0; i < rc; ++i)
 	{
 		if (events[i].events & WL_SOCKET_CLOSED)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a81ef6a201..d539dabb37 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3320,8 +3320,8 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
 {
 	WaitEvent	event;
 
-	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
-	if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
+	ModifyWaitEvent(BackendWaitSet, BackendWaitSetSocketPos, socket_events, NULL);
+	if (WaitEventSetWait(BackendWaitSet, timeout, &event, 1, wait_event) == 1 &&
 		(event.events & WL_POSTMASTER_DEATH))
 		proc_exit(1);
 }
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index eb3a569aae..89574b544f 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -151,11 +151,14 @@ struct WaitEventSet
 #endif
 };
 
-/* A common WaitEventSet used to implement WatchLatch() */
-static WaitEventSet *LatchWaitSet;
+/* A common WaitEventSet used to implement WatchLatch() and Fe/Be comms */
+WaitEventSet *BackendWaitSet;
 
-/* The position of the latch in LatchWaitSet. */
-#define LatchWaitSetLatchPos 0
+/* The position of the latch in BackendWaitSet. */
+#define BackendWaitSetLatchPos 0
+
+/* In backends that have a FeBe socket, it is here (see pqcomm.c). */
+#define BackendWaitSetSocketPos 2
 
 #ifndef WIN32
 /* Are we currently in WaitLatch? The signal handler would like to know. */
@@ -302,21 +305,23 @@ InitializeLatchSupport(void)
 }
 
 void
-InitializeLatchWaitSet(void)
+InitializeBackendWaitSet(void)
 {
 	int			latch_pos PG_USED_FOR_ASSERTS_ONLY;
 
-	Assert(LatchWaitSet == NULL);
+	Assert(BackendWaitSet == NULL);
 
-	/* Set up the WaitEventSet used by WaitLatch(). */
-	LatchWaitSet = CreateWaitEventSet(TopMemoryContext, 2);
-	latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
+	/* Set up the WaitEventSet used by WaitLatch() and libpq.c. */
+	BackendWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+	latch_pos = AddWaitEventToSet(BackendWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
 								  MyLatch, NULL);
+	Assert(latch_pos == BackendWaitSetLatchPos);
+
 	if (IsUnderPostmaster)
-		AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH,
+		AddWaitEventToSet(BackendWaitSet, WL_EXIT_ON_PM_DEATH,
 						  PGINVALID_SOCKET, NULL, NULL);
 
-	Assert(latch_pos == LatchWaitSetLatchPos);
+	/* libpq.c may also add a socket if connected to the frontend */
 }
 
 void
@@ -326,10 +331,10 @@ ShutdownLatchSupport(void)
 	pqsignal(SIGURG, SIG_IGN);
 #endif
 
-	if (LatchWaitSet)
+	if (BackendWaitSet)
 	{
-		FreeWaitEventSet(LatchWaitSet);
-		LatchWaitSet = NULL;
+		FreeWaitEventSet(BackendWaitSet);
+		BackendWaitSet = NULL;
 	}
 
 #if defined(WAIT_USE_SELF_PIPE)
@@ -477,6 +482,7 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
 		  uint32 wait_event_info)
 {
 	WaitEvent	event;
+	int			result = 0;
 
 	/* Postmaster-managed callers must handle postmaster death somehow. */
 	Assert(!IsUnderPostmaster ||
@@ -490,17 +496,32 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
 	 */
 	if (!(wakeEvents & WL_LATCH_SET))
 		latch = NULL;
-	ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch);
-	LatchWaitSet->exit_on_postmaster_death =
-		((wakeEvents & WL_EXIT_ON_PM_DEATH) != 0);
-
-	if (WaitEventSetWait(LatchWaitSet,
-						 (wakeEvents & WL_TIMEOUT) ? timeout : -1,
-						 &event, 1,
-						 wait_event_info) == 0)
-		return WL_TIMEOUT;
-	else
-		return event.events;
+	ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET, latch);
+
+	do
+	{
+		if (WaitEventSetWait(BackendWaitSet,
+							 (wakeEvents & WL_TIMEOUT) ? timeout : -1,
+							 &event, 1,
+							 wait_event_info) == 0)
+			result = WL_TIMEOUT;
+		else if (event.events & WL_SOCKET_MASK)
+		{
+			/*
+			 * Lazily shut off socket readiness events if they occur while
+			 * we're just waiting for a latch or timeout.
+			 */
+			ModifyWaitEvent(BackendWaitSet, event.pos, WL_SOCKET_IGNORE, NULL);
+		}
+		else
+			result = event.events;
+	} while (result == 0);
+
+	if (result == WL_POSTMASTER_DEATH &&
+		(wakeEvents & WL_EXIT_ON_PM_DEATH))
+		proc_exit(1);
+
+	return result;
 }
 
 /*
@@ -1069,6 +1090,7 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 		Assert(event->fd != PGINVALID_SOCKET);
 		Assert(event->events & (WL_SOCKET_READABLE |
 								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_IGNORE |
 								WL_SOCKET_CLOSED));
 
 		if (event->events & WL_SOCKET_READABLE)
@@ -1117,6 +1139,7 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 	{
 		Assert(event->events & (WL_SOCKET_READABLE |
 								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_IGNORE |
 								WL_SOCKET_CLOSED));
 		pollfd->events = 0;
 		if (event->events & WL_SOCKET_READABLE)
@@ -1201,6 +1224,7 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 		   event->events == WL_POSTMASTER_DEATH ||
 		   (event->events & (WL_SOCKET_READABLE |
 							 WL_SOCKET_WRITEABLE |
+							 WL_SOCKET_IGNORE |
 							 WL_SOCKET_CLOSED)));
 
 	if (event->events == WL_POSTMASTER_DEATH)
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index dfdcd3d78e..76f99f1700 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -137,7 +137,7 @@ InitPostmasterChild(void)
 	InitializeLatchSupport();
 	MyLatch = &LocalLatchData;
 	InitLatch(MyLatch);
-	InitializeLatchWaitSet();
+	InitializeBackendWaitSet();
 
 	/*
 	 * If possible, make this process a group leader, so that the postmaster
@@ -191,7 +191,7 @@ InitStandaloneProcess(const char *argv0)
 	InitializeLatchSupport();
 	MyLatch = &LocalLatchData;
 	InitLatch(MyLatch);
-	InitializeLatchWaitSet();
+	InitializeBackendWaitSet();
 
 	/*
 	 * For consistency with InitPostmasterChild, initialize signal mask here.
@@ -220,8 +220,8 @@ SwitchToSharedLatch(void)
 
 	MyLatch = &MyProc->procLatch;
 
-	if (FeBeWaitSet)
-		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
+	if (BackendWaitSet)
+		ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET,
 						MyLatch);
 
 	/*
@@ -240,8 +240,8 @@ SwitchBackToLocalLatch(void)
 
 	MyLatch = &LocalLatchData;
 
-	if (FeBeWaitSet)
-		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
+	if (BackendWaitSet)
+		ModifyWaitEvent(BackendWaitSet, BackendWaitSetLatchPos, WL_LATCH_SET,
 						MyLatch);
 
 	SetLatch(MyLatch);
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2de7d9bad2..111f2e3297 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -58,12 +58,6 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods;
 /*
  * prototypes for functions in pqcomm.c
  */
-extern PGDLLIMPORT WaitEventSet *FeBeWaitSet;
-
-#define FeBeWaitSetSocketPos 0
-#define FeBeWaitSetLatchPos 1
-#define FeBeWaitSetNEvents 3
-
 extern int	StreamServerPort(int family, const char *hostName,
 							 unsigned short portNumber, const char *unixSocketDir,
 							 pgsocket ListenSocket[], int MaxListen);
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 68ab740f16..b2e2d8c3f9 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -135,6 +135,7 @@ typedef struct Latch
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
 #endif
 #define WL_SOCKET_CLOSED 	 (1 << 7)
+#define WL_SOCKET_IGNORE 	 (1 << 8)	/* temporarily ignore a socket */
 #define WL_SOCKET_MASK		(WL_SOCKET_READABLE | \
 							 WL_SOCKET_WRITEABLE | \
 							 WL_SOCKET_CONNECTED | \
@@ -154,6 +155,12 @@ typedef struct WaitEvent
 /* forward declaration to avoid exposing latch.c implementation details */
 typedef struct WaitEventSet WaitEventSet;
 
+/* extern for pgcomm.c to access */
+extern PGDLLIMPORT WaitEventSet *BackendWaitSet;
+#define BackendWaitSetLatchPos 0
+#define BackendWaitSetSocketPos 2
+
+
 /*
  * prototypes for functions in latch.c
  */
@@ -179,7 +186,7 @@ extern int	WaitLatch(Latch *latch, int wakeEvents, long timeout,
 					  uint32 wait_event_info);
 extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
-extern void InitializeLatchWaitSet(void);
+extern void InitializeBackendWaitSet(void);
 extern int	GetNumRegisteredWaitEvents(WaitEventSet *set);
 extern bool WaitEventSetCanReportClosed(void);
 
-- 
2.35.1

Reply via email to