Thanks for the comments. Attaching the v2 patch.

>
> > One way, we could solve the above problem is that, upon firing the new
> > foreign query from local backend using the cached connection,
> > (assuming the remote backend that was cached in the local backed got
> > killed for some reasons), instead of failing the query in the local
> > backend, upon detecting error from the remote backend, we could just
> > delete the cached old entry and try getting another connection to
> > remote backend, cache it and proceed to submit the query. This has to
> > happen only at the beginning of remote xact.
> +1.
>
> I think this is a very useful feature.
> In an environment with connection pooling for local, if a remote
> server has a failover or switchover,
> this feature would prevent unexpected errors of local queries after
> recovery of the remote server.

Thanks for backing this feature.

>
> I haven't looked at the code in detail yet, some comments here.
>

Thanks for the comments. Please feel free to review more of the
attached v2 patch.

>
> 1. To keep the previous behavior (and performance), how about allowing
> the user to specify
>    whether or not to retry as a GUC parameter or in the FOREIGN SERVER
OPTION?
>

Do we actually need this? We don't encounter much performance with this
connection retry, as
we just do it at the beginning of the remote xact i.e. only once per a
remote session, if we are
able to establish it's well and good otherwise, the query is bound to fail.

If at all, we need one (if there exists a strong reason to have the
option), then the question is
GUC or the SERVER OPTION?

There's a similar discussion going on having GUC at the core level vs
SERVER OPTION for
postgres_fdw in [1].

>
> 2. How about logging a LOG message when retry was success to let us know
>    the retry feature worked or how often the retries worked ?
>

In the v1 patch I added the logging messages, but in v2 patch
"postgres_fdw connection retry is successful" is added. Please note that
all the
new logs are added at level "DEBUG3" as all the existing logs are also at
the same
level.

>
> > I couldn't think of adding a test case to the existing postgres_fdw
> > regression test suite with an automated scenario of the remote backend
> > getting killed.
>
> Couldn't you confirm this by adding a test case like the following?
> ===================================================
> BEGIN;
> -- Generate a connection to remote
> SELECT * FROM ft1 LIMIT 1;
>
> -- retrieve pid of postgres_fdw and kill it
> -- could use the other unique identifier (not postgres_fdw but
> fdw_retry_check, etc ) for application name
> SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE
> backend_type = 'client backend' AND application_name = 'postgres_fdw'
>
> -- COMMIT, so next query will should success if connection-retry works
> COMMIT;
> SELECT * FROM ft1 LIMIT 1;
> ===================================================
>

Yes, this way it works. Thanks for the suggestion. I added the test
case to the postgres_fdw regression test suite. v2 patch has these
changes also.

[1] -
https://www.postgresql.org/message-id/CALj2ACVvrp5%3DAVp2PupEm%2BnAC8S4buqR3fJMmaCoc7ftT0aD2A%40mail.gmail.com
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From b3725acc916e0daa2797b9d159fc6cc5cdcf43c6 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Sat, 11 Jul 2020 14:57:58 +0530
Subject: [PATCH v2] Retry Cached Remote Connections For postgres_fdw

Remote connections are cached for postgres_fdw in the local backend.
There are high chances that the remote backend can no logner be
avaiable i.e. it can get killed, the subsequent foreign queries
from local backend/session that uses cached connection fails as
the remote backend woule have become unavailable/killed. So,
this patch, solves this problem,
1. local backend/session uses cached connection, but recieves error
2. upon receiving the first error, it deletes the cached entry
3. try to get new connection at the start of begin remote xact
---
 contrib/postgres_fdw/connection.c             | 130 +++++++++++++++++-
 .../postgres_fdw/expected/postgres_fdw.out    |  30 ++++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  19 +++
 3 files changed, 175 insertions(+), 4 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 52d1fe3563..e3658dd704 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -45,6 +45,19 @@
  */
 typedef Oid ConnCacheKey;
 
+typedef enum ConnectionRetryType
+{
+	/* initial value for all cache entries */
+	CONN_RETRY_NONE,
+	/* indicates that the caller is ready to retry connection */
+	CONN_RETRY_READY,
+	/*
+	 * indicates to the caller that the connection may have
+	 * broken, it's okay to retry to get a new connection.
+	*/
+	CONN_RETRY_DO
+}ConnectionRetryType;
+
 typedef struct ConnCacheEntry
 {
 	ConnCacheKey key;			/* hash key (must be first) */
@@ -58,8 +71,15 @@ typedef struct ConnCacheEntry
 	bool		invalidated;	/* true if reconnect is pending */
 	uint32		server_hashvalue;	/* hash value of foreign server OID */
 	uint32		mapping_hashvalue;	/* hash value of user mapping OID */
+	/* conn retry status, default is CONN_RETRY_NONE for all entries */
+	uint8       conn_retry;
 } ConnCacheEntry;
 
+#define IsRemoteXactBegin(sql) \
+ ((strcmp(sql, "START TRANSACTION ISOLATION LEVEL SERIALIZABLE") == 0 || \
+  strcmp(sql, "START TRANSACTION ISOLATION LEVEL REPEATABLE READ") == 0) ? \
+  1 : 0)
+
 /*
  * Connection cache (initialized on first use)
  */
@@ -109,6 +129,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 	bool		found;
 	ConnCacheEntry *entry;
 	ConnCacheKey key;
+	bool	dobeginxact = true;
+	bool	doretry		= false;
 
 	/* First time through, initialize connection cache hashtable */
 	if (ConnectionHash == NULL)
@@ -170,10 +192,54 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 	}
 
 	/*
-	 * We don't check the health of cached connection here, because it would
-	 * require some overhead.  Broken connection will be detected when the
-	 * connection is actually used.
+	 * We check the health of cached connection here, while the remote xact is
+	 * goting to begin.  Broken connection will be detected and if so, clear the
+	 * existing conn, get a new connection and resubmit the remote xact begin.
+	 * We retry the broken connections only once during each begin remote xact
+	 * call, still the broken connection exist after that then, the query fails.
 	 */
+	if (entry->conn != NULL &&
+		entry->xact_depth <= 0 &&
+		entry->conn_retry == CONN_RETRY_NONE)
+	{
+		/*
+		 * This is before begin remote xact, so safe to do retry, hence indicate
+		 * that in the cached entry.
+		 */
+		entry->conn_retry = CONN_RETRY_READY;
+		begin_remote_xact(entry);
+
+		/*
+		 * The previously set RETRY_READY status gets changed to RETRY_DO, if there
+		 * exists a broken connection, if so, then clear the entry.
+		 */
+		if (entry->conn_retry == CONN_RETRY_DO)
+		{
+			elog(DEBUG3, "postgres_fdw cached connection %p is broken, hence closing it", entry->conn);
+			/*
+			 * It is okay to call disconnect here thought the connection is broken.
+			 * No termninate message will actually be sent to remote backend by
+			 * sendTerminateConn() as the connection status will be CONNECTION_BAD.
+			 * So disconnect will just do the clearing and resetting of parameters
+			 * in entry data structure.
+			 */
+			disconnect_pg_server(entry);
+			doretry = true;
+		}
+		else if (entry->conn_retry == CONN_RETRY_READY)
+		{
+			elog(DEBUG3, "postgres_fdw cached connection %p is healthy, hence using it", entry->conn);
+			/*
+			 * Connection is not broken, so remote xact being command is executed
+			 * successfully. Mark that here, to avoid further begin remote xact call
+			 * in this funciton.
+			 */
+			dobeginxact = false;
+		}
+
+		/* Reset the conn_retry to default value. */
+		entry->conn_retry = CONN_RETRY_NONE;
+	}
 
 	/*
 	 * If cache entry doesn't have a connection, we have to establish a new
@@ -199,15 +265,20 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 
 		/* Now try to make the connection */
 		entry->conn = connect_pg_server(server, user);
+		entry->conn_retry = CONN_RETRY_NONE;
 
 		elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
 			 entry->conn, server->servername, user->umid, user->userid);
+
+		if (doretry)
+			elog(DEBUG3, "postgres_fdw connection retry is successful");
 	}
 
 	/*
 	 * Start a new transaction or subtransaction if needed.
 	 */
-	begin_remote_xact(entry);
+	if (dobeginxact)
+		begin_remote_xact(entry);
 
 	/* Remember if caller will prepare statements */
 	entry->have_prep_stmt |= will_prep_stmt;
@@ -457,6 +528,17 @@ do_sql_command(PGconn *conn, const char *sql)
 	if (!PQsendQuery(conn, sql))
 		pgfdw_report_error(ERROR, NULL, conn, false, sql);
 	res = pgfdw_get_result(conn, sql);
+
+	/*
+	 * If this function is called from begin_remote_xact and returned res is null
+	 * from pgfdw_get_result, which means that the remote connection is broken, so
+	 * just return from here, the cached entry's retry status would have been set
+	 * appropriately to indicate the broken connection.
+	 */
+	if (res == NULL &&
+		IsRemoteXactBegin(sql))
+		return;
+
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		pgfdw_report_error(ERROR, res, conn, true, sql);
 	PQclear(res);
@@ -591,6 +673,7 @@ PGresult *
 pgfdw_get_result(PGconn *conn, const char *query)
 {
 	PGresult   *volatile last_res = NULL;
+	bool 		retry  = false;
 
 	/* In what follows, do not leak any PGresults on an error. */
 	PG_TRY();
@@ -617,10 +700,49 @@ pgfdw_get_result(PGconn *conn, const char *query)
 				if (wc & WL_SOCKET_READABLE)
 				{
 					if (!PQconsumeInput(conn))
+					{
+						/*
+						 * Do this on error and for only begin remote xact
+						 * commands.
+						 */
+						if (IsRemoteXactBegin(query) &&
+							ConnectionHash != NULL)
+						{
+							HASH_SEQ_STATUS scan;
+							ConnCacheEntry *entry;
+
+							hash_seq_init(&scan, ConnectionHash);
+							while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+							{
+								if (entry->conn == conn &&
+									entry->conn_retry == CONN_RETRY_READY)
+								{
+									/* got the required connection from the cache. */
+									entry->conn_retry = CONN_RETRY_DO;
+									hash_seq_term(&scan);
+									retry = true;
+									break;
+								}
+							}
+						}
+
+						/* if retry to be done, don't report error. */
+						if (retry == true)
+							break;
+
 						pgfdw_report_error(ERROR, NULL, conn, false, query);
+					}
 				}
 			}
 
+			if (retry == true)
+			{
+				/* clear if there is any previous res. */
+				PQclear(last_res);
+				last_res = NULL;
+				break;
+			}
+
 			res = PQgetResult(conn);
 			if (res == NULL)
 				break;			/* query is complete */
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 82fc1290ef..a813c4918f 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8974,3 +8974,33 @@ PREPARE TRANSACTION 'fdw_tpc';
 ERROR:  cannot PREPARE a transaction that has operated on postgres_fdw foreign tables
 ROLLBACK;
 WARNING:  there is no transaction in progress
+-- Retry cached connections at the beginning of the remote xact
+-- in case remote backend is killed.
+-- Let's use a different application name for remote connection,
+-- so that this test will not kill other backends wrongly.
+ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
+-- Generate a connection to remote. Local backend will cache it.
+SELECT * FROM ft1 LIMIT 1;
+  c1  | c2 | c3 | c4 | c5 | c6 |     c7     | c8 
+------+----+----+----+----+----+------------+----
+ 1111 |  2 |    |    |    |    | ft1        | 
+(1 row)
+
+-- Retrieve pid of remote backend with application name fdw_retry_check
+-- and kill it intentionally here. Note that, local backend still has
+-- the remote connection/backend info in it's cache.
+SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE
+backend_type = 'client backend' AND application_name = 'fdw_retry_check';
+ pg_terminate_backend 
+----------------------
+ t
+(1 row)
+
+-- Next query using the same foreign server should succeed if connection
+-- retry works.
+SELECT * FROM ft1 LIMIT 1;
+  c1  | c2 | c3 | c4 | c5 | c6 |     c7     | c8 
+------+----+----+----+----+----+------------+----
+ 1111 |  2 |    |    |    |    | ft1        | 
+(1 row)
+
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 83971665e3..9552625d2e 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2634,3 +2634,22 @@ SELECT count(*) FROM ft1;
 -- error here
 PREPARE TRANSACTION 'fdw_tpc';
 ROLLBACK;
+
+-- Retry cached connections at the beginning of the remote xact
+-- in case remote backend is killed.
+-- Let's use a different application name for remote connection,
+-- so that this test will not kill other backends wrongly.
+ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
+
+-- Generate a connection to remote. Local backend will cache it.
+SELECT * FROM ft1 LIMIT 1;
+
+-- Retrieve pid of remote backend with application name fdw_retry_check
+-- and kill it intentionally here. Note that, local backend still has
+-- the remote connection/backend info in it's cache.
+SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE
+backend_type = 'client backend' AND application_name = 'fdw_retry_check';
+
+-- Next query using the same foreign server should succeed if connection
+-- retry works.
+SELECT * FROM ft1 LIMIT 1;
-- 
2.25.1

Reply via email to