Hello!
I started work on patch (draft attached). Draft has changes related only to 
`CREATE SUBSCRIPTION`. 
I also introduce a new status (DEFFERED)  for tables in `FOR TABLE` clause (but 
not in publication).
New column in pg_subscription (suballtables) will be used in `REFRESH` clause

09.11.2018, 15:24, "Evgeniy Efimkin" <efim...@yandex-team.ru>:
> Hi!
> In order to support create subscription from non-superuser, we need to make 
> it possible to choose tables on the subscriber side:
>     1. add `FOR TABLE` clause in `CREATE SUBSCRIPTION`:
>        ```
>         CREATE SUBSCRIPTION subscription_name
>             CONNECTION 'conninfo'
>             PUBLICATION publication_name [, ...]
>             [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]| FOR ALL TABLES ]
>             [ WITH ( subscription_parameter [= value] [, ... ] ) ]
>        ```
>        ... where `FOR ALL TABLES` is only allowed for superuser.
>        and table list in `FOR TABLES` clause will be stored in 
> pg_subscription_rel table (maybe another place?)
>
>     2. Each subscription should have "all tables" attribute.
>        For example via a new column in pg_subscription "suballtables".
>
>     3. Add `ALTER SUBSCRIPTION (ADD TABLE | DROP TABLE)`:
>        ```
>         ALTER SUBSCRIPTION subscription_name ADD TABLE [ ONLY ] table_name 
> [WITH copy_data];
>         ALTER SUBSCRIPTION subscription_name DROP TABLE [ ONLY ] table_name;
>        ```
>     4. On `ALTER SUBSCRIPTION <name> REFRESH PUBLICATION` should check if 
> table owner equals subscription owner. The check is ommited if subscription 
> owner is superuser.
>     5. If superuser calls `ALTER SUBSCRIPTION REFRESH PUBLICATION` on 
> subscription with table list and non-superuser owner, we should filter tables 
> which owner is not subscription's owner or maybe we need to raise error?
>
> What do you think about it? Any objections?
>
> 07.11.2018, 00:52, "Stephen Frost" <sfr...@snowman.net>:
>>  Greetings,
>>
>>  * Evgeniy Efimkin (efim...@yandex-team.ru) wrote:
>>>   As a first step I suggest we allow CREATE SUBSCRIPTION for table owner 
>>> only.
>>
>>  That's a nice idea but seems like we would want to have a way to filter
>>  what tables a subscription follows then..? Just failing if the
>>  publication publishes tables that we don't have access to or are not the
>>  owner of seems like a poor solution..
>>
>>  Thanks!
>>
>>  Stephen
>
> --------
> Ефимкин Евгений

-------- 
Ефимкин Евгений
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f138e61a8d..5452bd6a55 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -29,6 +29,7 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 
+#include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
@@ -321,6 +322,14 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
 	List	   *publications;
+	AclResult	aclresult;
+	bool 		alltables;
+
+	/* must have CREATE privilege on database */
+	aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_DATABASE,
+					   get_database_name(MyDatabaseId));
 
 	/*
 	 * Parse and check options.
@@ -340,11 +349,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	 */
 	if (create_slot)
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
-
-	if (!superuser())
+	alltables = !stmt->tables || !stmt->for_all_tables;
+	/* FOR ALL TABLES requires superuser */
+	if (alltables && !superuser())
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 (errmsg("must be superuser to create subscriptions"))));
+				 (errmsg("must be superuser to create FOR ALL TABLES publication"))));
+
 
 	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -384,6 +395,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_suballtables - 1] = BoolGetDatum(alltables);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (slotname)
@@ -407,6 +419,27 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	replorigin_create(originname);
 
+
+	if (stmt->tables&&!connect)
+	{
+		ListCell   *lc;
+		char		table_state;
+		foreach(lc, stmt->tables)
+		{
+			RangeVar   *rv = (RangeVar *) lfirst(lc);
+			Oid			relid;
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
+			/* must be owner */
+			if (!pg_class_ownercheck(relid, GetUserId()))
+					aclcheck_error(ACLCHECK_NOT_OWNER,
+						get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+								rv->schemaname, rv->relname);
+			table_state = SUBREL_STATE_DEFFER;
+			AddSubscriptionRelState(subid, relid, table_state,
+											InvalidXLogRecPtr);
+		}
+	}
 	/*
 	 * Connect to remote side to execute requested commands and fetch table
 	 * info.
@@ -419,6 +452,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		List	   *tables;
 		ListCell   *lc;
 		char		table_state;
+		List *tablesiods = NIL;
 
 		/* Try to connect to the publisher. */
 		wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
@@ -442,17 +476,46 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			foreach(lc, tables)
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
-				Oid			relid;
+				Oid         		relid;
 
-				relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-				/* Check for supported relkind. */
-				CheckSubscriptionRelkind(get_rel_relkind(relid),
-										 rv->schemaname, rv->relname);
-
-				AddSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr);
+				relid = RangeVarGetRelid(rv, NoLock, true);
+				tablesiods = lappend_oid(tablesiods, relid);
 			}
+			if (stmt->tables)
+				foreach(lc, stmt->tables)
+				{
+					RangeVar   *rv = (RangeVar *) lfirst(lc);
+					Oid                     relid;
+
+					relid = RangeVarGetRelid(rv, AccessShareLock, false);
+					if (!pg_class_ownercheck(relid, GetUserId()))
+					aclcheck_error(ACLCHECK_NOT_OWNER,
+						get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+					CheckSubscriptionRelkind(get_rel_relkind(relid),
+											rv->schemaname, rv->relname);
+					if (!list_member_oid(tablesiods, relid))
+						table_state = SUBREL_STATE_DEFFER;
+					else
+						table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+					AddSubscriptionRelState(subid, relid, table_state,
+											InvalidXLogRecPtr);
+				}
+			else
+				foreach(lc, tables)
+				{
+					RangeVar   *rv = (RangeVar *) lfirst(lc);
+					Oid                     relid;
+
+					relid = RangeVarGetRelid(rv, AccessShareLock, false);
+					if (!pg_class_ownercheck(relid, GetUserId()))
+						aclcheck_error(ACLCHECK_NOT_OWNER,
+						get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
+					CheckSubscriptionRelkind(get_rel_relkind(relid),
+											rv->schemaname, rv->relname);
+					table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+					AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
+				}
 
 			/*
 			 * If requested, create permanent slot for the subscription. We
@@ -1103,7 +1166,6 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 
 	heap_close(rel, RowExclusiveLock);
 }
-
 /*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index db49968409..2e3a9e156d 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4612,6 +4612,8 @@ _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from)
 	COPY_STRING_FIELD(conninfo);
 	COPY_NODE_FIELD(publication);
 	COPY_NODE_FIELD(options);
+	COPY_NODE_FIELD(tables);
+	COPY_SCALAR_FIELD(for_all_tables);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3a084b4d1f..c047830f90 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2238,6 +2238,8 @@ _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a,
 	COMPARE_STRING_FIELD(conninfo);
 	COMPARE_NODE_FIELD(publication);
 	COMPARE_NODE_FIELD(options);
+	COMPARE_NODE_FIELD(tables);
+	COMPARE_SCALAR_FIELD(for_all_tables);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 2effd51135..16b79f8d8c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -405,6 +405,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
 %type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_subscription_for_tables subscription_for_tables
 %type <value>	publication_name_item
 
 %type <list>	opt_fdw_options fdw_options
@@ -9587,7 +9588,7 @@ AlterPublicationStmt:
  *****************************************************************************/
 
 CreateSubscriptionStmt:
-			CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition
+			CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition opt_subscription_for_tables
 				{
 					CreateSubscriptionStmt *n =
 						makeNode(CreateSubscriptionStmt);
@@ -9595,9 +9596,33 @@ CreateSubscriptionStmt:
 					n->conninfo = $5;
 					n->publication = $7;
 					n->options = $8;
+					if ($9 != NULL)
+					{
+						/* FOR TABLE */
+						if (IsA($9, List))
+							n->tables = (List *)$9;
+						/* FOR ALL TABLES */
+						else
+							n->for_all_tables = true;
+					}
 					$$ = (Node *)n;
 				}
 		;
+opt_subscription_for_tables:
+			subscription_for_tables					{ $$ = $1; }
+			| /* EMPTY */							{ $$ = NULL; }
+		;
+
+subscription_for_tables:
+			FOR TABLE relation_expr_list
+				{
+					$$ = (Node *) $3;
+				}
+			| FOR ALL TABLES
+				{
+					$$ = (Node *) makeInteger(true);
+				}
+		;
 
 publication_name_list:
 			publication_name_item
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6e420d893c..64017a9ba1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -409,7 +409,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	foreach(lc, table_states)
 	{
 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
-
+		if (rstate->state == SUBREL_STATE_DEFFER)
+			continue;
 		if (rstate->state == SUBREL_STATE_SYNCDONE)
 		{
 			/*
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index e4dc771cf5..2c4ec9b506 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -45,6 +45,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
+	bool            suballtables;
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
@@ -75,6 +76,7 @@ typedef struct Subscription
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
 	List	   *publications;	/* List of publication names to subscribe to */
+	bool		alltables;
 } Subscription;
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 556cb94841..43bb863e9b 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -47,6 +47,7 @@ typedef FormData_pg_subscription_rel *Form_pg_subscription_rel;
  * ----------------
  */
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
+#define SUBREL_STATE_DEFFER		'f' /* deffered (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 9da8bf2f88..cd41defeea 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3477,6 +3477,8 @@ typedef struct CreateSubscriptionStmt
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
+	List	   *tables;			/* Optional list of tables to add */
+	bool	   for_all_tables; /* Special publication for all tables in db */
 } CreateSubscriptionStmt;
 
 typedef enum AlterSubscriptionType

Reply via email to