Hi,

On 2020-07-27 09:44, Andrey V. Lepikhov wrote:
On 7/27/20 11:22 AM, tsunakawa.ta...@fujitsu.com wrote:

US8356007B2 - Distributed transaction management for database systems with multiversioning - Google Patents
https://patents.google.com/patent/US8356007


If it is, can we circumvent this patent?


Thank you for the research (and previous links too).
I haven't seen this patent before. This should be carefully studied.

I had a look on the patch set, although it is quite outdated, especially on 0003.

Two thoughts about 0003:

First, IIUC atomicity of the distributed transaction in the postgres_fdw is achieved by the usage of 2PC. I think that this postgres_fdw 2PC support should be separated from global snapshots. It could be useful to have such atomic distributed transactions even without a proper visibility, which is guaranteed by the global snapshot. Especially taking into account the doubts about Clock-SI and general questions about algorithm choosing criteria above in the thread.

Thus, I propose to split 0003 into two parts and add a separate GUC 'postgres_fdw.use_twophase', which could be turned on independently from 'postgres_fdw.use_global_snapshots'. Of course if the latter is enabled, then 2PC should be forcedly turned on as well.

Second, there are some problems with errors handling in the 0003 (thanks to Arseny Sher for review).

+error:
+                       if (!res)
+                       {
+                               sql = psprintf("ABORT PREPARED '%s'", 
fdwTransState->gid);
+                               BroadcastCmd(sql);
+                               elog(ERROR, "Failed to PREPARE transaction on remote 
node");
+                       }

It seems that we should never reach this point, just because BroadcastStmt will throw an ERROR if it fails to prepare transaction on the foreign server:

+                       if (PQresultStatus(result) != expectedStatus ||
+                               (handler && !handler(result, arg)))
+                       {
+ elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
+                               pgfdw_report_error(ERROR, result, entry->conn, 
true, sql);
+                               allOk = false;
+                       }

Moreover, It doesn't make much sense to try to abort prepared xacts, since if we failed to prepare it somewhere, then some foreign servers may become unavailable already and this doesn't provide us a 100% guarantee of clean up.

+       /* COMMIT open transaction of we were doing 2PC */
+       if (fdwTransState->two_phase_commit &&
+               (event == XACT_EVENT_PARALLEL_COMMIT || event == 
XACT_EVENT_COMMIT))
+       {
+               BroadcastCmd(psprintf("COMMIT PREPARED '%s'", 
fdwTransState->gid));
+       }

At this point, the host (local) transaction is already committed and there is no way to abort it gracefully. However, BroadcastCmd may rise an ERROR that will cause a PANIC, since it is non-recoverable state:

PANIC:  cannot abort transaction 487, it was already committed

Attached is a patch, which implements a plain 2PC in the postgres_fdw and adds a GUC 'postgres_fdw.use_twophase'. Also it solves these errors handling issues above and tries to add proper comments everywhere. I think, that 0003 should be rebased on the top of it, or it could be a first patch in the set, since it may be used independently. What do you think?


Regards
--
Alexey Kondratov

Postgres Professional https://www.postgrespro.com
Russian Postgres Company
From debdffade7abcdbf29031bda6c8359a89776ad36 Mon Sep 17 00:00:00 2001
From: Alexey Kondratov <kondratov.alek...@gmail.com>
Date: Fri, 7 Aug 2020 16:50:57 +0300
Subject: [PATCH] Add postgres_fdw.use_twophase GUC to use 2PC for transactions
 involving several servers.

---
 contrib/postgres_fdw/connection.c   | 234 +++++++++++++++++++++++++---
 contrib/postgres_fdw/postgres_fdw.c |  17 ++
 contrib/postgres_fdw/postgres_fdw.h |   2 +
 3 files changed, 228 insertions(+), 25 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 08daf26fdf0..d18fdd1f94e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -66,6 +66,20 @@ typedef struct ConnCacheEntry
  */
 static HTAB *ConnectionHash = NULL;
 
+/*
+ * FdwTransactionState
+ *
+ * Holds number of open remote transactions and shared state
+ * needed for all connection entries.
+ */
+typedef struct FdwTransactionState
+{
+	char	   *gid;
+	int			nparticipants;
+	bool		two_phase_commit;
+} FdwTransactionState;
+static FdwTransactionState *fdwTransState;
+
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -73,6 +87,9 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/* counter of prepared tx made by this backend */
+static int two_phase_xact_count = 0;
+
 /* prototypes of private functions */
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void disconnect_pg_server(ConnCacheEntry *entry);
@@ -81,6 +98,7 @@ static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void deallocate_prepared_stmts(ConnCacheEntry *entry);
 static void pgfdw_subxact_callback(SubXactEvent event,
 								   SubTransactionId mySubid,
 								   SubTransactionId parentSubid,
@@ -137,6 +155,16 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 									  pgfdw_inval_callback, (Datum) 0);
 	}
 
+	/* Allocate FdwTransactionState */
+	if (fdwTransState == NULL)
+	{
+		MemoryContext oldcxt;
+		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+		fdwTransState = palloc0(sizeof(FdwTransactionState));
+		fdwTransState->nparticipants = 0;
+		MemoryContextSwitchTo(oldcxt);
+	}
+
 	/* Set flag that we did GetConnection during the current transaction */
 	xact_got_connection = true;
 
@@ -448,7 +476,8 @@ configure_remote_session(PGconn *conn)
 }
 
 /*
- * Convenience subroutine to issue a non-data-returning SQL command to remote
+ * Convenience subroutine to issue a non-data-returning SQL command or
+ * statement to remote node.
  */
 static void
 do_sql_command(PGconn *conn, const char *sql)
@@ -494,6 +523,8 @@ begin_remote_xact(ConnCacheEntry *entry)
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
 		entry->changing_xact_state = false;
+
+		fdwTransState->nparticipants += 1;
 	}
 
 	/*
@@ -701,6 +732,76 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 	PG_END_TRY();
 }
 
+/* Callback typedef for BroadcastStmt */
+typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg);
+
+/*
+ * Broadcast sql in parallel to all ConnectionHash entries.
+ *
+ * In the case of elevel < ERROR and error occured only a elevel message
+ * will be rised and 0 (false) will be returned as a return code.  That way,
+ * it will be up to the caller to handle this situation gracefully.
+ */
+static bool
+BroadcastStmt(char const * sql, unsigned expectedStatus,
+			  int elevel, BroadcastCmdResHandler handler,
+			  void *arg)
+{
+	HASH_SEQ_STATUS scan;
+	ConnCacheEntry *entry;
+	bool		allOk = true;
+
+	/* Broadcast sql */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		pgfdw_reject_incomplete_xact_state_change(entry);
+
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			if (!PQsendQuery(entry->conn, sql))
+			{
+				PGresult   *res = PQgetResult(entry->conn);
+
+				elog(elevel < ERROR ? elevel : WARNING, "failed to send command %s", sql);
+				pgfdw_report_error(elevel, res, entry->conn, true, sql);
+				PQclear(res);
+			}
+		}
+	}
+
+	/* Collect responses */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			PGresult   *result = PQgetResult(entry->conn);
+
+			if (PQresultStatus(result) != expectedStatus ||
+				(handler && !handler(result, arg)))
+			{
+				elog(elevel < ERROR ? elevel : WARNING,
+					 "failed command %s: status=%d, expected status=%d",
+					 sql, PQresultStatus(result), expectedStatus);
+				pgfdw_report_error(elevel, result, entry->conn, true, sql);
+				allOk = false;
+			}
+			PQclear(result);
+			PQgetResult(entry->conn);	/* consume NULL result */
+		}
+	}
+
+	return allOk;
+}
+
+/* Wrapper for broadcasting commands */
+static bool
+BroadcastCmd(char const *sql, int elevel)
+{
+	return BroadcastStmt(sql, PGRES_COMMAND_OK, elevel, NULL, NULL);
+}
+
 /*
  * pgfdw_xact_callback --- cleanup at main-transaction end.
  */
@@ -714,6 +815,74 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	if (!xact_got_connection)
 		return;
 
+	/*
+	 * On PRE_COMMIT event we should figure out whether to use 2PC or not.
+	 * This decision is based on two factors:
+	 *    # postgres_fdw.use_twophase is turned on;
+	 *    # more than one server have participated in this transaction.
+	 *
+	 * If we decide to use 2PC for this xact, then we should broadcast
+	 * PREPARE to all participated foreign servers.
+	 */
+	if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
+	{
+		/* Should we take into account this node? */
+		if (TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+			fdwTransState->nparticipants += 1;
+
+		/* Switch to 2PC mode if there were more than one participant */
+		if (Use2PC && fdwTransState->nparticipants > 1)
+			fdwTransState->two_phase_commit = true;
+
+		if (fdwTransState->two_phase_commit)
+		{
+			char   *sql;
+
+			fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
+										  (long long) GetCurrentTimestamp(),
+										  (long long) GetSystemIdentifier(),
+										  MyProcPid,
+										  GetCurrentTransactionIdIfAny(),
+										  ++two_phase_xact_count,
+										  fdwTransState->nparticipants);
+
+			/* Broadcast PREPARE */
+			sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
+
+			/*
+			 * If we got any problem, then it does not make much sence to
+			 * broadcast ABORT PREPARED in order to clean up prepared xacts
+			 * everywhere, since this method does not guarantee a 100%
+			 * success.  This is a work for external tools.  Rise an ERROR
+			 * immediately in the case of failure during broadcast.
+			 */
+			BroadcastCmd(sql, ERROR);
+
+			/*
+			 * Do not fall down. Consequent COMMIT event will clean things up.
+			 */
+			return;
+		}
+	}
+
+	/*
+	 * COMMIT event occurs when the local transaction is fully committed.
+	 * That way, we have to broadcast COMMIT PREPARED to all participated
+	 * foreign servers in order to finalize this 'distributed' transaction.
+	 * Actually, it is too late to abort the host transaction if any error
+	 * occurs during COMMIT PREPARED broadcast stage.
+	 */
+	if (fdwTransState->two_phase_commit &&
+		(event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT))
+	{
+		if (!BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid), WARNING))
+			ereport(WARNING,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("canceling the wait for foreign servers to commit prepared transaction due to the error occured on one of them"),
+					 errdetail("The transaction has already committed locally, but might not have been committed on all participated foreign servers."),
+					 errhint("Consider committing it everywhere manually with COMMIT PREPARED '%s'", fdwTransState->gid)));
+	}
+
 	/*
 	 * Scan all connection cache entries to find open remote transactions, and
 	 * close them.
@@ -721,8 +890,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		PGresult   *res;
-
 		/* Ignore cache entry if no open connection right now */
 		if (entry->conn == NULL)
 			continue;
@@ -739,6 +906,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 			{
 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
+					Assert(!fdwTransState->two_phase_commit);
 
 					/*
 					 * If abort cleanup previously failed for this connection,
@@ -751,28 +919,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					do_sql_command(entry->conn, "COMMIT TRANSACTION");
 					entry->changing_xact_state = false;
 
-					/*
-					 * If there were any errors in subtransactions, and we
-					 * made prepared statements, do a DEALLOCATE ALL to make
-					 * sure we get rid of all prepared statements. This is
-					 * annoying and not terribly bulletproof, but it's
-					 * probably not worth trying harder.
-					 *
-					 * DEALLOCATE ALL only exists in 8.3 and later, so this
-					 * constrains how old a server postgres_fdw can
-					 * communicate with.  We intentionally ignore errors in
-					 * the DEALLOCATE, so that we can hobble along to some
-					 * extent with older servers (leaking prepared statements
-					 * as we go; but we don't really support update operations
-					 * pre-8.3 anyway).
-					 */
-					if (entry->have_prep_stmt && entry->have_error)
-					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
-						PQclear(res);
-					}
-					entry->have_prep_stmt = false;
-					entry->have_error = false;
+					deallocate_prepared_stmts(entry);
 					break;
 				case XACT_EVENT_PRE_PREPARE:
 
@@ -791,6 +938,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					break;
 				case XACT_EVENT_PARALLEL_COMMIT:
 				case XACT_EVENT_COMMIT:
+					if (fdwTransState->two_phase_commit)
+						deallocate_prepared_stmts(entry);
+					else /* Pre-commit should have closed the open transaction */
+						elog(ERROR, "missed cleaning up connection during pre-commit");
+					break;
 				case XACT_EVENT_PREPARE:
 					/* Pre-commit should have closed the open transaction */
 					elog(ERROR, "missed cleaning up connection during pre-commit");
@@ -886,6 +1038,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 	/* Also reset cursor numbering for next transaction */
 	cursor_number = 0;
+
+	/* Reset fdwTransState */
+	memset(fdwTransState, '\0', sizeof(FdwTransactionState));
+}
+
+/*
+ * If there were any errors in subtransactions, and we
+ * made prepared statements, do a DEALLOCATE ALL to make
+ * sure we get rid of all prepared statements. This is
+ * annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this
+ * constrains how old a server postgres_fdw can
+ * communicate with.  We intentionally ignore errors in
+ * the DEALLOCATE, so that we can hobble along to some
+ * extent with older servers (leaking prepared statements
+ * as we go; but we don't really support update operations
+ * pre-8.3 anyway).
+ */
+static void
+deallocate_prepared_stmts(ConnCacheEntry *entry)
+{
+	PGresult   *res;
+
+	if (entry->have_prep_stmt && entry->have_error)
+	{
+		res = PQexec(entry->conn, "DEALLOCATE ALL");
+		PQclear(res);
+	}
+	entry->have_prep_stmt = false;
+	entry->have_error = false;
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index a31abce7c99..7a7772f5dd3 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -301,6 +301,12 @@ typedef struct
 	List	   *already_used;	/* expressions already dealt with */
 } ec_member_foreign_arg;
 
+bool		Use2PC = false;
+
+#ifndef PG_FDW_BUILTIN
+void		_PG_init(void);
+#endif
+
 /*
  * SQL functions
  */
@@ -6583,3 +6589,14 @@ find_em_expr_for_input_target(PlannerInfo *root,
 	elog(ERROR, "could not find pathkey item to sort");
 	return NULL;				/* keep compiler quiet */
 }
+
+#ifndef PG_FDW_BUILTIN
+void
+_PG_init(void)
+{
+	DefineCustomBoolVariable("postgres_fdw.use_twophase",
+							 "Use two phase commit for distributed transactions", NULL,
+							 &Use2PC, false, PGC_USERSET, 0, NULL,
+							 NULL, NULL);
+}
+#endif
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db392..3c8cadc508a 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -208,4 +208,6 @@ extern const char *get_jointype_name(JoinType jointype);
 extern bool is_builtin(Oid objectId);
 extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
 
+extern bool Use2PC;
+
 #endif							/* POSTGRES_FDW_H */

base-commit: 49d7165117893405ae9b5b8d8e7877acff33c0e7
-- 
2.19.1

Reply via email to