Hi! 1. done 2. rename to pg_user_subscriptions 3. by pg_dump, i checked upgrade from 10 to 12devel, it's work fine 4. done 5. done 6. I took it from AlterSubscription_refresh, in that function no any free() 7. done
-------- Ефимкин Евгений
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 3f2f674a1a..ff8a65a3e4 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -522,12 +522,8 @@ </para> <para> - To create a subscription, the user must be a superuser. - </para> - - <para> - The subscription apply process will run in the local database with the - privileges of a superuser. + To add tables to a subscription, the user must have ownership rights on the + table. </para> <para> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6dfb2e4d3e..f0a368f90c 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -24,6 +24,9 @@ PostgreSQL documentation ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>' ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUBLICATION [ WITH ( <replaceable class="parameter">refresh_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD TABLE <replaceable class="parameter">table_name</replaceable> [, ...] +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET TABLE <replaceable class="parameter">table_name</replaceable> [, ...] +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DROP TABLE <replaceable class="parameter">table_name</replaceable> [, ...] ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) @@ -44,9 +47,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <para> You must own the subscription to use <command>ALTER SUBSCRIPTION</command>. To alter the owner, you must also be a direct or indirect member of the - new owning role. The new owner has to be a superuser. - (Currently, all subscription owners must be superusers, so the owner checks - will be bypassed in practice. But this might change in the future.) + new owning role. </para> </refsect1> @@ -137,6 +138,35 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </listitem> </varlistentry> + <varlistentry> + <term><literal>ADD TABLE <replaceable class="parameter">table_name</replaceable></literal></term> + <listitem> + <para> + The <literal>ADD TABLE</literal> clauses will add new table in subscription, table must be + present in publication. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>SET TABLE <replaceable class="parameter">table_name</replaceable></literal></term> + <listitem> + <para> + The <literal>SET TABLE</literal> clause will replace the list of tables in + the publication with the specified one. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>DROP TABLE <replaceable class="parameter">table_name</replaceable></literal></term> + <listitem> + <para> + The <literal>DROP TABLE</literal> clauses will remove table from subscription. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>ENABLE</literal></term> <listitem> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 1a90c244fb..04af4e27c7 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -24,6 +24,7 @@ PostgreSQL documentation CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable> CONNECTION '<replaceable class="parameter">conninfo</replaceable>' PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] + [ FOR TABLE <replaceable class="parameter">table_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> </refsynopsisdiv> @@ -88,6 +89,16 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </listitem> </varlistentry> + <varlistentry> + <term><literal>FOR TABLE</literal></term> + <listitem> + <para> + Specifies a list of tables to add to the subscription. All tables listed in clause + must be present in publication. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term> <listitem> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f4d9e9daf7..6ec6b24eb1 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -904,6 +904,27 @@ CREATE VIEW pg_stat_progress_vacuum AS FROM pg_stat_get_progress_info('VACUUM') AS S LEFT JOIN pg_database D ON S.datid = D.oid; +CREATE VIEW pg_user_subscriptions AS + SELECT + S.oid, + S.subdbid, + S.subname AS subname, + CASE WHEN S.subowner = 0 THEN + 'public' + ELSE + A.rolname + END AS usename, + S.subenabled, + CASE WHEN (S.subowner <> 0 AND A.rolname = current_user) + OR (SELECT rolsuper FROM pg_authid WHERE rolname = current_user) + THEN S.subconninfo + ELSE NULL END AS subconninfo, + S.subslotname, + S.subsynccommit, + S.subpublications + FROM pg_subscription S + LEFT JOIN pg_authid A ON (A.oid = S.subowner); + CREATE VIEW pg_user_mappings AS SELECT U.oid AS umid, @@ -936,7 +957,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications) +GRANT SELECT (tableoid, oid, subdbid, subname, + subowner, subenabled, subslotname, subpublications, subsynccommit) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index a60a15193a..79e967f037 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -30,6 +30,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" @@ -322,6 +323,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; bool create_slot; List *publications; + AclResult aclresult; + + /* must have CREATE privilege on database */ + aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_DATABASE, + get_database_name(MyDatabaseId)); /* * Parse and check options. @@ -342,11 +350,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) if (create_slot) PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); - if (!superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - (errmsg("must be superuser to create subscriptions")))); - rel = table_open(SubscriptionRelationId, RowExclusiveLock); /* Check if name is used */ @@ -375,6 +378,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) /* Check the connection info string. */ walrcv_check_conninfo(conninfo); + walrcv_connstr_check(conninfo); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -411,6 +415,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); replorigin_create(originname); + + if (stmt->tables && !connect) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot create subscription with connect = false and FOR TABLE"))); + } /* * Connect to remote side to execute requested commands and fetch table * info. @@ -423,6 +434,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) List *tables; ListCell *lc; char table_state; + List *tablesiods = NIL; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -438,25 +450,59 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + walrcv_security_check(wrconn); /* * Get the table list from publisher and build local table status * info. */ tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) - { - RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; - - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - - AddSubscriptionRelState(subid, relid, table_state, + if (stmt->tables) + { + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, NoLock, true); + tablesiods = lappend_oid(tablesiods, relid); + } + foreach(lc, stmt->tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!list_member_oid(tablesiods, relid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not preset in publication", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + } + else + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); - } + } /* * If requested, create permanent slot for the subscription. We @@ -503,6 +549,242 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) return myself; } +static void +AlterSubscription_set_table(Subscription *sub, List *tables, bool copy_data) +{ + char *err; + List *pubrel_names; + List *subrel_states; + Oid *subrel_local_oids; + Oid *pubrel_local_oids; + Oid *stmt_local_oids; + ListCell *lc; + int off; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + + /* Get local table list. */ + subrel_states = GetSubscriptionRelations(sub->oid); + + /* + * Build qsorted array of local table oids for faster lookup. This can + * potentially contain all tables in the database so speed of lookup is + * important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + stmt_local_oids = palloc(list_length(tables) * sizeof(Oid)); + off = 0; + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + stmt_local_oids[off++] = relid; + } + qsort(stmt_local_oids, list_length(tables), + sizeof(Oid), oid_cmp); + + pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + off = 0; + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + pubrel_local_oids[off++] = relid; + } + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + + /* + * Walk over the remote tables and try to match them to locally known + * tables. If the table is not known locally create a new state for it. + * + * Also builds array of local oids of remote tables for the next step. + */ + + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp) && + bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } + } + + /* + * Next remove state for tables we should not care about anymore using the + * data we collected above + */ + + for (off = 0; off < list_length(subrel_states); off++) + { + Oid relid = subrel_local_oids[off]; + + if (!bsearch(&relid, stmt_local_oids, + list_length(tables), sizeof(Oid), oid_cmp)) + { + RemoveSubscriptionRel(sub->oid, relid); + + logicalrep_worker_stop_at_commit(sub->oid, relid); + + ereport(DEBUG1, + (errmsg("table \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name))); + } + } +} + +static void +AlterSubscription_drop_table(Subscription *sub, List *tables) +{ + List *subrel_states; + Oid *subrel_local_oids; + ListCell *lc; + int off; + + Assert(list_length(tables) > 0); + subrel_states = GetSubscriptionRelations(sub->oid); + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" does not present in subscription", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + } + else + { + RemoveSubscriptionRel(sub->oid, relid); + logicalrep_worker_stop_at_commit(sub->oid, relid); + } + + } +} + +static void +AlterSubscription_add_table(Subscription *sub, List *tables, bool copy_data) +{ + char *err; + List *pubrel_names; + ListCell *lc; + List *pubrels = NIL; + + Assert(list_length(tables) > 0); + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); + /* Get oids of rels in command */ + foreach(lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, NoLock, true); + pubrels = lappend_oid(pubrels, relid); + } + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + char table_state; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!list_member_oid(pubrels, relid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not preset in publication", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(sub->oid, relid, + table_state, + InvalidXLogRecPtr); + } +} + static void AlterSubscription_refresh(Subscription *sub, bool copy_data) { @@ -568,6 +850,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); + /* must be owner */ + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + + pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, @@ -625,6 +913,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool update_tuple = false; Subscription *sub; Form_pg_subscription form; + char *err = NULL; rel = table_open(SubscriptionRelationId, RowExclusiveLock); @@ -721,10 +1010,31 @@ AlterSubscription(AlterSubscriptionStmt *stmt) } case ALTER_SUBSCRIPTION_CONNECTION: - /* Load the library providing us libpq calls. */ - load_file("libpqwalreceiver", false); - /* Check the connection info string. */ - walrcv_check_conninfo(stmt->conninfo); + { + /* Load the library providing us libpq calls. */ + /* Check the connection info string. */ + load_file("libpqwalreceiver", false); + walrcv_check_conninfo(stmt->conninfo); + if (sub->enabled) + { + + wrconn = walrcv_connect(stmt->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + PG_TRY(); + { + walrcv_security_check(wrconn); + } + PG_CATCH(); + { + /* Close the connection in case of failure. */ + walrcv_disconnect(wrconn); + PG_RE_THROW(); + } + PG_END_TRY(); + } + } values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(stmt->conninfo); @@ -774,6 +1084,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, NULL, NULL); @@ -782,7 +1093,56 @@ AlterSubscription(AlterSubscriptionStmt *stmt) break; } + case ALTER_SUBSCRIPTION_ADD_TABLE: + { + bool copy_data; + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not allowed for disabled subscriptions"))); + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, ©_data, + NULL, NULL); + + AlterSubscription_add_table(sub, stmt->tables, copy_data); + + break; + } + case ALTER_SUBSCRIPTION_DROP_TABLE: + { + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions"))); + + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, + NULL, NULL); + + AlterSubscription_drop_table(sub, stmt->tables); + + break; + } + case ALTER_SUBSCRIPTION_SET_TABLE: + { + bool copy_data; + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions"))); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, ©_data, + NULL, NULL); + + AlterSubscription_set_table(sub, stmt->tables, copy_data); + + break; + } default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 3eb7e95d64..dad2528350 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4612,7 +4612,7 @@ _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from) COPY_STRING_FIELD(conninfo); COPY_NODE_FIELD(publication); COPY_NODE_FIELD(options); - + COPY_NODE_FIELD(tables); return newnode; } @@ -4625,6 +4625,7 @@ _copyAlterSubscriptionStmt(const AlterSubscriptionStmt *from) COPY_STRING_FIELD(subname); COPY_STRING_FIELD(conninfo); COPY_NODE_FIELD(publication); + COPY_NODE_FIELD(tables); COPY_NODE_FIELD(options); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 5c4fa7d077..8724feaa67 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2239,6 +2239,7 @@ _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a, COMPARE_STRING_FIELD(conninfo); COMPARE_NODE_FIELD(publication); COMPARE_NODE_FIELD(options); + COMPARE_NODE_FIELD(tables); return true; } @@ -2251,6 +2252,7 @@ _equalAlterSubscriptionStmt(const AlterSubscriptionStmt *a, COMPARE_STRING_FIELD(subname); COMPARE_STRING_FIELD(conninfo); COMPARE_NODE_FIELD(publication); + COMPARE_NODE_FIELD(tables); COMPARE_NODE_FIELD(options); return true; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index c1faf4152c..2e484bb44a 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -395,7 +395,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); execute_param_clause using_clause returning_clause opt_enum_val_list enum_val_list table_func_column_list create_generic_options alter_generic_options - relation_expr_list dostmt_opt_list + relation_expr_list remote_relation_expr_list dostmt_opt_list transform_element_list transform_type_list TriggerTransitions TriggerReferencing publication_name_list @@ -405,6 +405,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause %type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_subscription_for_tables subscription_for_tables %type <value> publication_name_item %type <list> opt_fdw_options fdw_options @@ -489,6 +490,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> table_ref %type <jexpr> joined_table %type <range> relation_expr +%type <range> remote_relation_expr %type <range> relation_expr_opt_alias %type <node> tablesample_clause opt_repeatable_clause %type <target> target_el set_target insert_column_item @@ -9517,17 +9519,33 @@ AlterPublicationStmt: *****************************************************************************/ CreateSubscriptionStmt: - CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition + CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_subscription_for_tables opt_definition { CreateSubscriptionStmt *n = makeNode(CreateSubscriptionStmt); n->subname = $3; n->conninfo = $5; n->publication = $7; - n->options = $8; + if ($8 != NULL) + { + /* FOR TABLE */ + n->tables = (List *)$8; + } + n->options = $9; $$ = (Node *)n; } ; +opt_subscription_for_tables: + subscription_for_tables { $$ = $1; } + | /* EMPTY */ { $$ = NULL; } + ; + +subscription_for_tables: + FOR TABLE remote_relation_expr_list + { + $$ = (Node *) $3; + } + ; publication_name_list: publication_name_item @@ -9607,6 +9625,37 @@ AlterSubscriptionStmt: (Node *)makeInteger(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name ADD_P TABLE remote_relation_expr_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ADD_TABLE; + n->subname = $3; + n->tables = $6; + n->options = $7; + n->tableAction = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name DROP TABLE remote_relation_expr_list + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_DROP_TABLE; + n->subname = $3; + n->tables = $6; + n->tableAction = DEFELEM_DROP; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name SET TABLE remote_relation_expr_list + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_SET_TABLE; + n->subname = $3; + n->tables = $6; + n->tableAction = DEFELEM_SET; + $$ = (Node *)n; + } ; /***************************************************************************** @@ -12046,6 +12095,23 @@ relation_expr_list: | relation_expr_list ',' relation_expr { $$ = lappend($1, $3); } ; +remote_relation_expr: + qualified_name + { + /* no inheritance */ + $$ = $1; + $$->inh = false; + $$->alias = NULL; + } + ; + + +remote_relation_expr_list: + remote_relation_expr { $$ = list_make1($1); } + | remote_relation_expr_list ',' remote_relation_expr { $$ = lappend($1, $3); } + ; + + /* * Given "UPDATE foo set set ...", we have to decide without looking any diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 7027737e67..71e1f0838e 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -52,6 +52,9 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo); +static void libpqrcv_connstr_check(const char *connstr); +static void libpqrcv_security_check(WalReceiverConn *conn); + static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); @@ -83,6 +86,8 @@ static void libpqrcv_disconnect(WalReceiverConn *conn); static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, libpqrcv_check_conninfo, + libpqrcv_connstr_check, + libpqrcv_security_check, libpqrcv_get_conninfo, libpqrcv_get_senderinfo, libpqrcv_identify_system, @@ -232,6 +237,54 @@ libpqrcv_check_conninfo(const char *conninfo) PQconninfoFree(opts); } +static void +libpqrcv_security_check(WalReceiverConn *conn) +{ + if (!superuser()) + { + if (!PQconnectionUsedPassword(conn->streamConn)) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superuser cannot connect if the server does not request a password."), + errhint("Target server's authentication method must be changed."))); + } +} + +static void +libpqrcv_connstr_check(const char *connstr) +{ + if (!superuser()) + { + PQconninfoOption *options; + PQconninfoOption *option; + bool connstr_gives_password = false; + + options = PQconninfoParse(connstr, NULL); + if (options) + { + for (option = options; option->keyword != NULL; option++) + { + if (strcmp(option->keyword, "password") == 0) + { + if (option->val != NULL && option->val[0] != '\0') + { + connstr_gives_password = true; + break; + } + } + } + PQconninfoFree(options); + } + + if (!connstr_gives_password) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the connection string."))); + } +} + /* * Return a user-displayable conninfo string. Any security-sensitive fields * are obfuscated. diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 1d918d2c42..cabb4f4730 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -77,6 +77,28 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) } } +/* + * Relcache invalidation callback for all relation map cache. + */ +void +logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid, uint32 hashvalue) +{ + LogicalRepRelMapEntry *entry; + /* invalidate all cache entries */ + if (LogicalRepRelMap == NULL) + return; + HASH_SEQ_STATUS status; + hash_seq_init(&status, LogicalRepRelMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + entry->localreloid = InvalidOid; + entry->state = SUBREL_STATE_UNKNOWN; + } +} + + + /* * Initialize the relation map cache. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2c49c711e3..71e2607030 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1741,6 +1741,9 @@ ApplyWorkerMain(Datum main_arg) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + logicalrep_relmap_invalidate_cb2, + (Datum) 0); /* Build logical replication streaming options. */ options.logical = true; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2b1a94733b..e9a3e43246 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4005,7 +4005,7 @@ getSubscriptions(Archive *fout) if (dopt->no_subscriptions || fout->remoteVersion < 100000) return; - if (!is_superuser(fout)) + if (!is_superuser(fout) && fout->remoteVersion < 120000) { int n; @@ -4024,17 +4024,32 @@ getSubscriptions(Archive *fout) query = createPQExpBuffer(); resetPQExpBuffer(query); - + if (is_superuser(fout) && fout->remoteVersion < 120000) + { + appendPQExpBuffer(query, + "SELECT s.tableoid, s.oid, s.subname," + "(%s s.subowner) AS rolname, " + " s.subconninfo, s.subslotname, s.subsynccommit, " + " s.subpublications " + "FROM pg_subscription s " + "WHERE s.subdbid = (SELECT oid FROM pg_database" + " WHERE datname = current_database())", + username_subquery); + } + else + { + appendPQExpBuffer(query, + "SELECT s.tableoid, s.oid, s.subname," + "(%s s.subowner) AS rolname, " + " us.subconninfo, s.subslotname, s.subsynccommit, " + " s.subpublications " + "FROM pg_subscription s join pg_user_subscriptions us ON (s.oid=us.oid) " + "WHERE s.subdbid = (SELECT oid FROM pg_database" + " WHERE datname = current_database())", + username_subquery); + } /* Get the subscriptions in current database. */ - appendPQExpBuffer(query, - "SELECT s.tableoid, s.oid, s.subname," - "(%s s.subowner) AS rolname, " - " s.subconninfo, s.subslotname, s.subsynccommit, " - " s.subpublications " - "FROM pg_subscription s " - "WHERE s.subdbid = (SELECT oid FROM pg_database" - " WHERE datname = current_database())", - username_subquery); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4ec8a83541..66f2401e85 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3478,6 +3478,7 @@ typedef struct CreateSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + List *tables; /* Optional list of tables to add */ } CreateSubscriptionStmt; typedef enum AlterSubscriptionType @@ -3486,7 +3487,10 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, - ALTER_SUBSCRIPTION_ENABLED + ALTER_SUBSCRIPTION_ENABLED, + ALTER_SUBSCRIPTION_DROP_TABLE, + ALTER_SUBSCRIPTION_ADD_TABLE, + ALTER_SUBSCRIPTION_SET_TABLE } AlterSubscriptionType; typedef struct AlterSubscriptionStmt @@ -3497,6 +3501,9 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ + List *tables; /* List of tables to add/drop */ + DefElemAction tableAction; /* What action to perform with the tables */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 85e0b6ea62..99bf9e8817 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -38,5 +38,7 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern char *logicalrep_typmap_gettypname(Oid remoteid); +void logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid, + uint32 hashvalue); #endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e04d725ff5..33658edecb 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -204,6 +204,8 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica const char *appname, char **err); typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); +typedef void (*walrcv_connstr_check_fn) (const char *connstr); +typedef void (*walrcv_security_check_fn) (WalReceiverConn *conn); typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, char **sender_host, @@ -237,6 +239,8 @@ typedef struct WalReceiverFunctionsType { walrcv_connect_fn walrcv_connect; walrcv_check_conninfo_fn walrcv_check_conninfo; + walrcv_connstr_check_fn walrcv_connstr_check; + walrcv_security_check_fn walrcv_security_check; walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; @@ -256,6 +260,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err) #define walrcv_check_conninfo(conninfo) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo) +#define walrcv_connstr_check(connstr) \ + WalReceiverFunctions->walrcv_connstr_check(connstr) +#define walrcv_security_check(conn) \ + WalReceiverFunctions->walrcv_security_check(conn) #define walrcv_get_conninfo(conn) \ WalReceiverFunctions->walrcv_get_conninfo(conn) #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e384cd2279..5caa8f9cfa 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2241,6 +2241,25 @@ pg_user_mappings| SELECT u.oid AS umid, FROM ((pg_user_mapping u JOIN pg_foreign_server s ON ((u.umserver = s.oid))) LEFT JOIN pg_authid a ON ((a.oid = u.umuser))); +pg_user_subscriptions| SELECT s.oid, + s.subdbid, + s.subname, + CASE + WHEN (s.subowner = (0)::oid) THEN 'public'::name + ELSE a.rolname + END AS usename, + s.subenabled, + CASE + WHEN (((s.subowner <> (0)::oid) AND (a.rolname = CURRENT_USER)) OR ( SELECT pg_authid.rolsuper + FROM pg_authid + WHERE (pg_authid.rolname = CURRENT_USER))) THEN s.subconninfo + ELSE NULL::text + END AS subconninfo, + s.subslotname, + s.subsynccommit, + s.subpublications + FROM (pg_subscription s + LEFT JOIN pg_authid a ON ((a.oid = s.subowner))); pg_views| SELECT n.nspname AS schemaname, c.relname AS viewname, pg_get_userbyid(c.relowner) AS viewowner, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 4fcbf7efe9..afc5177f10 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -40,11 +40,6 @@ SELECT obj_description(s.oid, 'pg_subscription') FROM pg_subscription s; -- fail - name already exists CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false); ERROR: subscription "testsub" already exists --- fail - must be superuser -SET SESSION AUTHORIZATION 'regress_subscription_user2'; -CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WITH (connect = false); -ERROR: must be superuser to create subscriptions -SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); ERROR: connect = false and copy_data = true are mutually exclusive options diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 36fa1bbac8..63eef1381e 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -33,11 +33,6 @@ SELECT obj_description(s.oid, 'pg_subscription') FROM pg_subscription s; -- fail - name already exists CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false); --- fail - must be superuser -SET SESSION AUTHORIZATION 'regress_subscription_user2'; -CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WITH (connect = false); -SET SESSION AUTHORIZATION 'regress_subscription_user'; - -- fail - invalid option combinations CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true); diff --git a/src/test/subscription/t/011_rep_changes_nonsuperuser.pl b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl new file mode 100644 index 0000000000..3acbb5663c --- /dev/null +++ b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl @@ -0,0 +1,316 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More; + +if ($windows_os) +{ + plan skip_all => "authentication tests cannot run on Windows"; +} +else +{ + plan tests => 18; +} + +sub reset_pg_hba +{ + my $node = shift; + my $hba_method = shift; + + unlink($node->data_dir . '/pg_hba.conf'); + $node->append_conf('pg_hba.conf', "local all normal $hba_method"); + $node->append_conf('pg_hba.conf', "local all all trust"); + $node->reload; + return; +} + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_subscriber->safe_psql('postgres', + "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';"); +$node_subscriber->safe_psql('postgres', + "GRANT CREATE ON DATABASE postgres TO normal;"); +$node_subscriber->safe_psql('postgres', + "ALTER ROLE normal WITH LOGIN;"); +reset_pg_hba($node_subscriber, 'trust'); + + +$node_publisher->safe_psql('postgres', + "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';"); +$node_publisher->safe_psql('postgres', + "ALTER ROLE normal WITH LOGIN; ALTER ROLE normal WITH SUPERUSER"); +reset_pg_hba($node_publisher, 'md5'); + + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_mixed (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)", extra_params => [ '-U', 'normal' ]); + +# different column count and order than on publisher +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_mixed (c text, b text, a int primary key)", extra_params => [ '-U', 'normal' ]); + +# replication of the table with included index +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +, extra_params => [ '-U', 'normal' ]); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_notrep" +); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr password=pass user=normal application_name=$appname' + PUBLICATION tap_pub, tap_pub_ins_only + FOR TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_ins", + extra_params => [ '-U', 'normal' ]); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); +is($result, qq(0), 'check non-replicated table is empty on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed VALUES (2, 'bar')"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_include SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_include WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT c, b, a FROM tab_mixed"); +is( $result, qq(|foo|1 +|bar|2), 'check replicated changes with different column order'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_include"); +is($result, qq(20|-20|-1), + 'check replicated changes with primary key index with included columns'); + +# insert some duplicate rows +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); + +# add REPLICA IDENTITY FULL so we can update +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); + +# and do the updates +$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(20|1|100), + 'update works with REPLICA IDENTITY FULL and duplicate tuples'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT x FROM tab_full2 ORDER BY 1"); +is( $result, qq(a +bb +bb), + 'update works with REPLICA IDENTITY FULL and text datums'); + +# check that change of connection string and/or publication list causes +# restart of subscription workers. Not all of these are registered as tests +# as we need to poll for a change but the test suite will fail none the less +# when something goes wrong. +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONNECTION 'application_name=$appname $publisher_connstr'" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); + +# Restart the publisher and check the state of the subscriber which +# should be in a streaming state after catching up. +$node_publisher->stop('fast'); +$node_publisher->start; + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1152|1|1100), + 'check replicated inserts after subscription publication change'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), + 'check changes skipped after subscription publication change'); + +# check alter publication (relcache invalidation etc) +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0"); + +$result = $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ADD TABLE tab_full WITH (copy_data = false)"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); + +$node_publisher->wait_for_catchup($appname); + +# note that data are different on provider and subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), + 'check replicated deletes after alter publication'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check drop table from subscription +$result = $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub DROP TABLE tab_full"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(-1)"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check restart on rename +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');