From 2434111eadb7fa2bd45209d3f61ad505959da02c Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Sat, 12 Mar 2022 09:23:11 +0530
Subject: [PATCH v1 1/2] fixup: publish_as_relid

Make sure to determine the top-most ancestor listed in any publication.
Otherwise we might end up with different values depending on the order
of publications (as listed in subscription).
---
 src/backend/catalog/pg_publication.c        | 18 ++++++++--
 src/backend/commands/publicationcmds.c      |  2 +-
 src/backend/replication/pgoutput/pgoutput.c | 38 +++++++++++++++++++--
 src/include/catalog/pg_publication.h        |  3 +-
 4 files changed, 54 insertions(+), 7 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 25998fbb39..a9bbfce6b8 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -277,16 +277,18 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
 
 /*
  * Returns the relid of the topmost ancestor that is published via this
- * publication if any, otherwise returns InvalidOid.
+ * publication if any and set its ancestor level to ancestor_level,
+ * otherwise returns InvalidOid.
  *
  * Note that the list of ancestors should be ordered such that the topmost
  * ancestor is at the end of the list.
  */
 Oid
-GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
+GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
 {
 	ListCell   *lc;
 	Oid			topmost_relid = InvalidOid;
+	int			level = 0;
 
 	/*
 	 * Find the "topmost" ancestor that is in this publication.
@@ -297,13 +299,25 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
 		List	   *apubids = GetRelationPublications(ancestor);
 		List	   *aschemaPubids = NIL;
 
+		level++;
+
 		if (list_member_oid(apubids, puboid))
+		{
 			topmost_relid = ancestor;
+
+			if (ancestor_level)
+				*ancestor_level = level;
+		}
 		else
 		{
 			aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
 			if (list_member_oid(aschemaPubids, puboid))
+			{
 				topmost_relid = ancestor;
+
+				if (ancestor_level)
+					*ancestor_level = level;
+			}
 		}
 
 		list_free(apubids);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 16b8661a1b..a7b74dc60a 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -323,7 +323,7 @@ contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
 	 */
 	if (pubviaroot && relation->rd_rel->relispartition)
 	{
-		publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors);
+		publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
 
 		if (!OidIsValid(publish_as_relid))
 			publish_as_relid = relid;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..28b13adcd8 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1748,6 +1748,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		List	   *schemaPubids = GetSchemaPublications(schemaId);
 		ListCell   *lc;
 		Oid			publish_as_relid = relid;
+		int			publish_ancestor_level = 0;
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
@@ -1815,11 +1816,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
 
+			/*
+			 * Under what relid should we publish changes in this publication?
+			 * We'll use the top-most relid across all publications.
+			 */
+			Oid			pub_relid = relid;
+			int			ancestor_level = 0;
+
 			if (pub->alltables)
 			{
 				publish = true;
 				if (pub->pubviaroot && am_partition)
-					publish_as_relid = llast_oid(get_partition_ancestors(relid));
+				{
+					List	   *ancestors = get_partition_ancestors(relid);
+
+					pub_relid = llast_oid(ancestors);
+					ancestor_level = list_length(ancestors);
+				}
 			}
 
 			if (!publish)
@@ -1835,16 +1848,21 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				if (am_partition)
 				{
 					Oid			ancestor;
+					int			level = 0;
 					List	   *ancestors = get_partition_ancestors(relid);
 
 					ancestor = GetTopMostAncestorInPublication(pub->oid,
-															   ancestors);
+															   ancestors,
+															   &level);
 
 					if (ancestor != InvalidOid)
 					{
 						ancestor_published = true;
 						if (pub->pubviaroot)
-							publish_as_relid = ancestor;
+						{
+							pub_relid = ancestor;
+							ancestor_level = level;
+						}
 					}
 				}
 
@@ -1868,6 +1886,20 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 
 				rel_publications = lappend(rel_publications, pub);
+
+				/*
+				 * We want to publish the changes as the top-most ancestor
+				 * across all publications. So we need to check if the
+				 * already calculated level is higher than the new one. If
+				 * yes, we can ignore the new value (as it's a child).
+				 * Otherwise the new value is an ancestor, so we keep it.
+				 */
+				if (publish_ancestor_level > ancestor_level)
+					continue;
+
+				/* The new value is an ancestor, so let's keep it. */
+				publish_as_relid = pub_relid;
+				publish_ancestor_level = ancestor_level;
 			}
 		}
 
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index ba72e62e61..fe773cf9b7 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -134,7 +134,8 @@ extern List *GetAllSchemaPublicationRelations(Oid puboid,
 extern List *GetPubPartitionOptionRelations(List *result,
 											PublicationPartOpt pub_partopt,
 											Oid relid);
-extern Oid	GetTopMostAncestorInPublication(Oid puboid, List *ancestors);
+extern Oid	GetTopMostAncestorInPublication(Oid puboid, List *ancestors,
+											int *ancestor_level);
 
 extern bool is_publishable_relation(Relation rel);
 extern bool is_schema_publication(Oid pubid);
-- 
2.28.0.windows.1

