From 3498e773879b3b87cd7ad46a1430f32b9c628ac7 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Date: Tue, 12 Aug 2025 14:15:54 +0300
Subject: [PATCH v3 1/2] logical replication: add *_extended API; pgoutput uses
 leaf based O/K; doc note

---
 doc/src/sgml/logical-replication.sgml       |  8 +++
 src/backend/replication/logical/proto.c     | 79 +++++++++++++--------
 src/backend/replication/pgoutput/pgoutput.c |  4 +-
 src/include/replication/logicalproto.h      | 23 ++++--
 4 files changed, 76 insertions(+), 38 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index a0761cfee3f..9c34423204f 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -946,6 +946,14 @@ HINT:  To initiate replication, you must manually create the replication slot, e
     row filter is used.
    </para>
 
+   <para>
+    When <literal>publish_via_partition_root</literal> is <literal>true</literal>, the relation OID and
+    the tuple layout in logical replication messages correspond to the <emphasis>root</emphasis>
+    partitioned table. However, for <literal>UPDATE</literal> and <literal>DELETE</literal>, the
+    old-tuple flag (<literal>O</literal> vs <literal>K</literal>) is determined by the replica identity of the
+    <emphasis>leaf</emphasis> partition that actually stored the old row.
+   </para>
+
   </sect2>
 
   <sect2 id="logical-replication-row-filter-initial-data-sync">
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1b3d9eb49dd..bec198dc162 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -447,39 +447,51 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  * Write UPDATE to the output stream.
  */
 void
-logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *oldslot, TupleTableSlot *newslot,
-						bool binary, Bitmapset *columns,
-						PublishGencolsType include_gencols_type)
+logicalrep_write_update_extended(StringInfo out, TransactionId xid,
+								 Relation leafrel, Relation pubrel,
+								 TupleTableSlot *oldslot, TupleTableSlot *newslot,
+								 bool binary, Bitmapset *columns,
+								 PublishGencolsType include_gencols_type)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
-	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
-
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
 		pq_sendint32(out, xid);
 
 	/* use Oid as relation identifier */
-	pq_sendint32(out, RelationGetRelid(rel));
+	pq_sendint32(out, RelationGetRelid(pubrel));
 
 	if (oldslot != NULL)
 	{
-		if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+		Assert(leafrel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+			   leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+			   leafrel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+		if (leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+		logicalrep_write_tuple(out, pubrel, oldslot, binary, columns,
 							   include_gencols_type);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+	logicalrep_write_tuple(out, pubrel, newslot, binary, columns,
 						   include_gencols_type);
 }
 
+/* Backward-compatible wrappers keep the old exported symbols alive. */
+void
+logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+						TupleTableSlot *oldslot, TupleTableSlot *newslot,
+						bool binary, Bitmapset *columns,
+						PublishGencolsType include_gencols_type)
+{
+	logicalrep_write_update_extended(out, xid, rel, rel, oldslot, newslot,
+							   binary, columns, include_gencols_type);
+}
+
 /*
  * Read UPDATE from stream.
  */
@@ -521,19 +533,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 	return relid;
 }
 
-/*
- * Write DELETE to the output stream.
- */
 void
-logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *oldslot, bool binary,
-						Bitmapset *columns,
-						PublishGencolsType include_gencols_type)
+logicalrep_write_delete_extended(StringInfo out, TransactionId xid,
+								 Relation leafrel, Relation pubrel,
+								 TupleTableSlot *oldslot, bool binary,
+								 Bitmapset *columns,
+								 PublishGencolsType include_gencols_type)
 {
-	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
-
 	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
 
 	/* transaction ID (if not valid, we're not streaming) */
@@ -541,15 +547,26 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 		pq_sendint32(out, xid);
 
 	/* use Oid as relation identifier */
-	pq_sendint32(out, RelationGetRelid(rel));
-
-	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
-		pq_sendbyte(out, 'O');	/* old tuple follows */
-	else
-		pq_sendbyte(out, 'K');	/* old key follows */
+	pq_sendint32(out, RelationGetRelid(pubrel));
+	Assert(oldslot != NULL);
+	Assert(leafrel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   leafrel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+	pq_sendbyte(out, (leafrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) ? 'O' : 'K');
+	logicalrep_write_tuple(out, pubrel, oldslot, binary, columns, include_gencols_type);
+}
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, columns,
-						   include_gencols_type);
+/*
+ * Write DELETE to the output stream.
+ */
+void
+logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+						TupleTableSlot *oldslot, bool binary,
+						Bitmapset *columns,
+						PublishGencolsType include_gencols_type)
+{
+	logicalrep_write_delete_extended(out, xid, rel, rel, oldslot, binary,
+									  columns, include_gencols_type);
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..81056768587 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1589,12 +1589,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									relentry->include_gencols_type);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
-			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+			logicalrep_write_update_extended(ctx->out, xid, relation, targetrel, old_slot,
 									new_slot, data->binary, relentry->columns,
 									relentry->include_gencols_type);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
-			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+			logicalrep_write_delete_extended(ctx->out, xid, relation, targetrel, old_slot,
 									data->binary, relentry->columns,
 									relentry->include_gencols_type);
 			break;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..81932c065c5 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -228,17 +228,30 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									PublishGencolsType include_gencols_type);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
-									Relation rel, TupleTableSlot *oldslot,
-									TupleTableSlot *newslot, bool binary,
-									Bitmapset *columns,
+									Relation rel,
+									TupleTableSlot *oldslot, TupleTableSlot *newslot,
+									bool binary, Bitmapset *columns,
 									PublishGencolsType include_gencols_type);
+
+extern void logicalrep_write_update_extended(StringInfo out, TransactionId xid,
+											 Relation leafrel, Relation pubrel,
+											 TupleTableSlot *oldslot, TupleTableSlot *newslot,
+											 bool binary, Bitmapset *columns,
+											 PublishGencolsType include_gencols_type);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
-									Relation rel, TupleTableSlot *oldslot,
-									bool binary, Bitmapset *columns,
+									Relation rel,
+									TupleTableSlot *oldslot, bool binary,
+									Bitmapset *columns,
 									PublishGencolsType include_gencols_type);
+
+extern void logicalrep_write_delete_extended(StringInfo out, TransactionId xid,
+											 Relation leafrel, Relation pubrel,
+											 TupleTableSlot *oldslot, bool binary,
+											 Bitmapset *columns,
+											 PublishGencolsType include_gencols_type);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
 											  LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,

base-commit: b227b0bb4e032e19b3679bedac820eba3ac0d1cf
-- 
2.34.1

