Hi,
When reviewing the postgres_fdw parallel-abort patch [1], I found that
there are several duplicate codes in postgres_fdw/connection.c.
Which seems to make it harder to review the patch changing connection.c.
So I'd like to remove such duplicate codes and refactor the functions
in connection.c. I attached the following three patches.
There are two functions, pgfdw_get_result() and pgfdw_get_cleanup_result(),
to get a query result. They have almost the same code, call PQisBusy(),
WaitLatchOrSocket(), PQconsumeInput() and PQgetResult() in the loop,
but only pgfdw_get_cleanup_result() allows its callers to specify the timeout.
0001 patch transforms pgfdw_get_cleanup_result() to the common function
to get a query result and makes pgfdw_get_result() use it instead of
its own (duplicate) code. The patch also renames pgfdw_get_cleanup_result()
to pgfdw_get_result_timed().
pgfdw_xact_callback() and pgfdw_subxact_callback() have similar codes to
issue COMMIT or RELEASE SAVEPOINT commands. 0002 patch adds the common function,
pgfdw_exec_pre_commit(), for that purpose, and changes those functions
so that they use the common one.
pgfdw_finish_pre_commit_cleanup() and pgfdw_finish_pre_subcommit_cleanup()
have similar codes to wait for the results of COMMIT or RELEASE SAVEPOINT
commands.
0003 patch adds the common function, pgfdw_finish_pre_commit(), for that
purpose,
and replaces those functions with the common one.
That is, pgfdw_finish_pre_commit_cleanup() and
pgfdw_finish_pre_subcommit_cleanup()
are no longer necessary and 0003 patch removes them.
[1] https://commitfest.postgresql.org/38/3392/
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
From aa115d03880968c2e5bab68415e06e17a337a45b Mon Sep 17 00:00:00 2001
From: Fujii Masao <fu...@postgresql.org>
Date: Mon, 25 Jul 2022 17:25:24 +0900
Subject: [PATCH 1/3] Refactor pgfdw_get_result() and
pgfdw_get_cleanup_result().
---
contrib/postgres_fdw/connection.c | 125 +++++++++++-------------------
1 file changed, 47 insertions(+), 78 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c
b/contrib/postgres_fdw/connection.c
index 939d114f02..cbee285480 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -108,8 +108,8 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry,
bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool
ignore_errors);
-static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
-
PGresult **result, bool *timed_out);
+static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
+ PGresult
**result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
@@ -799,53 +799,12 @@ pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state)
PGresult *
pgfdw_get_result(PGconn *conn, const char *query)
{
- PGresult *volatile last_res = NULL;
-
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
- {
- for (;;)
- {
- PGresult *res;
-
- while (PQisBusy(conn))
- {
- int wc;
-
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
-
WL_LATCH_SET | WL_SOCKET_READABLE |
-
WL_EXIT_ON_PM_DEATH,
-
PQsocket(conn),
- -1L,
PG_WAIT_EXTENSION);
- ResetLatch(MyLatch);
-
- CHECK_FOR_INTERRUPTS();
-
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
- {
- if (!PQconsumeInput(conn))
- pgfdw_report_error(ERROR, NULL,
conn, false, query);
- }
- }
-
- res = PQgetResult(conn);
- if (res == NULL)
- break; /* query is complete */
+ PGresult *result = NULL;
- PQclear(last_res);
- last_res = res;
- }
- }
- PG_CATCH();
- {
- PQclear(last_res);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ if (pgfdw_get_result_timed(conn, 0, &result, NULL))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
- return last_res;
+ return result;
}
/*
@@ -1295,7 +1254,7 @@ pgfdw_cancel_query(PGconn *conn)
}
/* Get and discard the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1351,7 +1310,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors)
}
/* Get the result of the query. */
- if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
+ if (pgfdw_get_result_timed(conn, endtime, &result, &timed_out))
{
if (timed_out)
ereport(WARNING,
@@ -1375,24 +1334,33 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char
*query, bool ignore_errors)
}
/*
- * Get, during abort cleanup, the result of a query that is in progress. This
- * might be a query that is being interrupted by transaction abort, or it might
- * be a query that was initiated as part of transaction abort to get the remote
- * side back to the appropriate state.
+ * Get the result of a query.
+ *
+ * This function offers quick responsiveness by checking for any interruptions.
+ *
+ * If timed_out is NULL, the timeout does not occur. Otherwise, the timeout is
+ * enabled and endtime is used as the time at which this function should
+ * give up and assume the remote side is dead.
+ *
+ * Return true if the timeout expired or connection trouble occurred. Otherwise
+ * return false and set *result to the last result of a query. Set timed_out to
+ * true only when the timeout expired.
+ *
+ * This function emulates PQexec()'s behavior of returning the last result
+ * when there are many.
+ *
+ * Caller is responsible for the error handling on the result.
*
- * endtime is the time at which we should give up and assume the remote
- * side is dead. Returns true if the timeout expired or connection trouble
- * occurred, false otherwise. Sets *result except in case of a timeout.
- * Sets timed_out to true only when the timeout expired.
*/
static bool
-pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
- bool *timed_out)
+pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, PGresult **result,
+ bool *timed_out)
{
volatile bool failed = false;
PGresult *volatile last_res = NULL;
- *timed_out = false;
+ if (timed_out != NULL)
+ *timed_out = false;
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
@@ -1404,23 +1372,27 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz
endtime, PGresult **result,
while (PQisBusy(conn))
{
int wc;
- TimestampTz now = GetCurrentTimestamp();
- long cur_timeout;
+ long cur_timeout = -1;
+ int wakeEvents =
WL_LATCH_SET | WL_SOCKET_READABLE |
+ WL_EXIT_ON_PM_DEATH;
/* If timeout has expired, give up, else get
sleep time. */
- cur_timeout =
TimestampDifferenceMilliseconds(now, endtime);
- if (cur_timeout <= 0)
+ if (timed_out != NULL)
{
- *timed_out = true;
- failed = true;
- goto exit;
+ TimestampTz now = GetCurrentTimestamp();
+
+ cur_timeout =
TimestampDifferenceMilliseconds(now, endtime);
+ if (cur_timeout <= 0)
+ {
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
+ wakeEvents |= WL_TIMEOUT;
}
/* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
-
WL_LATCH_SET | WL_SOCKET_READABLE |
-
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-
PQsocket(conn),
+ wc = WaitLatchOrSocket(MyLatch, wakeEvents,
PQsocket(conn),
cur_timeout, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
@@ -1458,6 +1430,7 @@ exit: ;
PQclear(last_res);
else
*result = last_res;
+
return failed;
}
@@ -1599,13 +1572,9 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry = (ConnCacheEntry *) lfirst(lc);
/* Ignore errors (see notes in pgfdw_xact_callback) */
- while ((res = PQgetResult(entry->conn)) != NULL)
- {
- PQclear(res);
- /* Stop if the connection is lost (else we'll loop
infinitely) */
- if (PQstatus(entry->conn) == CONNECTION_BAD)
- break;
- }
+ pgfdw_get_result_timed(entry->conn, 0, &res, NULL);
+ PQclear(res);
+
entry->have_prep_stmt = false;
entry->have_error = false;
--
2.37.1
From 4205943a12d50255e7150f3b70aa75b061070e5e Mon Sep 17 00:00:00 2001
From: Fujii Masao <fu...@postgresql.org>
Date: Mon, 25 Jul 2022 22:45:06 +0900
Subject: [PATCH 2/3] Add common function to commit xact or subxact during
pre-commit.
---
contrib/postgres_fdw/connection.c | 122 ++++++++++++++++--------------
1 file changed, 66 insertions(+), 56 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c
b/contrib/postgres_fdw/connection.c
index cbee285480..ec290459be 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -111,6 +111,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const
char *query,
static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime,
PGresult
**result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+ List
**pending_entries, bool toplevel);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
@@ -894,8 +896,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- PGresult *res;
-
/* Ignore cache entry if no open connection right now */
if (entry->conn == NULL)
continue;
@@ -911,45 +911,10 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
- /*
- * If abort cleanup previously failed
for this connection,
- * we can't issue any more commands
against it.
- */
-
pgfdw_reject_incomplete_xact_state_change(entry);
-
/* Commit all remote transactions
during pre-commit */
- entry->changing_xact_state = true;
- if (entry->parallel_commit)
- {
-
do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
- pending_entries =
lappend(pending_entries, entry);
+ if (pgfdw_exec_pre_commit(entry,
"COMMIT TRANSACTION",
+
&pending_entries, true))
continue;
- }
- do_sql_command(entry->conn, "COMMIT
TRANSACTION");
- entry->changing_xact_state = false;
-
- /*
- * If there were any errors in
subtransactions, and we
- * made prepared statements, do a
DEALLOCATE ALL to make
- * sure we get rid of all prepared
statements. This is
- * annoying and not terribly
bulletproof, but it's
- * probably not worth trying harder.
- *
- * DEALLOCATE ALL only exists in 8.3
and later, so this
- * constrains how old a server
postgres_fdw can
- * communicate with. We intentionally
ignore errors in
- * the DEALLOCATE, so that we can
hobble along to some
- * extent with older servers (leaking
prepared statements
- * as we go; but we don't really
support update operations
- * pre-8.3 anyway).
- */
- if (entry->have_prep_stmt &&
entry->have_error)
- {
- res = PQexec(entry->conn,
"DEALLOCATE ALL");
- PQclear(res);
- }
- entry->have_prep_stmt = false;
- entry->have_error = false;
break;
case XACT_EVENT_PRE_PREPARE:
@@ -1014,6 +979,7 @@ pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
+ char sql[100];
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1029,11 +995,11 @@ pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
* of the current level, and close them.
*/
curlevel = GetCurrentTransactionNestLevel();
+ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
+
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
- char sql[100];
-
/*
* We only care about connections with open remote
subtransactions of
* the current level.
@@ -1047,23 +1013,9 @@ pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
{
- /*
- * If abort cleanup previously failed for this
connection, we
- * can't issue any more commands against it.
- */
- pgfdw_reject_incomplete_xact_state_change(entry);
-
/* Commit all remote subtransactions during pre-commit
*/
- snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d",
curlevel);
- entry->changing_xact_state = true;
- if (entry->parallel_commit)
- {
- do_sql_command_begin(entry->conn, sql);
- pending_entries = lappend(pending_entries,
entry);
+ if (pgfdw_exec_pre_commit(entry, sql, &pending_entries,
false))
continue;
- }
- do_sql_command(entry->conn, sql);
- entry->changing_xact_state = false;
}
else
{
@@ -1512,6 +1464,64 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
entry->changing_xact_state = false;
}
+/*
+ * Commit all remote transactions or subtransactions during pre-commit.
+ *
+ * If parallel_commit is enabled at this connection cache entry and
+ * the result of "sql" needs to be gotten later, return true and append
+ * this entry to "pending_entries".
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
+ */
+static bool
+pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
+ List **pending_entries, bool toplevel)
+{
+ PGresult *res;
+
+ /*
+ * If abort cleanup previously failed for this connection, we can't
issue
+ * any more commands against it.
+ */
+ pgfdw_reject_incomplete_xact_state_change(entry);
+
+ entry->changing_xact_state = true;
+ if (entry->parallel_commit)
+ {
+ do_sql_command_begin(entry->conn, sql);
+ *pending_entries = lappend(*pending_entries, entry);
+ return true;
+ }
+ do_sql_command(entry->conn, sql);
+ entry->changing_xact_state = false;
+
+ if (!toplevel)
+ return false;
+
+ /*
+ * If there were any errors in subtransactions, and we made prepared
+ * statements, do a DEALLOCATE ALL to make sure we get rid of all
prepared
+ * statements. This is annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how
old
+ * a server postgres_fdw can communicate with. We intentionally ignore
+ * errors in the DEALLOCATE, so that we can hobble along to some extent
+ * with older servers (leaking prepared statements as we go; but we
don't
+ * really support update operations pre-8.3 anyway).
+ */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ res = PQexec(entry->conn, "DEALLOCATE ALL");
+ PQclear(res);
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ return false;
+}
+
/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
--
2.37.1
From dbf44195ca0c51a172634a5d0fc16c34beb0381f Mon Sep 17 00:00:00 2001
From: Fujii Masao <fu...@postgresql.org>
Date: Mon, 25 Jul 2022 23:27:14 +0900
Subject: [PATCH 3/3] Merge pgfdw_finish_pre_commit_cleanup and
pgfdw_finish_pre_subcommit_cleanup into one.
---
contrib/postgres_fdw/connection.c | 78 ++++++++++---------------------
1 file changed, 25 insertions(+), 53 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c
b/contrib/postgres_fdw/connection.c
index ec290459be..6e23046ad6 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -113,9 +113,8 @@ static bool pgfdw_get_result_timed(PGconn *conn,
TimestampTz endtime,
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql,
List
**pending_entries, bool toplevel);
-static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
-static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
-
int curlevel);
+static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql,
+ bool
toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@@ -954,7 +953,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit_cleanup(pending_entries);
+ pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION",
true);
}
/*
@@ -1031,7 +1030,7 @@ pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
if (pending_entries)
{
Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ pgfdw_finish_pre_commit(pending_entries, sql, false);
}
}
@@ -1523,11 +1522,14 @@ pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char
*sql,
}
/*
- * Finish pre-commit cleanup of connections on each of which we've sent a
- * COMMIT command to the remote server.
+ * Wait for all remote transactions or subtransactions to be committed
+ * and finish pre-commit.
+ *
+ * "toplevel" should be set to true if toplevel (main) transaction is
+ * committed, false otherwise.
*/
static void
-pgfdw_finish_pre_commit_cleanup(List *pending_entries)
+pgfdw_finish_pre_commit(List *pending_entries, const char *sql, bool toplevel)
{
ConnCacheEntry *entry;
List *pending_deallocs = NIL;
@@ -1536,7 +1538,8 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
Assert(pending_entries);
/*
- * Get the result of the COMMIT command for each of the pending entries
+ * Get the result of COMMIT or RELEASE command for each of the pending
+ * entries.
*/
foreach(lc, pending_entries)
{
@@ -1548,23 +1551,26 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
* We might already have received the result on the socket, so
pass
* consume_input=true to try to consume it first
*/
- do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
+ do_sql_command_end(entry->conn, sql, true);
entry->changing_xact_state = false;
/* Do a DEALLOCATE ALL in parallel if needed */
- if (entry->have_prep_stmt && entry->have_error)
+ if (toplevel)
{
- /* Ignore errors (see notes in pgfdw_xact_callback) */
- if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+ if (entry->have_prep_stmt && entry->have_error)
{
- pending_deallocs = lappend(pending_deallocs,
entry);
- continue;
+ /* Ignore errors (see notes in
pgfdw_xact_callback) */
+ if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
+ {
+ pending_deallocs =
lappend(pending_deallocs, entry);
+ continue;
+ }
}
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
}
- entry->have_prep_stmt = false;
- entry->have_error = false;
- pgfdw_reset_xact_state(entry, true);
+ pgfdw_reset_xact_state(entry, toplevel);
}
/* No further work if no pending entries */
@@ -1588,41 +1594,7 @@ pgfdw_finish_pre_commit_cleanup(List *pending_entries)
entry->have_prep_stmt = false;
entry->have_error = false;
- pgfdw_reset_xact_state(entry, true);
- }
-}
-
-/*
- * Finish pre-subcommit cleanup of connections on each of which we've sent a
- * RELEASE command to the remote server.
- */
-static void
-pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
-{
- ConnCacheEntry *entry;
- char sql[100];
- ListCell *lc;
-
- Assert(pending_entries);
-
- /*
- * Get the result of the RELEASE command for each of the pending entries
- */
- snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
- foreach(lc, pending_entries)
- {
- entry = (ConnCacheEntry *) lfirst(lc);
-
- Assert(entry->changing_xact_state);
-
- /*
- * We might already have received the result on the socket, so
pass
- * consume_input=true to try to consume it first
- */
- do_sql_command_end(entry->conn, sql, true);
- entry->changing_xact_state = false;
-
- pgfdw_reset_xact_state(entry, false);
+ pgfdw_reset_xact_state(entry, toplevel);
}
}
--
2.37.1