On 11/22/23 2:29 AM, Noah Misch wrote:
Something as simple as the following doesn't respond to cancellation.  In
v15+, any DROP DATABASE will hang as long as it's running:

Hi,

One of our customers ran into this bug when upgrading from PostgreSQL 14 to PostgreSQL 16. Your commit[1] fixed this issue in PostgreSQL 17 but the bugfix was not backported with the explanation below.

> Code inspection identified the bug at least thirteen years ago, but user complaints have not appeared. Hence, no back-patch for now.

But that is as far as I can tell not the case because at least for CREATE DATABASE the bug was introduced in a commit[2] in PostgeSQL 15. And now that we actually have a user complaint what do you think about backporting the fix?

The patch seems small and relatively safe to backport and the functions have had no bugfixes as far as I could see. And I did a quick git cherry-pick myself on top of PG 16 (see attached patch) and the only conflict was related to the introduction of custom wait events which was easy to fix.

Here is a small snippet which reproduces the bug. It hangs reliably on PostgreSQL 15 and 16, but not 14, 17 or HEAD.

CREATE EXTENSION dblink;

CREATE FUNCTION f(text, int, text) RETURNS VOID LANGUAGE plpgsql AS $$
BEGIN
PERFORM dblink_connect(format('host=%s port=%s user=%s dbname=postgres', $1, $2, $3));
    RAISE NOTICE 'dblink connected!';
    PERFORM dblink_exec('DROP DATABASE test');
    RAISE NOTICE 'Database dropped!';
    PERFORM dblink_disconnect();
    RAISE NOTICE 'dblink disconnected!';
END
$$;

CREATE DATABASE test;
SELECT f(:'HOST', :PORT, :'USER');


Andreas

1. https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=d3c5f37dd543498cc7c678815d3921823beec9e9 2. https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=e2f65f42555ff531c6d7c8f151526b4ef7c016f8
From 9fa3af45dcfc6a27d3d2aa5ae29812bbd4740751 Mon Sep 17 00:00:00 2001
From: Noah Misch <n...@leadboat.com>
Date: Mon, 8 Jan 2024 11:39:56 -0800
Subject: [PATCH] Make dblink interruptible, via new libpqsrv APIs.

This replaces dblink's blocking libpq calls, allowing cancellation and
allowing DROP DATABASE (of a database not involved in the query).  Apart
from explicit dblink_cancel_query() calls, dblink still doesn't cancel
the remote side.  The replacement for the blocking calls consists of
new, general-purpose query execution wrappers in the libpqsrv facility.
Out-of-tree extensions should adopt these.  Use them in postgres_fdw,
replacing a local implementation from which the libpqsrv implementation
derives.  This is a bug fix for dblink.  Code inspection identified the
bug at least thirteen years ago, but user complaints have not appeared.
Hence, no back-patch for now.

Discussion: https://postgr.es/m/20231122012945...@rfd.leadboat.com
---
 contrib/dblink/dblink.c                       |  24 ++--
 contrib/postgres_fdw/connection.c             |  80 ++---------
 contrib/postgres_fdw/deparse.c                |   2 +-
 contrib/postgres_fdw/postgres_fdw.c           |  10 +-
 contrib/postgres_fdw/postgres_fdw.h           |   2 +-
 .../libpqwalreceiver/libpqwalreceiver.c       |   9 +-
 src/include/libpq/libpq-be-fe-helpers.h       | 127 ++++++++++++++++++
 7 files changed, 163 insertions(+), 91 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 1ff65d1e521..bd409d7bf1b 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -61,6 +61,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/varlena.h"
+#include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
@@ -430,7 +431,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	/* If we are not in a transaction, start one */
 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
 	{
-		res = PQexec(conn, "BEGIN");
+		res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			dblink_res_internalerror(conn, res, "begin error");
 		PQclear(res);
@@ -449,7 +450,7 @@ dblink_open(PG_FUNCTION_ARGS)
 		(rconn->openCursorCount)++;
 
 	appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		dblink_res_error(conn, conname, res, fail,
@@ -518,7 +519,7 @@ dblink_close(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "CLOSE %s", curname);
 
 	/* close the cursor */
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		dblink_res_error(conn, conname, res, fail,
@@ -538,7 +539,7 @@ dblink_close(PG_FUNCTION_ARGS)
 		{
 			rconn->newXactForCursor = false;
 
-			res = PQexec(conn, "COMMIT");
+			res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION);
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 				dblink_res_internalerror(conn, res, "commit error");
 			PQclear(res);
@@ -620,7 +621,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
-	res = PQexec(conn, buf.data);
+	res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -768,7 +769,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 		else
 		{
 			/* async result retrieval, do it the old way */
-			PGresult   *res = PQgetResult(conn);
+			PGresult   *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
 
 			/* NULL means we're all done with the async results */
 			if (res)
@@ -1076,7 +1077,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
 		PQclear(sinfo.last_res);
 		PQclear(sinfo.cur_res);
 		/* and clear out any pending data in libpq */
-		while ((res = PQgetResult(conn)) != NULL)
+		while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) !=
+			   NULL)
 			PQclear(res);
 		PG_RE_THROW();
 	}
@@ -1103,7 +1105,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
 	{
 		CHECK_FOR_INTERRUPTS();
 
-		sinfo->cur_res = PQgetResult(conn);
+		sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
 		if (!sinfo->cur_res)
 			break;
 
@@ -1431,7 +1433,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 		if (!conn)
 			dblink_conn_not_avail(conname);
 
-		res = PQexec(conn, sql);
+		res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION);
 		if (!res ||
 			(PQresultStatus(res) != PGRES_COMMAND_OK &&
 			 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2728,8 +2730,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 
 	/*
 	 * If we don't get a message from the PGresult, try the PGconn.  This is
-	 * needed because for connection-level failures, PQexec may just return
-	 * NULL, not a PGresult at all.
+	 * needed because for connection-level failures, PQgetResult may just
+	 * return NULL, not a PGresult at all.
 	 */
 	if (message_primary == NULL)
 		message_primary = pchomp(PQerrorMessage(conn));
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 3246e18ab93..3cf0f4b148e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -709,7 +709,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
 	 */
 	if (consume_input && !PQconsumeInput(conn))
 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
-	res = pgfdw_get_result(conn, sql);
+	res = pgfdw_get_result(conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -812,7 +812,9 @@ GetPrepStmtNumber(PGconn *conn)
 /*
  * Submit a query and wait for the result.
  *
- * This function is interruptible by signals.
+ * Since we don't use non-blocking mode, this can't process interrupts while
+ * pushing the query text to the server.  That risk is relatively small, so we
+ * ignore that for now.
  *
  * Caller is responsible for the error handling on the result.
  */
@@ -823,77 +825,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 	if (state && state->pendingAreq)
 		process_pending_request(state->pendingAreq);
 
-	/*
-	 * Submit a query.  Since we don't use non-blocking mode, this also can
-	 * block.  But its risk is relatively small, so we ignore that for now.
-	 */
 	if (!PQsendQuery(conn, query))
-		pgfdw_report_error(ERROR, NULL, conn, false, query);
-
-	/* Wait for the result. */
-	return pgfdw_get_result(conn, query);
+		return NULL;
+	return pgfdw_get_result(conn);
 }
 
 /*
- * Wait for the result from a prior asynchronous execution function call.
- *
- * This function offers quick responsiveness by checking for any interruptions.
- *
- * This function emulates PQexec()'s behavior of returning the last result
- * when there are many.
+ * Wrap libpqsrv_get_result_last(), adding wait event.
  *
  * Caller is responsible for the error handling on the result.
  */
 PGresult *
-pgfdw_get_result(PGconn *conn, const char *query)
+pgfdw_get_result(PGconn *conn)
 {
-	PGresult   *volatile last_res = NULL;
-
-	/* In what follows, do not leak any PGresults on an error. */
-	PG_TRY();
-	{
-		for (;;)
-		{
-			PGresult   *res;
-
-			while (PQisBusy(conn))
-			{
-				int			wc;
-
-				/* Sleep until there's something to do */
-				wc = WaitLatchOrSocket(MyLatch,
-									   WL_LATCH_SET | WL_SOCKET_READABLE |
-									   WL_EXIT_ON_PM_DEATH,
-									   PQsocket(conn),
-									   -1L, PG_WAIT_EXTENSION);
-				ResetLatch(MyLatch);
-
-				CHECK_FOR_INTERRUPTS();
-
-				/* Data available in socket? */
-				if (wc & WL_SOCKET_READABLE)
-				{
-					if (!PQconsumeInput(conn))
-						pgfdw_report_error(ERROR, NULL, conn, false, query);
-				}
-			}
-
-			res = PQgetResult(conn);
-			if (res == NULL)
-				break;			/* query is complete */
-
-			PQclear(last_res);
-			last_res = res;
-		}
-	}
-	PG_CATCH();
-	{
-		PQclear(last_res);
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	return last_res;
+	return libpqsrv_get_result_last(conn, PG_WAIT_EXTENSION);
 }
 
 /*
@@ -934,8 +879,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 
 		/*
 		 * If we don't get a message from the PGresult, try the PGconn.  This
-		 * is needed because for connection-level failures, PQexec may just
-		 * return NULL, not a PGresult at all.
+		 * is needed because for connection-level failures, PQgetResult may
+		 * just return NULL, not a PGresult at all.
 		 */
 		if (message_primary == NULL)
 			message_primary = pchomp(PQerrorMessage(conn));
@@ -1035,7 +980,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					 */
 					if (entry->have_prep_stmt && entry->have_error)
 					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
+						res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
+											   NULL);
 						PQclear(res);
 					}
 					entry->have_prep_stmt = false;
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 6cba34350af..dba4dedd932 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -3673,7 +3673,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first,
  * Print the representation of a parameter to be sent to the remote side.
  *
  * Note: we always label the Param's type explicitly rather than relying on
- * transmitting a numeric type OID in PQexecParams().  This allows us to
+ * transmitting a numeric type OID in PQsendQueryParams().  This allows us to
  * avoid assuming that types have the same OIDs on the remote side as they
  * do locally --- they need only have the same names.
  */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 8657a3b0e23..b18567cf8f4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -3767,7 +3767,7 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(conn, buf.data);
+	res = pgfdw_get_result(conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -3817,7 +3817,7 @@ fetch_more_data(ForeignScanState *node)
 			 * The query was already sent by an earlier call to
 			 * fetch_more_data_begin.  So now we just fetch the result.
 			 */
-			res = pgfdw_get_result(conn, fsstate->query);
+			res = pgfdw_get_result(conn);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -4166,7 +4166,7 @@ execute_foreign_modify(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->conn);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -4236,7 +4236,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->conn);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 	PQclear(res);
@@ -4578,7 +4578,7 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+	dmstate->result = pgfdw_get_result(dmstate->conn);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 02c11523199..8e8a36ad14c 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -158,7 +158,7 @@ extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
-extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_get_result(PGconn *conn);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
 								  PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index b4038e114d8..568024ec974 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -705,12 +705,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and socket readiness events.
  *
- * We must not use the regular blocking libpq functions like PQexec()
- * since they are uninterruptible by signals on some platforms, such as
- * Windows.
- *
- * The function is modeled on PQexec() in libpq, but only implements
- * those parts that are in use in the walreceiver api.
+ * The function is modeled on libpqsrv_exec(), with the behavior difference
+ * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
+ * skips try/catch, since all errors terminate the process.
  *
  * May return NULL, rather than an error result, on failure.
  */
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 41e3bb4376a..a4b3e805b9d 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -49,6 +49,8 @@
 
 static inline void libpqsrv_connect_prepare(void);
 static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
+static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
 
 
 /*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
 	PG_END_TRY();
 }
 
+/*
+ * PQexec() wrapper that processes interrupts.
+ *
+ * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
+ * interrupts while pushing the query text to the server.  Consider that
+ * setting if query strings can be long relative to TCP buffer size.
+ *
+ * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
+ * notably, PQexec() would silently discard any prior query results.
+ */
+static inline PGresult *
+libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
+{
+	if (!PQsendQuery(conn, query))
+		return NULL;
+	return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * PQexecParams() wrapper that processes interrupts.
+ *
+ * See notes at libpqsrv_exec().
+ */
+static inline PGresult *
+libpqsrv_exec_params(PGconn *conn,
+					 const char *command,
+					 int nParams,
+					 const Oid *paramTypes,
+					 const char *const *paramValues,
+					 const int *paramLengths,
+					 const int *paramFormats,
+					 int resultFormat,
+					 uint32 wait_event_info)
+{
+	if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
+						   paramLengths, paramFormats, resultFormat))
+		return NULL;
+	return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * Like PQexec(), loop over PQgetResult() until it returns NULL or another
+ * terminal state.  Return the last non-NULL result or the terminal state.
+ */
+static inline PGresult *
+libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
+{
+	PGresult   *volatile lastResult = NULL;
+
+	/* In what follows, do not leak any PGresults on an error. */
+	PG_TRY();
+	{
+		for (;;)
+		{
+			/* Wait for, and collect, the next PGresult. */
+			PGresult   *result;
+
+			result = libpqsrv_get_result(conn, wait_event_info);
+			if (result == NULL)
+				break;			/* query is complete, or failure */
+
+			/*
+			 * Emulate PQexec()'s behavior of returning the last result when
+			 * there are many.
+			 */
+			PQclear(lastResult);
+			lastResult = result;
+
+			if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+				PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+				PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+				PQstatus(conn) == CONNECTION_BAD)
+				break;
+		}
+	}
+	PG_CATCH();
+	{
+		PQclear(lastResult);
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return lastResult;
+}
+
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static inline PGresult *
+libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
+{
+	/*
+	 * Collect data until PQgetResult is ready to get the result without
+	 * blocking.
+	 */
+	while (PQisBusy(conn))
+	{
+		int			rc;
+
+		rc = WaitLatchOrSocket(MyLatch,
+							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
+							   WL_SOCKET_READABLE,
+							   PQsocket(conn),
+							   0,
+							   wait_event_info);
+
+		/* Interrupted? */
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/* Consume whatever data is available from the socket */
+		if (PQconsumeInput(conn) == 0)
+		{
+			/* trouble; expect PQgetResult() to return NULL */
+			break;
+		}
+	}
+
+	/* Now we can collect and return the next PGresult */
+	return PQgetResult(conn);
+}
+
 #endif							/* LIBPQ_BE_FE_HELPERS_H */
-- 
2.47.2

Reply via email to