On 2021/11/19 22:13, Bharath Rupireddy wrote:
How about adding the warning message in pgfdw_abort_cleanup instead of
pgfdw_get_cleanup_result?

Just before this in pgfdw_abort_cleanup seems better to me.

I was thinking pgfdw_get_cleanup_result() is better because it can
easily report different warning messages based on cases of a timeout
or connection failure, respectively. Since pgfdw_get_cleanup_result()
returns true in both those cases, ISTM that it's not easy to
distinguish them in pgfdw_abort_cleanup().

Anyway, attached is the patch (pgfdw_get_cleanup_result_v1.patch)
that makes pgfdw_get_cleanup_result() report a warning message.


Yeah, this seems to be an opportunity. But, the function should deal
with the timeout separately, I'm concerned that the function will
eventually be having if (timeout_param_specified) {  } else { } sort
of code. We can see how much duplicate code we save here vs the
readability or complexity that comes with the single function.

Please see the attached patch (refactor_pgfdw_get_result_v1.patch).
This is still WIP, but you can check how much the refactoring can
simplify the code.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/contrib/postgres_fdw/connection.c 
b/contrib/postgres_fdw/connection.c
index 4aff315b7c..203d4c411e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -104,7 +104,7 @@ static bool pgfdw_cancel_query(PGconn *conn);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
                                                                         bool 
ignore_errors);
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
-                                                                        
PGresult **result);
+                                                                        
PGresult **result, const char *query);
 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
                                                                bool toplevel);
 static bool UserMappingPasswordRequired(UserMapping *user);
@@ -1179,7 +1179,7 @@ pgfdw_cancel_query(PGconn *conn)
        }
 
        /* Get and discard the result of the query. */
-       if (pgfdw_get_cleanup_result(conn, endtime, &result))
+       if (pgfdw_get_cleanup_result(conn, endtime, &result, "(cancel 
request)"))
                return false;
        PQclear(result);
 
@@ -1223,7 +1223,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, 
bool ignore_errors)
        }
 
        /* Get the result of the query. */
-       if (pgfdw_get_cleanup_result(conn, endtime, &result))
+       if (pgfdw_get_cleanup_result(conn, endtime, &result, query))
                return false;
 
        /* Issue a warning if not successful. */
@@ -1248,7 +1248,8 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, 
bool ignore_errors)
  * Sets *result except in case of a timeout.
  */
 static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
+pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
+                                                const char *query)
 {
        volatile bool timed_out = false;
        PGresult   *volatile last_res = NULL;
@@ -1270,6 +1271,10 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz 
endtime, PGresult **result)
                                cur_timeout = 
TimestampDifferenceMilliseconds(now, endtime);
                                if (cur_timeout <= 0)
                                {
+                                       ereport(WARNING,
+                                                       (errmsg("could not get 
query result due to timeout"),
+                                                        query ? 
errcontext("remote SQL command: %s", query) : 0));
+
                                        timed_out = true;
                                        goto exit;
                                }
@@ -1289,6 +1294,8 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz 
endtime, PGresult **result)
                                {
                                        if (!PQconsumeInput(conn))
                                        {
+                                               pgfdw_report_error(WARNING, 
NULL, conn, false, query);
+
                                                /* connection trouble; treat 
the same as a timeout */
                                                timed_out = true;
                                                goto exit;
diff --git a/contrib/postgres_fdw/connection.c 
b/contrib/postgres_fdw/connection.c
index 4aff315b7c..3fd1abb8f5 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -103,8 +103,8 @@ static void 
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static bool pgfdw_cancel_query(PGconn *conn);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
                                                                         bool 
ignore_errors);
-static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
-                                                                        
PGresult **result);
+static bool pgfdw_get_result_with_timeout(PGconn *conn, TimestampTz endtime,
+                                                                               
  PGresult **result, const char *query);
 static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
                                                                bool toplevel);
 static bool UserMappingPasswordRequired(UserMapping *user);
@@ -729,53 +729,13 @@ pgfdw_exec_query(PGconn *conn, const char *query, 
PgFdwConnState *state)
 PGresult *
 pgfdw_get_result(PGconn *conn, const char *query)
 {
-       PGresult   *volatile last_res = NULL;
+       PGresult   *res;
+       bool            timed_out;
 
-       /* In what follows, do not leak any PGresults on an error. */
-       PG_TRY();
-       {
-               for (;;)
-               {
-                       PGresult   *res;
+       timed_out = pgfdw_get_result_with_timeout(conn, -1, &res, query);
+       Assert(!timed_out);
 
-                       while (PQisBusy(conn))
-                       {
-                               int                     wc;
-
-                               /* Sleep until there's something to do */
-                               wc = WaitLatchOrSocket(MyLatch,
-                                                                          
WL_LATCH_SET | WL_SOCKET_READABLE |
-                                                                          
WL_EXIT_ON_PM_DEATH,
-                                                                          
PQsocket(conn),
-                                                                          -1L, 
PG_WAIT_EXTENSION);
-                               ResetLatch(MyLatch);
-
-                               CHECK_FOR_INTERRUPTS();
-
-                               /* Data available in socket? */
-                               if (wc & WL_SOCKET_READABLE)
-                               {
-                                       if (!PQconsumeInput(conn))
-                                               pgfdw_report_error(ERROR, NULL, 
conn, false, query);
-                               }
-                       }
-
-                       res = PQgetResult(conn);
-                       if (res == NULL)
-                               break;                  /* query is complete */
-
-                       PQclear(last_res);
-                       last_res = res;
-               }
-       }
-       PG_CATCH();
-       {
-               PQclear(last_res);
-               PG_RE_THROW();
-       }
-       PG_END_TRY();
-
-       return last_res;
+       return res;
 }
 
 /*
@@ -1179,7 +1139,7 @@ pgfdw_cancel_query(PGconn *conn)
        }
 
        /* Get and discard the result of the query. */
-       if (pgfdw_get_cleanup_result(conn, endtime, &result))
+       if (pgfdw_get_result_with_timeout(conn, endtime, &result, NULL))
                return false;
        PQclear(result);
 
@@ -1223,7 +1183,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, 
bool ignore_errors)
        }
 
        /* Get the result of the query. */
-       if (pgfdw_get_cleanup_result(conn, endtime, &result))
+       if (pgfdw_get_result_with_timeout(conn, endtime, &result, query))
                return false;
 
        /* Issue a warning if not successful. */
@@ -1248,10 +1208,16 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char 
*query, bool ignore_errors)
  * Sets *result except in case of a timeout.
  */
 static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
+pgfdw_get_result_with_timeout(PGconn *conn, TimestampTz endtime,
+                                                         PGresult **result, 
const char *query)
 {
        volatile bool timed_out = false;
        PGresult   *volatile last_res = NULL;
+       int                     wakeEvents;
+
+       wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH;
+       if (endtime >= 0)
+               wakeEvents |= WL_TIMEOUT;
 
        /* In what follows, do not leak any PGresults on an error. */
        PG_TRY();
@@ -1263,37 +1229,44 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz 
endtime, PGresult **result)
                        while (PQisBusy(conn))
                        {
                                int                     wc;
-                               TimestampTz now = GetCurrentTimestamp();
-                               long            cur_timeout;
+                               long            cur_timeout = -1;
 
-                               /* If timeout has expired, give up, else get 
sleep time. */
-                               cur_timeout = 
TimestampDifferenceMilliseconds(now, endtime);
-                               if (cur_timeout <= 0)
+                               if (endtime >= 0)
                                {
-                                       timed_out = true;
-                                       goto exit;
-                               }
+                                       TimestampTz now = GetCurrentTimestamp();
 
-                               /* Sleep until there's something to do */
-                               wc = WaitLatchOrSocket(MyLatch,
-                                                                          
WL_LATCH_SET | WL_SOCKET_READABLE |
-                                                                          
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-                                                                          
PQsocket(conn),
-                                                                          
cur_timeout, PG_WAIT_EXTENSION);
-                               ResetLatch(MyLatch);
-
-                               CHECK_FOR_INTERRUPTS();
-
-                               /* Data available in socket? */
-                               if (wc & WL_SOCKET_READABLE)
-                               {
-                                       if (!PQconsumeInput(conn))
+                                       /* If timeout has expired, give up, 
else get sleep time. */
+                                       cur_timeout = 
TimestampDifferenceMilliseconds(now, endtime);
+                                       if (cur_timeout <= 0)
                                        {
-                                               /* connection trouble; treat 
the same as a timeout */
                                                timed_out = true;
                                                goto exit;
                                        }
                                }
+
+                               /* Sleep until there's something to do */
+                               wc = WaitLatchOrSocket(MyLatch, wakeEvents,
+                                                                          
PQsocket(conn),
+                                                                          
cur_timeout, PG_WAIT_EXTENSION);
+                               ResetLatch(MyLatch);
+
+                               CHECK_FOR_INTERRUPTS();
+
+                               /* Data available in socket? */
+                               if (wc & WL_SOCKET_READABLE)
+                               {
+                                       if (!PQconsumeInput(conn))
+                                       {
+                                               if (endtime >= 0)
+                                               {
+                                                       /* connection trouble; 
treat the same as a timeout */
+                                                       timed_out = true;
+                                                       goto exit;
+                                               }
+                                               else
+                                                       
pgfdw_report_error(ERROR, NULL, conn, false, query);
+                                       }
+                               }
                        }
 
                        res = PQgetResult(conn);

Reply via email to