On Fri, Apr 15, 2016 at 9:46 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> On Fri, Apr 15, 2016 at 8:25 PM, Etsuro Fujita
> <fujita.ets...@lab.ntt.co.jp> wrote:
>> How about doing something similar for PQprepare/PQexecPrepared in
>> postgresExecForeignInsert, postgresExecForeignUpdate, and
>> postgresExecForeignDelete?
>
> Yes, I hesitated to touch those, but they are good candidates for this
> new interface, and actually it has proved not to be complicated to
> plug in the new routines in those code paths.
>
>> Also, how about doing that for PQexec in connection.c?
>
> Here I disagree, this is not adapted. All the PQexec calls are part of
> callbacks that are triggered on failures, and we rely on such a
> callback when issuing PQcancel. do_sql_command runs commands that take
> a short amount of time, so I think as well that it is fine as-is with
> PQexec.

Here is a new version. I just recalled that I forgot a PQclear() call
to clean up results.
-- 
Michael
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 189f290..33b7363 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
 #include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "storage/latch.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
@@ -448,6 +449,68 @@ GetPrepStmtNumber(PGconn *conn)
 }
 
 /*
+ * Execute query in non-blocking mode and fetch back result
+ */
+PGresult *
+pgfdw_exec_query(PGconn *conn, const char *query)
+{
+	if (!PQsendQuery(conn, query))
+		pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+	return pgfdw_get_result(conn, query);
+}
+
+/*
+ * Scan for results of query run in non-waiting mode
+ *
+ * This should be run after executing a query on the remote server and
+ * offers quick responsiveness by checking for any interruptions. The
+ * last result is returned back to caller, and previous results are
+ * consumed. Caller is responsible for the error handling on the fetched
+ * result.
+ */
+PGresult *
+pgfdw_get_result(PGconn *conn, const char *query)
+{
+	PGresult   *last_res = NULL;
+
+	for (;;)
+	{
+		PGresult *res;
+
+		while (PQisBusy(conn))
+		{
+			int		wc;
+
+			/* Sleep until there's something to do */
+			wc = WaitLatchOrSocket(MyLatch,
+								   WL_LATCH_SET | WL_SOCKET_READABLE,
+								   PQsocket(conn),
+								   -1L);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+
+			/* Data available in socket */
+			if (wc & WL_SOCKET_READABLE)
+			{
+				if (!PQconsumeInput(conn))
+					pgfdw_report_error(ERROR, NULL, conn, false, query);
+			}
+		}
+
+		res = PQgetResult(conn);
+		if (res == NULL)
+			break;
+
+		PQclear(last_res);
+		last_res = res;
+	}
+
+	return last_res;
+}
+
+/*
  * Report an error we got from the remote server.
  *
  * elevel: error level to use (typically ERROR, but might be less)
@@ -598,6 +661,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 				case XACT_EVENT_ABORT:
 					/* Assume we might have lost track of prepared statements */
 					entry->have_error = true;
+
+					/*
+					 * If a command has been submitted to the remote server
+					 * using an asynchronous execution function, the command
+					 * might not have yet completed.  Check to see if a command
+					 * is still being processed by the remote server, and if so,
+					 * request cancellation of the command; if not, abort
+					 * gracefully.
+					 */
+					if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+					{
+						PGcancel   *cancel;
+						char		errbuf[256];
+
+						if ((cancel = PQgetCancel(entry->conn)))
+						{
+							if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+								ereport(WARNING,
+										(errcode(ERRCODE_CONNECTION_FAILURE),
+										 errmsg("could not send cancel request: %s",
+												errbuf)));
+							PQfreeCancel(cancel);
+						}
+						break;
+					}
+
 					/* If we're aborting, abort all remote transactions too */
 					res = PQexec(entry->conn, "ABORT TRANSACTION");
 					/* Note: can't throw ERROR, it would be infinite loop */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index ee0220a..4566cf4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
 	PQclear(res);
@@ -1754,13 +1754,16 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1824,13 +1827,16 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);;
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1894,13 +1900,16 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecPrepared(fmstate->conn,
-						 fmstate->p_name,
-						 fmstate->p_nums,
-						 p_values,
-						 NULL,
-						 NULL,
-						 0);
+	if (!PQsendQueryPrepared(fmstate->conn,
+							 fmstate->p_name,
+							 fmstate->p_nums,
+							 p_values,
+							 NULL,
+							 NULL,
+							 0))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1950,7 +1959,7 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = PQexec(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
 		PQclear(res);
@@ -2712,7 +2721,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
 		/*
 		 * Execute EXPLAIN remotely.
 		 */
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -2821,8 +2830,11 @@ create_cursor(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexecParams(conn, buf.data, numParams, NULL, values,
-					   NULL, NULL, 0);
+	if (!PQsendQueryParams(conn, buf.data, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+	res = pgfdw_get_result(conn, buf.data);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
 	PQclear(res);
@@ -2868,7 +2880,7 @@ fetch_more_data(ForeignScanState *node)
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fsstate->fetch_size, fsstate->cursor_number);
 
-		res = PQexec(conn, sql);
+		res = pgfdw_exec_query(conn, sql);
 		/* On error, report the original query, not the FETCH. */
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2978,7 +2990,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQexec(conn, sql);
+	res = pgfdw_exec_query(conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -3010,12 +3022,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = PQprepare(fmstate->conn,
-					p_name,
-					fmstate->query,
-					0,
-					NULL);
-
+	if (!PQsendPrepare(fmstate->conn,
+					   p_name,
+					   fmstate->query,
+					   0,
+					   NULL))
+		pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query);
+
+	res = pgfdw_get_result(fmstate->conn, fmstate->query);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 	PQclear(res);
@@ -3151,8 +3165,12 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
-								   numParams, NULL, values, NULL, NULL, 0);
+	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+						   NULL, values, NULL, NULL, 0))
+		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
 		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
@@ -3355,7 +3373,7 @@ postgresAnalyzeForeignTable(Relation relation,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -3449,7 +3467,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	/* In what follows, do not risk leaking any PGresults. */
 	PG_TRY();
 	{
-		res = PQexec(conn, sql.data);
+		res = pgfdw_exec_query(conn, sql.data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			pgfdw_report_error(ERROR, res, conn, false, sql.data);
 		PQclear(res);
@@ -3500,7 +3518,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 			snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
 					 fetch_size, cursor_number);
 
-			res = PQexec(conn, fetch_sql);
+			res = pgfdw_exec_query(conn, fetch_sql);
 			/* On error, report the original query, not the FETCH. */
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3675,7 +3693,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
 		deparseStringLiteral(&buf, stmt->remote_schema);
 
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -3774,7 +3792,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
 		appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
 
 		/* Fetch the data */
-		res = PQexec(conn, buf.data);
+		res = pgfdw_exec_query(conn, buf.data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 3a11d99..574b07d 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
+extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 				   bool clear, const char *sql);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to