On Mon, Jan 10, 2022 at 10:20 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> On Mon, Jan 10, 2022 at 8:00 PM Alexander Lakhin <exclus...@gmail.com> wrote:
> > The libpqrcv_PQgetResult function, in turn, invokes WaitLatchOrSocket()
> > where WaitEvents are defined locally, and the closed flag set on the
> > first invocation but expected to be checked on second.
>
> D'oh, right.  There's also a WaitLatchOrSocket call in walreceiver.c.
> We'd need a long-lived WaitEventSet common across all of these sites,
> which is hard here (because the socket might change under you, as
> discussed in other threads that introduced long lived WaitEventSets to
> other places but not here).

This is super quick-and-dirty code (and doesn't handle some errors or
socket changes correctly), but does it detect the closed socket?
From 3590a8c9b3e8992a65dea7a9d6aecdca2f25582d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 10 Jan 2022 14:45:05 +1300
Subject: [PATCH v2 1/2] Make Windows FD_CLOSE reporting sticky.

XXX Just testing an idea...
---
 src/backend/storage/ipc/latch.c | 16 ++++++++++++++++
 src/include/storage/latch.h     |  1 +
 2 files changed, 17 insertions(+)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 1d893cf863..8f3176a00f 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -899,6 +899,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	event->user_data = user_data;
 #ifdef WIN32
 	event->reset = false;
+	event->closed = false;
 #endif
 
 	if (events == WL_LATCH_SET)
@@ -1882,6 +1883,20 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				return 1;
 			}
 		}
+
+		/*
+		 * XXX: HYPOTHESIS TO TEST
+		 * Windows only reports FD_CLOSE once.  If we've seen that already,
+		 * continue to report it.
+		 */
+		if ((cur_event->events & WL_SOCKET_MASK) && cur_event->closed)
+		{
+			occurred_events->pos = cur_event->pos;
+			occurred_events->user_data = cur_event->user_data;
+			occurred_events->events = (cur_event->events & WL_SOCKET_MASK);
+			occurred_events->fd = cur_event->fd;
+			return 1;
+		}
 	}
 
 	/*
@@ -2002,6 +2017,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 		{
 			/* EOF/error, so signal all caller-requested socket flags */
 			occurred_events->events |= (cur_event->events & WL_SOCKET_MASK);
+			cur_event->closed = true;
 		}
 
 		if (occurred_events->events != 0)
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 44f9368c64..c24f46dc37 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -147,6 +147,7 @@ typedef struct WaitEvent
 	void	   *user_data;		/* pointer provided in AddWaitEventToSet */
 #ifdef WIN32
 	bool		reset;			/* Is reset of the event required? */
+	bool		closed;			/* Has FD_CLOSE event been reported? */
 #endif
 } WaitEvent;
 
-- 
2.33.1

From 755ee3da7934d33a706d59eb0abc0e4f1829acb0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 10 Jan 2022 22:02:21 +1300
Subject: [PATCH v2 2/2] XXX Quick hack to use persistent WaitEventSet in
 walreceiver

XXX There are several things wrong with this, it's just experimental code.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 97 ++++++++++++-------
 src/backend/replication/walreceiver.c         | 12 +--
 src/include/replication/walreceiver.h         |  8 ++
 3 files changed, 75 insertions(+), 42 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c08e599eef..f8dd76e2d0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -40,6 +40,7 @@ void		_PG_init(void);
 
 struct WalReceiverConn
 {
+	WaitEventSet *wes;
 	/* Current connection to the primary, if any */
 	PGconn	   *streamConn;
 	/* Used to remember if the connection is logical or physical */
@@ -82,6 +83,7 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const int nRetTypes,
 									   const Oid *retTypes);
 static void libpqrcv_disconnect(WalReceiverConn *conn);
+static int libpqrcv_wait(WalReceiverConn *conn, int timeout, int wait_event);
 
 static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_connect,
@@ -98,12 +100,13 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_create_slot,
 	libpqrcv_get_backend_pid,
 	libpqrcv_exec,
-	libpqrcv_disconnect
+	libpqrcv_disconnect,
+	libpqrcv_wait
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
+static PGresult *libpqrcv_PQexec(WalReceiverConn *conn, const char *query);
+static PGresult *libpqrcv_PQgetResult(WalReceiverConn *conn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -182,6 +185,15 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		return NULL;
 	}
 
+	conn->wes = CreateWaitEventSet(TopMemoryContext, 3);
+	AddWaitEventToSet(conn->wes, WL_SOCKET_WRITEABLE,
+					  PQsocket(conn->streamConn), NULL, NULL);
+	AddWaitEventToSet(conn->wes, WL_LATCH_SET, PGINVALID_SOCKET,
+					  MyLatch, NULL);
+	AddWaitEventToSet(conn->wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+					  NULL, NULL);
+
+
 	/*
 	 * Poll connection until we have OK or FAILED status.
 	 *
@@ -191,7 +203,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	do
 	{
 		int			io_flag;
-		int			rc;
+		WaitEvent	event;
 
 		if (status == PGRES_POLLING_READING)
 			io_flag = WL_SOCKET_READABLE;
@@ -203,21 +215,20 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		else
 			io_flag = WL_SOCKET_WRITEABLE;
 
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+		ModifyWaitEvent(conn->wes, 0, io_flag, NULL);
+		if (WaitEventSetWait(conn->wes, -1, &event, 1,
+							 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT) < 1)
+			continue;
 
 		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
+		if (event.events & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessWalRcvInterrupts();
 		}
 
 		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
+		if (event.events & io_flag)
 			status = PQconnectPoll(conn->streamConn);
 	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
@@ -231,7 +242,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
+		res = libpqrcv_PQexec(conn,
 							  ALWAYS_SECURE_SEARCH_PATH_SQL);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
@@ -359,7 +370,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqrcv_PQexec(conn, "IDENTIFY_SYSTEM");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -482,7 +493,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -532,7 +543,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -547,7 +558,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -561,7 +572,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -572,7 +583,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqrcv_PQgetResult(conn);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -597,7 +608,7 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqrcv_PQexec(conn, cmd);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -641,8 +652,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * May return NULL, rather than an error result, on failure.
  */
 static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
+libpqrcv_PQexec(WalReceiverConn *conn, const char *query)
 {
+	PGconn *streamConn = conn->streamConn;
 	PGresult   *lastResult = NULL;
 
 	/*
@@ -665,7 +677,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
 		/* Wait for, and collect, the next PGresult. */
 		PGresult   *result;
 
-		result = libpqrcv_PQgetResult(streamConn);
+		result = libpqrcv_PQgetResult(conn);
 		if (result == NULL)
 			break;				/* query is complete, or failure */
 
@@ -690,30 +702,31 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
  * Perform the equivalent of PQgetResult(), but watch for interrupts.
  */
 static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
+libpqrcv_PQgetResult(WalReceiverConn *conn)
 {
+	PGconn *streamConn = conn->streamConn;
+
 	/*
 	 * Collect data until PQgetResult is ready to get the result without
 	 * blocking.
 	 */
 	while (PQisBusy(streamConn))
 	{
-		int			rc;
+		WaitEvent	event;
 
 		/*
 		 * We don't need to break down the sleep into smaller increments,
 		 * since we'll get interrupted by signals and can handle any
 		 * interrupts here.
 		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
+		ModifyWaitEvent(conn->wes, 0, WL_SOCKET_READABLE, NULL);
+
+		if (WaitEventSetWait(conn->wes, -1, &event, 1,
+							 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE) < 1)
+			continue;
 
 		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
+		if (event.events & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessWalRcvInterrupts();
@@ -737,12 +750,28 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
+	FreeWaitEventSet(conn->wes);
 	PQfinish(conn->streamConn);
 	if (conn->recvBuf != NULL)
 		PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
 
+/*
+ * Wait for data, latch, or timeout.
+ */
+static int
+libpqrcv_wait(WalReceiverConn *conn, int timeout, int wait_event)
+{
+	WaitEvent event;
+
+	if (WaitEventSetWait(conn->wes, timeout, &event, 1, wait_event) == 1)
+		return event.events;
+
+	return 0;
+}
+
+
 /*
  * Receive a message available from XLOG stream.
  *
@@ -793,13 +822,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqrcv_PQgetResult(conn);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqrcv_PQgetResult(conn);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -941,7 +970,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqrcv_PQexec(conn, cmd.data);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1068,7 +1097,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqrcv_PQexec(conn, query);
 
 	switch (PQresultStatus(pgres))
 	{
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7a7eb3784e..f308ea9c24 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -507,13 +507,9 @@ WalReceiverMain(void)
 				 * could add and remove just the socket each time, potentially
 				 * avoiding some system calls.
 				 */
-				Assert(wait_fd != PGINVALID_SOCKET);
-				rc = WaitLatchOrSocket(MyLatch,
-									   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-									   WL_TIMEOUT | WL_LATCH_SET,
-									   wait_fd,
-									   NAPTIME_PER_CYCLE,
-									   WAIT_EVENT_WAL_RECEIVER_MAIN);
+				//Assert(wait_fd != PGINVALID_SOCKET);
+				rc = walrcv_wait(wrconn, NAPTIME_PER_CYCLE,
+								 WAIT_EVENT_WAL_RECEIVER_MAIN);
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
@@ -532,7 +528,7 @@ WalReceiverMain(void)
 						XLogWalRcvSendReply(true, false);
 					}
 				}
-				if (rc & WL_TIMEOUT)
+				if (rc == 0)
 				{
 					/*
 					 * We didn't receive anything new. If we haven't heard
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0b607ed777..3e80ff63ce 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -380,6 +380,11 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
  */
 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
 
+/*
+ * XXX Wait for readable data, timeout, or latch.
+ */
+typedef int (*walrcv_wait_fn) (WalReceiverConn *conn, int timeout, int wait_event);
+
 typedef struct WalReceiverFunctionsType
 {
 	walrcv_connect_fn walrcv_connect;
@@ -397,6 +402,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_backend_pid_fn walrcv_get_backend_pid;
 	walrcv_exec_fn walrcv_exec;
 	walrcv_disconnect_fn walrcv_disconnect;
+	walrcv_wait_fn walrcv_wait;
 } WalReceiverFunctionsType;
 
 extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
@@ -431,6 +437,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
 #define walrcv_disconnect(conn) \
 	WalReceiverFunctions->walrcv_disconnect(conn)
+#define walrcv_wait(conn, timeout, wait_event) \
+	WalReceiverFunctions->walrcv_wait(conn, timeout, wait_event)
 
 static inline void
 walrcv_clear_result(WalRcvExecResult *walres)
-- 
2.33.1

Reply via email to