Hi,

On 2023-01-14 14:45:06 +1300, Thomas Munro wrote:
> On Tue, Jan 10, 2023 at 3:05 PM Andres Freund <and...@anarazel.de> wrote:
> > On 2023-01-03 12:05:20 -0800, Andres Freund wrote:
> > > The attached patch adds libpq-be-fe-helpers.h and uses it in 
> > > libpqwalreceiver,
> > > dblink, postgres_fdw.
> >
> > > As I made libpq-be-fe-helpers.h handle reserving external fds,
> > > libpqwalreceiver now does so. I briefly looked through its users without
> > > seeing cases of leaking in case of errors - which would already have been 
> > > bad,
> > > since we'd already have leaked a libpq connection/socket.
> > >
> > >
> > > Given the lack of field complaints and the size of the required changes, I
> > > don't think we should backpatch this, even though it's pretty clearly 
> > > buggy
> > > as-is.
> >
> > Any comments on this? Otherwise I think I'll go with this approach.
> 
> +1.  Not totally convinced about the location but we are free to
> re-organise it any time, and the random CI failures are bad.

Cool.

Updated patch attached. I split it into multiple pieces.
1) A fix for [1], included here because I encountered it while testing
2) Introduction of libpq-be-fe-helpers.h
3) Convert dblink and postgres_fdw to the helper
4) Convert libpqwalreceiver.c to the helper

Even if we eventually decide to backpatch 3), we'd likely not backpatch 4), as
there's no bug (although perhaps the lack of FD handling could be called a
bug?).

There's also some light polishing, improving commit message, comments and
moving some internal helper functions to later in the file.


Greetings,

Andres Freund

[1] https://postgr.es/m/20230121011237.q52apbvlarfv6jm6%40awork3.anarazel.de
>From da9345151423180732d2928fe7fe6ff0b6a11d4d Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 20 Jan 2023 18:27:05 -0800
Subject: [PATCH v3 1/4] Fix error handling in libpqrcv_connect()

Author:
Reviewed-by:
Discussion: https://postgr.es/m/20230121011237.q52apbvlarfv6...@awork3.anarazel.de
Backpatch:
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 26 +++++++++++--------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c40c6220db8..fefc8660259 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -175,10 +175,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	conn->streamConn = PQconnectStartParams(keys, vals,
 											 /* expand_dbname = */ true);
 	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-	{
-		*err = pchomp(PQerrorMessage(conn->streamConn));
-		return NULL;
-	}
+		goto bad_connection_errmsg;
 
 	/*
 	 * Poll connection until we have OK or FAILED status.
@@ -220,10 +217,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
-	{
-		*err = pchomp(PQerrorMessage(conn->streamConn));
-		return NULL;
-	}
+		goto bad_connection_errmsg;
 
 	if (logical)
 	{
@@ -234,9 +228,9 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			PQclear(res);
-			ereport(ERROR,
-					(errmsg("could not clear search path: %s",
-							pchomp(PQerrorMessage(conn->streamConn)))));
+			*err = psprintf(_("could not clear search path: %s"),
+							pchomp(PQerrorMessage(conn->streamConn)));
+			goto bad_connection;
 		}
 		PQclear(res);
 	}
@@ -244,6 +238,16 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	conn->logical = logical;
 
 	return conn;
+
+	/* error path, using libpq's error message */
+bad_connection_errmsg:
+	*err = pchomp(PQerrorMessage(conn->streamConn));
+
+	/* error path, error already set */
+bad_connection:
+	PQfinish(conn->streamConn);
+	pfree(conn);
+	return NULL;
 }
 
 /*
-- 
2.38.0

>From 61531d075fc385bd22f7ec4cdb38e87da8aaec6d Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 20 Jan 2023 17:15:49 -0800
Subject: [PATCH v3 2/4] Add helper library for use of libpq inside the server
 environment

Currently dblink and postgres_fdw don't process interrupts during connection
establishment. Besides preventing query cancellations etc, this can lead to
undetected deadlocks, as global barriers are not processed.

Libpqwalreceiver in contrast, processes interrupts during connection
establishment. The required code is not trivial, so duplicating it into
additional places does not seem like a good option.

These aforementioned undetected deadlocks are the reason for the spate of CI
test failures in the FreeBSD 'test_running' step.

For now the helper library is just a header, as it needs to be linked into
each extension using libpq, and it seems too small to be worth adding a
dedicated static library for.

The conversion to the helper are done in subsequent commits.

Reviewed-by: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/20220925232237.p6uskba2dw6fn...@awork3.anarazel.de
---
 src/include/libpq/libpq-be-fe-helpers.h | 242 ++++++++++++++++++++++++
 1 file changed, 242 insertions(+)
 create mode 100644 src/include/libpq/libpq-be-fe-helpers.h

diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
new file mode 100644
index 00000000000..41e3bb4376a
--- /dev/null
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -0,0 +1,242 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq-be-fe-helpers.h
+ *	  Helper functions for using libpq in extensions
+ *
+ * Code built directly into the backend is not allowed to link to libpq
+ * directly. Extension code is allowed to use libpq however. However, libpq
+ * used in extensions has to be careful to block inside libpq, otherwise
+ * interrupts will not be processed, leading to issues like unresolvable
+ * deadlocks. Backend code also needs to take care to acquire/release an
+ * external fd for the connection, otherwise fd.c's accounting of fd's is
+ * broken.
+ *
+ * This file provides helper functions to make it easier to comply with these
+ * rules. It is a header only library as it needs to be linked into each
+ * extension using libpq, and it seems too small to be worth adding a
+ * dedicated static library for.
+ *
+ * TODO: For historical reasons the connections established here are not put
+ * into non-blocking mode. That can lead to blocking even when only the async
+ * libpq functions are used. This should be fixed.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/libpq-be-fe-helpers.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LIBPQ_BE_FE_HELPERS_H
+#define LIBPQ_BE_FE_HELPERS_H
+
+/*
+ * Despite the name, BUILDING_DLL is set only when building code directly part
+ * of the backend. Which also is where libpq isn't allowed to be
+ * used. Obviously this doesn't protect against libpq-fe.h getting included
+ * otherwise, but perhaps still protects against a few mistakes...
+ */
+#ifdef BUILDING_DLL
+#error "libpq may not be used code directly built into the backend"
+#endif
+
+#include "libpq-fe.h"
+#include "miscadmin.h"
+#include "storage/fd.h"
+#include "storage/latch.h"
+#include "utils/wait_event.h"
+
+
+static inline void libpqsrv_connect_prepare(void);
+static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
+
+
+/*
+ * PQconnectdb() wrapper that reserves a file descriptor and processes
+ * interrupts during connection establishment.
+ *
+ * Throws an error if AcquireExternalFD() fails, but does not throw if
+ * connection establishment itself fails. Callers need to use PQstatus() to
+ * check if connection establishment succeeded.
+ */
+static inline PGconn *
+libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
+{
+	PGconn	   *conn = NULL;
+
+	libpqsrv_connect_prepare();
+
+	conn = PQconnectStart(conninfo);
+
+	libpqsrv_connect_internal(conn, wait_event_info);
+
+	return conn;
+}
+
+/*
+ * Like libpqsrv_connect(), except that this is a wrapper for
+ * PQconnectdbParams().
+  */
+static inline PGconn *
+libpqsrv_connect_params(const char *const *keywords,
+						const char *const *values,
+						int expand_dbname,
+						uint32 wait_event_info)
+{
+	PGconn	   *conn = NULL;
+
+	libpqsrv_connect_prepare();
+
+	conn = PQconnectStartParams(keywords, values, expand_dbname);
+
+	libpqsrv_connect_internal(conn, wait_event_info);
+
+	return conn;
+}
+
+/*
+ * PQfinish() wrapper that additionally releases the reserved file descriptor.
+ *
+ * It is allowed to call this with a NULL pgconn iff NULL was returned by
+ * libpqsrv_connect*.
+ */
+static inline void
+libpqsrv_disconnect(PGconn *conn)
+{
+	/*
+	 * If no connection was established, we haven't reserved an FD for it (or
+	 * already released it). This rule makes it easier to write PG_CATCH()
+	 * handlers for this facility's users.
+	 *
+	 * See also libpqsrv_connect_internal().
+	 */
+	if (conn == NULL)
+		return;
+
+	ReleaseExternalFD();
+	PQfinish(conn);
+}
+
+
+/* internal helper functions follow */
+
+
+/*
+ * Helper function for all connection establishment functions.
+ */
+static inline void
+libpqsrv_connect_prepare(void)
+{
+	/*
+	 * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
+	 * a PGconn represents one long-lived FD.  (Doing this here also ensures
+	 * that VFDs are closed if needed to make room.)
+	 */
+	if (!AcquireExternalFD())
+	{
+#ifndef WIN32					/* can't write #if within ereport() macro */
+		ereport(ERROR,
+				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+				 errmsg("could not establish connection"),
+				 errdetail("There are too many open files on the local server."),
+				 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
+#else
+		ereport(ERROR,
+				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+				 errmsg("could not establish connection"),
+				 errdetail("There are too many open files on the local server."),
+				 errhint("Raise the server's max_files_per_process setting.")));
+#endif
+	}
+}
+
+/*
+ * Helper function for all connection establishment functions.
+ */
+static inline void
+libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
+{
+	/*
+	 * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
+	 * that here.
+	 */
+	if (conn == NULL)
+	{
+		ReleaseExternalFD();
+		return;
+	}
+
+	/*
+	 * Can't wait without a socket. Note that we don't want to close the libpq
+	 * connection yet, so callers can emit a useful error.
+	 */
+	if (PQstatus(conn) == CONNECTION_BAD)
+		return;
+
+	/*
+	 * WaitLatchOrSocket() can conceivably fail, handle that case here instead
+	 * of requiring all callers to do so.
+	 */
+	PG_TRY();
+	{
+		PostgresPollingStatusType status;
+
+		/*
+		 * Poll connection until we have OK or FAILED status.
+		 *
+		 * Per spec for PQconnectPoll, first wait till socket is write-ready.
+		 */
+		status = PGRES_POLLING_WRITING;
+		while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
+		{
+			int			io_flag;
+			int			rc;
+
+			if (status == PGRES_POLLING_READING)
+				io_flag = WL_SOCKET_READABLE;
+#ifdef WIN32
+
+			/*
+			 * Windows needs a different test while waiting for
+			 * connection-made
+			 */
+			else if (PQstatus(conn) == CONNECTION_STARTED)
+				io_flag = WL_SOCKET_CONNECTED;
+#endif
+			else
+				io_flag = WL_SOCKET_WRITEABLE;
+
+			rc = WaitLatchOrSocket(MyLatch,
+								   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
+								   PQsocket(conn),
+								   0,
+								   wait_event_info);
+
+			/* Interrupted? */
+			if (rc & WL_LATCH_SET)
+			{
+				ResetLatch(MyLatch);
+				CHECK_FOR_INTERRUPTS();
+			}
+
+			/* If socket is ready, advance the libpq state machine */
+			if (rc & io_flag)
+				status = PQconnectPoll(conn);
+		}
+	}
+	PG_CATCH();
+	{
+		/*
+		 * If an error is thrown here, the callers won't call
+		 * libpqsrv_disconnect() with a conn, so release resources
+		 * immediately.
+		 */
+		ReleaseExternalFD();
+		PQfinish(conn);
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
+#endif							/* LIBPQ_BE_FE_HELPERS_H */
-- 
2.38.0

>From 149399a0de2f388451cfeea22d37842a1f7494ec Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 20 Jan 2023 17:33:21 -0800
Subject: [PATCH v3 3/4] dblink, postgres_fdw: Handle interrupts during
 connection establishment

Until now dblink and postgres_fdw did not process interrupts during connection
establishment. Besides preventing query cancellations etc, this can lead to
undetected deadlocks, as global barriers are not processed.

These aforementioned undetected deadlocks are the reason for the spate of CI
test failures in the FreeBSD 'test_running' step.

Fix the bug by using the helper from libpq-be-fe-helpers.h, introduced in a
prior commit. Besides fixing the bug, this also removes duplicated code
around reserving file descriptors.

As the change is relatively large and there are no field reports of the
problem, don't backpatch for now.

Reviewed-by: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/20220925232237.p6uskba2dw6fn...@awork3.anarazel.de
Backpatch:
---
 contrib/dblink/dblink.c           | 79 +++++--------------------------
 contrib/postgres_fdw/connection.c | 42 +++-------------
 2 files changed, 17 insertions(+), 104 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 8dd122042b4..8982d623d3b 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -48,6 +48,7 @@
 #include "funcapi.h"
 #include "lib/stringinfo.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "parser/scansup.h"
@@ -199,37 +200,14 @@ dblink_get_conn(char *conname_or_str,
 			connstr = conname_or_str;
 		dblink_connstr_check(connstr);
 
-		/*
-		 * We must obey fd.c's limit on non-virtual file descriptors.  Assume
-		 * that a PGconn represents one long-lived FD.  (Doing this here also
-		 * ensures that VFDs are closed if needed to make room.)
-		 */
-		if (!AcquireExternalFD())
-		{
-#ifndef WIN32					/* can't write #if within ereport() macro */
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not establish connection"),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
-#else
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not establish connection"),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process setting.")));
-#endif
-		}
-
 		/* OK to make connection */
-		conn = PQconnectdb(connstr);
+		conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
 
 		if (PQstatus(conn) == CONNECTION_BAD)
 		{
 			char	   *msg = pchomp(PQerrorMessage(conn));
 
-			PQfinish(conn);
-			ReleaseExternalFD();
+			libpqsrv_disconnect(conn);
 			ereport(ERROR,
 					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
 					 errmsg("could not establish connection"),
@@ -312,36 +290,13 @@ dblink_connect(PG_FUNCTION_ARGS)
 	/* check password in connection string if not superuser */
 	dblink_connstr_check(connstr);
 
-	/*
-	 * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
-	 * a PGconn represents one long-lived FD.  (Doing this here also ensures
-	 * that VFDs are closed if needed to make room.)
-	 */
-	if (!AcquireExternalFD())
-	{
-#ifndef WIN32					/* can't write #if within ereport() macro */
-		ereport(ERROR,
-				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-				 errmsg("could not establish connection"),
-				 errdetail("There are too many open files on the local server."),
-				 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
-#else
-		ereport(ERROR,
-				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-				 errmsg("could not establish connection"),
-				 errdetail("There are too many open files on the local server."),
-				 errhint("Raise the server's max_files_per_process setting.")));
-#endif
-	}
-
 	/* OK to make connection */
-	conn = PQconnectdb(connstr);
+	conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
 
 	if (PQstatus(conn) == CONNECTION_BAD)
 	{
 		msg = pchomp(PQerrorMessage(conn));
-		PQfinish(conn);
-		ReleaseExternalFD();
+		libpqsrv_disconnect(conn);
 		if (rconn)
 			pfree(rconn);
 
@@ -366,10 +321,7 @@ dblink_connect(PG_FUNCTION_ARGS)
 	else
 	{
 		if (pconn->conn)
-		{
-			PQfinish(pconn->conn);
-			ReleaseExternalFD();
-		}
+			libpqsrv_disconnect(conn);
 		pconn->conn = conn;
 	}
 
@@ -402,8 +354,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 	if (!conn)
 		dblink_conn_not_avail(conname);
 
-	PQfinish(conn);
-	ReleaseExternalFD();
+	libpqsrv_disconnect(conn);
 	if (rconn)
 	{
 		deleteConnection(conname);
@@ -838,10 +789,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-		{
-			PQfinish(conn);
-			ReleaseExternalFD();
-		}
+			libpqsrv_disconnect(conn);
 	}
 	PG_END_TRY();
 
@@ -1516,10 +1464,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-		{
-			PQfinish(conn);
-			ReleaseExternalFD();
-		}
+			libpqsrv_disconnect(conn);
 	}
 	PG_END_TRY();
 
@@ -2606,8 +2551,7 @@ createNewConnection(const char *name, remoteConn *rconn)
 
 	if (found)
 	{
-		PQfinish(rconn->conn);
-		ReleaseExternalFD();
+		libpqsrv_disconnect(rconn->conn);
 		pfree(rconn);
 
 		ereport(ERROR,
@@ -2647,8 +2591,7 @@ dblink_security_check(PGconn *conn, remoteConn *rconn)
 	{
 		if (!PQconnectionUsedPassword(conn))
 		{
-			PQfinish(conn);
-			ReleaseExternalFD();
+			libpqsrv_disconnect(conn);
 			if (rconn)
 				pfree(rconn);
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ed75ce3f79c..7760380f00d 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
 #include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
 #include "funcapi.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -446,35 +447,10 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 		/* verify the set of connection parameters */
 		check_conn_params(keywords, values, user);
 
-		/*
-		 * We must obey fd.c's limit on non-virtual file descriptors.  Assume
-		 * that a PGconn represents one long-lived FD.  (Doing this here also
-		 * ensures that VFDs are closed if needed to make room.)
-		 */
-		if (!AcquireExternalFD())
-		{
-#ifndef WIN32					/* can't write #if within ereport() macro */
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not connect to server \"%s\"",
-							server->servername),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
-#else
-			ereport(ERROR,
-					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
-					 errmsg("could not connect to server \"%s\"",
-							server->servername),
-					 errdetail("There are too many open files on the local server."),
-					 errhint("Raise the server's max_files_per_process setting.")));
-#endif
-		}
-
 		/* OK to make connection */
-		conn = PQconnectdbParams(keywords, values, false);
-
-		if (!conn)
-			ReleaseExternalFD();	/* because the PG_CATCH block won't */
+		conn = libpqsrv_connect_params(keywords, values,
+									    /* expand_dbname = */ false,
+									   PG_WAIT_EXTENSION);
 
 		if (!conn || PQstatus(conn) != CONNECTION_OK)
 			ereport(ERROR,
@@ -507,12 +483,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 	}
 	PG_CATCH();
 	{
-		/* Release PGconn data structure if we managed to create one */
-		if (conn)
-		{
-			PQfinish(conn);
-			ReleaseExternalFD();
-		}
+		libpqsrv_disconnect(conn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
@@ -528,9 +499,8 @@ disconnect_pg_server(ConnCacheEntry *entry)
 {
 	if (entry->conn != NULL)
 	{
-		PQfinish(entry->conn);
+		libpqsrv_disconnect(entry->conn);
 		entry->conn = NULL;
-		ReleaseExternalFD();
 	}
 }
 
-- 
2.38.0

>From 70db02b9f7525f290f583351b240ef9c1bf6dd5d Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 20 Jan 2023 18:37:18 -0800
Subject: [PATCH v3 4/4] libpqwalreceiver: Convert to libpq-be-fe-helpers.h

In contrast to the changes to dblink and postgres_fdw, this does not fix a
bug, as libpqwalreceiver did already process interrupts.

Besides reducing code duplication, the conversion leads to libpqwalreceiver
now using reserving file descriptors for libpq connections. While not strictly
required for the use in walreceiver, we are also using libpqwalreceiver for
logical replication, where it does seem more important.

Even if we eventually decide to backpatch the prior commits, there'd be no
need to backpatch this commit, due to not fixing an active bug.

Reviewed-by: Thomas Munro <thomas.mu...@gmail.com>
Discussion: https://postgr.es/m/20220925232237.p6uskba2dw6fn...@awork3.anarazel.de
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 53 +++----------------
 1 file changed, 7 insertions(+), 46 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index fefc8660259..560ec974fa7 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,6 +24,7 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 				 char **err)
 {
 	WalReceiverConn *conn;
-	PostgresPollingStatusType status;
 	const char *keys[6];
 	const char *vals[6];
 	int			i = 0;
@@ -172,49 +172,10 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 	Assert(i < sizeof(keys));
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectStartParams(keys, vals,
-											 /* expand_dbname = */ true);
-	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-		goto bad_connection_errmsg;
-
-	/*
-	 * Poll connection until we have OK or FAILED status.
-	 *
-	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
-	 */
-	status = PGRES_POLLING_WRITING;
-	do
-	{
-		int			io_flag;
-		int			rc;
-
-		if (status == PGRES_POLLING_READING)
-			io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-		/* Windows needs a different test while waiting for connection-made */
-		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-			io_flag = WL_SOCKET_CONNECTED;
-#endif
-		else
-			io_flag = WL_SOCKET_WRITEABLE;
-
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
-		}
-
-		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
-			status = PQconnectPoll(conn->streamConn);
-	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+	conn->streamConn =
+		libpqsrv_connect_params(keys, vals,
+								 /* expand_dbname = */ true,
+								WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		goto bad_connection_errmsg;
@@ -245,7 +206,7 @@ bad_connection_errmsg:
 
 	/* error path, error already set */
 bad_connection:
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	pfree(conn);
 	return NULL;
 }
@@ -744,7 +705,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
-- 
2.38.0

Reply via email to