From 2c0054327c65d34bb65228ec89f3409b71935061 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Thu, 21 Jan 2021 07:13:23 +0530
Subject: [PATCH v15] postgres_fdw function to discard cached connections

This patch introduces two new functions postgres_fdw_disconnect()
and postgres_fdw_disconnect_all() to discard cached connection
for a given foreign server or discard all cached connections
respectively.
---
 contrib/postgres_fdw/connection.c             | 129 ++++++++++++++
 .../postgres_fdw/expected/postgres_fdw.out    | 162 +++++++++++++++++-
 .../postgres_fdw/postgres_fdw--1.0--1.1.sql   |  10 ++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  62 ++++++-
 doc/src/sgml/postgres-fdw.sgml                |  57 ++++++
 5 files changed, 417 insertions(+), 3 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index a1404cb6bb..be0ff43b4d 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -80,6 +80,8 @@ static bool xact_got_connection = false;
  * SQL functions
  */
 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
+PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
+PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
 
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -102,6 +104,7 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
 static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
 									 PGresult **result);
 static bool UserMappingPasswordRequired(UserMapping *user);
+static bool disconnect_cached_connections(uint32 hashvalue, bool all);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -1470,3 +1473,129 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
 
 	PG_RETURN_VOID();
 }
+
+/*
+ * Disconnect cached foreign server connection.
+ *
+ * This function throws an error when there is no foreign server with the given
+ * name.
+ *
+ * If cached connection for the given foreign server is found and has not been
+ * used within current transaction yet, close the connection and return true.
+ * Even when it's found, if it's already used, keep the connection, emit a
+ * warning and return false. If it's not found, return false.
+ *
+ * It returns false, if the cache doesn't exit.
+ */
+Datum
+postgres_fdw_disconnect(PG_FUNCTION_ARGS)
+{
+	ForeignServer	*server = NULL;
+	char	*servername = NULL;
+	uint32	hashvalue;
+
+	servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	server = GetForeignServerByName(servername, false);
+	hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID,
+									  ObjectIdGetDatum(server->serverid));
+
+	PG_RETURN_BOOL(disconnect_cached_connections(hashvalue, false));
+}
+
+/*
+ * Disconnect all cached connections.
+ *
+ * This function scans all the cache entries, closes connections that are not
+ * being used within current transaction. It emits warning for each connection
+ * that's in use.
+ *
+ * It returns true, if it closes at least one connection, otherwise false.
+ */
+Datum
+postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_BOOL(disconnect_cached_connections(0, true));
+}
+
+/*
+ * Workhorse to disconnect cached connections.
+ *
+ * This function disconnects either all unused connections when called from
+ * postgres_fdw_disconnect_all or a given foreign server unused connection when
+ * called from postgres_fdw_disconnect.
+ *
+ * This function returns true if at least one connection is disconnected,
+ * otherwise false.
+ */
+static bool
+disconnect_cached_connections(uint32 hashvalue, bool all)
+{
+	HASH_SEQ_STATUS	scan;
+	ConnCacheEntry	*entry;
+	bool	result = false;
+
+	if (!ConnectionHash)
+		return result;
+
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		/*
+		 * Either disconnect given or all the active and not in use cached
+		 * connections.
+		 */
+		if ((all || entry->server_hashvalue == hashvalue) &&
+			 entry->conn)
+		{
+			/* We cannot close connection that's in use, so issue a warning. */
+			if (entry->xact_depth > 0)
+			{
+				ForeignServer *server;
+
+				server = GetForeignServerExtended(entry->serverid,
+												  FSV_MISSING_OK);
+
+				if (!server)
+				{
+					/*
+					 * If the server has been dropped in the current explicit
+					 * transaction, then this entry would have been invalidated
+					 * in pgfdw_inval_callback at the end of drop sever
+					 * command. Note that this connection would not have been
+					 * closed in pgfdw_inval_callback because it is still being
+					 * used in the current explicit transaction. So, assert
+					 * that here.
+					 */
+					Assert(entry->invalidated);
+
+					ereport(WARNING,
+							(errmsg("cannot close dropped server connection because it is still in use")));
+				}
+				else
+					ereport(WARNING,
+							(errmsg("cannot close connection for server \"%s\" because it is still in use",
+							 server->servername)));
+
+			}
+			else
+			{
+				elog(DEBUG3, "discarding connection %p", entry->conn);
+				disconnect_pg_server(entry);
+				result = true;
+			}
+
+			/*
+			 * For the given server, if we closed connection or it is still in
+			 * use, then no need of scanning the cache further.
+			 */
+			if (entry->server_hashvalue == hashvalue &&
+				(entry->xact_depth > 0 || result))
+			{
+				hash_seq_term(&scan);
+				break;
+			}
+		}
+	}
+
+	return result;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index b4a04d2c14..27e6a8f141 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -17,7 +17,10 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
-
+        EXECUTE $$CREATE SERVER loopback4 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
     END;
 $d$;
 CREATE USER MAPPING FOR public SERVER testserver1
@@ -25,6 +28,7 @@ CREATE USER MAPPING FOR public SERVER testserver1
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
 CREATE USER MAPPING FOR public SERVER loopback3;
+CREATE USER MAPPING FOR public SERVER loopback4;
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
@@ -140,6 +144,11 @@ CREATE FOREIGN TABLE ft7 (
 	c2 int NOT NULL,
 	c3 text
 ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
+CREATE FOREIGN TABLE ft8 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER loopback4 OPTIONS (schema_name 'S 1', table_name 'T 4');
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -211,7 +220,8 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
  public | ft5   | loopback  | (schema_name 'S 1', table_name 'T 4') | 
  public | ft6   | loopback2 | (schema_name 'S 1', table_name 'T 4') | 
  public | ft7   | loopback3 | (schema_name 'S 1', table_name 'T 4') | 
-(6 rows)
+ public | ft8   | loopback4 | (schema_name 'S 1', table_name 'T 4') | 
+(7 rows)
 
 -- Test that alteration of server options causes reconnection
 -- Remote's errors might be non-English, so hide them to ensure stable results
@@ -9253,3 +9263,151 @@ SELECT COUNT(*) FROM batch_table;
 
 -- Clean up
 DROP TABLE batch_table CASCADE;
+-- ===================================================================
+-- test postgres_fdw_disconnect function
+-- ===================================================================
+-- Return true as all cached connections are closed.
+SELECT postgres_fdw_disconnect_all();
+ postgres_fdw_disconnect_all 
+-----------------------------
+ t
+(1 row)
+
+-- Ensure to cache loopback connection.
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+BEGIN;
+-- Ensure to cache loopback2 connection.
+SELECT 1 FROM ft6 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- List all the existing cached connections. loopback and loopback2 should be
+-- output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | t
+ loopback2   | t
+(2 rows)
+
+-- Issue a warning and return false as loopback2 connection is still in use and
+-- can not be closed.
+SELECT postgres_fdw_disconnect('loopback2');
+WARNING:  cannot close connection for server "loopback2" because it is still in use
+ postgres_fdw_disconnect 
+-------------------------
+ f
+(1 row)
+
+-- Close loopback connection, return true and issue a warning as loopback2
+-- connection is still in use and can not be closed.
+SELECT postgres_fdw_disconnect_all();
+WARNING:  cannot close connection for server "loopback2" because it is still in use
+ postgres_fdw_disconnect_all 
+-----------------------------
+ t
+(1 row)
+
+-- List all the existing cached connections. loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback2   | t
+(1 row)
+
+-- Ensure to cache loopback connection.
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- List all the existing cached connections. loopback and loopback2 should be
+-- output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | t
+ loopback2   | t
+(2 rows)
+
+-- Return false as connections are still in use, warnings are issued for both
+-- loopback and loopback2 connections.
+SELECT postgres_fdw_disconnect_all();
+WARNING:  cannot close connection for server "loopback" because it is still in use
+WARNING:  cannot close connection for server "loopback2" because it is still in use
+ postgres_fdw_disconnect_all 
+-----------------------------
+ f
+(1 row)
+
+-- Ensure to cache loopback4 connection.
+SELECT 1 FROM ft8 LIMIT 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- List all the existing cached connections. loopback, loopback2, loopback4
+-- should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | t
+ loopback2   | t
+ loopback4   | t
+(3 rows)
+
+DROP SERVER loopback4 CASCADE;
+NOTICE:  drop cascades to 2 other objects
+DETAIL:  drop cascades to user mapping for public on server loopback4
+drop cascades to foreign table ft8
+-- Return false as connections are still in use, warnings are issued.
+SELECT postgres_fdw_disconnect_all();
+WARNING:  cannot close dropped server connection because it is still in use
+WARNING:  cannot close connection for server "loopback" because it is still in use
+WARNING:  cannot close connection for server "loopback2" because it is still in use
+ postgres_fdw_disconnect_all 
+-----------------------------
+ f
+(1 row)
+
+COMMIT;
+-- Close loopback2 connection and return true.
+SELECT postgres_fdw_disconnect('loopback2');
+ postgres_fdw_disconnect 
+-------------------------
+ t
+(1 row)
+
+-- List all the existing cached connections. loopback should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+ loopback    | t
+(1 row)
+
+-- Return an error as there is no foreign server with given name.
+SELECT postgres_fdw_disconnect('unknownserver');
+ERROR:  server "unknownserver" does not exist
+-- Close loopback connection and return true.
+SELECT postgres_fdw_disconnect_all();
+ postgres_fdw_disconnect_all 
+-----------------------------
+ t
+(1 row)
+
+-- List all the existing cached connections. No connection exists, so NULL
+-- should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+ server_name | valid 
+-------------+-------
+(0 rows)
+
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
index 7f85784466..b8c0209036 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.0--1.1.sql
@@ -8,3 +8,13 @@ CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
 RETURNS SETOF record
 AS 'MODULE_PATHNAME'
 LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_disconnect (text)
+RETURNS bool
+AS 'MODULE_PATHNAME','postgres_fdw_disconnect'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION postgres_fdw_disconnect_all ()
+RETURNS bool
+AS 'MODULE_PATHNAME','postgres_fdw_disconnect_all'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 28b82f5f9d..4924cab74f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -19,7 +19,10 @@ DO $d$
             OPTIONS (dbname '$$||current_database()||$$',
                      port '$$||current_setting('port')||$$'
             )$$;
-
+        EXECUTE $$CREATE SERVER loopback4 FOREIGN DATA WRAPPER postgres_fdw
+            OPTIONS (dbname '$$||current_database()||$$',
+                     port '$$||current_setting('port')||$$'
+            )$$;
     END;
 $d$;
 
@@ -28,6 +31,7 @@ CREATE USER MAPPING FOR public SERVER testserver1
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
 CREATE USER MAPPING FOR public SERVER loopback3;
+CREATE USER MAPPING FOR public SERVER loopback4;
 
 -- ===================================================================
 -- create objects used through FDW loopback server
@@ -154,6 +158,12 @@ CREATE FOREIGN TABLE ft7 (
 	c3 text
 ) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
 
+CREATE FOREIGN TABLE ft8 (
+	c1 int NOT NULL,
+	c2 int NOT NULL,
+	c3 text
+) SERVER loopback4 OPTIONS (schema_name 'S 1', table_name 'T 4');
+
 -- ===================================================================
 -- tests for validator
 -- ===================================================================
@@ -2831,3 +2841,53 @@ SELECT COUNT(*) FROM batch_table;
 
 -- Clean up
 DROP TABLE batch_table CASCADE;
+
+-- ===================================================================
+-- test postgres_fdw_disconnect function
+-- ===================================================================
+-- Return true as all cached connections are closed.
+SELECT postgres_fdw_disconnect_all();
+-- Ensure to cache loopback connection.
+SELECT 1 FROM ft1 LIMIT 1;
+BEGIN;
+-- Ensure to cache loopback2 connection.
+SELECT 1 FROM ft6 LIMIT 1;
+-- List all the existing cached connections. loopback and loopback2 should be
+-- output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+-- Issue a warning and return false as loopback2 connection is still in use and
+-- can not be closed.
+SELECT postgres_fdw_disconnect('loopback2');
+-- Close loopback connection, return true and issue a warning as loopback2
+-- connection is still in use and can not be closed.
+SELECT postgres_fdw_disconnect_all();
+-- List all the existing cached connections. loopback2 should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+-- Ensure to cache loopback connection.
+SELECT 1 FROM ft1 LIMIT 1;
+-- List all the existing cached connections. loopback and loopback2 should be
+-- output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+-- Return false as connections are still in use, warnings are issued for both
+-- loopback and loopback2 connections.
+SELECT postgres_fdw_disconnect_all();
+-- Ensure to cache loopback4 connection.
+SELECT 1 FROM ft8 LIMIT 1;
+-- List all the existing cached connections. loopback, loopback2, loopback4
+-- should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+DROP SERVER loopback4 CASCADE;
+-- Return false as connections are still in use, warnings are issued.
+SELECT postgres_fdw_disconnect_all();
+COMMIT;
+-- Close loopback2 connection and return true.
+SELECT postgres_fdw_disconnect('loopback2');
+-- List all the existing cached connections. loopback should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
+-- Return an error as there is no foreign server with given name.
+SELECT postgres_fdw_disconnect('unknownserver');
+-- Close loopback connection and return true.
+SELECT postgres_fdw_disconnect_all();
+-- List all the existing cached connections. No connection exists, so NULL
+-- should be output.
+SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index fb4c22ac69..738f0cbc85 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -522,6 +522,49 @@ postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><function>postgres_fdw_disconnect(server_name text) returns boolean</function></term>
+    <listitem>
+     <para>
+      This function discards the open connection that <filename>postgres_fdw</filename>
+      established from the local session to the foreign server with the given
+      name if it's not used in the current local transaction yet, and then
+      returns <literal>true</literal>. If it's already used, the function
+      doesn't discard the connection, emits a warning and then returns <literal>false</literal>.
+      If there is no open connection to the given foreign server, <literal>false</literal>
+      is returned. If no foreign server with the given name is found, an error
+      is emitted. Example usage of the function:
+    <screen>
+postgres=# SELECT * FROM postgres_fdw_disconnect('loopback1');
+ postgres_fdw_disconnect 
+-------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><function>postgres_fdw_disconnect_all() returns boolean</function></term>
+    <listitem>
+     <para>
+      This function discards all the open connections that <filename>postgres_fdw</filename>
+      established from the local session to the foreign server if they are not
+      used in the current local transaction yet. It doesn't discard the
+      connections that's already used in the current local transaction and
+      emits a warning for each such connection. It returns <literal>true</literal>,
+      if it closes at least one connection, otherwise <literal>false</literal>.
+      Example usage of the function:
+    <screen>
+postgres=# SELECT * FROM postgres_fdw_disconnect_all();
+ postgres_fdw_disconnect_all 
+-----------------------------
+ t
+</screen>
+     </para>
+    </listitem>
+   </varlistentry>
    </variablelist>
 
 </sect2>
@@ -537,6 +580,20 @@ postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
    multiple user identities (user mappings) are used to access the foreign
    server, a connection is established for each user mapping.
   </para>
+
+  <para>
+   Since the <filename>postgres_fdw</filename> keeps the connections to remote
+   servers in the local session, the corresponding sessions that are opened on
+   the remote servers are kept idle until they are re-used by the local session.
+   This may waste resources if those connections are not frequently used by the
+   local session. To address this, the <filename>postgres_fdw</filename>
+   provides following way to remove the connections to the remote servers and
+   so the remote sessions:
+    
+   <function>postgres_fdw_disconnect_all()</function> to discard all the
+   connections or <function>postgres_fdw_disconnect(server_name text)</function>
+   to discard the connection associated with the given foreign server.
+  </para>
  </sect2>
 
  <sect2>
-- 
2.25.1

