On Fri, Apr 15, 2016 at 9:46 PM, Michael Paquier <michael.paqu...@gmail.com> wrote: > On Fri, Apr 15, 2016 at 8:25 PM, Etsuro Fujita > <fujita.ets...@lab.ntt.co.jp> wrote: >> How about doing something similar for PQprepare/PQexecPrepared in >> postgresExecForeignInsert, postgresExecForeignUpdate, and >> postgresExecForeignDelete? > > Yes, I hesitated to touch those, but they are good candidates for this > new interface, and actually it has proved not to be complicated to > plug in the new routines in those code paths. > >> Also, how about doing that for PQexec in connection.c? > > Here I disagree, this is not adapted. All the PQexec calls are part of > callbacks that are triggered on failures, and we rely on such a > callback when issuing PQcancel. do_sql_command runs commands that take > a short amount of time, so I think as well that it is fine as-is with > PQexec.
Here is a new version. I just recalled that I forgot a PQclear() call to clean up results. -- Michael
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 189f290..33b7363 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -17,6 +17,7 @@ #include "access/xact.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "storage/latch.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -448,6 +449,68 @@ GetPrepStmtNumber(PGconn *conn) } /* + * Execute query in non-blocking mode and fetch back result + */ +PGresult * +pgfdw_exec_query(PGconn *conn, const char *query) +{ + if (!PQsendQuery(conn, query)) + pgfdw_report_error(ERROR, NULL, conn, false, query); + + return pgfdw_get_result(conn, query); +} + +/* + * Scan for results of query run in non-waiting mode + * + * This should be run after executing a query on the remote server and + * offers quick responsiveness by checking for any interruptions. The + * last result is returned back to caller, and previous results are + * consumed. Caller is responsible for the error handling on the fetched + * result. + */ +PGresult * +pgfdw_get_result(PGconn *conn, const char *query) +{ + PGresult *last_res = NULL; + + for (;;) + { + PGresult *res; + + while (PQisBusy(conn)) + { + int wc; + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE, + PQsocket(conn), + -1L); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + pgfdw_report_error(ERROR, NULL, conn, false, query); + } + } + + res = PQgetResult(conn); + if (res == NULL) + break; + + PQclear(last_res); + last_res = res; + } + + return last_res; +} + +/* * Report an error we got from the remote server. * * elevel: error level to use (typically ERROR, but might be less) @@ -598,6 +661,32 @@ pgfdw_xact_callback(XactEvent event, void *arg) case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ entry->have_error = true; + + /* + * If a command has been submitted to the remote server + * using an asynchronous execution function, the command + * might not have yet completed. Check to see if a command + * is still being processed by the remote server, and if so, + * request cancellation of the command; if not, abort + * gracefully. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + { + PGcancel *cancel; + char errbuf[256]; + + if ((cancel = PQgetCancel(entry->conn))) + { + if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + errbuf))); + PQfreeCancel(cancel); + } + break; + } + /* If we're aborting, abort all remote transactions too */ res = PQexec(entry->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index ee0220a..4566cf4 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1754,13 +1754,16 @@ postgresExecForeignInsert(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query); + + res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1824,13 +1827,16 @@ postgresExecForeignUpdate(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query); + + res = pgfdw_get_result(fmstate->conn, fmstate->query);; if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1894,13 +1900,16 @@ postgresExecForeignDelete(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0); + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query); + + res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -1950,7 +1959,7 @@ postgresEndForeignModify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -2712,7 +2721,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = PQexec(conn, sql); + res = pgfdw_exec_query(conn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -2821,8 +2830,11 @@ create_cursor(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexecParams(conn, buf.data, numParams, NULL, values, - NULL, NULL, 0); + if (!PQsendQueryParams(conn, buf.data, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + + res = pgfdw_get_result(conn, buf.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); @@ -2868,7 +2880,7 @@ fetch_more_data(ForeignScanState *node) snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); - res = PQexec(conn, sql); + res = pgfdw_exec_query(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); @@ -2978,7 +2990,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(conn, sql); + res = pgfdw_exec_query(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -3010,12 +3022,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQprepare(fmstate->conn, - p_name, - fmstate->query, - 0, - NULL); - + if (!PQsendPrepare(fmstate->conn, + p_name, + fmstate->query, + 0, + NULL)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, true, fmstate->query); + + res = pgfdw_get_result(fmstate->conn, fmstate->query); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); @@ -3151,8 +3165,12 @@ execute_dml_stmt(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = PQexecParams(dmstate->conn, dmstate->query, - numParams, NULL, values, NULL, NULL, 0); + if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + + dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, @@ -3355,7 +3373,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -3449,7 +3467,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = PQexec(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -3500,7 +3518,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); - res = PQexec(conn, fetch_sql); + res = pgfdw_exec_query(conn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -3675,7 +3693,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = PQexec(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -3774,7 +3792,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = PQexec(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 3a11d99..574b07d 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); +extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); +extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers