On Sat, 18 Jan 2025 at 14:29, Sergey Tatarintsev
<s.tatarint...@postgrespro.ru> wrote:
>
> I think we must take into account whole inheritance tree of partitioned table.
>
> For example:
>
> node_A:
> CREATE TABLE t(id int);
> CREATE PUBLICATION pub_b FOR TABLE t;
>
> node_A:
> CREATE TABLE t(id int) PARTITION BY RANGE(id);
> CREATE TABLE part PARTITION OF t FOR VALUES FROM (0) TO (10) PARTITION BY 
> RANGE(id);
> CREATE TABLE subpart PARTITION OF part FOR VALUES FROM (0) TO (5);
> CREATE SUBSCRIPTION sub_c CONNECTION '$node_B_connstr' PUBLICATION pub_b;
> CREATE PUBLICATION pub_t FOR TABLE t WITH (publish_via_partition_root);
> CREATE PUBLICATION pub_part FOR TABLE part WITH (publish_via_partition_root);
>
> node_C:
> -- this command will raise a warning CREATE SUBSCRIPTION sub_t CONNECTION 
> '$node_A_connstr' PUBLICATION pub_t WITH (origin = none, copy_data = on);
> DROP SUBSCRIPTION IF EXISTS sub_t;
> -- here we got silence, but "part" is in tree of upper level replicated table
> CREATE SUBSCRIPTION sub_part CONNECTION '$node_A_connstr' PUBLICATION 
> pub_part WITH (origin = none, copy_data = on);
> DROP SUBSCRIPTION IF EXISTS sub_part;
>
> I think that for each partition/partitioned table in the publication we can 
> use something like
>
> select relid from pg_partition_tree('part'::regclass)
> union
> select relid from pg_partition_ancestors('part'::regclass);
>
> In this case  we don't care about  publish_via_partition_root option, because 
> we already check all inheritance tree, and there is no need to change pg_class
>
> What are you thinking about it?

Yes, we should include the ancestors of the table to handle the
scenario you mentioned. The attached patch has the changes which
includes the ancestors also while getting the tables for the table
publication and schema publication. And in case of all tables
publication, get all the tables.
Thoughts?

Regards,
Vignesh
From f92d7daea28f46d687beb1ff83a2eb229837fe17 Mon Sep 17 00:00:00 2001
From: Vignesh <vignes...@gmail.com>
Date: Sat, 18 Jan 2025 10:19:12 +0530
Subject: [PATCH v2 1/2] Fix origin warning not thrown for publications on
 partition tables

When checking if a publisher had subscribed to the same table from a different
publisher, the check only considered tables directly specified for the
publication. It did not account for cases where the publication was present on
partition tables as well. This has been fixed by including all partition tables
associated with the publication in the check.
---
 src/backend/catalog/pg_publication.c    | 39 +++++++++++++++++++++----
 src/backend/commands/subscriptioncmds.c |  2 +-
 src/include/catalog/pg_proc.dat         |  9 ++++++
 3 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index b89098f5e9..bfdd0c7cc5 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1089,12 +1089,16 @@ GetPublicationByName(const char *pubname, bool missing_ok)
 }
 
 /*
- * Get information of the tables in the given publication array.
+ * Helper function for SQL callables: pg_get_publication_tables and
+ * pg_get_publication_tables_with_partitions.
  *
- * Returns pubid, relid, column list, row filter for each table.
+ * If allparttables is true, retrieves tables including all the partitions
+ * for the publication.
+ * If allparttables is false, retrieves tables based on the publication's
+ * pubviaroot option.
  */
-Datum
-pg_get_publication_tables(PG_FUNCTION_ARGS)
+static Datum
+pg_get_publication_tables_internal(FunctionCallInfo fcinfo, bool allparttables)
 {
 #define NUM_PUBLICATION_TABLES_ELEM	4
 	FuncCallContext *funcctx;
@@ -1147,10 +1151,12 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 						   *schemarelids;
 
 				relids = GetPublicationRelations(pub_elem->oid,
+												 allparttables ? PUBLICATION_PART_ALL :
 												 pub_elem->pubviaroot ?
 												 PUBLICATION_PART_ROOT :
 												 PUBLICATION_PART_LEAF);
 				schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
+																allparttables ? PUBLICATION_PART_ALL :
 																pub_elem->pubviaroot ?
 																PUBLICATION_PART_ROOT :
 																PUBLICATION_PART_LEAF);
@@ -1187,7 +1193,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * data of the child table to be double-published on the subscriber
 		 * side.
 		 */
-		if (viaroot)
+		if (!allparttables && viaroot)
 			filter_partitions(table_infos);
 
 		/* Construct a tuple descriptor for the result rows. */
@@ -1298,3 +1304,26 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
 	SRF_RETURN_DONE(funcctx);
 }
+
+/*
+ * Get information of the tables in the given publication array.
+ *
+ * Returns pubid, relid, column list, row filter for each table.
+ */
+Datum
+pg_get_publication_tables(PG_FUNCTION_ARGS)
+{
+	return pg_get_publication_tables_internal(fcinfo, false);
+}
+
+/*
+ * Get information of the tables (including all the all partitions) in the
+ * given publication array.
+ *
+ * Returns pubid, relid, column list, row filter for each table.
+ */
+Datum
+pg_get_publication_tables_with_partitions(PG_FUNCTION_ARGS)
+{
+	return pg_get_publication_tables_internal(fcinfo, true);
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 2d8a71ca1e..403b4fc918 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -2116,7 +2116,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
 	appendStringInfoString(&cmd,
 						   "SELECT DISTINCT P.pubname AS pubname\n"
 						   "FROM pg_publication P,\n"
-						   "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "     LATERAL pg_get_publication_tables_with_partitions(P.pubname) GPT\n"
 						   "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
 						   "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
 						   "WHERE C.oid = GPT.relid AND P.pubname IN (");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 18560755d2..349c6330b2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12128,6 +12128,15 @@
   proargmodes => '{v,o,o,o,o}',
   proargnames => '{pubname,pubid,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
+{ oid => '8051',
+  descr => 'get information of the tables(including all partitions) that are part of the specified publications',
+  proname => 'pg_get_publication_tables_with_partitions', prorows => '1000',
+  provariadic => 'text', proretset => 't', provolatile => 's',
+  prorettype => 'record', proargtypes => '_text',
+  proallargtypes => '{_text,oid,oid,int2vector,pg_node_tree}',
+  proargmodes => '{v,o,o,o,o}',
+  proargnames => '{pubname,pubid,relid,attrs,qual}',
+  prosrc => 'pg_get_publication_tables_with_partitions' },  
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
-- 
2.43.0

From 2012cccb9e4638210def7f01ceb511a2f51a7f97 Mon Sep 17 00:00:00 2001
From: Vignesh <vignes...@gmail.com>
Date: Mon, 20 Jan 2025 17:42:37 +0530
Subject: [PATCH v2 2/2] Fix origin warning not thrown for publications on
 partition tables

When checking if a publisher had subscribed to the same table from a different
publisher, the check only considered tables directly specified for the
publication. It did not account for cases where the publication was present on
ancestor tables as well. This has been fixed by including all the
ancestor tables associated with the publication in the check.
---
 src/backend/catalog/pg_publication.c    | 69 ++++++++++++++++---------
 src/backend/commands/publicationcmds.c  | 15 +++---
 src/backend/commands/subscriptioncmds.c |  2 +-
 src/include/catalog/pg_proc.dat         |  4 +-
 src/include/catalog/pg_publication.h    | 11 ++--
 5 files changed, 65 insertions(+), 36 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index bfdd0c7cc5..a91400ff6c 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -720,7 +720,8 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
 	 * partitions.
 	 */
 	schemaRels = GetSchemaPublicationRelations(schemaid,
-											   PUBLICATION_PART_ALL);
+											   PUBLICATION_PART_ALL,
+											   false);
 	InvalidatePublicationRels(schemaRels);
 
 	return myself;
@@ -755,9 +756,12 @@ GetRelationPublications(Oid relid)
  *
  * This should only be used FOR TABLE publications, the FOR ALL TABLES
  * should use GetAllTablesPublicationRelations().
+ * If include_ancestors is true include all the ancestors for the partitioned
+ * table which could insert data to this partitioned table.
  */
 List *
-GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
+GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt,
+						bool include_ancestors)
 {
 	List	   *result;
 	Relation	pubrelsrel;
@@ -784,6 +788,11 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
 		result = GetPubPartitionOptionRelations(result, pub_partopt,
 												pubrel->prrelid);
+
+		if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
+			include_ancestors)
+			result = list_concat_unique_oid(result,
+											get_partition_ancestors(pubrel->prrelid));
 	}
 
 	systable_endscan(scan);
@@ -838,10 +847,11 @@ GetAllTablesPublications(void)
  *
  * If the publication publishes partition changes via their respective root
  * partitioned tables, we must exclude partitions in favor of including the
- * root partitioned tables.
+ * root partitioned tables. If allrelatedrels is true get all the publishable
+ * tables i.e. include both partition and partitioned tables also.
  */
 List *
-GetAllTablesPublicationRelations(bool pubviaroot)
+GetAllTablesPublicationRelations(bool pubviaroot, bool allrelatedrels)
 {
 	Relation	classRel;
 	ScanKeyData key[1];
@@ -864,13 +874,13 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 		Oid			relid = relForm->oid;
 
 		if (is_publishable_class(relid, relForm) &&
-			!(relForm->relispartition && pubviaroot))
+			(allrelatedrels || !(relForm->relispartition && pubviaroot)))
 			result = lappend_oid(result, relid);
 	}
 
 	table_endscan(scan);
 
-	if (pubviaroot)
+	if (allrelatedrels || pubviaroot)
 	{
 		ScanKeyInit(&key[0],
 					Anum_pg_class_relkind,
@@ -964,9 +974,12 @@ GetSchemaPublications(Oid schemaid)
 
 /*
  * Get the list of publishable relation oids for a specified schema.
+ * If include_ancestors is true include all the ancestors for the partitioned
+ * table which could insert data to this partitioned table.
  */
 List *
-GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
+GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt,
+							  bool include_ancestors)
 {
 	Relation	classRel;
 	ScanKeyData key[1];
@@ -1009,6 +1022,11 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
 			partitionrels = GetPubPartitionOptionRelations(partitionrels,
 														   pub_partopt,
 														   relForm->oid);
+
+			if (include_ancestors)
+				partitionrels = list_concat_unique_oid(partitionrels,
+													   get_partition_ancestors(relForm->oid));
+
 			result = list_concat_unique_oid(result, partitionrels);
 		}
 	}
@@ -1023,7 +1041,8 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
  * publication.
  */
 List *
-GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
+GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt,
+								 bool include_ancestors)
 {
 	List	   *result = NIL;
 	List	   *pubschemalist = GetPublicationSchemas(pubid);
@@ -1034,7 +1053,8 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 		Oid			schemaid = lfirst_oid(cell);
 		List	   *schemaRels = NIL;
 
-		schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
+		schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt,
+												   include_ancestors);
 		result = list_concat(result, schemaRels);
 	}
 
@@ -1090,15 +1110,15 @@ GetPublicationByName(const char *pubname, bool missing_ok)
 
 /*
  * Helper function for SQL callables: pg_get_publication_tables and
- * pg_get_publication_tables_with_partitions.
+ * pg_get_publication_all_related_tables.
  *
- * If allparttables is true, retrieves tables including all the partitions
- * for the publication.
- * If allparttables is false, retrieves tables based on the publication's
+ * If allrelatedrels is true, retrieves tables including all the partitions and
+ * the ancestors for the publication.
+ * If allrelatedrels is false, retrieves tables based on the publication's
  * pubviaroot option.
  */
 static Datum
-pg_get_publication_tables_internal(FunctionCallInfo fcinfo, bool allparttables)
+pg_get_publication_tables_internal(FunctionCallInfo fcinfo, bool allrelatedrels)
 {
 #define NUM_PUBLICATION_TABLES_ELEM	4
 	FuncCallContext *funcctx;
@@ -1144,22 +1164,25 @@ pg_get_publication_tables_internal(FunctionCallInfo fcinfo, bool allparttables)
 			 * those. Otherwise, get the partitioned table itself.
 			 */
 			if (pub_elem->alltables)
-				pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
+				pub_elem_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot,
+																   allrelatedrels);
 			else
 			{
 				List	   *relids,
 						   *schemarelids;
 
 				relids = GetPublicationRelations(pub_elem->oid,
-												 allparttables ? PUBLICATION_PART_ALL :
+												 allrelatedrels ? PUBLICATION_PART_ALL :
 												 pub_elem->pubviaroot ?
 												 PUBLICATION_PART_ROOT :
-												 PUBLICATION_PART_LEAF);
+												 PUBLICATION_PART_LEAF,
+												 allrelatedrels);
 				schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
-																allparttables ? PUBLICATION_PART_ALL :
+																allrelatedrels ? PUBLICATION_PART_ALL :
 																pub_elem->pubviaroot ?
 																PUBLICATION_PART_ROOT :
-																PUBLICATION_PART_LEAF);
+																PUBLICATION_PART_LEAF,
+																allrelatedrels);
 				pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
 			}
 
@@ -1193,7 +1216,7 @@ pg_get_publication_tables_internal(FunctionCallInfo fcinfo, bool allparttables)
 		 * data of the child table to be double-published on the subscriber
 		 * side.
 		 */
-		if (!allparttables && viaroot)
+		if (!allrelatedrels && viaroot)
 			filter_partitions(table_infos);
 
 		/* Construct a tuple descriptor for the result rows. */
@@ -1317,13 +1340,13 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 }
 
 /*
- * Get information of the tables (including all the all partitions) in the
- * given publication array.
+ * Get information of the tables (including all the partitions and the
+ * ancestors) in the given publication array.
  *
  * Returns pubid, relid, column list, row filter for each table.
  */
 Datum
-pg_get_publication_tables_with_partitions(PG_FUNCTION_ARGS)
+pg_get_publication_all_related_tables(PG_FUNCTION_ARGS)
 {
 	return pg_get_publication_tables_internal(fcinfo, true);
 }
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 35747b3df5..a3bec7023c 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -957,7 +957,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 						   AccessShareLock);
 
 		root_relids = GetPublicationRelations(pubform->oid,
-											  PUBLICATION_PART_ROOT);
+											  PUBLICATION_PART_ROOT, false);
 
 		foreach(lc, root_relids)
 		{
@@ -1077,7 +1077,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 		 */
 		if (root_relids == NIL)
 			relids = GetPublicationRelations(pubform->oid,
-											 PUBLICATION_PART_ALL);
+											 PUBLICATION_PART_ALL, false);
 		else
 		{
 			/*
@@ -1091,7 +1091,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
 		}
 
 		schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
-														PUBLICATION_PART_ALL);
+														PUBLICATION_PART_ALL,
+														false);
 		relids = list_concat_unique_oid(relids, schemarelids);
 
 		InvalidatePublicationRels(relids);
@@ -1163,7 +1164,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 	else						/* AP_SetObjects */
 	{
 		List	   *oldrelids = GetPublicationRelations(pubid,
-														PUBLICATION_PART_ROOT);
+														PUBLICATION_PART_ROOT,
+														false);
 		List	   *delrels = NIL;
 		ListCell   *oldlc;
 
@@ -1314,7 +1316,8 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 		ListCell   *lc;
 		List	   *reloids;
 
-		reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
+		reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT,
+										  false);
 
 		foreach(lc, reloids)
 		{
@@ -1575,7 +1578,7 @@ RemovePublicationSchemaById(Oid psoid)
 	 * partitions.
 	 */
 	schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
-											   PUBLICATION_PART_ALL);
+											   PUBLICATION_PART_ALL, false);
 	InvalidatePublicationRels(schemaRels);
 
 	CatalogTupleDelete(rel, &tup->t_self);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 403b4fc918..843de0ccf6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -2116,7 +2116,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
 	appendStringInfoString(&cmd,
 						   "SELECT DISTINCT P.pubname AS pubname\n"
 						   "FROM pg_publication P,\n"
-						   "     LATERAL pg_get_publication_tables_with_partitions(P.pubname) GPT\n"
+						   "     LATERAL pg_get_publication_all_related_tables(P.pubname) GPT\n"
 						   "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
 						   "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
 						   "WHERE C.oid = GPT.relid AND P.pubname IN (");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 349c6330b2..083ceee4e5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12130,13 +12130,13 @@
   prosrc => 'pg_get_publication_tables' },
 { oid => '8051',
   descr => 'get information of the tables(including all partitions) that are part of the specified publications',
-  proname => 'pg_get_publication_tables_with_partitions', prorows => '1000',
+  proname => 'pg_get_publication_all_related_tables', prorows => '1000',
   provariadic => 'text', proretset => 't', provolatile => 's',
   prorettype => 'record', proargtypes => '_text',
   proallargtypes => '{_text,oid,oid,int2vector,pg_node_tree}',
   proargmodes => '{v,o,o,o,o}',
   proargnames => '{pubname,pubid,relid,attrs,qual}',
-  prosrc => 'pg_get_publication_tables_with_partitions' },  
+  prosrc => 'pg_get_publication_all_related_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 3c2ae2a960..3ce7e6e816 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -144,15 +144,18 @@ typedef enum PublicationPartOpt
 	PUBLICATION_PART_ALL,
 } PublicationPartOpt;
 
-extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
+extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt,
+									 bool include_ancestors);
 extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(bool pubviaroot);
+extern List *GetAllTablesPublicationRelations(bool pubviaroot, bool allrelatedrels);
 extern List *GetPublicationSchemas(Oid pubid);
 extern List *GetSchemaPublications(Oid schemaid);
 extern List *GetSchemaPublicationRelations(Oid schemaid,
-										   PublicationPartOpt pub_partopt);
+										   PublicationPartOpt pub_partopt,
+										   bool include_ancestors);
 extern List *GetAllSchemaPublicationRelations(Oid pubid,
-											  PublicationPartOpt pub_partopt);
+											  PublicationPartOpt pub_partopt,
+											  bool include_ancestors);
 extern List *GetPubPartitionOptionRelations(List *result,
 											PublicationPartOpt pub_partopt,
 											Oid relid);
-- 
2.43.0

Reply via email to