On Mon, 2024-12-16 at 20:05 -0800, Jeff Davis wrote: > On Wed, 2024-10-30 at 08:08 -0700, Jeff Davis wrote: >
Rebased v14. The approach has changed multiple times. It starte off with more in- core code, but in response to review feedback, has become more decoupled from core and more coupled to postgres_fdw. But the patch has been about the same (just rebases) since March of last year, and hasn't gotten feedback since. I still think it's a nice feature, but I'd like some feedback on the externals of the feature. As a note, this will require a version bump for postgres_fdw for the new connection method. Regards, Jeff Davis
From e63b42acfb4d4d8241b4453520a7fe52195c0f99 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Tue, 2 Jan 2024 13:42:48 -0800 Subject: [PATCH v14] CREATE SUSBCRIPTION ... SERVER. Allow specifying a foreign server for CREATE SUBSCRIPTION, rather than a raw connection string with CONNECTION. Using a foreign server as a layer of indirection improves management of multiple subscriptions to the same server. It also provides integration with user mappings in case different subscriptions have different owners or a subscription changes owners. Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.ca...@j-davis.com Reviewed-by: Ashutosh Bapat --- contrib/postgres_fdw/Makefile | 2 + contrib/postgres_fdw/connection.c | 73 ++++++++ .../postgres_fdw/expected/postgres_fdw.out | 8 + contrib/postgres_fdw/meson.build | 1 + .../postgres_fdw/postgres_fdw--1.1--1.2.sql | 8 + contrib/postgres_fdw/sql/postgres_fdw.sql | 7 + contrib/postgres_fdw/t/010_subscription.pl | 71 ++++++++ doc/src/sgml/ref/alter_subscription.sgml | 18 +- doc/src/sgml/ref/create_subscription.sgml | 11 +- src/backend/catalog/pg_subscription.c | 38 +++- src/backend/commands/foreigncmds.c | 58 +++++- src/backend/commands/subscriptioncmds.c | 168 ++++++++++++++++-- src/backend/foreign/foreign.c | 66 +++++++ src/backend/parser/gram.y | 22 +++ src/backend/replication/logical/worker.c | 16 +- src/bin/pg_dump/pg_dump.c | 35 +++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/tab-complete.in.c | 2 +- src/include/catalog/pg_foreign_data_wrapper.h | 3 + src/include/catalog/pg_subscription.h | 7 +- src/include/foreign/foreign.h | 3 + src/include/nodes/parsenodes.h | 3 + src/test/regress/expected/oidjoins.out | 1 + 23 files changed, 587 insertions(+), 35 deletions(-) create mode 100644 contrib/postgres_fdw/t/010_subscription.pl diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index adfbd2ef758..59b805656c1 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -19,6 +19,8 @@ DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.s REGRESS = postgres_fdw query_cancel TAP_TESTS = 1 +TAP_TESTS = 1 + ifdef USE_PGXS PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 8a8d3b4481f..961368a919a 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -131,6 +131,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections); PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all); +PG_FUNCTION_INFO_V1(postgres_fdw_connection); /* prototypes of private functions */ static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); @@ -2279,6 +2280,78 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, } } +/* + * Values in connection strings must be enclosed in single quotes. Single + * quotes and backslashes must be escaped with backslash. NB: these rules are + * different from the rules for escaping a SQL literal. + */ +static void +appendEscapedValue(StringInfo str, const char *val) +{ + appendStringInfoChar(str, '\''); + for (int i = 0; val[i] != '\0'; i++) + { + if (val[i] == '\\' || val[i] == '\'') + appendStringInfoChar(str, '\\'); + appendStringInfoChar(str, val[i]); + } + appendStringInfoChar(str, '\''); +} + +Datum +postgres_fdw_connection(PG_FUNCTION_ARGS) +{ + Oid userid = PG_GETARG_OID(0); + Oid serverid = PG_GETARG_OID(1); + ForeignServer *server = GetForeignServer(serverid); + UserMapping *user = GetUserMapping(userid, serverid); + StringInfoData str; + const char **keywords; + const char **values; + int n; + + /* + * Construct connection params from generic options of ForeignServer and + * UserMapping. (Some of them might not be libpq options, in which case + * we'll just waste a few array slots.) Add 4 extra slots for + * application_name, fallback_application_name, client_encoding, end + * marker. + */ + n = list_length(server->options) + list_length(user->options) + 4; + keywords = (const char **) palloc(n * sizeof(char *)); + values = (const char **) palloc(n * sizeof(char *)); + + n = 0; + n += ExtractConnectionOptions(server->options, + keywords + n, values + n); + n += ExtractConnectionOptions(user->options, + keywords + n, values + n); + + /* Set client_encoding so that libpq can convert encoding properly. */ + keywords[n] = "client_encoding"; + values[n] = GetDatabaseEncodingName(); + n++; + + keywords[n] = values[n] = NULL; + + /* verify the set of connection parameters */ + check_conn_params(keywords, values, user); + + initStringInfo(&str); + for (int i = 0; i < n; i++) + { + char *sep = ""; + + appendStringInfo(&str, "%s%s = ", sep, keywords[i]); + appendEscapedValue(&str, values[i]); + sep = " "; + } + + pfree(keywords); + pfree(values); + PG_RETURN_TEXT_P(cstring_to_text(str.data)); +} + /* * List active foreign server connections. * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 8447b289cb7..61a7e3455a8 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -256,6 +256,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again ANALYZE ft1; ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); -- =================================================================== +-- test subscription +-- =================================================================== +CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1 + PUBLICATION pub1 WITH (slot_name = NONE, connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +DROP SUBSCRIPTION regress_pgfdw_subscription; +-- =================================================================== -- test error case for create publication on foreign table -- =================================================================== CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build index 8b29be24dee..33f98ab86f2 100644 --- a/contrib/postgres_fdw/meson.build +++ b/contrib/postgres_fdw/meson.build @@ -44,6 +44,7 @@ tests += { 'tap': { 'tests': [ 't/001_auth_scram.pl', + 't/010_subscription.pl', ], }, } diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql index 81aad4fcdaa..8981787d165 100644 --- a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql +++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql @@ -16,3 +16,11 @@ CREATE FUNCTION postgres_fdw_get_connections ( RETURNS SETOF record AS 'MODULE_PATHNAME', 'postgres_fdw_get_connections_1_2' LANGUAGE C STRICT PARALLEL RESTRICTED; + +-- takes internal parameter to prevent calling from SQL +CREATE FUNCTION postgres_fdw_connection(oid, oid, internal) +RETURNS text +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 1598d9e0862..f733b580859 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -248,6 +248,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again ANALYZE ft1; ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); +-- =================================================================== +-- test subscription +-- =================================================================== +CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1 + PUBLICATION pub1 WITH (slot_name = NONE, connect = false); +DROP SUBSCRIPTION regress_pgfdw_subscription; + -- =================================================================== -- test error case for create publication on foreign table -- =================================================================== diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl new file mode 100644 index 00000000000..a39e8fdbba4 --- /dev/null +++ b/contrib/postgres_fdw/t/010_subscription.pl @@ -0,0 +1,71 @@ + +# Copyright (c) 2021-2024, PostgreSQL Global Development Group + +# Basic logical replication test +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a"); + +# Replicate the changes without columns +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_no_col default VALUES"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins"); + +my $publisher_host = $node_publisher->host; +my $publisher_port = $node_publisher->port; +$node_subscriber->safe_psql('postgres', + "CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')" +); + +$node_subscriber->safe_psql('postgres', + "CREATE USER MAPPING FOR PUBLIC SERVER tap_server" +); + +$node_subscriber->safe_psql('postgres', + "CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match"); +is($result, qq(1050), 'check initial data was copied to subscriber'); + +done_testing(); diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d007f..35a8101796b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -21,6 +21,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable> ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>' ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] @@ -101,13 +102,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </listitem> </varlistentry> + <varlistentry id="sql-altersubscription-params-server"> + <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term> + <listitem> + <para> + This clause replaces the foreign server or connection string originally + set by <xref linkend="sql-createsubscription"/> with the foreign server + <replaceable>servername</replaceable>. + </para> + </listitem> + </varlistentry> + <varlistentry id="sql-altersubscription-params-connection"> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <listitem> <para> - This clause replaces the connection string originally set by - <xref linkend="sql-createsubscription"/>. See there for more - information. + This clause replaces the foreign server or connection string originally + set by <xref linkend="sql-createsubscription"/> with the connection + string <replaceable>conninfo</replaceable>. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 57dec28a5df..966efae16a3 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable> - CONNECTION '<replaceable class="parameter">conninfo</replaceable>' + { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' } PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> @@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </listitem> </varlistentry> + <varlistentry id="sql-createsubscription-params-server"> + <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term> + <listitem> + <para> + A foreign server to use for the connection. + </para> + </listitem> + </varlistentry> + <varlistentry id="sql-createsubscription-params-connection"> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <listitem> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032413e..490cb965965 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -19,11 +19,14 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "catalog/indexing.h" +#include "catalog/pg_foreign_server.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "storage/lmgr.h" +#include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal) * Fetch the subscription from the syscache. */ Subscription * -GetSubscription(Oid subid, bool missing_ok) +GetSubscription(Oid subid, bool missing_ok, bool aclcheck) { HeapTuple tup; Subscription *sub; @@ -105,10 +108,35 @@ GetSubscription(Oid subid, bool missing_ok) sub->failover = subform->subfailover; /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, - tup, - Anum_pg_subscription_subconninfo); - sub->conninfo = TextDatumGetCString(datum); + if (OidIsValid(subform->subserver)) + { + AclResult aclresult; + + /* recheck ACL if requested */ + if (aclcheck) + { + aclresult = object_aclcheck(ForeignServerRelationId, + subform->subserver, + subform->subowner, ACL_USAGE); + + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(subform->subowner, false), + ForeignServerName(subform->subserver)))); + } + + sub->conninfo = ForeignServerConnectionString(subform->subowner, + subform->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconninfo); + sub->conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index c14e038d54f..5913678c5b1 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -511,21 +511,53 @@ lookup_fdw_validator_func(DefElem *validator) /* validator's return value is ignored, so we don't check the type */ } +/* + * Convert a connection string function name passed from the parser to an Oid. + */ +static Oid +lookup_fdw_connection_func(DefElem *connection) +{ + Oid connectionOid; + Oid funcargtypes[3]; + + if (connection == NULL || connection->arg == NULL) + return InvalidOid; + + /* connection string functions take user oid, server oid */ + funcargtypes[0] = OIDOID; + funcargtypes[1] = OIDOID; + funcargtypes[2] = INTERNALOID; + + connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false); + + /* check that connection string function has correct return type */ + if (get_func_rettype(connectionOid) != TEXTOID) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("function %s must return type %s", + NameListToString((List *) connection->arg), "text"))); + + return connectionOid; +} + /* * Process function options of CREATE/ALTER FDW */ static void parse_func_options(ParseState *pstate, List *func_options, bool *handler_given, Oid *fdwhandler, - bool *validator_given, Oid *fdwvalidator) + bool *validator_given, Oid *fdwvalidator, + bool *connection_given, Oid *fdwconnection) { ListCell *cell; *handler_given = false; *validator_given = false; + *connection_given = false; /* return InvalidOid if not given */ *fdwhandler = InvalidOid; *fdwvalidator = InvalidOid; + *fdwconnection = InvalidOid; foreach(cell, func_options) { @@ -545,6 +577,13 @@ parse_func_options(ParseState *pstate, List *func_options, *validator_given = true; *fdwvalidator = lookup_fdw_validator_func(def); } + else if (strcmp(def->defname, "connection") == 0) + { + if (*connection_given) + errorConflictingDefElem(def, pstate); + *connection_given = true; + *fdwconnection = lookup_fdw_connection_func(def); + } else elog(ERROR, "option \"%s\" not recognized", def->defname); @@ -564,8 +603,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt) Oid fdwId; bool handler_given; bool validator_given; + bool connection_given; Oid fdwhandler; Oid fdwvalidator; + Oid fdwconnection; Datum fdwoptions; Oid ownerId; ObjectAddress myself; @@ -609,10 +650,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt) /* Lookup handler and validator functions, if given */ parse_func_options(pstate, stmt->func_options, &handler_given, &fdwhandler, - &validator_given, &fdwvalidator); + &validator_given, &fdwvalidator, + &connection_given, &fdwconnection); values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler); values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator); + values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection); nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true; @@ -684,8 +727,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) Datum datum; bool handler_given; bool validator_given; + bool connection_given; Oid fdwhandler; Oid fdwvalidator; + Oid fdwconnection; ObjectAddress myself; rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock); @@ -715,7 +760,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) parse_func_options(pstate, stmt->func_options, &handler_given, &fdwhandler, - &validator_given, &fdwvalidator); + &validator_given, &fdwvalidator, + &connection_given, &fdwconnection); if (handler_given) { @@ -753,6 +799,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) fdwvalidator = fdwForm->fdwvalidator; } + if (connection_given) + { + repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection); + repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true; + } + /* * If options specified, validate and update. */ diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4aec73bcc6b..9b9ab6657aa 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -26,14 +26,17 @@ #include "catalog/objectaddress.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" +#include "catalog/pg_foreign_server.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "catalog/pg_user_mapping.h" #include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" @@ -546,6 +549,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; + Oid serverid; char *conninfo; char originname[NAMEDATALEN]; List *publications; @@ -638,15 +642,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.synchronous_commit == NULL) opts.synchronous_commit = "off"; - conninfo = stmt->conninfo; - publications = stmt->publication; - /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); + if (stmt->servername) + { + ForeignServer *server; + + Assert(!stmt->conninfo); + conninfo = NULL; + + server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername); + + /* make sure a user mapping exists */ + GetUserMapping(owner, server->serverid); + + serverid = server->serverid; + conninfo = ForeignServerConnectionString(owner, serverid); + } + else + { + Assert(stmt->conninfo); + + serverid = InvalidOid; + conninfo = stmt->conninfo; + } + /* Check the connection info string. */ walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser()); + publications = stmt->publication; + /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -670,8 +699,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); - values[Anum_pg_subscription_subconninfo - 1] = - CStringGetTextDatum(conninfo); + values[Anum_pg_subscription_subserver - 1] = serverid; + if (!OidIsValid(serverid)) + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(conninfo); + else + nulls[Anum_pg_subscription_subconninfo - 1] = true; if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); @@ -692,6 +725,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + + if (stmt->servername) + { + ObjectAddress referenced; + + Assert(OidIsValid(serverid)); + + ObjectAddressSet(referenced, ForeignServerRelationId, serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); @@ -809,8 +854,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.enabled) ApplyLauncherWakeupAtCommit(); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0); return myself; @@ -1135,7 +1178,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION, stmt->subname); - sub = GetSubscription(subid, false); + /* + * Skip ACL checks on the subscription's foreign server, if any. If + * changing the server (or replacing it with a raw connection), then the + * old one will be removed anyway. If changing something unrelated, + * there's no need to do an additional ACL check here; that will be done + * by the subscription worker anyway. + */ + sub = GetSubscription(subid, false, false); /* * Don't allow non-superuser modification of a subscription with @@ -1155,6 +1205,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + switch (stmt->kind) { case ALTER_SUBSCRIPTION_OPTIONS: @@ -1358,7 +1410,79 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SERVER: + { + ForeignServer *new_server; + ObjectAddress referenced; + AclResult aclresult; + char *conninfo; + + /* + * Remove what was there before, either another foreign server + * or a connection string. + */ + if (form->subserver) + { + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + } + else + { + nulls[Anum_pg_subscription_subconninfo - 1] = true; + replaces[Anum_pg_subscription_subconninfo - 1] = true; + } + + /* + * Find the new server and user mapping. Check ACL of server + * based on current user ID, but find the user mapping based + * on the subscription owner. + */ + new_server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, + new_server->serverid, + form->subowner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(form->subowner, false), + ForeignServerName(new_server->serverid)))); + + /* make sure a user mapping exists */ + GetUserMapping(form->subowner, new_server->serverid); + + conninfo = ForeignServerConnectionString(form->subowner, + new_server->serverid); + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + /* Check the connection info string. */ + walrcv_check_conninfo(conninfo, + sub->passwordrequired && !sub->ownersuperuser); + + values[Anum_pg_subscription_subserver - 1] = new_server->serverid; + replaces[Anum_pg_subscription_subserver - 1] = true; + + ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + + update_tuple = true; + } + break; + case ALTER_SUBSCRIPTION_CONNECTION: + /* remove reference to foreign server and dependencies, if present */ + if (form->subserver) + { + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + + values[Anum_pg_subscription_subserver - 1] = InvalidOid; + replaces[Anum_pg_subscription_subserver - 1] = true; + } + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); /* Check the connection info string. */ @@ -1609,8 +1733,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, table_close(rel, RowExclusiveLock); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); /* Wake up related replication workers to handle this change quickly. */ @@ -1695,9 +1817,28 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) subname = pstrdup(NameStr(*DatumGetName(datum))); /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subconninfo); - conninfo = TextDatumGetCString(datum); + if (OidIsValid(form->subserver)) + { + AclResult aclresult; + + aclresult = object_aclcheck(ForeignServerRelationId, form->subserver, + form->subowner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(form->subowner, false), + ForeignServerName(form->subserver)))); + + conninfo = ForeignServerConnectionString(form->subowner, + form->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_subconninfo); + conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, @@ -1796,6 +1937,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } /* Clean up dependencies */ + deleteDependencyRecordsFor(SubscriptionRelationId, subid, false); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index f0835fc3070..24ce097d683 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -71,6 +71,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags) fdw->fdwname = pstrdup(NameStr(fdwform->fdwname)); fdw->fdwhandler = fdwform->fdwhandler; fdw->fdwvalidator = fdwform->fdwvalidator; + fdw->fdwconnection = fdwform->fdwconnection; /* Extract the fdwoptions */ datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID, @@ -175,6 +176,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags) } +/* + * ForeignServerName - get name of foreign server. + */ +char * +ForeignServerName(Oid serverid) +{ + Form_pg_foreign_server serverform; + char *servername; + HeapTuple tp; + + tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid)); + + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for foreign server %u", serverid); + + serverform = (Form_pg_foreign_server) GETSTRUCT(tp); + + servername = pstrdup(NameStr(serverform->srvname)); + + ReleaseSysCache(tp); + + return servername; +} + + /* * GetForeignServerByName - look up the foreign server definition by name. */ @@ -190,6 +216,46 @@ GetForeignServerByName(const char *srvname, bool missing_ok) } +/* + * Retrieve connection string from server's FDW. + */ +char * +ForeignServerConnectionString(Oid userid, Oid serverid) +{ + static MemoryContext tempContext = NULL; + MemoryContext oldcxt; + ForeignServer *server; + ForeignDataWrapper *fdw; + Datum connection_datum; + text *connection_text; + char *result; + + if (tempContext == NULL) + { + tempContext = AllocSetContextCreate(CurrentMemoryContext, + "temp context", + ALLOCSET_DEFAULT_SIZES); + } + + oldcxt = MemoryContextSwitchTo(tempContext); + + server = GetForeignServer(serverid); + fdw = GetForeignDataWrapper(server->fdwid); + connection_datum = OidFunctionCall2(fdw->fdwconnection, + ObjectIdGetDatum(userid), + ObjectIdGetDatum(serverid)); + connection_text = DatumGetTextPP(connection_datum); + + MemoryContextSwitchTo(oldcxt); + + result = text_to_cstring(connection_text); + + MemoryContextReset(tempContext); + + return result; +} + + /* * GetUserMapping - look up the user mapping. * diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 7d99c9355c6..fafe351ca5c 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -5437,6 +5437,8 @@ fdw_option: | NO HANDLER { $$ = makeDefElem("handler", NULL, @1); } | VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); } | NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); } + | CONNECTION handler_name { $$ = makeDefElem("connection", (Node *) $2, @1); } + | NO CONNECTION { $$ = makeDefElem("connection", NULL, @1); } ; fdw_options: @@ -10804,6 +10806,16 @@ CreateSubscriptionStmt: n->options = $8; $$ = (Node *) n; } + | CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition + { + CreateSubscriptionStmt *n = + makeNode(CreateSubscriptionStmt); + n->subname = $3; + n->servername = $5; + n->publication = $7; + n->options = $8; + $$ = (Node *) n; + } ; /***************************************************************************** @@ -10833,6 +10845,16 @@ AlterSubscriptionStmt: n->conninfo = $5; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name SERVER name + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_SERVER; + n->subname = $3; + n->servername = $5; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 31ab69ea13a..99eae2dde24 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3987,7 +3987,7 @@ maybe_reread_subscription(void) /* Ensure allocations in permanent context. */ oldctx = MemoryContextSwitchTo(ApplyContext); - newsub = GetSubscription(MyLogicalRepWorker->subid, true); + newsub = GetSubscription(MyLogicalRepWorker->subid, true, true); /* * Exit if the subscription was removed. This normally should not happen @@ -4093,7 +4093,9 @@ maybe_reread_subscription(void) } /* - * Callback from subscription syscache invalidation. + * Callback from subscription syscache invalidation. Also needed for server or + * user mapping invalidation, which can change the connection information for + * subscriptions that connect using a server object. */ static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) @@ -4675,7 +4677,7 @@ InitializeLogRepWorker(void) StartTransactionCommand(); oldctx = MemoryContextSwitchTo(ApplyContext); - MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true); if (!MySubscription) { ereport(LOG, @@ -4712,6 +4714,14 @@ InitializeLogRepWorker(void) CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, (Datum) 0); + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + subscription_change_cb, + (Datum) 0); + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(USERMAPPINGOID, + subscription_change_cb, + (Datum) 0); CacheRegisterSyscacheCallback(AUTHOID, subscription_change_cb, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 4f4ad2ee150..a7a70535fa0 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4894,6 +4894,7 @@ getSubscriptions(Archive *fout) int i_subdisableonerr; int i_subpasswordrequired; int i_subrunasowner; + int i_subservername; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4974,16 +4975,29 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBuffer(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + + if (dopt->binary_upgrade && fout->remoteVersion >= 180000) + appendPQExpBufferStr(query, " fs.srvname AS subservername,\n" + " o.remote_lsn AS suboriginremotelsn,\n" + " s.subenabled,\n" + " s.subfailover\n"); + else + appendPQExpBufferStr(query, " NULL AS subservername,\n" + " NULL AS suboriginremotelsn,\n" + " false AS subenabled,\n" + " false AS subfailover\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); if (dopt->binary_upgrade && fout->remoteVersion >= 170000) appendPQExpBufferStr(query, + "LEFT JOIN pg_catalog.pg_foreign_server fs \n" + " ON fs.oid = s.subserver \n" "LEFT JOIN pg_catalog.pg_replication_origin_status o \n" " ON o.external_id = 'pg_' || s.oid::text \n"); @@ -5011,6 +5025,7 @@ getSubscriptions(Archive *fout) i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); + i_subservername = PQfnumber(res, "subservername"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5032,6 +5047,10 @@ getSubscriptions(Archive *fout) subinfo[i].subenabled = (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); + if (PQgetisnull(res, i, i_subservername)) + subinfo[i].subservername = NULL; + else + subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername)); subinfo[i].subbinary = (strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0); subinfo[i].substream = *(PQgetvalue(res, i, i_substream)); @@ -5254,9 +5273,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n", qsubname); - appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ", + appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ", qsubname); - appendStringLiteralAH(query, subinfo->subconninfo, fout); + if (subinfo->subservername) + { + appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername)); + } + else + { + appendPQExpBuffer(query, "CONNECTION "); + appendStringLiteralAH(query, subinfo->subconninfo, fout); + } /* Build list of quoted publications and append them to query. */ if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index ca32f167878..6c553765ea1 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -703,6 +703,7 @@ typedef struct _SubscriptionInfo bool subpasswordrequired; bool subrunasowner; bool subfailover; + char *subservername; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8432be641ac..b78680994a0 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -3704,7 +3704,7 @@ match_previous_words(int pattern_id, /* CREATE SUBSCRIPTION */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAny)) - COMPLETE_WITH("CONNECTION"); + COMPLETE_WITH("SERVER", "CONNECTION"); else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny)) COMPLETE_WITH("PUBLICATION"); else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", diff --git a/src/include/catalog/pg_foreign_data_wrapper.h b/src/include/catalog/pg_foreign_data_wrapper.h index d03ab5a4f28..29eaba467b6 100644 --- a/src/include/catalog/pg_foreign_data_wrapper.h +++ b/src/include/catalog/pg_foreign_data_wrapper.h @@ -36,6 +36,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId) Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation * function, or 0 if * none */ + Oid fdwconnection BKI_LOOKUP_OPT(pg_proc); /* connection string + * function, or 0 if + * none */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ aclitem fdwacl[1]; /* access permissions */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 20fc329992d..93067ea9182 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -78,9 +78,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + Oid subserver; /* Set if connecting with server */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ - text subconninfo BKI_FORCE_NOT_NULL; + text subconninfo; /* Set if connecting with connection string */ /* Slot name on publisher */ NameData subslotname BKI_FORCE_NULL; @@ -178,7 +180,8 @@ typedef struct Subscription #endif /* EXPOSE_TO_CLIENT_CODE */ -extern Subscription *GetSubscription(Oid subid, bool missing_ok); +extern Subscription *GetSubscription(Oid subid, bool missing_ok, + bool aclcheck); extern void FreeSubscription(Subscription *sub); extern void DisableSubscription(Oid subid); diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index 7e9decd2537..a7e6cf0226a 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper char *fdwname; /* Name of the FDW */ Oid fdwhandler; /* Oid of handler function, or 0 */ Oid fdwvalidator; /* Oid of validator function, or 0 */ + Oid fdwconnection; /* Oid of connection string function, or 0 */ List *options; /* fdwoptions as DefElem list */ } ForeignDataWrapper; @@ -65,10 +66,12 @@ typedef struct ForeignTable extern ForeignServer *GetForeignServer(Oid serverid); +extern char *ForeignServerName(Oid serverid); extern ForeignServer *GetForeignServerExtended(Oid serverid, bits16 flags); extern ForeignServer *GetForeignServerByName(const char *srvname, bool missing_ok); +extern char *ForeignServerConnectionString(Oid userid, Oid serverid); extern UserMapping *GetUserMapping(Oid userid, Oid serverid); extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid); extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 0b208f51bdd..8291d958da5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4282,6 +4282,7 @@ typedef struct CreateSubscriptionStmt { NodeTag type; char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ @@ -4290,6 +4291,7 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -4304,6 +4306,7 @@ typedef struct AlterSubscriptionStmt NodeTag type; AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */ char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be3..59c64126bdc 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -224,6 +224,7 @@ NOTICE: checking pg_extension {extconfig} => pg_class {oid} NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid} NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid} NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid} +NOTICE: checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid} NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid} NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid} NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid} -- 2.34.1