On 23/01/2024 10:24, Kyotaro Horiguchi wrote:
Thank you for looking this!

At Tue, 23 Jan 2024 15:07:10 +0900, Fujii Masao <masao.fu...@gmail.com> wrote in
Regarding the patch, here are the review comments.

+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+ return WalRcv != NULL;
+}

This looks wrong because WalRcv can be non-NULL in processes other
than walreceiver.

Mmm. Sorry for the silly mistake. We can use B_WAL_RECEIVER
instead. I'm not sure if the new function IsWalReceiver() is
required. The expression "MyBackendType == B_WAL_RECEIVER" is quite
descriptive. However, the function does make ProcessInterrupts() more
aligned regarding process types.

There's an existing AmWalReceiverProcess() macro too. Let's use that.

(See also https://www.postgresql.org/message-id/f3ecd4cb-85ee-4e54-8278-5fabfb3a4ed0%40iki.fi for refactoring in this area)

Here's a patch set summarizing the changes so far. They should be squashed, but I kept them separate for now to help with review:

1. revert the revert of 728f86fec6.
2. your walrcv_shutdown_deblocking_v2-2.patch
3. Also replace libpqrcv_PQexec() and libpqrcv_PQgetResult() with the wrappers from libpq-be-fe-helpers.h
4. Replace IsWalReceiver() with AmWalReceiverProcess()

- pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+ pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */

Can't we just use die(), instead?

There was a comment explaining the problems associated with exiting
within a signal handler;

- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the

And I think we should keep the considerations it suggests. The patch
removes the comment itself, but it does so because it implements our
standard process exit procedure, which incorporates points suggested
by the now-removed comment.

die() doesn't call exit(1). Unless DoingCommandRead is set, but it never is in the walreceiver. It looks just like the new WalRcvShutdownSignalHandler() function. Am I missing something?

Hmm, but doesn't bgworker_die() have that problem with exit(1)ing in the signal handler?

I also wonder if we should replace SignalHandlerForShutdownRequest() completely with die(), in all processes? The difference is that SignalHandlerForShutdownRequest() uses ShutdownRequestPending, while die() uses ProcDiePending && InterruptPending to indicate that the signal was received. Or do some of the processes want to check for ShutdownRequestPending only at specific places, and don't want to get terminated at the any random CHECK_FOR_INTERRUPTS()?

--
Heikki Linnakangas
Neon (https://neon.tech)
From 27b9f8283b2caa7a4243fe57a8d14a127396e80f Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 23 Jan 2024 11:01:03 +0200
Subject: [PATCH v3 1/4] Revert "Revert "libpqwalreceiver: Convert to
 libpq-be-fe-helpers.h""

This reverts commit 21ef4d4d897563adb2f7920ad53b734950f1e0a4.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 55 +++----------------
 1 file changed, 8 insertions(+), 47 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 77669074e82..201c36cb220 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,6 +24,7 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -132,7 +133,6 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 				 const char *appname, char **err)
 {
 	WalReceiverConn *conn;
-	PostgresPollingStatusType status;
 	const char *keys[6];
 	const char *vals[6];
 	int			i = 0;
@@ -188,56 +188,17 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 	Assert(i < sizeof(keys));
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectStartParams(keys, vals,
-											 /* expand_dbname = */ true);
-	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-		goto bad_connection_errmsg;
-
-	/*
-	 * Poll connection until we have OK or FAILED status.
-	 *
-	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
-	 */
-	status = PGRES_POLLING_WRITING;
-	do
-	{
-		int			io_flag;
-		int			rc;
-
-		if (status == PGRES_POLLING_READING)
-			io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-		/* Windows needs a different test while waiting for connection-made */
-		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-			io_flag = WL_SOCKET_CONNECTED;
-#endif
-		else
-			io_flag = WL_SOCKET_WRITEABLE;
-
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
-		}
-
-		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
-			status = PQconnectPoll(conn->streamConn);
-	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+	conn->streamConn =
+		libpqsrv_connect_params(keys, vals,
+								 /* expand_dbname = */ true,
+								WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		goto bad_connection_errmsg;
 
 	if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
 	{
-		PQfinish(conn->streamConn);
+		libpqsrv_disconnect(conn->streamConn);
 		pfree(conn);
 
 		ereport(ERROR,
@@ -273,7 +234,7 @@ bad_connection_errmsg:
 
 	/* error path, error already set */
 bad_connection:
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	pfree(conn);
 	return NULL;
 }
@@ -809,7 +770,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
-- 
2.39.2

From 9295f868fa207d71f7eef620e64f3047e8f45e13 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 23 Jan 2024 11:01:25 +0200
Subject: [PATCH v3 2/4] Apply walrcv_shutdown_deblocking_v2-2.patch

From https://www.postgresql.org/message-id/20240123.172410.1596193222420636986.horikyota.ntt%40gmail.com
---
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +-
 src/backend/replication/walreceiver.c         | 53 +++++++++----------
 src/backend/tcop/postgres.c                   |  5 ++
 src/include/replication/walreceiver.h         |  2 +-
 4 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 201c36cb220..db779dc6ca6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -749,7 +749,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1053,7 +1053,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 728059518e1..e491f7d4c5e 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -147,39 +147,34 @@ static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
+static void WalRcvShutdownSignalHandler(SIGNAL_ARGS);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
 void
-ProcessWalRcvInterrupts(void)
+WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 {
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
+	int			save_errno = errno;
 
-	if (ShutdownRequestPending)
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
 	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
+		InterruptPending = true;
+		ProcDiePending = true;
 	}
+
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+	
 }
 
+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+	return MyBackendType == B_WAL_RECEIVER;
+}
 
 /* Main entry point for walreceiver process */
 void
@@ -277,7 +272,7 @@ WalReceiverMain(void)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -456,7 +451,7 @@ WalReceiverMain(void)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -552,7 +547,7 @@ WalReceiverMain(void)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -691,7 +686,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1a34bd3715f..2ce24d8a9a1 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -59,6 +59,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3286,6 +3287,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (IsWalReceiver())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (IsBackgroundWorker)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0899891cdb8..a7684b7bdc8 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -456,8 +456,8 @@ walrcv_clear_result(WalRcvExecResult *walres)
 }
 
 /* prototypes for functions in walreceiver.c */
+extern bool IsWalReceiver(void);
 extern void WalReceiverMain(void) pg_attribute_noreturn();
-extern void ProcessWalRcvInterrupts(void);
 extern void WalRcvForceReply(void);
 
 /* prototypes for functions in walreceiverfuncs.c */
-- 
2.39.2

From 4a8fa778fe5ee1807b91d2587775b7a7ad250829 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 23 Jan 2024 11:16:23 +0200
Subject: [PATCH v3 3/4] Use libpq-be-fe-helpers.h wrappers more

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 148 ++++--------------
 1 file changed, 31 insertions(+), 117 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index db779dc6ca6..c60a121093c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -102,8 +102,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -212,8 +210,9 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
-							  ALWAYS_SECURE_SEARCH_PATH_SQL);
+		res = libpqsrv_exec(conn->streamConn,
+							ALWAYS_SECURE_SEARCH_PATH_SQL,
+							WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			PQclear(res);
@@ -385,7 +384,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqsrv_exec(conn->streamConn,
+						"IDENTIFY_SYSTEM",
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -518,7 +519,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -548,7 +551,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PGresult   *res;
 
 	/*
-	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+	 * Send copy-end message.  As in libpqsrv_exec, this could theoretically
 	 * block, but the risk seems small.
 	 */
 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -568,7 +571,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -583,7 +587,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -597,7 +602,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -608,7 +614,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -633,7 +640,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -663,107 +672,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	PQclear(res);
 }
 
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
-	PGresult   *lastResult = NULL;
-
-	/*
-	 * PQexec() silently discards any prior query results on the connection.
-	 * This is not required for this function as it's expected that the caller
-	 * (which is this library in all cases) will behave correctly and we don't
-	 * have to be backwards compatible with old libpq.
-	 */
-
-	/*
-	 * Submit the query.  Since we don't use non-blocking mode, this could
-	 * theoretically block.  In practice, since we don't send very long query
-	 * strings, the risk seems negligible.
-	 */
-	if (!PQsendQuery(streamConn, query))
-		return NULL;
-
-	for (;;)
-	{
-		/* Wait for, and collect, the next PGresult. */
-		PGresult   *result;
-
-		result = libpqrcv_PQgetResult(streamConn);
-		if (result == NULL)
-			break;				/* query is complete, or failure */
-
-		/*
-		 * Emulate PQexec()'s behavior of returning the last result when there
-		 * are many.  We are fine with returning just last error message.
-		 */
-		PQclear(lastResult);
-		lastResult = result;
-
-		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-			PQstatus(streamConn) == CONNECTION_BAD)
-			break;
-	}
-
-	return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
-	/*
-	 * Collect data until PQgetResult is ready to get the result without
-	 * blocking.
-	 */
-	while (PQisBusy(streamConn))
-	{
-		int			rc;
-
-		/*
-		 * We don't need to break down the sleep into smaller increments,
-		 * since we'll get interrupted by signals and can handle any
-		 * interrupts here.
-		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Consume whatever data is available from the socket */
-		if (PQconsumeInput(streamConn) == 0)
-		{
-			/* trouble; return NULL */
-			return NULL;
-		}
-	}
-
-	/* Now we can collect and return the next PGresult */
-	return PQgetResult(streamConn);
-}
-
 /*
  * Disconnect connection to primary, if any.
  */
@@ -824,13 +732,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqsrv_get_result(conn->streamConn,
+									  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -972,7 +882,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1099,7 +1011,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqsrv_exec(conn->streamConn,
+						  query,
+						  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 	switch (PQresultStatus(pgres))
 	{
-- 
2.39.2

From c0797200f5b84505bfab618166b8f9d576685678 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 23 Jan 2024 11:19:07 +0200
Subject: [PATCH v3 4/4] Use existing AmWalReceiverProcess() function

---
 src/backend/replication/walreceiver.c | 9 ---------
 src/backend/tcop/postgres.c           | 2 +-
 2 files changed, 1 insertion(+), 10 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e491f7d4c5e..3bd633e75cb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -167,15 +167,6 @@ WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 	
 }
 
-/*
- * Is current process a wal receiver?
- */
-bool
-IsWalReceiver(void)
-{
-	return MyBackendType == B_WAL_RECEIVER;
-}
-
 /* Main entry point for walreceiver process */
 void
 WalReceiverMain(void)
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 2ce24d8a9a1..5a4dc1977d3 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3287,7 +3287,7 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
-		else if (IsWalReceiver())
+		else if (AmWalReceiverProcess())
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					 errmsg("terminating walreceiver process due to administrator command")));
-- 
2.39.2

Reply via email to