On Wed, 2024-01-31 at 11:10 +0530, Ashutosh Bapat wrote: > > I like the idea -- it further decouples the logic from the core > > server. > > I suspect it will make postgres_fdw the primary way (though not the > > only possible way) to use this feature. There would be little need > > to > > create a new builtin FDW to make this work. > > That's what I see as well. I am glad that we are on the same page.
Implemented in v11, attached. Is this what you had in mind? It leaves a lot of the work to postgres_fdw and it's almost unusable without postgres_fdw. That's not a bad thing, but it makes the core functionality a bit harder to test standalone. I can work on the core tests some more. The postgres_fdw tests passed without modification, though, and offer a simple example of how to use it. Regards, Jeff Davis
From 88fa1333ace4d15d72534d20d2cccb37748277f2 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Tue, 2 Jan 2024 13:42:48 -0800 Subject: [PATCH v11] CREATE SUSBCRIPTION ... SERVER. Allow specifying a foreign server for CREATE SUBSCRIPTION, rather than a raw connection string with CONNECTION. Using a foreign server as a layer of indirection improves management of multiple subscriptions to the same server. It also provides integration with user mappings in case different subscriptions have different owners or a subscription changes owners. Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.ca...@j-davis.com Reviewed-by: Ashutosh Bapat --- contrib/postgres_fdw/Makefile | 4 +- contrib/postgres_fdw/connection.c | 74 ++++++++ .../postgres_fdw/expected/postgres_fdw.out | 8 + contrib/postgres_fdw/meson.build | 6 + .../postgres_fdw/postgres_fdw--1.1--1.2.sql | 11 ++ contrib/postgres_fdw/postgres_fdw.control | 2 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 7 + contrib/postgres_fdw/t/010_subscription.pl | 71 ++++++++ doc/src/sgml/ref/alter_subscription.sgml | 18 +- doc/src/sgml/ref/create_subscription.sgml | 11 +- src/backend/catalog/pg_subscription.c | 38 +++- src/backend/commands/foreigncmds.c | 57 +++++- src/backend/commands/subscriptioncmds.c | 167 ++++++++++++++++-- src/backend/foreign/foreign.c | 42 +++++ src/backend/parser/gram.y | 22 +++ src/backend/replication/logical/worker.c | 16 +- src/bin/pg_dump/pg_dump.c | 27 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_foreign_data_wrapper.h | 3 + src/include/catalog/pg_subscription.h | 7 +- src/include/foreign/foreign.h | 3 + src/include/nodes/parsenodes.h | 3 + src/test/regress/expected/oidjoins.out | 1 + 24 files changed, 563 insertions(+), 38 deletions(-) create mode 100644 contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql create mode 100644 contrib/postgres_fdw/t/010_subscription.pl diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index c1b0cad453..995a30c297 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -14,10 +14,12 @@ PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) EXTENSION = postgres_fdw -DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql +DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql REGRESS = postgres_fdw +TAP_TESTS = 1 + ifdef USE_PGXS PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4931ebf591..a011e6df5f 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -113,6 +113,7 @@ static uint32 pgfdw_we_get_result = 0; PG_FUNCTION_INFO_V1(postgres_fdw_get_connections); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all); +PG_FUNCTION_INFO_V1(postgres_fdw_connection); /* prototypes of private functions */ static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); @@ -1972,6 +1973,79 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, } } +/* + * Values in connection strings must be enclosed in single quotes. Single + * quotes and backslashes must be escaped with backslash. NB: these rules are + * different from the rules for escaping a SQL literal. + */ +static void +appendEscapedValue(StringInfo str, const char *val) +{ + appendStringInfoChar(str, '\''); + for (int i = 0; val[i] != '\0'; i++) + { + if (val[i] == '\\' || val[i] == '\'') + appendStringInfoChar(str, '\\'); + appendStringInfoChar(str, val[i]); + } + appendStringInfoChar(str, '\''); +} + +Datum +postgres_fdw_connection(PG_FUNCTION_ARGS) +{ + /* TODO: consider memory usage */ + Oid userid = PG_GETARG_OID(0); + Oid serverid = PG_GETARG_OID(1); + ForeignServer *server = GetForeignServer(serverid); + UserMapping *user = GetUserMapping(userid, serverid); + StringInfoData str; + const char **keywords; + const char **values; + int n; + + /* + * Construct connection params from generic options of ForeignServer + * and UserMapping. (Some of them might not be libpq options, in + * which case we'll just waste a few array slots.) Add 4 extra slots + * for application_name, fallback_application_name, client_encoding, + * end marker. + */ + n = list_length(server->options) + list_length(user->options) + 4; + keywords = (const char **) palloc(n * sizeof(char *)); + values = (const char **) palloc(n * sizeof(char *)); + + n = 0; + n += ExtractConnectionOptions(server->options, + keywords + n, values + n); + n += ExtractConnectionOptions(user->options, + keywords + n, values + n); + + /* Set client_encoding so that libpq can convert encoding properly. */ + keywords[n] = "client_encoding"; + values[n] = GetDatabaseEncodingName(); + n++; + + keywords[n] = values[n] = NULL; + + /* verify the set of connection parameters */ + check_conn_params(keywords, values, user); + + initStringInfo(&str); + for (int i = 0; i < n; i++) + { + char *sep = ""; + + appendStringInfo(&str, "%s%s = ", sep, keywords[i]); + appendEscapedValue(&str, values[i]); + sep = " "; + } + + pfree(keywords); + pfree(values); + PG_RETURN_TEXT_P(cstring_to_text(str.data)); +} + /* * List active foreign server connections. * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index c355e8f3f7..617e2cf5dc 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -256,6 +256,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again ANALYZE ft1; ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); -- =================================================================== +-- test subscription +-- =================================================================== +CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1 + PUBLICATION pub1 WITH (slot_name = NONE, connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +DROP SUBSCRIPTION regress_pgfdw_subscription; +-- =================================================================== -- test error case for create publication on foreign table -- =================================================================== CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build index 2b86d8a6ee..92cb3c5556 100644 --- a/contrib/postgres_fdw/meson.build +++ b/contrib/postgres_fdw/meson.build @@ -26,6 +26,7 @@ install_data( 'postgres_fdw.control', 'postgres_fdw--1.0.sql', 'postgres_fdw--1.0--1.1.sql', + 'postgres_fdw--1.1--1.2.sql', kwargs: contrib_data_args, ) @@ -39,4 +40,9 @@ tests += { ], 'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'], }, + 'tap': { + 'tests': [ + 't/010_subscription.pl', + ], + }, } diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql new file mode 100644 index 0000000000..468d8f0d7d --- /dev/null +++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql @@ -0,0 +1,11 @@ +/* contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.2'" to load this file. \quit + +CREATE FUNCTION postgres_fdw_connection(oid, oid) +RETURNS text +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection; diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control index d489382064..a4b800be4f 100644 --- a/contrib/postgres_fdw/postgres_fdw.control +++ b/contrib/postgres_fdw/postgres_fdw.control @@ -1,5 +1,5 @@ # postgres_fdw extension comment = 'foreign-data wrapper for remote PostgreSQL servers' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/postgres_fdw' relocatable = true diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 812e7646e1..8520094fc9 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -248,6 +248,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again ANALYZE ft1; ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); +-- =================================================================== +-- test subscription +-- =================================================================== +CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1 + PUBLICATION pub1 WITH (slot_name = NONE, connect = false); +DROP SUBSCRIPTION regress_pgfdw_subscription; + -- =================================================================== -- test error case for create publication on foreign table -- =================================================================== diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl new file mode 100644 index 0000000000..a39e8fdbba --- /dev/null +++ b/contrib/postgres_fdw/t/010_subscription.pl @@ -0,0 +1,71 @@ + +# Copyright (c) 2021-2024, PostgreSQL Global Development Group + +# Basic logical replication test +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a"); + +# Replicate the changes without columns +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_no_col default VALUES"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins"); + +my $publisher_host = $node_publisher->host; +my $publisher_port = $node_publisher->port; +$node_subscriber->safe_psql('postgres', + "CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')" +); + +$node_subscriber->safe_psql('postgres', + "CREATE USER MAPPING FOR PUBLIC SERVER tap_server" +); + +$node_subscriber->safe_psql('postgres', + "CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match"); +is($result, qq(1050), 'check initial data was copied to subscriber'); + +done_testing(); diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index e9e6d9d74a..12d8855aef 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -21,6 +21,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable> ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>' ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] @@ -94,13 +95,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </listitem> </varlistentry> + <varlistentry id="sql-altersubscription-params-server"> + <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term> + <listitem> + <para> + This clause replaces the foreign server or connection string originally + set by <xref linkend="sql-createsubscription"/> with the foreign server + <replaceable>servername</replaceable>. + </para> + </listitem> + </varlistentry> + <varlistentry id="sql-altersubscription-params-connection"> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <listitem> <para> - This clause replaces the connection string originally set by - <xref linkend="sql-createsubscription"/>. See there for more - information. + This clause replaces the foreign server or connection string originally + set by <xref linkend="sql-createsubscription"/> with the connection + string <replaceable>conninfo</replaceable>. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 15794731bb..233e87d5ea 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable> - CONNECTION '<replaceable class="parameter">conninfo</replaceable>' + { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' } PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> @@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </listitem> </varlistentry> + <varlistentry id="sql-createsubscription-params-server"> + <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term> + <listitem> + <para> + A foreign server to use for the connection. + </para> + </listitem> + </varlistentry> + <varlistentry id="sql-createsubscription-params-connection"> <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term> <listitem> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..b5ba0aa953 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -19,11 +19,14 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "catalog/indexing.h" +#include "catalog/pg_foreign_server.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "storage/lmgr.h" +#include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -38,7 +41,7 @@ static List *textarray_to_stringlist(ArrayType *textarray); * Fetch the subscription from the syscache. */ Subscription * -GetSubscription(Oid subid, bool missing_ok) +GetSubscription(Oid subid, bool missing_ok, bool aclcheck) { HeapTuple tup; Subscription *sub; @@ -74,10 +77,35 @@ GetSubscription(Oid subid, bool missing_ok) sub->failover = subform->subfailover; /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, - tup, - Anum_pg_subscription_subconninfo); - sub->conninfo = TextDatumGetCString(datum); + if (OidIsValid(subform->subserver)) + { + AclResult aclresult; + + /* recheck ACL if requested */ + if (aclcheck) + { + aclresult = object_aclcheck(ForeignServerRelationId, + subform->subserver, + subform->subowner, ACL_USAGE); + + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(subform->subowner, false), + ForeignServerName(subform->subserver)))); + } + + sub->conninfo = ForeignServerConnectionString(subform->subowner, + subform->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconninfo); + sub->conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index cf61bbac1f..357918be11 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -511,21 +511,52 @@ lookup_fdw_validator_func(DefElem *validator) /* validator's return value is ignored, so we don't check the type */ } +/* + * Convert a connection string function name passed from the parser to an Oid. + */ +static Oid +lookup_fdw_connection_func(DefElem *connection) +{ + Oid connectionOid; + Oid funcargtypes[2]; + + if (connection == NULL || connection->arg == NULL) + return InvalidOid; + + /* connection string functions take user oid, server oid */ + funcargtypes[0] = OIDOID; + funcargtypes[1] = OIDOID; + + connectionOid = LookupFuncName((List *) connection->arg, 2, funcargtypes, false); + + /* check that connection string function has correct return type */ + if (get_func_rettype(connectionOid) != TEXTOID) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("function %s must return type %s", + NameListToString((List *) connection->arg), "text"))); + + return connectionOid; +} + /* * Process function options of CREATE/ALTER FDW */ static void parse_func_options(ParseState *pstate, List *func_options, bool *handler_given, Oid *fdwhandler, - bool *validator_given, Oid *fdwvalidator) + bool *validator_given, Oid *fdwvalidator, + bool *connection_given, Oid *fdwconnection) { ListCell *cell; *handler_given = false; *validator_given = false; + *connection_given = false; /* return InvalidOid if not given */ *fdwhandler = InvalidOid; *fdwvalidator = InvalidOid; + *fdwconnection = InvalidOid; foreach(cell, func_options) { @@ -545,6 +576,13 @@ parse_func_options(ParseState *pstate, List *func_options, *validator_given = true; *fdwvalidator = lookup_fdw_validator_func(def); } + else if (strcmp(def->defname, "connection") == 0) + { + if (*connection_given) + errorConflictingDefElem(def, pstate); + *connection_given = true; + *fdwconnection = lookup_fdw_connection_func(def); + } else elog(ERROR, "option \"%s\" not recognized", def->defname); @@ -564,8 +602,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt) Oid fdwId; bool handler_given; bool validator_given; + bool connection_given; Oid fdwhandler; Oid fdwvalidator; + Oid fdwconnection; Datum fdwoptions; Oid ownerId; ObjectAddress myself; @@ -609,10 +649,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt) /* Lookup handler and validator functions, if given */ parse_func_options(pstate, stmt->func_options, &handler_given, &fdwhandler, - &validator_given, &fdwvalidator); + &validator_given, &fdwvalidator, + &connection_given, &fdwconnection); values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler); values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator); + values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection); nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true; @@ -684,8 +726,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) Datum datum; bool handler_given; bool validator_given; + bool connection_given; Oid fdwhandler; Oid fdwvalidator; + Oid fdwconnection; ObjectAddress myself; rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock); @@ -715,7 +759,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) parse_func_options(pstate, stmt->func_options, &handler_given, &fdwhandler, - &validator_given, &fdwvalidator); + &validator_given, &fdwvalidator, + &connection_given, &fdwconnection); if (handler_given) { @@ -753,6 +798,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) fdwvalidator = fdwForm->fdwvalidator; } + if (connection_given) + { + repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection); + repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true; + } + /* * If options specified, validate and update. */ diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index a05d69922d..304a46efec 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -25,14 +25,17 @@ #include "catalog/objectaddress.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" +#include "catalog/pg_foreign_server.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "catalog/pg_user_mapping.h" #include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" @@ -594,6 +597,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; + Oid serverid; char *conninfo; char originname[NAMEDATALEN]; List *publications; @@ -686,15 +690,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.synchronous_commit == NULL) opts.synchronous_commit = "off"; - conninfo = stmt->conninfo; - publications = stmt->publication; - /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); + if (stmt->servername) + { + ForeignServer *server; + + Assert(!stmt->conninfo); + conninfo = NULL; + + server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername); + + /* make sure a user mapping exists */ + GetUserMapping(owner, server->serverid); + + serverid = server->serverid; + conninfo = ForeignServerConnectionString(owner, serverid); + } + else + { + Assert(stmt->conninfo); + + serverid = InvalidOid; + conninfo = stmt->conninfo; + } + /* Check the connection info string. */ walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser()); + publications = stmt->publication; + /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -718,8 +747,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); - values[Anum_pg_subscription_subconninfo - 1] = - CStringGetTextDatum(conninfo); + values[Anum_pg_subscription_subserver - 1] = serverid; + if (!OidIsValid(serverid)) + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(conninfo); + else + nulls[Anum_pg_subscription_subconninfo - 1] = true; if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); @@ -740,6 +773,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + + if (stmt->servername) + { + ObjectAddress referenced; + Assert(OidIsValid(serverid)); + + ObjectAddressSet(referenced, ForeignServerRelationId, serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); @@ -871,8 +915,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.enabled) ApplyLauncherWakeupAtCommit(); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0); return myself; @@ -1140,7 +1182,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION, stmt->subname); - sub = GetSubscription(subid, false); + /* + * Skip ACL checks on the subscription's foreign server, if any. If + * changing the server (or replacing it with a raw connection), then the + * old one will be removed anyway. If changing something unrelated, + * there's no need to do an additional ACL check here; that will be done + * by the subscription worker anyway. + */ + sub = GetSubscription(subid, false, false); /* * Don't allow non-superuser modification of a subscription with @@ -1160,6 +1209,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + switch (stmt->kind) { case ALTER_SUBSCRIPTION_OPTIONS: @@ -1306,7 +1357,79 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SERVER: + { + ForeignServer *new_server; + ObjectAddress referenced; + AclResult aclresult; + char *conninfo; + + /* + * Remove what was there before, either another foreign server + * or a connection string. + */ + if (form->subserver) + { + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + } + else + { + nulls[Anum_pg_subscription_subconninfo - 1] = true; + replaces[Anum_pg_subscription_subconninfo - 1] = true; + } + + /* + * Find the new server and user mapping. Check ACL of server + * based on current user ID, but find the user mapping based + * on the subscription owner. + */ + new_server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, + new_server->serverid, + form->subowner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(form->subowner, false), + ForeignServerName(new_server->serverid)))); + + /* make sure a user mapping exists */ + GetUserMapping(form->subowner, new_server->serverid); + + conninfo = ForeignServerConnectionString(form->subowner, + new_server->serverid); + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + /* Check the connection info string. */ + walrcv_check_conninfo(conninfo, + sub->passwordrequired && !sub->ownersuperuser); + + values[Anum_pg_subscription_subserver - 1] = new_server->serverid; + replaces[Anum_pg_subscription_subserver - 1] = true; + + ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + + update_tuple = true; + } + break; + case ALTER_SUBSCRIPTION_CONNECTION: + /* remove reference to foreign server and dependencies, if present */ + if (form->subserver) + { + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + + values[Anum_pg_subscription_subserver - 1] = InvalidOid; + replaces[Anum_pg_subscription_subserver - 1] = true; + } + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); /* Check the connection info string. */ @@ -1553,8 +1676,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, table_close(rel, RowExclusiveLock); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); /* Wake up related replication workers to handle this change quickly. */ @@ -1639,9 +1760,28 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) subname = pstrdup(NameStr(*DatumGetName(datum))); /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subconninfo); - conninfo = TextDatumGetCString(datum); + if (OidIsValid(form->subserver)) + { + AclResult aclresult; + + aclresult = object_aclcheck(ForeignServerRelationId, form->subserver, + form->subowner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(form->subowner, false), + ForeignServerName(form->subserver)))); + + conninfo = ForeignServerConnectionString(form->subowner, + form->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_subconninfo); + conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, @@ -1742,6 +1882,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } /* Clean up dependencies */ + deleteDependencyRecordsFor(SubscriptionRelationId, subid, false); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index f4f35728b4..fdd7ee3ad9 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -70,6 +70,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags) fdw->fdwname = pstrdup(NameStr(fdwform->fdwname)); fdw->fdwhandler = fdwform->fdwhandler; fdw->fdwvalidator = fdwform->fdwvalidator; + fdw->fdwconnection = fdwform->fdwconnection; /* Extract the fdwoptions */ datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID, @@ -174,6 +175,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags) } +/* + * ForeignServerName - get name of foreign server. + */ +char * +ForeignServerName(Oid serverid) +{ + Form_pg_foreign_server serverform; + char *servername; + HeapTuple tp; + + tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid)); + + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for foreign server %u", serverid); + + serverform = (Form_pg_foreign_server) GETSTRUCT(tp); + + servername = pstrdup(NameStr(serverform->srvname)); + + ReleaseSysCache(tp); + + return servername; +} + + /* * GetForeignServerByName - look up the foreign server definition by name. */ @@ -189,6 +215,22 @@ GetForeignServerByName(const char *srvname, bool missing_ok) } +/* + * Retrieve connection string from server's FDW. + */ +char * +ForeignServerConnectionString(Oid userid, Oid serverid) +{ + /* TODO: clean up memory */ + ForeignServer *server = GetForeignServer(serverid); + ForeignDataWrapper *fdw = GetForeignDataWrapper(server->fdwid); + Datum connection_text = OidFunctionCall2(fdw->fdwconnection, + ObjectIdGetDatum(userid), + ObjectIdGetDatum(serverid)); + return text_to_cstring(DatumGetTextPP(connection_text)); +} + + /* * GetUserMapping - look up the user mapping. * diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index c6e2f679fd..04aabf08d1 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -5293,6 +5293,8 @@ fdw_option: | NO HANDLER { $$ = makeDefElem("handler", NULL, @1); } | VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); } | NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); } + | CONNECTION handler_name { $$ = makeDefElem("connection", (Node *) $2, @1); } + | NO CONNECTION { $$ = makeDefElem("connection", NULL, @1); } ; fdw_options: @@ -10667,6 +10669,16 @@ CreateSubscriptionStmt: n->options = $8; $$ = (Node *) n; } + | CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition + { + CreateSubscriptionStmt *n = + makeNode(CreateSubscriptionStmt); + n->subname = $3; + n->servername = $5; + n->publication = $7; + n->options = $8; + $$ = (Node *) n; + } ; /***************************************************************************** @@ -10696,6 +10708,16 @@ AlterSubscriptionStmt: n->conninfo = $5; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name SERVER name + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_SERVER; + n->subname = $3; + n->servername = $5; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe3e8..3725c53d8b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3879,7 +3879,7 @@ maybe_reread_subscription(void) /* Ensure allocations in permanent context. */ oldctx = MemoryContextSwitchTo(ApplyContext); - newsub = GetSubscription(MyLogicalRepWorker->subid, true); + newsub = GetSubscription(MyLogicalRepWorker->subid, true, true); /* * Exit if the subscription was removed. This normally should not happen @@ -3985,7 +3985,9 @@ maybe_reread_subscription(void) } /* - * Callback from subscription syscache invalidation. + * Callback from subscription syscache invalidation. Also needed for server or + * user mapping invalidation, which can change the connection information for + * subscriptions that connect using a server object. */ static void subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) @@ -4584,7 +4586,7 @@ InitializeLogRepWorker(void) StartTransactionCommand(); oldctx = MemoryContextSwitchTo(ApplyContext); - MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true); if (!MySubscription) { ereport(LOG, @@ -4621,6 +4623,14 @@ InitializeLogRepWorker(void) CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, (Datum) 0); + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + subscription_change_cb, + (Datum) 0); + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(USERMAPPINGOID, + subscription_change_cb, + (Datum) 0); CacheRegisterSyscacheCallback(AUTHOID, subscription_change_cb, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 10cbf02beb..ac0e9f98df 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4636,6 +4636,7 @@ getSubscriptions(Archive *fout) int i_subdisableonerr; int i_subpasswordrequired; int i_subrunasowner; + int i_subservername; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4708,11 +4709,13 @@ getSubscriptions(Archive *fout) LOGICALREP_ORIGIN_ANY); if (dopt->binary_upgrade && fout->remoteVersion >= 170000) - appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n" + appendPQExpBufferStr(query, " fs.srvname AS subservername,\n" + " o.remote_lsn AS suboriginremotelsn,\n" " s.subenabled,\n" " s.subfailover\n"); else - appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n" + appendPQExpBufferStr(query, " NULL AS subservername,\n" + " NULL AS suboriginremotelsn,\n" " false AS subenabled,\n" " false AS subfailover\n"); @@ -4721,6 +4724,8 @@ getSubscriptions(Archive *fout) if (dopt->binary_upgrade && fout->remoteVersion >= 170000) appendPQExpBufferStr(query, + "LEFT JOIN pg_catalog.pg_foreign_server fs \n" + " ON fs.oid = s.subserver \n" "LEFT JOIN pg_catalog.pg_replication_origin_status o \n" " ON o.external_id = 'pg_' || s.oid::text \n"); @@ -4746,6 +4751,7 @@ getSubscriptions(Archive *fout) i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); + i_subservername = PQfnumber(res, "subservername"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -4766,7 +4772,10 @@ getSubscriptions(Archive *fout) AssignDumpId(&subinfo[i].dobj); subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname)); subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner)); - + if (PQgetisnull(res, i, i_subservername)) + subinfo[i].subservername = NULL; + else + subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername)); subinfo[i].subbinary = pg_strdup(PQgetvalue(res, i, i_subbinary)); subinfo[i].substream = @@ -4994,9 +5003,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n", qsubname); - appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ", + appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ", qsubname); - appendStringLiteralAH(query, subinfo->subconninfo, fout); + if (subinfo->subservername) + { + appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername)); + } + else + { + appendPQExpBuffer(query, "CONNECTION "); + appendStringLiteralAH(query, subinfo->subconninfo, fout); + } /* Build list of quoted publications and append them to query. */ if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 9bc93520b4..6fdf63688c 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -660,6 +660,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *subpasswordrequired; char *subrunasowner; + char *subservername; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 73133ce735..b3ac86890a 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3329,7 +3329,7 @@ psql_completion(const char *text, int start, int end) /* CREATE SUBSCRIPTION */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAny)) - COMPLETE_WITH("CONNECTION"); + COMPLETE_WITH("SERVER", "CONNECTION"); else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny)) COMPLETE_WITH("PUBLICATION"); else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", diff --git a/src/include/catalog/pg_foreign_data_wrapper.h b/src/include/catalog/pg_foreign_data_wrapper.h index 0d8759d3fd..700d6eed65 100644 --- a/src/include/catalog/pg_foreign_data_wrapper.h +++ b/src/include/catalog/pg_foreign_data_wrapper.h @@ -36,6 +36,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId) Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation * function, or 0 if * none */ + Oid fdwconnection BKI_LOOKUP_OPT(pg_proc); /* connection string + * function, or 0 if + * none */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ aclitem fdwacl[1]; /* access permissions */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..b84c25d55e 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,9 +98,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + Oid subserver; /* Set if connecting with server */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ - text subconninfo BKI_FORCE_NOT_NULL; + text subconninfo; /* Set if connecting with connection string */ /* Slot name on publisher */ NameData subslotname BKI_FORCE_NULL; @@ -174,7 +176,8 @@ typedef struct Subscription */ #define LOGICALREP_STREAM_PARALLEL 'p' -extern Subscription *GetSubscription(Oid subid, bool missing_ok); +extern Subscription *GetSubscription(Oid subid, bool missing_ok, + bool aclcheck); extern void FreeSubscription(Subscription *sub); extern void DisableSubscription(Oid subid); diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index 82b8153100..b4025e7f1e 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper char *fdwname; /* Name of the FDW */ Oid fdwhandler; /* Oid of handler function, or 0 */ Oid fdwvalidator; /* Oid of validator function, or 0 */ + Oid fdwconnection; /* Oid of connection string function, or 0 */ List *options; /* fdwoptions as DefElem list */ } ForeignDataWrapper; @@ -65,10 +66,12 @@ typedef struct ForeignTable extern ForeignServer *GetForeignServer(Oid serverid); +extern char *ForeignServerName(Oid serverid); extern ForeignServer *GetForeignServerExtended(Oid serverid, bits16 flags); extern ForeignServer *GetForeignServerByName(const char *srvname, bool missing_ok); +extern char *ForeignServerConnectionString(Oid userid, Oid serverid); extern UserMapping *GetUserMapping(Oid userid, Oid serverid); extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid); extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 2380821600..90b203dc60 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4042,6 +4042,7 @@ typedef struct CreateSubscriptionStmt { NodeTag type; char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ @@ -4050,6 +4051,7 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -4064,6 +4066,7 @@ typedef struct AlterSubscriptionStmt NodeTag type; AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */ char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 215eb899be..59c64126bd 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -224,6 +224,7 @@ NOTICE: checking pg_extension {extconfig} => pg_class {oid} NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid} NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid} NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid} +NOTICE: checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid} NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid} NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid} NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid} -- 2.34.1