On Mon, 2024-12-16 at 20:05 -0800, Jeff Davis wrote:
> On Wed, 2024-10-30 at 08:08 -0700, Jeff Davis wrote:
> 

Rebased v14.

The approach has changed multiple times. It starte off with more in-
core code, but in response to review feedback, has become more
decoupled from core and more coupled to postgres_fdw.

But the patch has been about the same (just rebases) since March of
last year, and hasn't gotten feedback since. I still think it's a nice
feature, but I'd like some feedback on the externals of the feature.

As a note, this will require a version bump for postgres_fdw for the
new connection method.

Regards,
        Jeff Davis

From e63b42acfb4d4d8241b4453520a7fe52195c0f99 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Tue, 2 Jan 2024 13:42:48 -0800
Subject: [PATCH v14] CREATE SUSBCRIPTION ... SERVER.

Allow specifying a foreign server for CREATE SUBSCRIPTION, rather than
a raw connection string with CONNECTION.

Using a foreign server as a layer of indirection improves management
of multiple subscriptions to the same server. It also provides
integration with user mappings in case different subscriptions have
different owners or a subscription changes owners.

Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.ca...@j-davis.com
Reviewed-by: Ashutosh Bapat
---
 contrib/postgres_fdw/Makefile                 |   2 +
 contrib/postgres_fdw/connection.c             |  73 ++++++++
 .../postgres_fdw/expected/postgres_fdw.out    |   8 +
 contrib/postgres_fdw/meson.build              |   1 +
 .../postgres_fdw/postgres_fdw--1.1--1.2.sql   |   8 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   7 +
 contrib/postgres_fdw/t/010_subscription.pl    |  71 ++++++++
 doc/src/sgml/ref/alter_subscription.sgml      |  18 +-
 doc/src/sgml/ref/create_subscription.sgml     |  11 +-
 src/backend/catalog/pg_subscription.c         |  38 +++-
 src/backend/commands/foreigncmds.c            |  58 +++++-
 src/backend/commands/subscriptioncmds.c       | 168 ++++++++++++++++--
 src/backend/foreign/foreign.c                 |  66 +++++++
 src/backend/parser/gram.y                     |  22 +++
 src/backend/replication/logical/worker.c      |  16 +-
 src/bin/pg_dump/pg_dump.c                     |  35 +++-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/tab-complete.in.c                |   2 +-
 src/include/catalog/pg_foreign_data_wrapper.h |   3 +
 src/include/catalog/pg_subscription.h         |   7 +-
 src/include/foreign/foreign.h                 |   3 +
 src/include/nodes/parsenodes.h                |   3 +
 src/test/regress/expected/oidjoins.out        |   1 +
 23 files changed, 587 insertions(+), 35 deletions(-)
 create mode 100644 contrib/postgres_fdw/t/010_subscription.pl

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index adfbd2ef758..59b805656c1 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -19,6 +19,8 @@ DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.s
 REGRESS = postgres_fdw query_cancel
 TAP_TESTS = 1
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 8a8d3b4481f..961368a919a 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -131,6 +131,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
+PG_FUNCTION_INFO_V1(postgres_fdw_connection);
 
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -2279,6 +2280,78 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
 	}
 }
 
+/*
+ * Values in connection strings must be enclosed in single quotes. Single
+ * quotes and backslashes must be escaped with backslash. NB: these rules are
+ * different from the rules for escaping a SQL literal.
+ */
+static void
+appendEscapedValue(StringInfo str, const char *val)
+{
+	appendStringInfoChar(str, '\'');
+	for (int i = 0; val[i] != '\0'; i++)
+	{
+		if (val[i] == '\\' || val[i] == '\'')
+			appendStringInfoChar(str, '\\');
+		appendStringInfoChar(str, val[i]);
+	}
+	appendStringInfoChar(str, '\'');
+}
+
+Datum
+postgres_fdw_connection(PG_FUNCTION_ARGS)
+{
+	Oid			userid = PG_GETARG_OID(0);
+	Oid			serverid = PG_GETARG_OID(1);
+	ForeignServer *server = GetForeignServer(serverid);
+	UserMapping *user = GetUserMapping(userid, serverid);
+	StringInfoData str;
+	const char **keywords;
+	const char **values;
+	int			n;
+
+	/*
+	 * Construct connection params from generic options of ForeignServer and
+	 * UserMapping.  (Some of them might not be libpq options, in which case
+	 * we'll just waste a few array slots.)  Add 4 extra slots for
+	 * application_name, fallback_application_name, client_encoding, end
+	 * marker.
+	 */
+	n = list_length(server->options) + list_length(user->options) + 4;
+	keywords = (const char **) palloc(n * sizeof(char *));
+	values = (const char **) palloc(n * sizeof(char *));
+
+	n = 0;
+	n += ExtractConnectionOptions(server->options,
+								  keywords + n, values + n);
+	n += ExtractConnectionOptions(user->options,
+								  keywords + n, values + n);
+
+	/* Set client_encoding so that libpq can convert encoding properly. */
+	keywords[n] = "client_encoding";
+	values[n] = GetDatabaseEncodingName();
+	n++;
+
+	keywords[n] = values[n] = NULL;
+
+	/* verify the set of connection parameters */
+	check_conn_params(keywords, values, user);
+
+	initStringInfo(&str);
+	for (int i = 0; i < n; i++)
+	{
+		char	   *sep = "";
+
+		appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
+		appendEscapedValue(&str, values[i]);
+		sep = " ";
+	}
+
+	pfree(keywords);
+	pfree(values);
+	PG_RETURN_TEXT_P(cstring_to_text(str.data));
+}
+
 /*
  * List active foreign server connections.
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 8447b289cb7..61a7e3455a8 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -256,6 +256,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 -- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+-- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
 CREATE PUBLICATION testpub_ftbl FOR TABLE ft1;  -- should fail
diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build
index 8b29be24dee..33f98ab86f2 100644
--- a/contrib/postgres_fdw/meson.build
+++ b/contrib/postgres_fdw/meson.build
@@ -44,6 +44,7 @@ tests += {
   'tap': {
     'tests': [
       't/001_auth_scram.pl',
+      't/010_subscription.pl',
     ],
   },
 }
diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
index 81aad4fcdaa..8981787d165 100644
--- a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
+++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
@@ -16,3 +16,11 @@ CREATE FUNCTION postgres_fdw_get_connections (
 RETURNS SETOF record
 AS 'MODULE_PATHNAME', 'postgres_fdw_get_connections_1_2'
 LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+-- takes internal parameter to prevent calling from SQL
+CREATE FUNCTION postgres_fdw_connection(oid, oid, internal)
+RETURNS text
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT;
+
+ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 1598d9e0862..f733b580859 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -248,6 +248,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 
+-- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+
 -- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl
new file mode 100644
index 00000000000..a39e8fdbba4
--- /dev/null
+++ b/contrib/postgres_fdw/t/010_subscription.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
+
+# Replicate the changes without columns
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_no_col default VALUES");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
+
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1050), 'check initial data was copied to subscriber');
+
+done_testing();
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index fdc648d007f..35a8101796b 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -101,13 +102,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-altersubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the foreign server
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-altersubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
      <para>
-      This clause replaces the connection string originally set by
-      <xref linkend="sql-createsubscription"/>.  See there for more
-      information.
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the connection
+      string <replaceable>conninfo</replaceable>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 57dec28a5df..966efae16a3 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
-    CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+    { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
     PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
     [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createsubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      A foreign server to use for the connection.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createsubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1395032413e..490cb965965 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -19,11 +19,14 @@
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "storage/lmgr.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
  * Fetch the subscription from the syscache.
  */
 Subscription *
-GetSubscription(Oid subid, bool missing_ok)
+GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
 {
 	HeapTuple	tup;
 	Subscription *sub;
@@ -105,10 +108,35 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->failover = subform->subfailover;
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
-								   tup,
-								   Anum_pg_subscription_subconninfo);
-	sub->conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(subform->subserver))
+	{
+		AclResult	aclresult;
+
+		/* recheck ACL if requested */
+		if (aclcheck)
+		{
+			aclresult = object_aclcheck(ForeignServerRelationId,
+										subform->subserver,
+										subform->subowner, ACL_USAGE);
+
+			if (aclresult != ACLCHECK_OK)
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+								GetUserNameFromId(subform->subowner, false),
+								ForeignServerName(subform->subserver))));
+		}
+
+		sub->conninfo = ForeignServerConnectionString(subform->subowner,
+													  subform->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+									   tup,
+									   Anum_pg_subscription_subconninfo);
+		sub->conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index c14e038d54f..5913678c5b1 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -511,21 +511,53 @@ lookup_fdw_validator_func(DefElem *validator)
 	/* validator's return value is ignored, so we don't check the type */
 }
 
+/*
+ * Convert a connection string function name passed from the parser to an Oid.
+ */
+static Oid
+lookup_fdw_connection_func(DefElem *connection)
+{
+	Oid			connectionOid;
+	Oid			funcargtypes[3];
+
+	if (connection == NULL || connection->arg == NULL)
+		return InvalidOid;
+
+	/* connection string functions take user oid, server oid */
+	funcargtypes[0] = OIDOID;
+	funcargtypes[1] = OIDOID;
+	funcargtypes[2] = INTERNALOID;
+
+	connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false);
+
+	/* check that connection string function has correct return type */
+	if (get_func_rettype(connectionOid) != TEXTOID)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s must return type %s",
+						NameListToString((List *) connection->arg), "text")));
+
+	return connectionOid;
+}
+
 /*
  * Process function options of CREATE/ALTER FDW
  */
 static void
 parse_func_options(ParseState *pstate, List *func_options,
 				   bool *handler_given, Oid *fdwhandler,
-				   bool *validator_given, Oid *fdwvalidator)
+				   bool *validator_given, Oid *fdwvalidator,
+				   bool *connection_given, Oid *fdwconnection)
 {
 	ListCell   *cell;
 
 	*handler_given = false;
 	*validator_given = false;
+	*connection_given = false;
 	/* return InvalidOid if not given */
 	*fdwhandler = InvalidOid;
 	*fdwvalidator = InvalidOid;
+	*fdwconnection = InvalidOid;
 
 	foreach(cell, func_options)
 	{
@@ -545,6 +577,13 @@ parse_func_options(ParseState *pstate, List *func_options,
 			*validator_given = true;
 			*fdwvalidator = lookup_fdw_validator_func(def);
 		}
+		else if (strcmp(def->defname, "connection") == 0)
+		{
+			if (*connection_given)
+				errorConflictingDefElem(def, pstate);
+			*connection_given = true;
+			*fdwconnection = lookup_fdw_connection_func(def);
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 def->defname);
@@ -564,8 +603,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
 	Oid			fdwId;
 	bool		handler_given;
 	bool		validator_given;
+	bool		connection_given;
 	Oid			fdwhandler;
 	Oid			fdwvalidator;
+	Oid			fdwconnection;
 	Datum		fdwoptions;
 	Oid			ownerId;
 	ObjectAddress myself;
@@ -609,10 +650,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
 	/* Lookup handler and validator functions, if given */
 	parse_func_options(pstate, stmt->func_options,
 					   &handler_given, &fdwhandler,
-					   &validator_given, &fdwvalidator);
+					   &validator_given, &fdwvalidator,
+					   &connection_given, &fdwconnection);
 
 	values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler);
 	values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
+	values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
 
 	nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
 
@@ -684,8 +727,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
 	Datum		datum;
 	bool		handler_given;
 	bool		validator_given;
+	bool		connection_given;
 	Oid			fdwhandler;
 	Oid			fdwvalidator;
+	Oid			fdwconnection;
 	ObjectAddress myself;
 
 	rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
@@ -715,7 +760,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
 
 	parse_func_options(pstate, stmt->func_options,
 					   &handler_given, &fdwhandler,
-					   &validator_given, &fdwvalidator);
+					   &validator_given, &fdwvalidator,
+					   &connection_given, &fdwconnection);
 
 	if (handler_given)
 	{
@@ -753,6 +799,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
 		fdwvalidator = fdwForm->fdwvalidator;
 	}
 
+	if (connection_given)
+	{
+		repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
+		repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true;
+	}
+
 	/*
 	 * If options specified, validate and update.
 	 */
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 4aec73bcc6b..9b9ab6657aa 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -26,14 +26,17 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
 #include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -546,6 +549,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
+	Oid			serverid;
 	char	   *conninfo;
 	char		originname[NAMEDATALEN];
 	List	   *publications;
@@ -638,15 +642,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.synchronous_commit == NULL)
 		opts.synchronous_commit = "off";
 
-	conninfo = stmt->conninfo;
-	publications = stmt->publication;
-
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
+	if (stmt->servername)
+	{
+		ForeignServer *server;
+
+		Assert(!stmt->conninfo);
+		conninfo = NULL;
+
+		server = GetForeignServerByName(stmt->servername, false);
+		aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+		/* make sure a user mapping exists */
+		GetUserMapping(owner, server->serverid);
+
+		serverid = server->serverid;
+		conninfo = ForeignServerConnectionString(owner, serverid);
+	}
+	else
+	{
+		Assert(stmt->conninfo);
+
+		serverid = InvalidOid;
+		conninfo = stmt->conninfo;
+	}
+
 	/* Check the connection info string. */
 	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
 
+	publications = stmt->publication;
+
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -670,8 +699,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
-	values[Anum_pg_subscription_subconninfo - 1] =
-		CStringGetTextDatum(conninfo);
+	values[Anum_pg_subscription_subserver - 1] = serverid;
+	if (!OidIsValid(serverid))
+		values[Anum_pg_subscription_subconninfo - 1] =
+			CStringGetTextDatum(conninfo);
+	else
+		nulls[Anum_pg_subscription_subconninfo - 1] = true;
 	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
 			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -692,6 +725,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+	if (stmt->servername)
+	{
+		ObjectAddress referenced;
+
+		Assert(OidIsValid(serverid));
+
+		ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
@@ -809,8 +854,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
 	return myself;
@@ -1135,7 +1178,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
 					   stmt->subname);
 
-	sub = GetSubscription(subid, false);
+	/*
+	 * Skip ACL checks on the subscription's foreign server, if any. If
+	 * changing the server (or replacing it with a raw connection), then the
+	 * old one will be removed anyway. If changing something unrelated,
+	 * there's no need to do an additional ACL check here; that will be done
+	 * by the subscription worker anyway.
+	 */
+	sub = GetSubscription(subid, false, false);
 
 	/*
 	 * Don't allow non-superuser modification of a subscription with
@@ -1155,6 +1205,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
 	switch (stmt->kind)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1358,7 +1410,79 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SERVER:
+			{
+				ForeignServer *new_server;
+				ObjectAddress referenced;
+				AclResult	aclresult;
+				char	   *conninfo;
+
+				/*
+				 * Remove what was there before, either another foreign server
+				 * or a connection string.
+				 */
+				if (form->subserver)
+				{
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   ForeignServerRelationId, form->subserver);
+				}
+				else
+				{
+					nulls[Anum_pg_subscription_subconninfo - 1] = true;
+					replaces[Anum_pg_subscription_subconninfo - 1] = true;
+				}
+
+				/*
+				 * Find the new server and user mapping. Check ACL of server
+				 * based on current user ID, but find the user mapping based
+				 * on the subscription owner.
+				 */
+				new_server = GetForeignServerByName(stmt->servername, false);
+				aclresult = object_aclcheck(ForeignServerRelationId,
+											new_server->serverid,
+											form->subowner, ACL_USAGE);
+				if (aclresult != ACLCHECK_OK)
+					ereport(ERROR,
+							(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+							 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+									GetUserNameFromId(form->subowner, false),
+									ForeignServerName(new_server->serverid))));
+
+				/* make sure a user mapping exists */
+				GetUserMapping(form->subowner, new_server->serverid);
+
+				conninfo = ForeignServerConnectionString(form->subowner,
+														 new_server->serverid);
+
+				/* Load the library providing us libpq calls. */
+				load_file("libpqwalreceiver", false);
+				/* Check the connection info string. */
+				walrcv_check_conninfo(conninfo,
+									  sub->passwordrequired && !sub->ownersuperuser);
+
+				values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+
+				ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+				update_tuple = true;
+			}
+			break;
+
 		case ALTER_SUBSCRIPTION_CONNECTION:
+			/* remove reference to foreign server and dependencies, if present */
+			if (form->subserver)
+			{
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   ForeignServerRelationId, form->subserver);
+
+				values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+			}
+
 			/* Load the library providing us libpq calls. */
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
@@ -1609,8 +1733,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
 	/* Wake up related replication workers to handle this change quickly. */
@@ -1695,9 +1817,28 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	subname = pstrdup(NameStr(*DatumGetName(datum)));
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
-								   Anum_pg_subscription_subconninfo);
-	conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(form->subserver))
+	{
+		AclResult	aclresult;
+
+		aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
+									form->subowner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+							GetUserNameFromId(form->subowner, false),
+							ForeignServerName(form->subserver))));
+
+		conninfo = ForeignServerConnectionString(form->subowner,
+												 form->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+									   Anum_pg_subscription_subconninfo);
+		conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1796,6 +1937,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 
 	/* Clean up dependencies */
+	deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index f0835fc3070..24ce097d683 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -71,6 +71,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags)
 	fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
 	fdw->fdwhandler = fdwform->fdwhandler;
 	fdw->fdwvalidator = fdwform->fdwvalidator;
+	fdw->fdwconnection = fdwform->fdwconnection;
 
 	/* Extract the fdwoptions */
 	datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
@@ -175,6 +176,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
 }
 
 
+/*
+ * ForeignServerName - get name of foreign server.
+ */
+char *
+ForeignServerName(Oid serverid)
+{
+	Form_pg_foreign_server serverform;
+	char	   *servername;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
+
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for foreign server %u", serverid);
+
+	serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
+
+	servername = pstrdup(NameStr(serverform->srvname));
+
+	ReleaseSysCache(tp);
+
+	return servername;
+}
+
+
 /*
  * GetForeignServerByName - look up the foreign server definition by name.
  */
@@ -190,6 +216,46 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
 }
 
 
+/*
+ * Retrieve connection string from server's FDW.
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid)
+{
+	static MemoryContext tempContext = NULL;
+	MemoryContext oldcxt;
+	ForeignServer *server;
+	ForeignDataWrapper *fdw;
+	Datum		connection_datum;
+	text	   *connection_text;
+	char	   *result;
+
+	if (tempContext == NULL)
+	{
+		tempContext = AllocSetContextCreate(CurrentMemoryContext,
+											"temp context",
+											ALLOCSET_DEFAULT_SIZES);
+	}
+
+	oldcxt = MemoryContextSwitchTo(tempContext);
+
+	server = GetForeignServer(serverid);
+	fdw = GetForeignDataWrapper(server->fdwid);
+	connection_datum = OidFunctionCall2(fdw->fdwconnection,
+										ObjectIdGetDatum(userid),
+										ObjectIdGetDatum(serverid));
+	connection_text = DatumGetTextPP(connection_datum);
+
+	MemoryContextSwitchTo(oldcxt);
+
+	result = text_to_cstring(connection_text);
+
+	MemoryContextReset(tempContext);
+
+	return result;
+}
+
+
 /*
  * GetUserMapping - look up the user mapping.
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d99c9355c6..fafe351ca5c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -5437,6 +5437,8 @@ fdw_option:
 			| NO HANDLER						{ $$ = makeDefElem("handler", NULL, @1); }
 			| VALIDATOR handler_name			{ $$ = makeDefElem("validator", (Node *) $2, @1); }
 			| NO VALIDATOR						{ $$ = makeDefElem("validator", NULL, @1); }
+			| CONNECTION handler_name			{ $$ = makeDefElem("connection", (Node *) $2, @1); }
+			| NO CONNECTION						{ $$ = makeDefElem("connection", NULL, @1); }
 		;
 
 fdw_options:
@@ -10804,6 +10806,16 @@ CreateSubscriptionStmt:
 					n->options = $8;
 					$$ = (Node *) n;
 				}
+			| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+				{
+					CreateSubscriptionStmt *n =
+						makeNode(CreateSubscriptionStmt);
+					n->subname = $3;
+					n->servername = $5;
+					n->publication = $7;
+					n->options = $8;
+					$$ = (Node *) n;
+				}
 		;
 
 /*****************************************************************************
@@ -10833,6 +10845,16 @@ AlterSubscriptionStmt:
 					n->conninfo = $5;
 					$$ = (Node *) n;
 				}
+			| ALTER SUBSCRIPTION name SERVER name
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_SERVER;
+					n->subname = $3;
+					n->servername = $5;
+					$$ = (Node *) n;
+				}
 			| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 31ab69ea13a..99eae2dde24 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3987,7 +3987,7 @@ maybe_reread_subscription(void)
 	/* Ensure allocations in permanent context. */
 	oldctx = MemoryContextSwitchTo(ApplyContext);
 
-	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
+	newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
 
 	/*
 	 * Exit if the subscription was removed. This normally should not happen
@@ -4093,7 +4093,9 @@ maybe_reread_subscription(void)
 }
 
 /*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
  */
 static void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4675,7 +4677,7 @@ InitializeLogRepWorker(void)
 	StartTransactionCommand();
 	oldctx = MemoryContextSwitchTo(ApplyContext);
 
-	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
 	if (!MySubscription)
 	{
 		ereport(LOG,
@@ -4712,6 +4714,14 @@ InitializeLogRepWorker(void)
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+								  subscription_change_cb,
+								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(USERMAPPINGOID,
+								  subscription_change_cb,
+								  (Datum) 0);
 
 	CacheRegisterSyscacheCallback(AUTHOID,
 								  subscription_change_cb,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 4f4ad2ee150..a7a70535fa0 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4894,6 +4894,7 @@ getSubscriptions(Archive *fout)
 	int			i_subdisableonerr;
 	int			i_subpasswordrequired;
 	int			i_subrunasowner;
+	int			i_subservername;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4974,16 +4975,29 @@ getSubscriptions(Archive *fout)
 
 	if (fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query,
-							 " s.subfailover\n");
+							 " s.subfailover,\n");
 	else
 		appendPQExpBuffer(query,
-						  " false AS subfailover\n");
+						  " false AS subfailover,\n");
+
+	if (dopt->binary_upgrade && fout->remoteVersion >= 180000)
+		appendPQExpBufferStr(query, " fs.srvname AS subservername,\n"
+							 " o.remote_lsn AS suboriginremotelsn,\n"
+							 " s.subenabled,\n"
+							 " s.subfailover\n");
+	else
+		appendPQExpBufferStr(query, " NULL AS subservername,\n"
+							 " NULL AS suboriginremotelsn,\n"
+							 " false AS subenabled,\n"
+							 " false AS subfailover\n");
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n");
 
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query,
+							 "LEFT JOIN pg_catalog.pg_foreign_server fs \n"
+							 "    ON fs.oid = s.subserver \n"
 							 "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
 							 "    ON o.external_id = 'pg_' || s.oid::text \n");
 
@@ -5011,6 +5025,7 @@ getSubscriptions(Archive *fout)
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
 	i_subrunasowner = PQfnumber(res, "subrunasowner");
 	i_subfailover = PQfnumber(res, "subfailover");
+	i_subservername = PQfnumber(res, "subservername");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -5032,6 +5047,10 @@ getSubscriptions(Archive *fout)
 
 		subinfo[i].subenabled =
 			(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
+		if (PQgetisnull(res, i, i_subservername))
+			subinfo[i].subservername = NULL;
+		else
+			subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
 		subinfo[i].subbinary =
 			(strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
 		subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
@@ -5254,9 +5273,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
 					  qsubname);
 
-	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
 					  qsubname);
-	appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	if (subinfo->subservername)
+	{
+		appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+	}
+	else
+	{
+		appendPQExpBuffer(query, "CONNECTION ");
+		appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	}
 
 	/* Build list of quoted publications and append them to query. */
 	if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ca32f167878..6c553765ea1 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -703,6 +703,7 @@ typedef struct _SubscriptionInfo
 	bool		subpasswordrequired;
 	bool		subrunasowner;
 	bool		subfailover;
+	char	   *subservername;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 8432be641ac..b78680994a0 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -3704,7 +3704,7 @@ match_previous_words(int pattern_id,
 
 /* CREATE SUBSCRIPTION */
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
-		COMPLETE_WITH("CONNECTION");
+		COMPLETE_WITH("SERVER", "CONNECTION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
 		COMPLETE_WITH("PUBLICATION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/pg_foreign_data_wrapper.h b/src/include/catalog/pg_foreign_data_wrapper.h
index d03ab5a4f28..29eaba467b6 100644
--- a/src/include/catalog/pg_foreign_data_wrapper.h
+++ b/src/include/catalog/pg_foreign_data_wrapper.h
@@ -36,6 +36,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId)
 	Oid			fdwvalidator BKI_LOOKUP_OPT(pg_proc);	/* option validation
 														 * function, or 0 if
 														 * none */
+	Oid			fdwconnection BKI_LOOKUP_OPT(pg_proc);	/* connection string
+														 * function, or 0 if
+														 * none */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	aclitem		fdwacl[1];		/* access permissions */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 20fc329992d..93067ea9182 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -78,9 +78,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	Oid			subserver;		/* Set if connecting with server */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
-	text		subconninfo BKI_FORCE_NOT_NULL;
+	text		subconninfo;	/* Set if connecting with connection string */
 
 	/* Slot name on publisher */
 	NameData	subslotname BKI_FORCE_NULL;
@@ -178,7 +180,8 @@ typedef struct Subscription
 
 #endif							/* EXPOSE_TO_CLIENT_CODE */
 
-extern Subscription *GetSubscription(Oid subid, bool missing_ok);
+extern Subscription *GetSubscription(Oid subid, bool missing_ok,
+									 bool aclcheck);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
 
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 7e9decd2537..a7e6cf0226a 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper
 	char	   *fdwname;		/* Name of the FDW */
 	Oid			fdwhandler;		/* Oid of handler function, or 0 */
 	Oid			fdwvalidator;	/* Oid of validator function, or 0 */
+	Oid			fdwconnection;	/* Oid of connection string function, or 0 */
 	List	   *options;		/* fdwoptions as DefElem list */
 } ForeignDataWrapper;
 
@@ -65,10 +66,12 @@ typedef struct ForeignTable
 
 
 extern ForeignServer *GetForeignServer(Oid serverid);
+extern char *ForeignServerName(Oid serverid);
 extern ForeignServer *GetForeignServerExtended(Oid serverid,
 											   bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
 											 bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
 extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
 extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
 extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 0b208f51bdd..8291d958da5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4282,6 +4282,7 @@ typedef struct CreateSubscriptionStmt
 {
 	NodeTag		type;
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
@@ -4290,6 +4291,7 @@ typedef struct CreateSubscriptionStmt
 typedef enum AlterSubscriptionType
 {
 	ALTER_SUBSCRIPTION_OPTIONS,
+	ALTER_SUBSCRIPTION_SERVER,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4304,6 +4306,7 @@ typedef struct AlterSubscriptionStmt
 	NodeTag		type;
 	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be3..59c64126bdc 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -224,6 +224,7 @@ NOTICE:  checking pg_extension {extconfig} => pg_class {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid}
+NOTICE:  checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid}
 NOTICE:  checking pg_foreign_server {srvowner} => pg_authid {oid}
 NOTICE:  checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid}
 NOTICE:  checking pg_user_mapping {umuser} => pg_authid {oid}
-- 
2.34.1

Reply via email to