On 2024-Mar-22, Jelte Fennema-Nio wrote:

> On Thu, 21 Mar 2024 at 03:54, Noah Misch <n...@leadboat.com> wrote:
> >
> > On Mon, Mar 18, 2024 at 07:40:10PM +0100, Alvaro Herrera wrote:
> > > I enabled the test again and also pushed the changes to dblink,
> > > isolationtester and fe_utils (which AFAICS is used by pg_dump,
> >
> > I recommend adding a libpqsrv_cancel() function to libpq-be-fe-helpers.h, to
> > use from dblink and postgres_fdw.  pgxn modules calling PQcancel() from the
> > backend (citus pg_bulkload plproxy pmpp) then have a better chance to adopt
> > the new way.
> 
> Done

Nice, thanks.  I played with it a bit, mostly trying to figure out if
the chosen API is usable.  I toyed with making it return boolean success
and the error message as an output argument, because I was nervous about
what'd happen in OOM.  But since this is backend environment, what
actually happens is that we elog(ERROR) anyway, so we never return a
NULL error message.  So after the detour I think Jelte's API is okay.

I changed it so that the error messages are returned as translated
phrases, and was bothered by the fact that if errors happen repeatedly,
the memory for them might be leaked.  Maybe this is fine depending on
the caller's memory context, but since it's only at most one string each
time, it's quite easy to just keep track of it so that we can release it
on the next.

I ended up reducing the two PG_TRY blocks to a single one.  I see no
reason to split them up, and this way it looks more legible.

What do you think?

-- 
Álvaro Herrera        Breisgau, Deutschland  —  https://www.EnterpriseDB.com/
"Tiene valor aquel que admite que es un cobarde" (Fernandel)
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index edbc9ab02a..de858e165a 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1347,25 +1347,16 @@ Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
 	PGconn	   *conn;
-	PGcancelConn *cancelConn;
 	char	   *msg;
+	TimestampTz endtime;
 
 	dblink_init();
 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancelConn = PQcancelCreate(conn);
-
-	PG_TRY();
-	{
-		if (!PQcancelBlocking(cancelConn))
-			msg = pchomp(PQcancelErrorMessage(cancelConn));
-		else
-			msg = "OK";
-	}
-	PG_FINALLY();
-	{
-		PQcancelFinish(cancelConn);
-	}
-	PG_END_TRY();
+	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+										  30000);
+	msg = libpqsrv_cancel(conn, endtime);
+	if (msg == NULL)
+		msg = "OK";
 
 	PG_RETURN_TEXT_P(cstring_to_text(msg));
 }
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4931ebf591..2532e453c4 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
-static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
 								   bool consume_input);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn)
 	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
 										  CONNECTION_CLEANUP_TIMEOUT);
 
-	if (!pgfdw_cancel_query_begin(conn))
+	if (!pgfdw_cancel_query_begin(conn, endtime))
 		return false;
 	return pgfdw_cancel_query_end(conn, endtime, false);
 }
 
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return true; if the timeout
+ * lapses without that, or the request fails for whatever reason, return
+ * false.
+ */
 static bool
-pgfdw_cancel_query_begin(PGconn *conn)
+pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
+	char	   *errormsg = libpqsrv_cancel(conn, endtime);
 
-	/*
-	 * Issue cancel request.  Unfortunately, there's no good way to limit the
-	 * amount of time that we might block inside PQgetCancel().
-	 */
-	if ((cancel = PQgetCancel(conn)))
-	{
-		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-		{
-			ereport(WARNING,
-					(errcode(ERRCODE_CONNECTION_FAILURE),
-					 errmsg("could not send cancel request: %s",
-							errbuf)));
-			PQfreeCancel(cancel);
-			return false;
-		}
-		PQfreeCancel(cancel);
-	}
+	if (errormsg != NULL)
+		ereport(WARNING,
+				errcode(ERRCODE_CONNECTION_FAILURE),
+				errmsg("could not send cancel request: %s", errormsg));
 
-	return true;
+	return errormsg == NULL;
 }
 
 static bool
@@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
 	 */
 	if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
 	{
-		if (!pgfdw_cancel_query_begin(entry->conn))
+		TimestampTz endtime;
+
+		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+											  CONNECTION_CLEANUP_TIMEOUT);
+		if (!pgfdw_cancel_query_begin(entry->conn, endtime))
 			return false;		/* Unable to cancel running query */
 		*cancel_requested = lappend(*cancel_requested, entry);
 	}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 3f0110c52b..b7af86d351 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 (10 rows)
 
 ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+                                                                             QUERY PLAN                                                                              
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+   Output: (count(*))
+   Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+   Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR:  canceling statement due to statement timeout
+RESET statement_timeout;
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 5fffc4c53b..6e1c819159 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
 ALTER VIEW v4 OWNER TO regress_view_owner;
 
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 5d33bcf32f..123ffb96af 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -44,6 +44,8 @@
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
+#include "utils/timestamp.h"
+#include "utils/wait_event.h"
 
 
 static inline void libpqsrv_connect_prepare(void);
@@ -365,4 +367,105 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
 	return PQgetResult(conn);
 }
 
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return NULL; if the cancel
+ * request fails, return an error message string (which is not to be
+ * freed).
+ *
+ * For other problems (to wit: OOM when strdup'ing an error message from
+ * libpq), this function can ereport(ERROR).
+ */
+static inline char *
+libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
+{
+	PGcancelConn *cancel_conn;
+	static char *prverror = NULL;
+	char	   *error = NULL;
+
+	/*
+	 * Most of the error strings we return are statically allocated so they
+	 * don't need freeing, but there's a couple of cases where we cannot keep
+	 * that promise.  To avoid long-term leaks, we keep a static pointer to
+	 * the last one we returned, and free it here next time around.
+	 */
+	if (prverror != NULL)
+	{
+		pfree(prverror);
+		prverror = NULL;
+	}
+
+	cancel_conn = PQcancelCreate(conn);
+	if (cancel_conn == NULL)
+		return _("out of memory");
+
+	/* In what follows, do not leak any PGcancelConn on any errors. */
+
+	PG_TRY();
+	{
+		if (!PQcancelStart(cancel_conn))
+		{
+			error = pchomp(PQcancelErrorMessage(cancel_conn));
+			/* save pchomp output so we can free it next time */
+			prverror = error;
+			goto exit;
+		}
+
+		for (;;)
+		{
+			PostgresPollingStatusType pollres;
+			TimestampTz now;
+			long		cur_timeout;
+			int			waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+			pollres = PQcancelPoll(cancel_conn);
+			if (pollres == PGRES_POLLING_OK)
+				break;			/* success! */
+
+			/* If timeout has expired, give up, else get sleep time. */
+			now = GetCurrentTimestamp();
+			cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+			if (cur_timeout <= 0)
+			{
+				error = _("cancel request timed out");
+				break;
+			}
+
+			switch (pollres)
+			{
+				case PGRES_POLLING_READING:
+					waitEvents |= WL_SOCKET_READABLE;
+					break;
+				case PGRES_POLLING_WRITING:
+					waitEvents |= WL_SOCKET_WRITEABLE;
+					break;
+				default:
+					/* save pchomp output so we can free it next time */
+					error = pchomp(PQcancelErrorMessage(cancel_conn));
+					prverror = error;
+					goto exit;
+			}
+
+			/* Sleep until there's something to do */
+			WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+							  cur_timeout, PG_WAIT_CLIENT);
+
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+exit:	;
+	}
+	PG_FINALLY();
+	{
+		PQcancelFinish(cancel_conn);
+	}
+	PG_END_TRY();
+
+	return error;
+}
+
 #endif							/* LIBPQ_BE_FE_HELPERS_H */

Reply via email to