On Wed, 2024-01-31 at 11:10 +0530, Ashutosh Bapat wrote:
> > I like the idea -- it further decouples the logic from the core
> > server.
> > I suspect it will make postgres_fdw the primary way (though not the
> > only possible way) to use this feature. There would be little need
> > to
> > create a new builtin FDW to make this work.
> 
> That's what I see as well. I am glad that we are on the same page.

Implemented in v11, attached.

Is this what you had in mind? It leaves a lot of the work to
postgres_fdw and it's almost unusable without postgres_fdw.

That's not a bad thing, but it makes the core functionality a bit
harder to test standalone. I can work on the core tests some more. The
postgres_fdw tests passed without modification, though, and offer a
simple example of how to use it.

Regards,
        Jeff Davis

From 88fa1333ace4d15d72534d20d2cccb37748277f2 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Tue, 2 Jan 2024 13:42:48 -0800
Subject: [PATCH v11] CREATE SUSBCRIPTION ... SERVER.

Allow specifying a foreign server for CREATE SUBSCRIPTION, rather than
a raw connection string with CONNECTION.

Using a foreign server as a layer of indirection improves management
of multiple subscriptions to the same server. It also provides
integration with user mappings in case different subscriptions have
different owners or a subscription changes owners.

Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.ca...@j-davis.com
Reviewed-by: Ashutosh Bapat
---
 contrib/postgres_fdw/Makefile                 |   4 +-
 contrib/postgres_fdw/connection.c             |  74 ++++++++
 .../postgres_fdw/expected/postgres_fdw.out    |   8 +
 contrib/postgres_fdw/meson.build              |   6 +
 .../postgres_fdw/postgres_fdw--1.1--1.2.sql   |  11 ++
 contrib/postgres_fdw/postgres_fdw.control     |   2 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   7 +
 contrib/postgres_fdw/t/010_subscription.pl    |  71 ++++++++
 doc/src/sgml/ref/alter_subscription.sgml      |  18 +-
 doc/src/sgml/ref/create_subscription.sgml     |  11 +-
 src/backend/catalog/pg_subscription.c         |  38 +++-
 src/backend/commands/foreigncmds.c            |  57 +++++-
 src/backend/commands/subscriptioncmds.c       | 167 ++++++++++++++++--
 src/backend/foreign/foreign.c                 |  42 +++++
 src/backend/parser/gram.y                     |  22 +++
 src/backend/replication/logical/worker.c      |  16 +-
 src/bin/pg_dump/pg_dump.c                     |  27 ++-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/tab-complete.c                   |   2 +-
 src/include/catalog/pg_foreign_data_wrapper.h |   3 +
 src/include/catalog/pg_subscription.h         |   7 +-
 src/include/foreign/foreign.h                 |   3 +
 src/include/nodes/parsenodes.h                |   3 +
 src/test/regress/expected/oidjoins.out        |   1 +
 24 files changed, 563 insertions(+), 38 deletions(-)
 create mode 100644 contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
 create mode 100644 contrib/postgres_fdw/t/010_subscription.pl

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index c1b0cad453..995a30c297 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -14,10 +14,12 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
 SHLIB_LINK_INTERNAL = $(libpq)
 
 EXTENSION = postgres_fdw
-DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
+DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql
 
 REGRESS = postgres_fdw
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4931ebf591..a011e6df5f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -113,6 +113,7 @@ static uint32 pgfdw_we_get_result = 0;
 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
+PG_FUNCTION_INFO_V1(postgres_fdw_connection);
 
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -1972,6 +1973,79 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
 	}
 }
 
+/*
+ * Values in connection strings must be enclosed in single quotes. Single
+ * quotes and backslashes must be escaped with backslash. NB: these rules are
+ * different from the rules for escaping a SQL literal.
+ */
+static void
+appendEscapedValue(StringInfo str, const char *val)
+{
+	appendStringInfoChar(str, '\'');
+	for (int i = 0; val[i] != '\0'; i++)
+	{
+		if (val[i] == '\\' || val[i] == '\'')
+			appendStringInfoChar(str, '\\');
+		appendStringInfoChar(str, val[i]);
+	}
+	appendStringInfoChar(str, '\'');
+}
+
+Datum
+postgres_fdw_connection(PG_FUNCTION_ARGS)
+{
+	/* TODO: consider memory usage */
+	Oid userid = PG_GETARG_OID(0);
+	Oid serverid = PG_GETARG_OID(1);
+	ForeignServer *server = GetForeignServer(serverid);
+	UserMapping *user = GetUserMapping(userid, serverid);
+	StringInfoData str;
+	const char **keywords;
+	const char **values;
+	int			n;
+
+	/*
+	 * Construct connection params from generic options of ForeignServer
+	 * and UserMapping.  (Some of them might not be libpq options, in
+	 * which case we'll just waste a few array slots.)  Add 4 extra slots
+	 * for application_name, fallback_application_name, client_encoding,
+	 * end marker.
+	 */
+	n = list_length(server->options) + list_length(user->options) + 4;
+	keywords = (const char **) palloc(n * sizeof(char *));
+	values = (const char **) palloc(n * sizeof(char *));
+
+	n = 0;
+	n += ExtractConnectionOptions(server->options,
+								  keywords + n, values + n);
+	n += ExtractConnectionOptions(user->options,
+								  keywords + n, values + n);
+
+	/* Set client_encoding so that libpq can convert encoding properly. */
+	keywords[n] = "client_encoding";
+	values[n] = GetDatabaseEncodingName();
+	n++;
+
+	keywords[n] = values[n] = NULL;
+
+	/* verify the set of connection parameters */
+	check_conn_params(keywords, values, user);
+
+	initStringInfo(&str);
+	for (int i = 0; i < n; i++)
+	{
+		char *sep = "";
+
+		appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
+		appendEscapedValue(&str, values[i]);
+		sep = " ";
+	}
+
+	pfree(keywords);
+	pfree(values);
+	PG_RETURN_TEXT_P(cstring_to_text(str.data));
+}
+
 /*
  * List active foreign server connections.
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c355e8f3f7..617e2cf5dc 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -256,6 +256,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 -- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+-- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
 CREATE PUBLICATION testpub_ftbl FOR TABLE ft1;  -- should fail
diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build
index 2b86d8a6ee..92cb3c5556 100644
--- a/contrib/postgres_fdw/meson.build
+++ b/contrib/postgres_fdw/meson.build
@@ -26,6 +26,7 @@ install_data(
   'postgres_fdw.control',
   'postgres_fdw--1.0.sql',
   'postgres_fdw--1.0--1.1.sql',
+  'postgres_fdw--1.1--1.2.sql',
   kwargs: contrib_data_args,
 )
 
@@ -39,4 +40,9 @@ tests += {
     ],
     'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
   },
+  'tap': {
+    'tests': [
+      't/010_subscription.pl',
+    ],
+  },
 }
diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
new file mode 100644
index 0000000000..468d8f0d7d
--- /dev/null
+++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql
@@ -0,0 +1,11 @@
+/* contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.2'" to load this file. \quit
+
+CREATE FUNCTION postgres_fdw_connection(oid, oid)
+RETURNS text
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT;
+
+ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection;
diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control
index d489382064..a4b800be4f 100644
--- a/contrib/postgres_fdw/postgres_fdw.control
+++ b/contrib/postgres_fdw/postgres_fdw.control
@@ -1,5 +1,5 @@
 # postgres_fdw extension
 comment = 'foreign-data wrapper for remote PostgreSQL servers'
-default_version = '1.1'
+default_version = '1.2'
 module_pathname = '$libdir/postgres_fdw'
 relocatable = true
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 812e7646e1..8520094fc9 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -248,6 +248,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 
+-- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+
 -- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl
new file mode 100644
index 0000000000..a39e8fdbba
--- /dev/null
+++ b/contrib/postgres_fdw/t/010_subscription.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
+
+# Replicate the changes without columns
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_no_col default VALUES");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
+
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1050), 'check initial data was copied to subscriber');
+
+done_testing();
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index e9e6d9d74a..12d8855aef 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -94,13 +95,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-altersubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the foreign server
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-altersubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
      <para>
-      This clause replaces the connection string originally set by
-      <xref linkend="sql-createsubscription"/>.  See there for more
-      information.
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the connection
+      string <replaceable>conninfo</replaceable>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 15794731bb..233e87d5ea 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
-    CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+    { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
     PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
     [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createsubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      A foreign server to use for the connection.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createsubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 9efc9159f2..b5ba0aa953 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -19,11 +19,14 @@
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "storage/lmgr.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -38,7 +41,7 @@ static List *textarray_to_stringlist(ArrayType *textarray);
  * Fetch the subscription from the syscache.
  */
 Subscription *
-GetSubscription(Oid subid, bool missing_ok)
+GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
 {
 	HeapTuple	tup;
 	Subscription *sub;
@@ -74,10 +77,35 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->failover = subform->subfailover;
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
-								   tup,
-								   Anum_pg_subscription_subconninfo);
-	sub->conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(subform->subserver))
+	{
+		AclResult	aclresult;
+
+		/* recheck ACL if requested */
+		if (aclcheck)
+		{
+			aclresult = object_aclcheck(ForeignServerRelationId,
+										subform->subserver,
+										subform->subowner, ACL_USAGE);
+
+			if (aclresult != ACLCHECK_OK)
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+								GetUserNameFromId(subform->subowner, false),
+								ForeignServerName(subform->subserver))));
+		}
+
+		sub->conninfo = ForeignServerConnectionString(subform->subowner,
+													  subform->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+									   tup,
+									   Anum_pg_subscription_subconninfo);
+		sub->conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index cf61bbac1f..357918be11 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -511,21 +511,52 @@ lookup_fdw_validator_func(DefElem *validator)
 	/* validator's return value is ignored, so we don't check the type */
 }
 
+/*
+ * Convert a connection string function name passed from the parser to an Oid.
+ */
+static Oid
+lookup_fdw_connection_func(DefElem *connection)
+{
+	Oid			connectionOid;
+	Oid			funcargtypes[2];
+
+	if (connection == NULL || connection->arg == NULL)
+		return InvalidOid;
+
+	/* connection string functions take user oid, server oid */
+	funcargtypes[0] = OIDOID;
+	funcargtypes[1] = OIDOID;
+
+	connectionOid = LookupFuncName((List *) connection->arg, 2, funcargtypes, false);
+
+	/* check that connection string function has correct return type */
+	if (get_func_rettype(connectionOid) != TEXTOID)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("function %s must return type %s",
+						NameListToString((List *) connection->arg), "text")));
+
+	return connectionOid;
+}
+
 /*
  * Process function options of CREATE/ALTER FDW
  */
 static void
 parse_func_options(ParseState *pstate, List *func_options,
 				   bool *handler_given, Oid *fdwhandler,
-				   bool *validator_given, Oid *fdwvalidator)
+				   bool *validator_given, Oid *fdwvalidator,
+				   bool *connection_given, Oid *fdwconnection)
 {
 	ListCell   *cell;
 
 	*handler_given = false;
 	*validator_given = false;
+	*connection_given = false;
 	/* return InvalidOid if not given */
 	*fdwhandler = InvalidOid;
 	*fdwvalidator = InvalidOid;
+	*fdwconnection = InvalidOid;
 
 	foreach(cell, func_options)
 	{
@@ -545,6 +576,13 @@ parse_func_options(ParseState *pstate, List *func_options,
 			*validator_given = true;
 			*fdwvalidator = lookup_fdw_validator_func(def);
 		}
+		else if (strcmp(def->defname, "connection") == 0)
+		{
+			if (*connection_given)
+				errorConflictingDefElem(def, pstate);
+			*connection_given = true;
+			*fdwconnection = lookup_fdw_connection_func(def);
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 def->defname);
@@ -564,8 +602,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
 	Oid			fdwId;
 	bool		handler_given;
 	bool		validator_given;
+	bool		connection_given;
 	Oid			fdwhandler;
 	Oid			fdwvalidator;
+	Oid			fdwconnection;
 	Datum		fdwoptions;
 	Oid			ownerId;
 	ObjectAddress myself;
@@ -609,10 +649,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
 	/* Lookup handler and validator functions, if given */
 	parse_func_options(pstate, stmt->func_options,
 					   &handler_given, &fdwhandler,
-					   &validator_given, &fdwvalidator);
+					   &validator_given, &fdwvalidator,
+					   &connection_given, &fdwconnection);
 
 	values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler);
 	values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
+	values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
 
 	nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
 
@@ -684,8 +726,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
 	Datum		datum;
 	bool		handler_given;
 	bool		validator_given;
+	bool		connection_given;
 	Oid			fdwhandler;
 	Oid			fdwvalidator;
+	Oid			fdwconnection;
 	ObjectAddress myself;
 
 	rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
@@ -715,7 +759,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
 
 	parse_func_options(pstate, stmt->func_options,
 					   &handler_given, &fdwhandler,
-					   &validator_given, &fdwvalidator);
+					   &validator_given, &fdwvalidator,
+					   &connection_given, &fdwconnection);
 
 	if (handler_given)
 	{
@@ -753,6 +798,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
 		fdwvalidator = fdwForm->fdwvalidator;
 	}
 
+	if (connection_given)
+	{
+		repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
+		repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true;
+	}
+
 	/*
 	 * If options specified, validate and update.
 	 */
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index a05d69922d..304a46efec 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -25,14 +25,17 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
 #include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -594,6 +597,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
+	Oid			serverid;
 	char	   *conninfo;
 	char		originname[NAMEDATALEN];
 	List	   *publications;
@@ -686,15 +690,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.synchronous_commit == NULL)
 		opts.synchronous_commit = "off";
 
-	conninfo = stmt->conninfo;
-	publications = stmt->publication;
-
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
+	if (stmt->servername)
+	{
+		ForeignServer	*server;
+
+		Assert(!stmt->conninfo);
+		conninfo = NULL;
+
+		server = GetForeignServerByName(stmt->servername, false);
+		aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+		/* make sure a user mapping exists */
+		GetUserMapping(owner, server->serverid);
+
+		serverid = server->serverid;
+		conninfo = ForeignServerConnectionString(owner, serverid);
+	}
+	else
+	{
+		Assert(stmt->conninfo);
+
+		serverid = InvalidOid;
+		conninfo = stmt->conninfo;
+	}
+
 	/* Check the connection info string. */
 	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
 
+	publications = stmt->publication;
+
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -718,8 +747,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
-	values[Anum_pg_subscription_subconninfo - 1] =
-		CStringGetTextDatum(conninfo);
+	values[Anum_pg_subscription_subserver - 1] = serverid;
+	if (!OidIsValid(serverid))
+		values[Anum_pg_subscription_subconninfo - 1] =
+			CStringGetTextDatum(conninfo);
+	else
+		nulls[Anum_pg_subscription_subconninfo - 1] = true;
 	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
 			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -740,6 +773,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+	if (stmt->servername)
+	{
+		ObjectAddress referenced;
+		Assert(OidIsValid(serverid));
+
+		ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
@@ -871,8 +915,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
 	return myself;
@@ -1140,7 +1182,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
 					   stmt->subname);
 
-	sub = GetSubscription(subid, false);
+	/*
+	 * Skip ACL checks on the subscription's foreign server, if any. If
+	 * changing the server (or replacing it with a raw connection), then the
+	 * old one will be removed anyway. If changing something unrelated,
+	 * there's no need to do an additional ACL check here; that will be done
+	 * by the subscription worker anyway.
+	 */
+	sub = GetSubscription(subid, false, false);
 
 	/*
 	 * Don't allow non-superuser modification of a subscription with
@@ -1160,6 +1209,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
 	switch (stmt->kind)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1306,7 +1357,79 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SERVER:
+			{
+				ForeignServer	*new_server;
+				ObjectAddress	 referenced;
+				AclResult		 aclresult;
+				char			*conninfo;
+
+				/*
+				 * Remove what was there before, either another foreign server
+				 * or a connection string.
+				 */
+				if (form->subserver)
+				{
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   ForeignServerRelationId, form->subserver);
+				}
+				else
+				{
+					nulls[Anum_pg_subscription_subconninfo - 1] = true;
+					replaces[Anum_pg_subscription_subconninfo - 1] = true;
+				}
+
+				/*
+				 * Find the new server and user mapping. Check ACL of server
+				 * based on current user ID, but find the user mapping based
+				 * on the subscription owner.
+				 */
+				new_server = GetForeignServerByName(stmt->servername, false);
+				aclresult = object_aclcheck(ForeignServerRelationId,
+											new_server->serverid,
+											form->subowner, ACL_USAGE);
+				if (aclresult != ACLCHECK_OK)
+					ereport(ERROR,
+							(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+							 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+									GetUserNameFromId(form->subowner, false),
+									ForeignServerName(new_server->serverid))));
+
+				/* make sure a user mapping exists */
+				GetUserMapping(form->subowner, new_server->serverid);
+
+				conninfo = ForeignServerConnectionString(form->subowner,
+														 new_server->serverid);
+
+				/* Load the library providing us libpq calls. */
+				load_file("libpqwalreceiver", false);
+				/* Check the connection info string. */
+				walrcv_check_conninfo(conninfo,
+									  sub->passwordrequired && !sub->ownersuperuser);
+
+				values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+
+				ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+				update_tuple = true;
+			}
+			break;
+
 		case ALTER_SUBSCRIPTION_CONNECTION:
+			/* remove reference to foreign server and dependencies, if present */
+			if (form->subserver)
+			{
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   ForeignServerRelationId, form->subserver);
+
+				values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+			}
+
 			/* Load the library providing us libpq calls. */
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
@@ -1553,8 +1676,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
 	/* Wake up related replication workers to handle this change quickly. */
@@ -1639,9 +1760,28 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	subname = pstrdup(NameStr(*DatumGetName(datum)));
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
-								   Anum_pg_subscription_subconninfo);
-	conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(form->subserver))
+	{
+		AclResult aclresult;
+
+		aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
+									form->subowner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+							GetUserNameFromId(form->subowner, false),
+							ForeignServerName(form->subserver))));
+
+		conninfo = ForeignServerConnectionString(form->subowner,
+												 form->subserver);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+									   Anum_pg_subscription_subconninfo);
+		conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1742,6 +1882,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 
 	/* Clean up dependencies */
+	deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index f4f35728b4..fdd7ee3ad9 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -70,6 +70,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags)
 	fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
 	fdw->fdwhandler = fdwform->fdwhandler;
 	fdw->fdwvalidator = fdwform->fdwvalidator;
+	fdw->fdwconnection = fdwform->fdwconnection;
 
 	/* Extract the fdwoptions */
 	datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
@@ -174,6 +175,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
 }
 
 
+/*
+ * ForeignServerName - get name of foreign server.
+ */
+char *
+ForeignServerName(Oid serverid)
+{
+	Form_pg_foreign_server serverform;
+	char *servername;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
+
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for foreign server %u", serverid);
+
+	serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
+
+	servername = pstrdup(NameStr(serverform->srvname));
+
+	ReleaseSysCache(tp);
+
+	return servername;
+}
+
+
 /*
  * GetForeignServerByName - look up the foreign server definition by name.
  */
@@ -189,6 +215,22 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
 }
 
 
+/*
+ * Retrieve connection string from server's FDW.
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid)
+{
+	/* TODO: clean up memory */
+	ForeignServer *server = GetForeignServer(serverid);
+	ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid);
+	Datum connection_text = OidFunctionCall2(fdw->fdwconnection,
+											 ObjectIdGetDatum(userid),
+											 ObjectIdGetDatum(serverid));
+	return text_to_cstring(DatumGetTextPP(connection_text));
+}
+
+
 /*
  * GetUserMapping - look up the user mapping.
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c6e2f679fd..04aabf08d1 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -5293,6 +5293,8 @@ fdw_option:
 			| NO HANDLER						{ $$ = makeDefElem("handler", NULL, @1); }
 			| VALIDATOR handler_name			{ $$ = makeDefElem("validator", (Node *) $2, @1); }
 			| NO VALIDATOR						{ $$ = makeDefElem("validator", NULL, @1); }
+			| CONNECTION handler_name			{ $$ = makeDefElem("connection", (Node *) $2, @1); }
+			| NO CONNECTION						{ $$ = makeDefElem("connection", NULL, @1); }
 		;
 
 fdw_options:
@@ -10667,6 +10669,16 @@ CreateSubscriptionStmt:
 					n->options = $8;
 					$$ = (Node *) n;
 				}
+			| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+				{
+					CreateSubscriptionStmt *n =
+						makeNode(CreateSubscriptionStmt);
+					n->subname = $3;
+					n->servername = $5;
+					n->publication = $7;
+					n->options = $8;
+					$$ = (Node *) n;
+				}
 		;
 
 /*****************************************************************************
@@ -10696,6 +10708,16 @@ AlterSubscriptionStmt:
 					n->conninfo = $5;
 					$$ = (Node *) n;
 				}
+			| ALTER SUBSCRIPTION name SERVER name
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_SERVER;
+					n->subname = $3;
+					n->servername = $5;
+					$$ = (Node *) n;
+				}
 			| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..3725c53d8b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3879,7 +3879,7 @@ maybe_reread_subscription(void)
 	/* Ensure allocations in permanent context. */
 	oldctx = MemoryContextSwitchTo(ApplyContext);
 
-	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
+	newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
 
 	/*
 	 * Exit if the subscription was removed. This normally should not happen
@@ -3985,7 +3985,9 @@ maybe_reread_subscription(void)
 }
 
 /*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
  */
 static void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4584,7 +4586,7 @@ InitializeLogRepWorker(void)
 	StartTransactionCommand();
 	oldctx = MemoryContextSwitchTo(ApplyContext);
 
-	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
 	if (!MySubscription)
 	{
 		ereport(LOG,
@@ -4621,6 +4623,14 @@ InitializeLogRepWorker(void)
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+								  subscription_change_cb,
+								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(USERMAPPINGOID,
+								  subscription_change_cb,
+								  (Datum) 0);
 
 	CacheRegisterSyscacheCallback(AUTHOID,
 								  subscription_change_cb,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 10cbf02beb..ac0e9f98df 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4636,6 +4636,7 @@ getSubscriptions(Archive *fout)
 	int			i_subdisableonerr;
 	int			i_subpasswordrequired;
 	int			i_subrunasowner;
+	int			i_subservername;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4708,11 +4709,13 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_ORIGIN_ANY);
 
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
-		appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
+		appendPQExpBufferStr(query, " fs.srvname AS subservername,\n"
+							 " o.remote_lsn AS suboriginremotelsn,\n"
 							 " s.subenabled,\n"
 							 " s.subfailover\n");
 	else
-		appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n"
+		appendPQExpBufferStr(query, " NULL AS subservername,\n"
+							 " NULL AS suboriginremotelsn,\n"
 							 " false AS subenabled,\n"
 							 " false AS subfailover\n");
 
@@ -4721,6 +4724,8 @@ getSubscriptions(Archive *fout)
 
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query,
+							 "LEFT JOIN pg_catalog.pg_foreign_server fs \n"
+							 "    ON fs.oid = s.subserver \n"
 							 "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
 							 "    ON o.external_id = 'pg_' || s.oid::text \n");
 
@@ -4746,6 +4751,7 @@ getSubscriptions(Archive *fout)
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
 	i_subrunasowner = PQfnumber(res, "subrunasowner");
+	i_subservername = PQfnumber(res, "subservername");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -4766,7 +4772,10 @@ getSubscriptions(Archive *fout)
 		AssignDumpId(&subinfo[i].dobj);
 		subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
 		subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
-
+		if (PQgetisnull(res, i, i_subservername))
+			subinfo[i].subservername = NULL;
+		else
+			subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
 		subinfo[i].subbinary =
 			pg_strdup(PQgetvalue(res, i, i_subbinary));
 		subinfo[i].substream =
@@ -4994,9 +5003,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
 					  qsubname);
 
-	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
 					  qsubname);
-	appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	if (subinfo->subservername)
+	{
+		appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+	}
+	else
+	{
+		appendPQExpBuffer(query, "CONNECTION ");
+		appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	}
 
 	/* Build list of quoted publications and append them to query. */
 	if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 9bc93520b4..6fdf63688c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -660,6 +660,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *subpasswordrequired;
 	char	   *subrunasowner;
+	char	   *subservername;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 73133ce735..b3ac86890a 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3329,7 +3329,7 @@ psql_completion(const char *text, int start, int end)
 
 /* CREATE SUBSCRIPTION */
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
-		COMPLETE_WITH("CONNECTION");
+		COMPLETE_WITH("SERVER", "CONNECTION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
 		COMPLETE_WITH("PUBLICATION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/pg_foreign_data_wrapper.h b/src/include/catalog/pg_foreign_data_wrapper.h
index 0d8759d3fd..700d6eed65 100644
--- a/src/include/catalog/pg_foreign_data_wrapper.h
+++ b/src/include/catalog/pg_foreign_data_wrapper.h
@@ -36,6 +36,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId)
 	Oid			fdwvalidator BKI_LOOKUP_OPT(pg_proc);	/* option validation
 														 * function, or 0 if
 														 * none */
+	Oid			fdwconnection BKI_LOOKUP_OPT(pg_proc);	/* connection string
+														 * function, or 0 if
+														 * none */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	aclitem		fdwacl[1];		/* access permissions */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..b84c25d55e 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,9 +98,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	Oid			subserver;		/* Set if connecting with server */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
-	text		subconninfo BKI_FORCE_NOT_NULL;
+	text		subconninfo;	/* Set if connecting with connection string */
 
 	/* Slot name on publisher */
 	NameData	subslotname BKI_FORCE_NULL;
@@ -174,7 +176,8 @@ typedef struct Subscription
  */
 #define LOGICALREP_STREAM_PARALLEL 'p'
 
-extern Subscription *GetSubscription(Oid subid, bool missing_ok);
+extern Subscription *GetSubscription(Oid subid, bool missing_ok,
+									 bool aclcheck);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
 
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 82b8153100..b4025e7f1e 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper
 	char	   *fdwname;		/* Name of the FDW */
 	Oid			fdwhandler;		/* Oid of handler function, or 0 */
 	Oid			fdwvalidator;	/* Oid of validator function, or 0 */
+	Oid			fdwconnection;	/* Oid of connection string function, or 0 */
 	List	   *options;		/* fdwoptions as DefElem list */
 } ForeignDataWrapper;
 
@@ -65,10 +66,12 @@ typedef struct ForeignTable
 
 
 extern ForeignServer *GetForeignServer(Oid serverid);
+extern char *ForeignServerName(Oid serverid);
 extern ForeignServer *GetForeignServerExtended(Oid serverid,
 											   bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
 											 bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
 extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
 extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
 extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2380821600..90b203dc60 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4042,6 +4042,7 @@ typedef struct CreateSubscriptionStmt
 {
 	NodeTag		type;
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
@@ -4050,6 +4051,7 @@ typedef struct CreateSubscriptionStmt
 typedef enum AlterSubscriptionType
 {
 	ALTER_SUBSCRIPTION_OPTIONS,
+	ALTER_SUBSCRIPTION_SERVER,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4064,6 +4066,7 @@ typedef struct AlterSubscriptionStmt
 	NodeTag		type;
 	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be..59c64126bd 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -224,6 +224,7 @@ NOTICE:  checking pg_extension {extconfig} => pg_class {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid}
+NOTICE:  checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid}
 NOTICE:  checking pg_foreign_server {srvowner} => pg_authid {oid}
 NOTICE:  checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid}
 NOTICE:  checking pg_user_mapping {umuser} => pg_authid {oid}
-- 
2.34.1

Reply via email to