From efd0ca3ad80243a0d704d9eb079483ecf9f707ea Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v19_REL_15] Distribute invalidatons if change in catalog
 tables

Distribute invalidations to inprogress transactions if the current
committed transaction change any catalog table.
---
 contrib/test_decoding/Makefile                |  2 +-
 .../expected/invalidation_distrubution.out    | 20 ++++++
 .../specs/invalidation_distrubution.spec      | 32 +++++++++
 .../replication/logical/reorderbuffer.c       | 23 +++++++
 src/backend/replication/logical/snapbuild.c   | 67 +++++++++++++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 6 files changed, 133 insertions(+), 15 deletions(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a4ba1a509ae..eef70770674 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot slot_creation_error catalog_change_snapshot \
-	skip_snapshot_restore
+	skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 00000000000..24190ebe570
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '3', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 00000000000..f63aba3ce96
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '3', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index d5cde20a1c9..835180e12f6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5200,3 +5200,26 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+							  SharedInvalidationMessage **msgs)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	if (txn == NULL)
+		return 0;
+
+	*msgs = txn->invalidations;
+
+	return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index cc1f2a9f154..0b303f9a235 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -290,7 +290,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
 
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
@@ -852,15 +852,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 {
 	dlist_iter	txn_i;
 	ReorderBufferTXN *txn;
@@ -868,7 +868,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 	/*
 	 * Iterate through all toplevel transactions. This can include
 	 * subtransactions which we just don't yet know to be that, but that's
-	 * fine, they will just get an unnecessary snapshot queued.
+	 * fine, they will just get an unnecessary snapshot and invalidations
+	 * queued.
 	 */
 	dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
 	{
@@ -881,6 +882,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		 * transaction which in turn implies we don't yet need a snapshot at
 		 * all. We'll add a snapshot when the first change gets queued.
 		 *
+		 * Similarly, we don't need to add invalidations to a transaction whose
+		 * base snapshot is not yet set. Once a base snapshot is built, it will
+		 * include the xids of committed transactions that have modified the
+		 * catalog, thus reflecting the new catalog contents. The existing
+		 * catalog cache will have already been invalidated after processing
+		 * the invalidations in the transaction that modified catalogs,
+		 * ensuring that a fresh cache is constructed during decoding.
+		 *
 		 * NB: This works correctly even for subtransactions because
 		 * ReorderBufferAssignChild() takes care to transfer the base snapshot
 		 * to the top-level transaction, and while iterating the changequeue
@@ -890,13 +899,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 			continue;
 
 		/*
-		 * We don't need to add snapshot to prepared transactions as they
-		 * should not see the new catalog contents.
+		 * We don't need to add snapshot or invalidations to prepared
+		 * transactions as they should not see the new catalog contents.
 		 */
 		if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
 			continue;
 
-		elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+		elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
 			 txn->xid, LSN_FORMAT_ARGS(lsn));
 
 		/*
@@ -906,6 +915,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		SnapBuildSnapIncRefcount(builder->snapshot);
 		ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
 								 builder->snapshot);
+
+		/*
+		 * Add invalidation messages to the reorder buffer of in-progress
+		 * transactions except the current committed transaction, for which we
+		 * will execute invalidations at the end.
+		 *
+		 * It is required, otherwise, we will end up using the stale catcache
+		 * contents built by the current transaction even after its decoding,
+		 * which should have been invalidated due to concurrent catalog
+		 * changing transaction.
+		 */
+		if (txn->xid != xid)
+		{
+			uint32 ninvalidations;
+			SharedInvalidationMessage *msgs = NULL;
+
+			ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
+														   xid, &msgs);
+
+			if (ninvalidations > 0)
+			{
+				Assert(msgs != NULL);
+
+				ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
+											  ninvalidations, msgs);
+			}
+		}
 	}
 }
 
@@ -1184,8 +1220,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		/* refcount of the snapshot builder for the new snapshot */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 
-		/* add a new catalog snapshot to all currently running transactions */
-		SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+		/*
+		 * Add a new catalog snapshot and invalidations messages to all
+		 * currently running transactions.
+		 */
+		SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
 	}
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4a01f877e5d..402bb7a2728 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -680,6 +680,10 @@ extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
+extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
+											TransactionId xid,
+											SharedInvalidationMessage **msgs);
+
 extern void StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

