=== Background

Something as simple as the following doesn't respond to cancellation.  In
v15+, any DROP DATABASE will hang as long as it's running:

  SELECT dblink_exec(
    $$dbname='$$||current_database()||$$' port=$$||current_setting('port'),
    'SELECT pg_sleep(15)');

https://postgr.es/m/4b584c99.8090...@enterprisedb.com proposed a fix back in
2010.  Latches and the libpqsrv facility have changed the server programming
environment since that patch.  The problem statement also came up here:

On Thu, Dec 08, 2022 at 06:08:15PM -0800, Andres Freund wrote:
> dblink.c uses a lot of other blocking libpq functions, which obviously also
> isn't ok.


=== Key decisions

This patch adds to libpqsrv facility.  It dutifully follows the existing
naming scheme.  For greppability, I am favoring renaming new and old functions
such that the libpq name is a substring of this facility's name.  That is,
rename libpqsrv_disconnect to srvPQfinish or maybe libpqsrv_PQfinish().  Now
is better than later, while pgxn contains no references to libpqsrv.  Does
anyone else have a preference between naming schemes?  If nobody does, I'll
keep today's libpqsrv_disconnect() style.

I was tempted to add a timeout argument to each libpqsrv function, which would
allow libpqsrv_get_result_last() to replace pgfdw_get_cleanup_result().  We
can always add a timeout-accepting function later and make this thread's
function name a thin wrapper around it.  Does anyone feel a mandatory timeout
argument, accepting -1 for no timeout, would be the right thing?


=== Minor topics

It would be nice to replace libpqrcv_PQgetResult() and friends with the new
functions.  I refrained since they use ProcessWalRcvInterrupts(), not
CHECK_FOR_INTERRUPTS().  Since walreceiver already reaches
CHECK_FOR_INTERRUPTS() via libpqsrv_connect_params(), things might just work.

This patch contains a PQexecParams() wrapper, called nowhere in
postgresql.git.  It's inessential, but twelve pgxn modules do mention
PQexecParams.  Just one mentions PQexecPrepared, and none mention PQdescribe*.

The patch makes postgres_fdw adopt its functions, as part of confirming the
functions are general enough.  postgres_fdw create_cursor() has been passing
the "DECLARE CURSOR FOR inner_query" string for some error messages and just
inner_query for others.  I almost standardized on the longer one, but the test
suite checks that.  Hence, I standardized on just inner_query.

I wrote this because pglogical needs these functions to cooperate with v15+
DROP DATABASE (https://github.com/2ndQuadrant/pglogical/issues/418).

Thanks,
nm
Author:     Noah Misch <n...@leadboat.com>
Commit:     Noah Misch <n...@leadboat.com>

    Make dblink interruptible, via new libpqsrv APIs.
    
    This replaces dblink's blocking libpq calls, allowing cancellation and
    allowing DROP DATABASE (of a database not involved in the query).  Apart
    from explicit dblink_cancel_query() calls, dblink still doesn't cancel
    the remote side.  The replacement for the blocking calls consists of
    new, general-purpose query execution wrappers in the libpqsrv facility.
    Out-of-tree extensions should adopt these.  Use them in postgres_fdw,
    replacing a local implementation from which the libpqsrv implementation
    derives.  This is a bug fix for dblink.  Code inspection identified the
    bug at least thirteen years ago, but user complaints have not appeared.
    Hence, no back-patch for now.
    
    Reviewed by FIXME.
    
    Discussion: https://postgr.es/m/FIXME

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 195b278..4624e53 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -133,6 +133,7 @@ static HTAB *remoteConnHash = NULL;
 /* custom wait event values, retrieved from shared memory */
 static uint32 dblink_we_connect = 0;
 static uint32 dblink_we_get_conn = 0;
+static uint32 dblink_we_get_result = 0;
 
 /*
  *     Following is list that holds multiple remote connections.
@@ -252,6 +253,9 @@ dblink_init(void)
 {
        if (!pconn)
        {
+               if (dblink_we_get_result == 0)
+                       dblink_we_get_result = 
WaitEventExtensionNew("DblinkGetResult");
+
                pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, 
sizeof(remoteConn));
                pconn->conn = NULL;
                pconn->openCursorCount = 0;
@@ -442,7 +446,7 @@ dblink_open(PG_FUNCTION_ARGS)
        /* If we are not in a transaction, start one */
        if (PQtransactionStatus(conn) == PQTRANS_IDLE)
        {
-               res = PQexec(conn, "BEGIN");
+               res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                        dblink_res_internalerror(conn, res, "begin error");
                PQclear(res);
@@ -461,7 +465,7 @@ dblink_open(PG_FUNCTION_ARGS)
                (rconn->openCursorCount)++;
 
        appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
-       res = PQexec(conn, buf.data);
+       res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                dblink_res_error(conn, conname, res, fail,
@@ -530,7 +534,7 @@ dblink_close(PG_FUNCTION_ARGS)
        appendStringInfo(&buf, "CLOSE %s", curname);
 
        /* close the cursor */
-       res = PQexec(conn, buf.data);
+       res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                dblink_res_error(conn, conname, res, fail,
@@ -550,7 +554,7 @@ dblink_close(PG_FUNCTION_ARGS)
                {
                        rconn->newXactForCursor = false;
 
-                       res = PQexec(conn, "COMMIT");
+                       res = libpqsrv_exec(conn, "COMMIT", 
dblink_we_get_result);
                        if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                dblink_res_internalerror(conn, res, "commit 
error");
                        PQclear(res);
@@ -632,7 +636,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
         * PGresult will be long-lived even though we are still in a short-lived
         * memory context.
         */
-       res = PQexec(conn, buf.data);
+       res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
                 PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -780,7 +784,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool 
is_async)
                else
                {
                        /* async result retrieval, do it the old way */
-                       PGresult   *res = PQgetResult(conn);
+                       PGresult   *res = libpqsrv_get_result(conn, 
dblink_we_get_result);
 
                        /* NULL means we're all done with the async results */
                        if (res)
@@ -1088,7 +1092,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
                PQclear(sinfo.last_res);
                PQclear(sinfo.cur_res);
                /* and clear out any pending data in libpq */
-               while ((res = PQgetResult(conn)) != NULL)
+               while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) 
!=
+                          NULL)
                        PQclear(res);
                PG_RE_THROW();
        }
@@ -1115,7 +1120,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, 
const char *sql)
        {
                CHECK_FOR_INTERRUPTS();
 
-               sinfo->cur_res = PQgetResult(conn);
+               sinfo->cur_res = libpqsrv_get_result(conn, 
dblink_we_get_result);
                if (!sinfo->cur_res)
                        break;
 
@@ -1443,7 +1448,7 @@ dblink_exec(PG_FUNCTION_ARGS)
                if (!conn)
                        dblink_conn_not_avail(conname);
 
-               res = PQexec(conn, sql);
+               res = libpqsrv_exec(conn, sql, dblink_we_get_result);
                if (!res ||
                        (PQresultStatus(res) != PGRES_COMMAND_OK &&
                         PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2740,8 +2745,8 @@ dblink_res_error(PGconn *conn, const char *conname, 
PGresult *res,
 
        /*
         * If we don't get a message from the PGresult, try the PGconn.  This is
-        * needed because for connection-level failures, PQexec may just return
-        * NULL, not a PGresult at all.
+        * needed because for connection-level failures, PQgetResult may just
+        * return NULL, not a PGresult at all.
         */
        if (message_primary == NULL)
                message_primary = pchomp(PQerrorMessage(conn));
diff --git a/contrib/postgres_fdw/connection.c 
b/contrib/postgres_fdw/connection.c
index 5800c6a..8755244 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, 
PgFdwConnState **state)
        {
                HASHCTL         ctl;
 
+               if (pgfdw_we_get_result == 0)
+                       pgfdw_we_get_result =
+                               WaitEventExtensionNew("PostgresFdwGetResult");
+
                ctl.keysize = sizeof(ConnCacheKey);
                ctl.entrysize = sizeof(ConnCacheEntry);
                ConnectionHash = hash_create("postgres_fdw connections", 8,
@@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool 
consume_input)
         */
        if (consume_input && !PQconsumeInput(conn))
                pgfdw_report_error(ERROR, NULL, conn, false, sql);
-       res = pgfdw_get_result(conn, sql);
+       res = pgfdw_get_result(conn);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
                pgfdw_report_error(ERROR, res, conn, true, sql);
        PQclear(res);
@@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn)
 /*
  * Submit a query and wait for the result.
  *
- * This function is interruptible by signals.
+ * Since we don't use non-blocking mode, this can't process interrupts while
+ * pushing the query text to the server.  That risk is relatively small, so we
+ * ignore that for now.
  *
  * Caller is responsible for the error handling on the result.
  */
@@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, 
PgFdwConnState *state)
        if (state && state->pendingAreq)
                process_pending_request(state->pendingAreq);
 
-       /*
-        * Submit a query.  Since we don't use non-blocking mode, this also can
-        * block.  But its risk is relatively small, so we ignore that for now.
-        */
        if (!PQsendQuery(conn, query))
-               pgfdw_report_error(ERROR, NULL, conn, false, query);
-
-       /* Wait for the result. */
-       return pgfdw_get_result(conn, query);
+               return NULL;
+       return pgfdw_get_result(conn);
 }
 
 /*
- * Wait for the result from a prior asynchronous execution function call.
- *
- * This function offers quick responsiveness by checking for any interruptions.
- *
- * This function emulates PQexec()'s behavior of returning the last result
- * when there are many.
+ * Wrap libpqsrv_get_result_last(), adding wait event.
  *
  * Caller is responsible for the error handling on the result.
  */
 PGresult *
-pgfdw_get_result(PGconn *conn, const char *query)
+pgfdw_get_result(PGconn *conn)
 {
-       PGresult   *volatile last_res = NULL;
-
-       /* In what follows, do not leak any PGresults on an error. */
-       PG_TRY();
-       {
-               for (;;)
-               {
-                       PGresult   *res;
-
-                       while (PQisBusy(conn))
-                       {
-                               int                     wc;
-
-                               /* first time, allocate or get the custom wait 
event */
-                               if (pgfdw_we_get_result == 0)
-                                       pgfdw_we_get_result = 
WaitEventExtensionNew("PostgresFdwGetResult");
-
-                               /* Sleep until there's something to do */
-                               wc = WaitLatchOrSocket(MyLatch,
-                                                                          
WL_LATCH_SET | WL_SOCKET_READABLE |
-                                                                          
WL_EXIT_ON_PM_DEATH,
-                                                                          
PQsocket(conn),
-                                                                          -1L, 
pgfdw_we_get_result);
-                               ResetLatch(MyLatch);
-
-                               CHECK_FOR_INTERRUPTS();
-
-                               /* Data available in socket? */
-                               if (wc & WL_SOCKET_READABLE)
-                               {
-                                       if (!PQconsumeInput(conn))
-                                               pgfdw_report_error(ERROR, NULL, 
conn, false, query);
-                               }
-                       }
-
-                       res = PQgetResult(conn);
-                       if (res == NULL)
-                               break;                  /* query is complete */
-
-                       PQclear(last_res);
-                       last_res = res;
-               }
-       }
-       PG_CATCH();
-       {
-               PQclear(last_res);
-               PG_RE_THROW();
-       }
-       PG_END_TRY();
-
-       return last_res;
+       return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
 }
 
 /*
@@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 
                /*
                 * If we don't get a message from the PGresult, try the PGconn. 
 This
-                * is needed because for connection-level failures, PQexec may 
just
-                * return NULL, not a PGresult at all.
+                * is needed because for connection-level failures, PQgetResult 
may
+                * just return NULL, not a PGresult at all.
                 */
                if (message_primary == NULL)
                        message_primary = pchomp(PQerrorMessage(conn));
@@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                                         */
                                        if (entry->have_prep_stmt && 
entry->have_error)
                                        {
-                                               res = PQexec(entry->conn, 
"DEALLOCATE ALL");
+                                               res = 
pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
+                                                                               
           NULL);
                                                PQclear(res);
                                        }
                                        entry->have_prep_stmt = false;
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 09fd489..5084aa6 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -3673,7 +3673,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool 
nulls_first,
  * Print the representation of a parameter to be sent to the remote side.
  *
  * Note: we always label the Param's type explicitly rather than relying on
- * transmitting a numeric type OID in PQexecParams().  This allows us to
+ * transmitting a numeric type OID in PQsendQueryParams().  This allows us to
  * avoid assuming that types have the same OIDs on the remote side as they
  * do locally --- they need only have the same names.
  */
diff --git a/contrib/postgres_fdw/postgres_fdw.c 
b/contrib/postgres_fdw/postgres_fdw.c
index 6de2bec..7d509c6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -3759,7 +3759,7 @@ create_cursor(ForeignScanState *node)
         * We don't use a PG_TRY block here, so be careful not to throw error
         * without releasing the PGresult.
         */
-       res = pgfdw_get_result(conn, buf.data);
+       res = pgfdw_get_result(conn);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
                pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
        PQclear(res);
@@ -3809,7 +3809,7 @@ fetch_more_data(ForeignScanState *node)
                         * The query was already sent by an earlier call to
                         * fetch_more_data_begin.  So now we just fetch the 
result.
                         */
-                       res = pgfdw_get_result(conn, fsstate->query);
+                       res = pgfdw_get_result(conn);
                        /* On error, report the original query, not the FETCH. 
*/
                        if (PQresultStatus(res) != PGRES_TUPLES_OK)
                                pgfdw_report_error(ERROR, res, conn, false, 
fsstate->query);
@@ -4158,7 +4158,7 @@ execute_foreign_modify(EState *estate,
         * We don't use a PG_TRY block here, so be careful not to throw error
         * without releasing the PGresult.
         */
-       res = pgfdw_get_result(fmstate->conn, fmstate->query);
+       res = pgfdw_get_result(fmstate->conn);
        if (PQresultStatus(res) !=
                (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
                pgfdw_report_error(ERROR, res, fmstate->conn, true, 
fmstate->query);
@@ -4228,7 +4228,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
         * We don't use a PG_TRY block here, so be careful not to throw error
         * without releasing the PGresult.
         */
-       res = pgfdw_get_result(fmstate->conn, fmstate->query);
+       res = pgfdw_get_result(fmstate->conn);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
                pgfdw_report_error(ERROR, res, fmstate->conn, true, 
fmstate->query);
        PQclear(res);
@@ -4570,7 +4570,7 @@ execute_dml_stmt(ForeignScanState *node)
         * We don't use a PG_TRY block here, so be careful not to throw error
         * without releasing the PGresult.
         */
-       dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+       dmstate->result = pgfdw_get_result(dmstate->conn);
        if (PQresultStatus(dmstate->result) !=
                (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
                pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
diff --git a/contrib/postgres_fdw/postgres_fdw.h 
b/contrib/postgres_fdw/postgres_fdw.h
index 47157ac..3e94d51 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -158,7 +158,7 @@ extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
-extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_get_result(PGconn *conn);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
                                                                  
PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml
index e8de5a6..81f3598 100644
--- a/doc/src/sgml/dblink.sgml
+++ b/doc/src/sgml/dblink.sgml
@@ -37,6 +37,15 @@
     </para>
    </listitem>
   </varlistentry>
+
+  <varlistentry>
+   <term><literal>DblinkGetResult</literal></term>
+   <listitem>
+    <para>
+     Waiting to receive the results of a query from a remote server.
+    </para>
+   </listitem>
+  </varlistentry>
  </variablelist>
 
  <para>
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 60d5c1f..2825001 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -648,12 +648,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
  * Send a query and wait for the results by using the asynchronous libpq
  * functions and socket readiness events.
  *
- * We must not use the regular blocking libpq functions like PQexec()
- * since they are uninterruptible by signals on some platforms, such as
- * Windows.
- *
- * The function is modeled on PQexec() in libpq, but only implements
- * those parts that are in use in the walreceiver api.
+ * The function is modeled on libpqsrv_exec(), with the behavior difference
+ * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
+ * skips try/catch, since all errors terminate the process.
  *
  * May return NULL, rather than an error result, on failure.
  */
diff --git a/src/include/libpq/libpq-be-fe-helpers.h 
b/src/include/libpq/libpq-be-fe-helpers.h
index 41e3bb4..a4b3e80 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -49,6 +49,8 @@
 
 static inline void libpqsrv_connect_prepare(void);
 static inline void libpqsrv_connect_internal(PGconn *conn, uint32 
wait_event_info);
+static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 
wait_event_info);
+static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 
wait_event_info);
 
 
 /*
@@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 
wait_event_info)
        PG_END_TRY();
 }
 
+/*
+ * PQexec() wrapper that processes interrupts.
+ *
+ * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
+ * interrupts while pushing the query text to the server.  Consider that
+ * setting if query strings can be long relative to TCP buffer size.
+ *
+ * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
+ * notably, PQexec() would silently discard any prior query results.
+ */
+static inline PGresult *
+libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
+{
+       if (!PQsendQuery(conn, query))
+               return NULL;
+       return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * PQexecParams() wrapper that processes interrupts.
+ *
+ * See notes at libpqsrv_exec().
+ */
+static inline PGresult *
+libpqsrv_exec_params(PGconn *conn,
+                                        const char *command,
+                                        int nParams,
+                                        const Oid *paramTypes,
+                                        const char *const *paramValues,
+                                        const int *paramLengths,
+                                        const int *paramFormats,
+                                        int resultFormat,
+                                        uint32 wait_event_info)
+{
+       if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
+                                                  paramLengths, paramFormats, 
resultFormat))
+               return NULL;
+       return libpqsrv_get_result_last(conn, wait_event_info);
+}
+
+/*
+ * Like PQexec(), loop over PQgetResult() until it returns NULL or another
+ * terminal state.  Return the last non-NULL result or the terminal state.
+ */
+static inline PGresult *
+libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
+{
+       PGresult   *volatile lastResult = NULL;
+
+       /* In what follows, do not leak any PGresults on an error. */
+       PG_TRY();
+       {
+               for (;;)
+               {
+                       /* Wait for, and collect, the next PGresult. */
+                       PGresult   *result;
+
+                       result = libpqsrv_get_result(conn, wait_event_info);
+                       if (result == NULL)
+                               break;                  /* query is complete, 
or failure */
+
+                       /*
+                        * Emulate PQexec()'s behavior of returning the last 
result when
+                        * there are many.
+                        */
+                       PQclear(lastResult);
+                       lastResult = result;
+
+                       if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+                               PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+                               PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+                               PQstatus(conn) == CONNECTION_BAD)
+                               break;
+               }
+       }
+       PG_CATCH();
+       {
+               PQclear(lastResult);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       return lastResult;
+}
+
+/*
+ * Perform the equivalent of PQgetResult(), but watch for interrupts.
+ */
+static inline PGresult *
+libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
+{
+       /*
+        * Collect data until PQgetResult is ready to get the result without
+        * blocking.
+        */
+       while (PQisBusy(conn))
+       {
+               int                     rc;
+
+               rc = WaitLatchOrSocket(MyLatch,
+                                                          WL_EXIT_ON_PM_DEATH 
| WL_LATCH_SET |
+                                                          WL_SOCKET_READABLE,
+                                                          PQsocket(conn),
+                                                          0,
+                                                          wait_event_info);
+
+               /* Interrupted? */
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
+
+               /* Consume whatever data is available from the socket */
+               if (PQconsumeInput(conn) == 0)
+               {
+                       /* trouble; expect PQgetResult() to return NULL */
+                       break;
+               }
+       }
+
+       /* Now we can collect and return the next PGresult */
+       return PQgetResult(conn);
+}
+
 #endif                                                 /* 
LIBPQ_BE_FE_HELPERS_H */

Reply via email to