On Fri, Apr 8, 2016 at 6:28 PM, Robert Haas <robertmh...@gmail.com> wrote:
> On Fri, Apr 8, 2016 at 3:05 AM, Etsuro Fujita > <fujita.ets...@lab.ntt.co.jp> wrote: > >> What do you think? This open item's seven-day deadline has passed. It > >> would > >> help keep things moving to know whether you consider your latest patch > >> optimal > >> or whether you wish to change it the way Michael described. > > > > I wish to change it that way because it not only avoids the duplicate but > > fixes a bug in the previous patch that I overlooked that there is a race > > condition if a signal arrives just before entering the CheckSocket. > > > > Attached is an updated version of the patch. > > The comment just before the second hunk in the patch says: > > * We don't use a PG_TRY block here, so be careful not to throw error > * without releasing the PGresult. > > But the patch adds a whole bunch of new things there that seem like > they can error out, like CHECK_FOR_INTERRUPTS(), for example. Isn't > that a problem? > Basically we fetching the PGresult, after the newly added hunk, so there should not be any problem. But yes comment is definitely at wrong place. PFA patch with correction. > > -- > Robert Haas > EnterpriseDB: http://www.enterprisedb.com > The Enterprise PostgreSQL Company > -- Rushabh Lathia
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 189f290..8820597 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -598,6 +598,26 @@ pgfdw_xact_callback(XactEvent event, void *arg) case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ entry->have_error = true; + /* + * If we had submitted a command to the remote server using + * an asynchronous execution function, the command might + * have not yet completed. Check to see if a command is + * still being processed by the remote server, and if so, + * request cancellation of the command; if not, abort + * gracefully. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) + { + PGcancel *cancel; + char errbuf[256]; + + if ((cancel = PQgetCancel(entry->conn))) + { + PQcancel(cancel, errbuf, sizeof(errbuf)); + PQfreeCancel(cancel); + } + break; + } /* If we're aborting, abort all remote transactions too */ res = PQexec(entry->conn, "ABORT TRANSACTION"); /* Note: can't throw ERROR, it would be infinite loop */ diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index ee0220a..2b6a61b 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -32,6 +32,7 @@ #include "optimizer/var.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" @@ -3131,6 +3132,7 @@ execute_dml_stmt(ForeignScanState *node) ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = dmstate->numParams; const char **values = dmstate->param_values; + int wc; /* * Construct array of query parameter values in text format. @@ -3147,12 +3149,42 @@ execute_dml_stmt(ForeignScanState *node) * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. + */ + if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, dmstate->conn, true, dmstate->query); + + /* + * Receive data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(dmstate->conn)) + { + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE, + PQsocket(dmstate->conn), + -1L); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(dmstate->conn)) + pgfdw_report_error(ERROR, NULL, dmstate->conn, true, + dmstate->query); + } + } + + /* + * Get the result * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = PQexecParams(dmstate->conn, dmstate->query, - numParams, NULL, values, NULL, NULL, 0); + dmstate->result = PQgetResult(dmstate->conn); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers