Hello! I wrote some tests(it's just 01_rep_changes.pl but for non superuser) and fix `DROP TABLE` from subscription. Now old and new tests pass.
22.11.2018, 16:23, "Evgeniy Efimkin" <efim...@yandex-team.ru>: > Hello! > New draft attached with filtering table in subscription (ADD/DROP) and allow > non-superusers use` CREATE SUBSCRIPTION` for own tables. > > 14.11.2018, 18:10, "Evgeniy Efimkin" <efim...@yandex-team.ru>: >> Hello! >> I started work on patch (draft attached). Draft has changes related only to >> `CREATE SUBSCRIPTION`. >> I also introduce a new status (DEFFERED) for tables in `FOR TABLE` clause >> (but not in publication). >> New column in pg_subscription (suballtables) will be used in `REFRESH` >> clause >> >> 09.11.2018, 15:24, "Evgeniy Efimkin" <efim...@yandex-team.ru>: >>> Hi! >>> In order to support create subscription from non-superuser, we need to >>> make it possible to choose tables on the subscriber side: >>> 1. add `FOR TABLE` clause in `CREATE SUBSCRIPTION`: >>> ``` >>> CREATE SUBSCRIPTION subscription_name >>> CONNECTION 'conninfo' >>> PUBLICATION publication_name [, ...] >>> [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]| FOR ALL TABLES >>> ] >>> [ WITH ( subscription_parameter [= value] [, ... ] ) ] >>> ``` >>> ... where `FOR ALL TABLES` is only allowed for superuser. >>> and table list in `FOR TABLES` clause will be stored in >>> pg_subscription_rel table (maybe another place?) >>> >>> 2. Each subscription should have "all tables" attribute. >>> For example via a new column in pg_subscription "suballtables". >>> >>> 3. Add `ALTER SUBSCRIPTION (ADD TABLE | DROP TABLE)`: >>> ``` >>> ALTER SUBSCRIPTION subscription_name ADD TABLE [ ONLY ] >>> table_name [WITH copy_data]; >>> ALTER SUBSCRIPTION subscription_name DROP TABLE [ ONLY ] >>> table_name; >>> ``` >>> 4. On `ALTER SUBSCRIPTION <name> REFRESH PUBLICATION` should check if >>> table owner equals subscription owner. The check is ommited if subscription >>> owner is superuser. >>> 5. If superuser calls `ALTER SUBSCRIPTION REFRESH PUBLICATION` on >>> subscription with table list and non-superuser owner, we should filter >>> tables which owner is not subscription's owner or maybe we need to raise >>> error? >>> >>> What do you think about it? Any objections? >>> >>> 07.11.2018, 00:52, "Stephen Frost" <sfr...@snowman.net>: >>>> Greetings, >>>> >>>> * Evgeniy Efimkin (efim...@yandex-team.ru) wrote: >>>>> As a first step I suggest we allow CREATE SUBSCRIPTION for table >>>>> owner only. >>>> >>>> That's a nice idea but seems like we would want to have a way to filter >>>> what tables a subscription follows then..? Just failing if the >>>> publication publishes tables that we don't have access to or are not the >>>> owner of seems like a poor solution.. >>>> >>>> Thanks! >>>> >>>> Stephen >>> >>> -------- >>> Ефимкин Евгений >> >> -------- >> Ефимкин Евгений > > -------- > Ефимкин Евгений -------- Ефимкин Евгений
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index e136aa6a0b..5d7841f296 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->alltables = subform->suballtables; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9021463a4c..e7024d0804 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -30,6 +30,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" @@ -322,6 +323,21 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; bool create_slot; List *publications; + AclResult aclresult; + bool alltables; + + alltables = !stmt->tables; + /* FOR ALL TABLES requires superuser */ + if (alltables && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to create FOR ALL TABLES subscriptions")))); + + /* must have CREATE privilege on database */ + aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_DATABASE, + get_database_name(MyDatabaseId)); /* * Parse and check options. @@ -342,11 +358,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) if (create_slot) PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); - if (!superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - (errmsg("must be superuser to create subscriptions")))); - rel = heap_open(SubscriptionRelationId, RowExclusiveLock); /* Check if name is used */ @@ -375,6 +386,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) /* Check the connection info string. */ walrcv_check_conninfo(conninfo); + walrcv_connstr_check(conninfo); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -388,6 +400,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_suballtables - 1] = BoolGetDatum(alltables); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -411,6 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); replorigin_create(originname); + + if (stmt->tables&&!connect) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot create subscription with connect = false and FOR TABLE"))); + } /* * Connect to remote side to execute requested commands and fetch table * info. @@ -423,6 +443,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) List *tables; ListCell *lc; char table_state; + List *tablesiods = NIL; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -438,6 +459,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + walrcv_security_check(wrconn); /* * Get the table list from publisher and build local table status * info. @@ -446,17 +468,48 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; + Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - - AddSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + relid = RangeVarGetRelid(rv, NoLock, true); + tablesiods = lappend_oid(tablesiods, relid); } + if (stmt->tables) + foreach(lc, stmt->tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!list_member_oid(tablesiods, relid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not preset in publication", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + else + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } /* * If requested, create permanent slot for the subscription. We @@ -503,6 +556,87 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) return myself; } +static void +AlterSubscription_drop_table(Subscription *sub, List *tables) +{ + ListCell *lc; + + + Assert(list_length(tables) > 0); + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, NoLock, false); + RemoveSubscriptionRel(sub->oid, relid); + + logicalrep_worker_stop_at_commit(sub->oid, relid); + } +} + +static void +AlterSubscription_add_table(Subscription *sub, List *tables, bool copy_data) +{ + char *err; + List *pubrel_names; + ListCell *lc; + List *pubrels = NIL; + + + Assert(list_length(tables) > 0); + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); + /* Get oids of rels in command */ + foreach(lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, NoLock, true); + pubrels = lappend_oid(pubrels, relid); + } + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + char table_state; + + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!list_member_oid(pubrels, relid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not preset in publication", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(sub->oid, relid, + table_state, + InvalidXLogRecPtr); + } +} + static void AlterSubscription_refresh(Subscription *sub, bool copy_data) { @@ -724,6 +858,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); /* Check the connection info string. */ + walrcv_check_conninfo(stmt->conninfo); values[Anum_pg_subscription_subconninfo - 1] = @@ -773,6 +908,12 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + if (!sub->alltables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for FOR TABLE subscriptions"), + errhint("Use ALTER SUBSCRIPTION ADD/DROP TABLE ..."))); + parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, @@ -782,7 +923,51 @@ AlterSubscription(AlterSubscriptionStmt *stmt) break; } + case ALTER_SUBSCRIPTION_ADD_TABLE: + { + bool copy_data; + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not allowed for disabled subscriptions"))); + + if (sub->alltables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not allowed for FOR ALL TABLES subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION"))); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, ©_data, + NULL, NULL); + + AlterSubscription_add_table(sub, stmt->tables, copy_data); + + break; + } + case ALTER_SUBSCRIPTION_DROP_TABLE: + { + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions"))); + + if (sub->alltables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for FOR ALL TABLES subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION"))); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, + NULL, NULL); + + AlterSubscription_drop_table(sub, stmt->tables); + + break; + } default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index db49968409..b929c26adc 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4612,6 +4612,8 @@ _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from) COPY_STRING_FIELD(conninfo); COPY_NODE_FIELD(publication); COPY_NODE_FIELD(options); + COPY_NODE_FIELD(tables); + COPY_SCALAR_FIELD(for_all_tables); return newnode; } @@ -4625,6 +4627,7 @@ _copyAlterSubscriptionStmt(const AlterSubscriptionStmt *from) COPY_STRING_FIELD(subname); COPY_STRING_FIELD(conninfo); COPY_NODE_FIELD(publication); + COPY_NODE_FIELD(tables); COPY_NODE_FIELD(options); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 3a084b4d1f..1082918ff1 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2238,6 +2238,8 @@ _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a, COMPARE_STRING_FIELD(conninfo); COMPARE_NODE_FIELD(publication); COMPARE_NODE_FIELD(options); + COMPARE_NODE_FIELD(tables); + COMPARE_SCALAR_FIELD(for_all_tables); return true; } @@ -2250,6 +2252,7 @@ _equalAlterSubscriptionStmt(const AlterSubscriptionStmt *a, COMPARE_STRING_FIELD(subname); COMPARE_STRING_FIELD(conninfo); COMPARE_NODE_FIELD(publication); + COMPARE_NODE_FIELD(tables); COMPARE_NODE_FIELD(options); return true; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2c2208ffb7..54351a85f0 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -405,6 +405,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause %type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_subscription_for_tables subscription_for_tables %type <value> publication_name_item %type <list> opt_fdw_options fdw_options @@ -9565,7 +9566,7 @@ AlterPublicationStmt: *****************************************************************************/ CreateSubscriptionStmt: - CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition + CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition opt_subscription_for_tables { CreateSubscriptionStmt *n = makeNode(CreateSubscriptionStmt); @@ -9573,9 +9574,33 @@ CreateSubscriptionStmt: n->conninfo = $5; n->publication = $7; n->options = $8; + if ($9 != NULL) + { + /* FOR TABLE */ + if (IsA($9, List)) + n->tables = (List *)$9; + /* FOR ALL TABLES */ + else + n->for_all_tables = true; + } $$ = (Node *)n; } ; +opt_subscription_for_tables: + subscription_for_tables { $$ = $1; } + | /* EMPTY */ { $$ = NULL; } + ; + +subscription_for_tables: + FOR TABLE relation_expr_list + { + $$ = (Node *) $3; + } + | FOR ALL TABLES + { + $$ = (Node *) makeInteger(true); + } + ; publication_name_list: publication_name_item @@ -9655,6 +9680,27 @@ AlterSubscriptionStmt: (Node *)makeInteger(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name ADD_P TABLE relation_expr_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ADD_TABLE; + n->subname = $3; + n->tables = $6; + n->options = $7; + n->tableAction = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name DROP TABLE relation_expr_list + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_DROP_TABLE; + n->subname = $3; + n->tables = $6; + n->tableAction = DEFELEM_DROP; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9b75711ebd..49c5b68858 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -52,6 +52,9 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo); +static void libpqrcv_connstr_check(const char *connstr); +static void libpqrcv_security_check(WalReceiverConn *conn); + static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); @@ -83,6 +86,8 @@ static void libpqrcv_disconnect(WalReceiverConn *conn); static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, libpqrcv_check_conninfo, + libpqrcv_connstr_check, + libpqrcv_security_check, libpqrcv_get_conninfo, libpqrcv_get_senderinfo, libpqrcv_identify_system, @@ -232,6 +237,54 @@ libpqrcv_check_conninfo(const char *conninfo) PQconninfoFree(opts); } +static void +libpqrcv_security_check(WalReceiverConn *conn) +{ + if (!superuser()) + { + if (!PQconnectionUsedPassword(conn->streamConn)) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superuser cannot connect if the server does not request a password."), + errhint("Target server's authentication method must be changed."))); + } +} + +static void +libpqrcv_connstr_check(const char *connstr) +{ + if (!superuser()) + { + PQconninfoOption *options; + PQconninfoOption *option; + bool connstr_gives_password = false; + + options = PQconninfoParse(connstr, NULL); + if (options) + { + for (option = options; option->keyword != NULL; option++) + { + if (strcmp(option->keyword, "password") == 0) + { + if (option->val != NULL && option->val[0] != '\0') + { + connstr_gives_password = true; + break; + } + } + } + PQconninfoFree(options); + } + + if (!connstr_gives_password) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the connection string."))); + } +} + /* * Return a user-displayable conninfo string. Any security-sensitive fields * are obfuscated. diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 1f20df5680..d4c14e3e17 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -77,6 +77,28 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) } } +/* + * Relcache invalidation callback for all relation map cache. + */ +void +logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid, uint32 hashvalue) +{ + LogicalRepRelMapEntry *entry; + /* invalidate all cache entries */ + if (LogicalRepRelMap == NULL) + return; + HASH_SEQ_STATUS status; + hash_seq_init(&status, LogicalRepRelMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + entry->localreloid = InvalidOid; + entry->state = SUBREL_STATE_UNKNOWN; + } +} + + + /* * Initialize the relation map cache. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8d5e0946c4..465c36632a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1741,6 +1741,9 @@ ApplyWorkerMain(Datum main_arg) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + logicalrep_relmap_invalidate_cb2, + (Datum) 0); /* Build logical replication streaming options. */ options.logical = true; diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 4298c3cbf2..3534459bd6 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -47,6 +47,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool suballtables; #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ @@ -77,6 +78,7 @@ typedef struct Subscription char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ + bool alltables; } Subscription; extern Subscription *GetSubscription(Oid subid, bool missing_ok); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e5bdc1cec5..982d51f48e 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3475,6 +3475,8 @@ typedef struct CreateSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + List *tables; /* Optional list of tables to add */ + bool for_all_tables; /* Special subscription for all tables in publication */ } CreateSubscriptionStmt; typedef enum AlterSubscriptionType @@ -3483,7 +3485,9 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, - ALTER_SUBSCRIPTION_ENABLED + ALTER_SUBSCRIPTION_ENABLED, + ALTER_SUBSCRIPTION_DROP_TABLE, + ALTER_SUBSCRIPTION_ADD_TABLE } AlterSubscriptionType; typedef struct AlterSubscriptionStmt @@ -3494,6 +3498,10 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ + List *tables; /* List of tables to add/drop */ + bool for_all_tables; /* Special publication for all tables in db */ + DefElemAction tableAction; /* What action to perform with the tables */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 73e4805827..4fb95c1d03 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -38,5 +38,7 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern char *logicalrep_typmap_gettypname(Oid remoteid); +void logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid, + uint32 hashvalue); #endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 5913b580c2..fd7c710547 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -204,6 +204,8 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica const char *appname, char **err); typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); +typedef void (*walrcv_connstr_check_fn) (const char *connstr); +typedef void (*walrcv_security_check_fn) (WalReceiverConn *conn); typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, char **sender_host, @@ -237,6 +239,8 @@ typedef struct WalReceiverFunctionsType { walrcv_connect_fn walrcv_connect; walrcv_check_conninfo_fn walrcv_check_conninfo; + walrcv_connstr_check_fn walrcv_connstr_check; + walrcv_security_check_fn walrcv_security_check; walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; @@ -256,6 +260,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err) #define walrcv_check_conninfo(conninfo) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo) +#define walrcv_connstr_check(connstr) \ + WalReceiverFunctions->walrcv_connstr_check(connstr) +#define walrcv_security_check(conn) \ + WalReceiverFunctions->walrcv_security_check(conn) #define walrcv_get_conninfo(conn) \ WalReceiverFunctions->walrcv_get_conninfo(conn) #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 4fcbf7efe9..70e36b4fd7 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -43,7 +43,7 @@ ERROR: subscription "testsub" already exists -- fail - must be superuser SET SESSION AUTHORIZATION 'regress_subscription_user2'; CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WITH (connect = false); -ERROR: must be superuser to create subscriptions +ERROR: must be superuser to create FOR ALL TABLES subscriptions SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); diff --git a/src/test/subscription/t/011_rep_changes_nonsuperuser.pl b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl new file mode 100644 index 0000000000..3acbb5663c --- /dev/null +++ b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl @@ -0,0 +1,316 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More; + +if ($windows_os) +{ + plan skip_all => "authentication tests cannot run on Windows"; +} +else +{ + plan tests => 18; +} + +sub reset_pg_hba +{ + my $node = shift; + my $hba_method = shift; + + unlink($node->data_dir . '/pg_hba.conf'); + $node->append_conf('pg_hba.conf', "local all normal $hba_method"); + $node->append_conf('pg_hba.conf', "local all all trust"); + $node->reload; + return; +} + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_subscriber->safe_psql('postgres', + "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';"); +$node_subscriber->safe_psql('postgres', + "GRANT CREATE ON DATABASE postgres TO normal;"); +$node_subscriber->safe_psql('postgres', + "ALTER ROLE normal WITH LOGIN;"); +reset_pg_hba($node_subscriber, 'trust'); + + +$node_publisher->safe_psql('postgres', + "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';"); +$node_publisher->safe_psql('postgres', + "ALTER ROLE normal WITH LOGIN; ALTER ROLE normal WITH SUPERUSER"); +reset_pg_hba($node_publisher, 'md5'); + + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_mixed (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)", extra_params => [ '-U', 'normal' ]); + +# different column count and order than on publisher +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_mixed (c text, b text, a int primary key)", extra_params => [ '-U', 'normal' ]); + +# replication of the table with included index +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +, extra_params => [ '-U', 'normal' ]); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_notrep" +); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr password=pass user=normal application_name=$appname' + PUBLICATION tap_pub, tap_pub_ins_only + FOR TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_ins", + extra_params => [ '-U', 'normal' ]); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); +is($result, qq(0), 'check non-replicated table is empty on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed VALUES (2, 'bar')"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_include SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_include WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT c, b, a FROM tab_mixed"); +is( $result, qq(|foo|1 +|bar|2), 'check replicated changes with different column order'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_include"); +is($result, qq(20|-20|-1), + 'check replicated changes with primary key index with included columns'); + +# insert some duplicate rows +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); + +# add REPLICA IDENTITY FULL so we can update +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); + +# and do the updates +$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(20|1|100), + 'update works with REPLICA IDENTITY FULL and duplicate tuples'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT x FROM tab_full2 ORDER BY 1"); +is( $result, qq(a +bb +bb), + 'update works with REPLICA IDENTITY FULL and text datums'); + +# check that change of connection string and/or publication list causes +# restart of subscription workers. Not all of these are registered as tests +# as we need to poll for a change but the test suite will fail none the less +# when something goes wrong. +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONNECTION 'application_name=$appname $publisher_connstr'" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); + +# Restart the publisher and check the state of the subscriber which +# should be in a streaming state after catching up. +$node_publisher->stop('fast'); +$node_publisher->start; + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1152|1|1100), + 'check replicated inserts after subscription publication change'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), + 'check changes skipped after subscription publication change'); + +# check alter publication (relcache invalidation etc) +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0"); + +$result = $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ADD TABLE tab_full WITH (copy_data = false)"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); + +$node_publisher->wait_for_catchup($appname); + +# note that data are different on provider and subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), + 'check replicated deletes after alter publication'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check drop table from subscription +$result = $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub DROP TABLE tab_full"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(-1)"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check restart on rename +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');