Hello! I started work on patch (draft attached). Draft has changes related only to `CREATE SUBSCRIPTION`. I also introduce a new status (DEFFERED) for tables in `FOR TABLE` clause (but not in publication). New column in pg_subscription (suballtables) will be used in `REFRESH` clause
09.11.2018, 15:24, "Evgeniy Efimkin" <efim...@yandex-team.ru>: > Hi! > In order to support create subscription from non-superuser, we need to make > it possible to choose tables on the subscriber side: > 1. add `FOR TABLE` clause in `CREATE SUBSCRIPTION`: > ``` > CREATE SUBSCRIPTION subscription_name > CONNECTION 'conninfo' > PUBLICATION publication_name [, ...] > [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]| FOR ALL TABLES ] > [ WITH ( subscription_parameter [= value] [, ... ] ) ] > ``` > ... where `FOR ALL TABLES` is only allowed for superuser. > and table list in `FOR TABLES` clause will be stored in > pg_subscription_rel table (maybe another place?) > > 2. Each subscription should have "all tables" attribute. > For example via a new column in pg_subscription "suballtables". > > 3. Add `ALTER SUBSCRIPTION (ADD TABLE | DROP TABLE)`: > ``` > ALTER SUBSCRIPTION subscription_name ADD TABLE [ ONLY ] table_name > [WITH copy_data]; > ALTER SUBSCRIPTION subscription_name DROP TABLE [ ONLY ] table_name; > ``` > 4. On `ALTER SUBSCRIPTION <name> REFRESH PUBLICATION` should check if > table owner equals subscription owner. The check is ommited if subscription > owner is superuser. > 5. If superuser calls `ALTER SUBSCRIPTION REFRESH PUBLICATION` on > subscription with table list and non-superuser owner, we should filter tables > which owner is not subscription's owner or maybe we need to raise error? > > What do you think about it? Any objections? > > 07.11.2018, 00:52, "Stephen Frost" <sfr...@snowman.net>: >> Greetings, >> >> * Evgeniy Efimkin (efim...@yandex-team.ru) wrote: >>> As a first step I suggest we allow CREATE SUBSCRIPTION for table owner >>> only. >> >> That's a nice idea but seems like we would want to have a way to filter >> what tables a subscription follows then..? Just failing if the >> publication publishes tables that we don't have access to or are not the >> owner of seems like a poor solution.. >> >> Thanks! >> >> Stephen > > -------- > Ефимкин Евгений -------- Ефимкин Евгений
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f138e61a8d..5452bd6a55 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -29,6 +29,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" @@ -321,6 +322,14 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; bool create_slot; List *publications; + AclResult aclresult; + bool alltables; + + /* must have CREATE privilege on database */ + aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_DATABASE, + get_database_name(MyDatabaseId)); /* * Parse and check options. @@ -340,11 +349,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ if (create_slot) PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); - - if (!superuser()) + alltables = !stmt->tables || !stmt->for_all_tables; + /* FOR ALL TABLES requires superuser */ + if (alltables && !superuser()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - (errmsg("must be superuser to create subscriptions")))); + (errmsg("must be superuser to create FOR ALL TABLES publication")))); + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -384,6 +395,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_suballtables - 1] = BoolGetDatum(alltables); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -407,6 +419,27 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); replorigin_create(originname); + + if (stmt->tables&&!connect) + { + ListCell *lc; + char table_state; + foreach(lc, stmt->tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + relid = RangeVarGetRelid(rv, AccessShareLock, false); + /* must be owner */ + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + table_state = SUBREL_STATE_DEFFER; + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + } /* * Connect to remote side to execute requested commands and fetch table * info. @@ -419,6 +452,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) List *tables; ListCell *lc; char table_state; + List *tablesiods = NIL; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -442,17 +476,46 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; + Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - - AddSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + relid = RangeVarGetRelid(rv, NoLock, true); + tablesiods = lappend_oid(tablesiods, relid); } + if (stmt->tables) + foreach(lc, stmt->tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!list_member_oid(tablesiods, relid)) + table_state = SUBREL_STATE_DEFFER; + else + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + else + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } /* * If requested, create permanent slot for the subscription. We @@ -1103,7 +1166,6 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) heap_close(rel, RowExclusiveLock); } - /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index db49968409..2e3a9e156d 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4612,6 +4612,8 @@ _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from) COPY_STRING_FIELD(conninfo); COPY_NODE_FIELD(publication); COPY_NODE_FIELD(options); + COPY_NODE_FIELD(tables); + COPY_SCALAR_FIELD(for_all_tables); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 3a084b4d1f..c047830f90 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2238,6 +2238,8 @@ _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a, COMPARE_STRING_FIELD(conninfo); COMPARE_NODE_FIELD(publication); COMPARE_NODE_FIELD(options); + COMPARE_NODE_FIELD(tables); + COMPARE_SCALAR_FIELD(for_all_tables); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2effd51135..16b79f8d8c 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -405,6 +405,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause %type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_subscription_for_tables subscription_for_tables %type <value> publication_name_item %type <list> opt_fdw_options fdw_options @@ -9587,7 +9588,7 @@ AlterPublicationStmt: *****************************************************************************/ CreateSubscriptionStmt: - CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition + CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition opt_subscription_for_tables { CreateSubscriptionStmt *n = makeNode(CreateSubscriptionStmt); @@ -9595,9 +9596,33 @@ CreateSubscriptionStmt: n->conninfo = $5; n->publication = $7; n->options = $8; + if ($9 != NULL) + { + /* FOR TABLE */ + if (IsA($9, List)) + n->tables = (List *)$9; + /* FOR ALL TABLES */ + else + n->for_all_tables = true; + } $$ = (Node *)n; } ; +opt_subscription_for_tables: + subscription_for_tables { $$ = $1; } + | /* EMPTY */ { $$ = NULL; } + ; + +subscription_for_tables: + FOR TABLE relation_expr_list + { + $$ = (Node *) $3; + } + | FOR ALL TABLES + { + $$ = (Node *) makeInteger(true); + } + ; publication_name_list: publication_name_item diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6e420d893c..64017a9ba1 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -409,7 +409,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) foreach(lc, table_states) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); - + if (rstate->state == SUBREL_STATE_DEFFER) + continue; if (rstate->state == SUBREL_STATE_SYNCDONE) { /* diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e4dc771cf5..2c4ec9b506 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -45,6 +45,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool suballtables; #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ @@ -75,6 +76,7 @@ typedef struct Subscription char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ + bool alltables; } Subscription; extern Subscription *GetSubscription(Oid subid, bool missing_ok); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 556cb94841..43bb863e9b 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -47,6 +47,7 @@ typedef FormData_pg_subscription_rel *Form_pg_subscription_rel; * ---------------- */ #define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */ +#define SUBREL_STATE_DEFFER 'f' /* deffered (sublsn NULL) */ #define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn * NULL) */ #define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 9da8bf2f88..cd41defeea 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3477,6 +3477,8 @@ typedef struct CreateSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + List *tables; /* Optional list of tables to add */ + bool for_all_tables; /* Special publication for all tables in db */ } CreateSubscriptionStmt; typedef enum AlterSubscriptionType