On 2021/11/19 22:13, Bharath Rupireddy wrote:
How about adding the warning message in pgfdw_abort_cleanup instead of
pgfdw_get_cleanup_result?
Just before this in pgfdw_abort_cleanup seems better to me.
I was thinking pgfdw_get_cleanup_result() is better because it can
easily report different warning messages based on cases of a timeout
or connection failure, respectively. Since pgfdw_get_cleanup_result()
returns true in both those cases, ISTM that it's not easy to
distinguish them in pgfdw_abort_cleanup().
Anyway, attached is the patch (pgfdw_get_cleanup_result_v1.patch)
that makes pgfdw_get_cleanup_result() report a warning message.
Yeah, this seems to be an opportunity. But, the function should deal
with the timeout separately, I'm concerned that the function will
eventually be having if (timeout_param_specified) { } else { } sort
of code. We can see how much duplicate code we save here vs the
readability or complexity that comes with the single function.
Please see the attached patch (refactor_pgfdw_get_result_v1.patch).
This is still WIP, but you can check how much the refactoring can
simplify the code.
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/contrib/postgres_fdw/connection.c
b/contrib/postgres_fdw/connection.c
index 4aff315b7c..203d4c411e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -104,7 +104,7 @@ static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool
ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
-
PGresult **result);
+
PGresult **result, const char *query);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
@@ -1179,7 +1179,7 @@ pgfdw_cancel_query(PGconn *conn)
}
/* Get and discard the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result))
+ if (pgfdw_get_cleanup_result(conn, endtime, &result, "(cancel
request)"))
return false;
PQclear(result);
@@ -1223,7 +1223,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors)
}
/* Get the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result))
+ if (pgfdw_get_cleanup_result(conn, endtime, &result, query))
return false;
/* Issue a warning if not successful. */
@@ -1248,7 +1248,8 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors)
* Sets *result except in case of a timeout.
*/
static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
+pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
+ const char *query)
{
volatile bool timed_out = false;
PGresult *volatile last_res = NULL;
@@ -1270,6 +1271,10 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz
endtime, PGresult **result)
cur_timeout =
TimestampDifferenceMilliseconds(now, endtime);
if (cur_timeout <= 0)
{
+ ereport(WARNING,
+ (errmsg("could not get
query result due to timeout"),
+ query ?
errcontext("remote SQL command: %s", query) : 0));
+
timed_out = true;
goto exit;
}
@@ -1289,6 +1294,8 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz
endtime, PGresult **result)
{
if (!PQconsumeInput(conn))
{
+ pgfdw_report_error(WARNING,
NULL, conn, false, query);
+
/* connection trouble; treat
the same as a timeout */
timed_out = true;
goto exit;
diff --git a/contrib/postgres_fdw/connection.c
b/contrib/postgres_fdw/connection.c
index 4aff315b7c..3fd1abb8f5 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -103,8 +103,8 @@ static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool
ignore_errors);
-static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
-
PGresult **result);
+static bool pgfdw_get_result_with_timeout(PGconn *conn, TimestampTz endtime,
+
PGresult **result, const char *query);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
@@ -729,53 +729,13 @@ pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state)
PGresult *
pgfdw_get_result(PGconn *conn, const char *query)
{
- PGresult *volatile last_res = NULL;
+ PGresult *res;
+ bool timed_out;
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
- {
- for (;;)
- {
- PGresult *res;
+ timed_out = pgfdw_get_result_with_timeout(conn, -1, &res, query);
+ Assert(!timed_out);
- while (PQisBusy(conn))
- {
- int wc;
-
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
-
WL_LATCH_SET | WL_SOCKET_READABLE |
-
WL_EXIT_ON_PM_DEATH,
-
PQsocket(conn),
- -1L,
PG_WAIT_EXTENSION);
- ResetLatch(MyLatch);
-
- CHECK_FOR_INTERRUPTS();
-
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
- {
- if (!PQconsumeInput(conn))
- pgfdw_report_error(ERROR, NULL,
conn, false, query);
- }
- }
-
- res = PQgetResult(conn);
- if (res == NULL)
- break; /* query is complete */
-
- PQclear(last_res);
- last_res = res;
- }
- }
- PG_CATCH();
- {
- PQclear(last_res);
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- return last_res;
+ return res;
}
/*
@@ -1179,7 +1139,7 @@ pgfdw_cancel_query(PGconn *conn)
}
/* Get and discard the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result))
+ if (pgfdw_get_result_with_timeout(conn, endtime, &result, NULL))
return false;
PQclear(result);
@@ -1223,7 +1183,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors)
}
/* Get the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result))
+ if (pgfdw_get_result_with_timeout(conn, endtime, &result, query))
return false;
/* Issue a warning if not successful. */
@@ -1248,10 +1208,16 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char
*query, bool ignore_errors)
* Sets *result except in case of a timeout.
*/
static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
+pgfdw_get_result_with_timeout(PGconn *conn, TimestampTz endtime,
+ PGresult **result,
const char *query)
{
volatile bool timed_out = false;
PGresult *volatile last_res = NULL;
+ int wakeEvents;
+
+ wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH;
+ if (endtime >= 0)
+ wakeEvents |= WL_TIMEOUT;
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
@@ -1263,37 +1229,44 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz
endtime, PGresult **result)
while (PQisBusy(conn))
{
int wc;
- TimestampTz now = GetCurrentTimestamp();
- long cur_timeout;
+ long cur_timeout = -1;
- /* If timeout has expired, give up, else get
sleep time. */
- cur_timeout =
TimestampDifferenceMilliseconds(now, endtime);
- if (cur_timeout <= 0)
+ if (endtime >= 0)
{
- timed_out = true;
- goto exit;
- }
+ TimestampTz now = GetCurrentTimestamp();
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
-
WL_LATCH_SET | WL_SOCKET_READABLE |
-
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-
PQsocket(conn),
-
cur_timeout, PG_WAIT_EXTENSION);
- ResetLatch(MyLatch);
-
- CHECK_FOR_INTERRUPTS();
-
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
- {
- if (!PQconsumeInput(conn))
+ /* If timeout has expired, give up,
else get sleep time. */
+ cur_timeout =
TimestampDifferenceMilliseconds(now, endtime);
+ if (cur_timeout <= 0)
{
- /* connection trouble; treat
the same as a timeout */
timed_out = true;
goto exit;
}
}
+
+ /* Sleep until there's something to do */
+ wc = WaitLatchOrSocket(MyLatch, wakeEvents,
+
PQsocket(conn),
+
cur_timeout, PG_WAIT_EXTENSION);
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Data available in socket? */
+ if (wc & WL_SOCKET_READABLE)
+ {
+ if (!PQconsumeInput(conn))
+ {
+ if (endtime >= 0)
+ {
+ /* connection trouble;
treat the same as a timeout */
+ timed_out = true;
+ goto exit;
+ }
+ else
+
pgfdw_report_error(ERROR, NULL, conn, false, query);
+ }
+ }
}
res = PQgetResult(conn);