On Tue, Jan 5, 2021 at 6:10 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> For point 2, the question I am raising is: why should users get a
> special FATAL message in some places and not others, for PM death?
> However, if people are attached to that behaviour, we could still
> achieve goal 1 without goal 2 by handling PM death explicitly in
> walsender.c and I'd be happy to post an alternative patch set like
> that.

Here's the alternative patch set, with no change to existing error
message behaviour.  I'm going to commit this version and close this CF
item soon if there are no objections.
From 8f563edefdc2f276b3d8abeb019af1ff172bf7d9 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmu...@postgresql.org>
Date: Sat, 27 Feb 2021 13:34:37 +1300
Subject: [PATCH v8 1/2] Introduce symbolic names for FeBeWaitSet positions.

Previously we used 0 and 1 to refer to the socket and latch in far flung
parts of the tree, without any explanation.  Also use PGINVALID_SOCKET
rather than -1 in the place where we weren't already.

Reviewed-by: Kyotaro Horiguchi <horikyota....@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/libpq/be-secure.c     |  4 ++--
 src/backend/libpq/pqcomm.c        | 18 +++++++++++++++---
 src/backend/utils/init/miscinit.c |  6 ++++--
 src/include/libpq/libpq.h         |  3 +++
 4 files changed, 24 insertions(+), 7 deletions(-)

diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index d1545a2ad6..bb603ad209 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -180,7 +180,7 @@ retry:
 
 		Assert(waitfor);
 
-		ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
 
 		WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
 						 WAIT_EVENT_CLIENT_READ);
@@ -292,7 +292,7 @@ retry:
 
 		Assert(waitfor);
 
-		ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
 
 		WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
 						 WAIT_EVENT_CLIENT_WRITE);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 1e6b6db540..27a298f110 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -191,6 +191,9 @@ WaitEventSet *FeBeWaitSet;
 void
 pq_init(void)
 {
+	int		socket_pos PG_USED_FOR_ASSERTS_ONLY;
+	int		latch_pos PG_USED_FOR_ASSERTS_ONLY;
+
 	/* initialize state variables */
 	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
 	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
@@ -219,10 +222,19 @@ pq_init(void)
 #endif
 
 	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
-	AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
+	socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
+								   MyProcPort->sock, NULL, NULL);
+	latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
+								  MyLatch, NULL);
+	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
 					  NULL, NULL);
-	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
-	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+
+	/*
+	 * The event positions match the order we added them, but let's sanity
+	 * check them to be sure.
+	 */
+	Assert(socket_pos == FeBeWaitSetSocketPos);
+	Assert(latch_pos == FeBeWaitSetLatchPos);
 }
 
 /* --------------------------------
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 734c66d4e8..8de70eb46d 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -202,7 +202,8 @@ SwitchToSharedLatch(void)
 	MyLatch = &MyProc->procLatch;
 
 	if (FeBeWaitSet)
-		ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
+						MyLatch);
 
 	/*
 	 * Set the shared latch as the local one might have been set. This
@@ -221,7 +222,8 @@ SwitchBackToLocalLatch(void)
 	MyLatch = &LocalLatchData;
 
 	if (FeBeWaitSet)
-		ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
+						MyLatch);
 
 	SetLatch(MyLatch);
 }
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b41b10620a..e4e5c21565 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -55,6 +55,9 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods;
  */
 extern WaitEventSet *FeBeWaitSet;
 
+#define FeBeWaitSetSocketPos 0
+#define FeBeWaitSetLatchPos 1
+
 extern int	StreamServerPort(int family, const char *hostName,
 							 unsigned short portNumber, const char *unixSocketDir,
 							 pgsocket ListenSocket[], int MaxListen);
-- 
2.30.0

From 22b2c474b476ca91b579408f73ebdc014bb76083 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 24 Feb 2020 23:48:29 +1300
Subject: [PATCH v8 2/2] Use FeBeWaitSet for walsender.c.

This avoids the need to set up and tear down a new WaitEventSet every
time we wait.

Reviewed-by: Kyotaro Horiguchi <horikyota....@gmail.com>
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
---
 src/backend/replication/walsender.c | 43 +++++++++++++++++------------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 81244541e2..eb3f18ed48 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
 static long WalSndComputeSleeptime(TimestampTz now);
+static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
@@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* If we have pending write here, go to slow path */
 	for (;;)
 	{
-		int			wakeEvents;
 		long		sleeptime;
 
 		/* Check for input from the client */
@@ -1304,13 +1304,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 
 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
 
-		wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
-			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
-
 		/* Sleep until something happens or we time out */
-		(void) WaitLatchOrSocket(MyLatch, wakeEvents,
-								 MyProcPort->sock, sleeptime,
-								 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+		WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
+				   WAIT_EVENT_WAL_SENDER_WRITE_DATA);
 
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
@@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 */
 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
 
-		wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
-			WL_SOCKET_READABLE | WL_TIMEOUT;
+		wakeEvents = WL_SOCKET_READABLE;
 
 		if (pq_is_send_pending())
 			wakeEvents |= WL_SOCKET_WRITEABLE;
 
-		(void) WaitLatchOrSocket(MyLatch, wakeEvents,
-								 MyProcPort->sock, sleeptime,
-								 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+		WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			long		sleeptime;
 			int			wakeEvents;
 
-			wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT;
-
 			if (!streamingDoneReceiving)
-				wakeEvents |= WL_SOCKET_READABLE;
+				wakeEvents = WL_SOCKET_READABLE;
+			else
+				wakeEvents = 0;
 
 			/*
 			 * Use fresh timestamp, not last_processing, to reduce the chance
@@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
 				wakeEvents |= WL_SOCKET_WRITEABLE;
 
 			/* Sleep until something happens or we time out */
-			(void) WaitLatchOrSocket(MyLatch, wakeEvents,
-									 MyProcPort->sock, sleeptime,
-									 WAIT_EVENT_WAL_SENDER_MAIN);
+			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
 		}
 	}
 }
@@ -3121,6 +3112,22 @@ WalSndWakeup(void)
 	}
 }
 
+/*
+ * Wait for readiness on the FeBe socket, or a timeout.  The mask should be
+ * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags.  Exit
+ * on postmaster death.
+ */
+static void
+WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
+{
+	WaitEvent event;
+
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+	if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
+		(event.events & WL_POSTMASTER_DEATH))
+		proc_exit(1);
+}
+
 /*
  * Signal all walsenders to move to stopping state.
  *
-- 
2.30.0

Reply via email to