> In the v7 patch, I am looping through the reorder buffer of the
> current committed transaction and storing all invalidation messages in
> a list. Then I am distributing those invalidations.
> But I found that for a transaction we already store all the
> invalidation messages (see [1]). So we don't need to loop through the
> reorder buffer and store the invalidations.
>
> I have modified the patch accordingly and attached the same.
>
> [1]: 
> https://github.com/postgres/postgres/blob/7da1bdc2c2f17038f2ae1900be90a0d7b5e361e0/src/include/replication/reorderbuffer.h#L384

Hi,

I tried to add changes to selectively invalidate the cache to reduce
the performance degradation during the distribution of invalidations.

Here is the analysis for selective invalidation.
Observation:
Currently when there is a change in a publication, cache related to
all the tables is invalidated including the ones that are not part of
any publication and even tables of different publications. For
example, suppose pub1 includes tables t1 to t1000, while pub2 contains
just table t1001. If pub2 is altered, even though it only has t1001,
this change will also invalidate all the tables t1 through t1000 in
pub1.
Similarly for a namespace, whenever we alter a schema or we add/drop a
schema to the publication, cache related to all the tables is
invalidated including the ones that are on of different schema. For
example, suppose pub1 includes tables t1 to t1000 in schema sc1, while
pub2 contains just table t1001 in schema sc2. If schema ‘sc2’ is
changed or if it is dropped from publication ‘pub2’ even though it
only has t1001, this change will invalidate all the tables t1 through
t1000 in schema sc1.
‘rel_sync_cache_publication_cb’ function is called during the
execution of invalidation in both above cases. And
‘rel_sync_cache_publication_cb’ invalidates all the tables in the
cache.

Solution:
1. When we alter a publication using commands like ‘ALTER PUBLICATION
pub_name DROP TABLE table_name’, first all tables in the publications
are invalidated using the function ‘rel_sync_cache_relation_cb’. Then
again ‘rel_sync_cache_publication_cb’ function is called which
invalidates all the tables. This happens because of the following
callback registered:
CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
rel_sync_cache_publication_cb, (Datum) 0);

So, I feel this second function call can be avoided. And I have
included changes for the same in the patch. Now the behavior will be
as:
suppose pub1 includes tables t1 to t1000, while pub2 contains just
table t1001. If pub2 is altered, it will only invalidate t1001.

2. When we add/drop a schema to/from a publication using command like
‘ALTER PUBLICATION pub_name ADD TABLES in SCHEMA schema_name’, first
all tables in that schema are invalidated using
‘rel_sync_cache_relation_cb’ and then again
‘rel_sync_cache_publication_cb’ function is called which invalidates
all the tables. This happens because of the following callback
registered:
CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
rel_sync_cache_publication_cb, (Datum) 0);

So, I feel this second function call can be avoided. And I have
included changes for the same in the patch. Now the behavior will be
as:
suppose pub1 includes tables t1 to t1000 in schema sc1, while pub2
contains just table t1001 in schema sc2. If schema ‘sc2’ dropped from
publication ‘pub2’, it will only invalidate table t1001.

3. When we alter a namespace using command like ‘ALTER SCHEMA
schema_name RENAME to new_schema_name’ all the table in cache are
invalidated as ‘rel_sync_cache_publication_cb’ is called due to the
following registered callback:
CacheRegisterSyscacheCallback(NAMESPACEOID,
rel_sync_cache_publication_cb, (Datum) 0);

So, we added a new callback function ‘rel_sync_cache_namespacerel_cb’
will be called instead of function ‘rel_sync_cache_publication_cb’ ,
which invalidates only the cache of the tables which are part of that
particular namespace. For the new function the ‘namespace id’ is added
in the Invalidation message.

For example, if namespace ‘sc1’ has table t1 and t2 and a namespace
‘sc2’ has table t3. Then if we rename namespace ‘sc1’ to ‘sc_new’.
Only tables in sc1 i.e. tables t1 and table t2 are invalidated.


Performance Comparison:
I have run the same tests as shared in [1] and observed a significant
decrease in the degradation with the new changes.  With selective
invalidation degradation is around ~5%. This results are an average of
3 runs.

count     |      Head (sec)      |        Fix (sec)      |     Degradation (%)
-----------------------------------------------------------------------------------------
10000    |    0.38842567       |    0.405057        |     4.281727827
50000    |    7.22018834       |   7.605011334    |     5.329819333
75000    |    15.627181         |   16.38659034    |     4.859541462
100000  |     27.37910867    |    28.8636873     |     5.422304458

I have attached the patch for the same
v9-0001 : distribute invalidation to inprogress transaction
v9-0002: Selective invalidation

[1]:https://www.postgresql.org/message-id/CANhcyEW4pq6%2BPO_eFn2q%3D23sgV1budN3y4SxpYBaKMJNADSDuA%40mail.gmail.com


Thanks and Regards,
Shlok Kyal
From 4222dca86e4892fbae6698ed7a6135f61d499d8f Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v9 1/2] Distribute invalidatons if change in catalog tables

Distribute invalidations to inprogress transactions if the current
committed transaction change any catalog table.
---
 .../replication/logical/reorderbuffer.c       |   5 +-
 src/backend/replication/logical/snapbuild.c   |  34 +++--
 src/include/replication/reorderbuffer.h       |   4 +
 src/test/subscription/t/100_bugs.pl           | 128 ++++++++++++++++++
 4 files changed, 157 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 22bcf171ff..c5dfc1ab06 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -221,9 +221,6 @@ int			debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
  */
 static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
 static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
-											   TransactionId xid, bool create, bool *is_new,
-											   XLogRecPtr lsn, bool create_as_top);
 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 											  ReorderBufferTXN *subtxn);
 
@@ -622,7 +619,7 @@ ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
  * (with the given LSN, and as top transaction if that's specified);
  * when this happens, is_new is set to true.
  */
-static ReorderBufferTXN *
+ReorderBufferTXN *
 ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 					  bool *is_new, XLogRecPtr lsn, bool create_as_top)
 {
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0450f94ba8..42c947651b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -300,7 +300,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
 
 static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
 												 uint32 xinfo);
@@ -859,18 +859,21 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 {
 	dlist_iter	txn_i;
 	ReorderBufferTXN *txn;
+	ReorderBufferTXN *curr_txn;
+
+	curr_txn = ReorderBufferTXNByXid(builder->reorder, xid, false, NULL, InvalidXLogRecPtr, false);
 
 	/*
 	 * Iterate through all toplevel transactions. This can include
@@ -913,6 +916,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		SnapBuildSnapIncRefcount(builder->snapshot);
 		ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
 								 builder->snapshot);
+
+		/*
+		 * Add invalidation messages to the reorder buffer of inprogress
+		 * transactions except the current committed transaction
+		 */
+		if (txn->xid != xid && curr_txn->ninvalidations > 0)
+			ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
+										  curr_txn->ninvalidations, curr_txn->invalidations);
 	}
 }
 
@@ -1184,8 +1195,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		/* refcount of the snapshot builder for the new snapshot */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 
-		/* add a new catalog snapshot to all currently running transactions */
-		SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+		/*
+		 * add a new catalog snapshot and invalidations messages to all
+		 * currently running transactions
+		 */
+		SnapBuildDistributeNewCatalogSnapshot(builder, lsn, xid);
 	}
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index e332635f70..093d21213a 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -743,6 +743,10 @@ extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
+extern ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
+											   TransactionId xid, bool create, bool *is_new,
+											   XLogRecPtr lsn, bool create_as_top);
+
 extern void StartupReorderBuffer(void);
 
 #endif
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index cb36ca7b16..85d5c0d016 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -487,6 +487,134 @@ $result =
 is( $result, qq(2|f
 3|t), 'check replicated update on subscriber');
 
+# Clean up
+$node_publisher->safe_psql('postgres', qq(DROP  PUBLICATION pub1;));
+$node_subscriber->safe_psql('postgres', qq(DROP  SUBSCRIPTION sub1;));
+
+# The bug was that the incremental data synchronization was being skipped when
+# a new table is added to the publication in presence of a concurrent active
+# transaction performing the DML on the same table.
+
+# Initial setup.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	CREATE TABLE tab_conc(a int);
+	CREATE SCHEMA sch3;
+	CREATE TABLE sch3.tab_conc(a int);
+	CREATE PUBLICATION regress_pub1;
+));
+
+$node_subscriber->safe_psql(
+	'postgres', qq(
+	CREATE TABLE tab_conc(a int);
+	CREATE SCHEMA sch3;
+	CREATE TABLE sch3.tab_conc(a int);
+	CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1;
+));
+
+# Bump the query timeout to avoid false negatives on slow test systems.
+my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
+
+# Initiate 3 background sessions.
+my $background_psql1 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+$background_psql1->set_query_timer_restart();
+
+my $background_psql2 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+$background_psql2->set_query_timer_restart();
+
+my $background_psql3 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+$background_psql3->set_query_timer_restart();
+
+# Maintain an active transaction with the table that will be added to the
+# publication.
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab_conc VALUES (1);
+]);
+
+# Maintain an active transaction with a schema table that will be added to the
+# publication.
+$background_psql2->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO sch3.tab_conc VALUES (1);
+]);
+
+# Add the table to the publication using background_psql, as the alter
+# publication operation will wait for the lock and can only be completed after
+$background_psql3->query_safe(
+	qq[
+	ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc, TABLES IN SCHEMA sch3;
+]);
+
+# Complete the transaction on the tables, so that ALTER PUBLICATION can proceed
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql2->query_safe(qq[COMMIT]);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (2);
+	INSERT INTO sch3.tab_conc VALUES (2);
+));
+
+# Refresh the publication.
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2),
+	'Ensure that the data from the tab_conc table is synchronized to the subscriber after the subscription is refreshed'
+);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2),
+	'Ensure that the data from the sch3.tab_conc table is synchronized to the subscriber after the subscription is refreshed'
+);
+
+# Perform an insert.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (3);
+	INSERT INTO sch3.tab_conc VALUES (3);
+));
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert is replicated to the subscriber.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2
+3),
+	'Verify that the incremental data for table tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2
+3),
+	'Verify that the incremental data for table sch3.tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+$background_psql1->quit;
+$background_psql2->quit;
+$background_psql3->quit;
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
-- 
2.34.1

From 7ce19b7dafdf659a9689856211e81c501dfe498f Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Wed, 25 Sep 2024 11:41:42 +0530
Subject: [PATCH v9 2/2] Add Selective Invalidation of Cache

When we alter a publication, add/drop namespace to/from publication,
alter a namespace all the cache for all the tables are invalidated. With
this patch for the above operationns we will invalidate the cache of
only the desired tables.
---
 src/backend/replication/pgoutput/pgoutput.c |  52 ++++----
 src/backend/utils/cache/inval.c             | 127 +++++++++++++++++++-
 src/include/storage/sinval.h                |   9 ++
 src/include/utils/inval.h                   |   4 +
 src/test/subscription/t/100_bugs.pl         |  91 ++++++++++++++
 5 files changed, 253 insertions(+), 30 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00e7024563..ba480e7e48 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -126,6 +126,8 @@ typedef struct RelationSyncEntry
 {
 	Oid			relid;			/* relation oid */
 
+	Oid			schemaid;		/* schema oid */
+
 	bool		replicate_valid;	/* overall validity flag for entry */
 
 	bool		schema_sent;
@@ -216,6 +218,7 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespacerel_cb(Datum arg, int nspid);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1739,12 +1742,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	publications_valid = false;
-
-	/*
-	 * Also invalidate per-relation cache so that next time the filtering info
-	 * is checked it will be updated with the new publication settings.
-	 */
-	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
@@ -1911,26 +1908,7 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 	/* We must update the cache entry for a relation after a relcache flush */
 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
-
-	/*
-	 * Flush all cache entries after a pg_namespace change, in case it was a
-	 * schema rename affecting a relation being replicated.
-	 */
-	CacheRegisterSyscacheCallback(NAMESPACEOID,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-
-	/*
-	 * Flush all cache entries after any publication changes.  (We need no
-	 * callback entry for pg_publication, because publication_invalidation_cb
-	 * will take care of it.)
-	 */
-	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
-	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
-								  rel_sync_cache_publication_cb,
-								  (Datum) 0);
+	CacheRegisterNspcacheCallback(rel_sync_cache_namespacerel_cb, (Datum) 0);
 
 	relation_callbacks_registered = true;
 }
@@ -2076,6 +2054,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->estate = NULL;
 		memset(entry->exprstate, 0, sizeof(entry->exprstate));
 
+		entry->schemaid = schemaId;
+
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
 		 * relcache considers all publications that the given relation is in,
@@ -2349,6 +2329,26 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	}
 }
 
+/*
+ * Namespace invalidation callback
+ */
+static void
+rel_sync_cache_namespacerel_cb(Datum arg, int nspid)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		if (entry->replicate_valid && entry->schemaid == nspid)
+			entry->replicate_valid = false;
+	}
+}
+
 /* Send Replication origin */
 static void
 send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 603aa4157b..fc0d91aec9 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -114,6 +114,7 @@
 #include "access/xact.h"
 #include "access/xloginsert.h"
 #include "catalog/catalog.h"
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_constraint.h"
 #include "miscadmin.h"
 #include "storage/sinval.h"
@@ -160,6 +161,9 @@
  */
 #define CatCacheMsgs 0
 #define RelCacheMsgs 1
+#define NspCacheMsgs 2
+
+#define NumberofCache 3
 
 /* Pointers to main arrays in TopTransactionContext */
 typedef struct InvalMessageArray
@@ -168,13 +172,13 @@ typedef struct InvalMessageArray
 	int			maxmsgs;		/* current allocated size of array */
 } InvalMessageArray;
 
-static InvalMessageArray InvalMessageArrays[2];
+static InvalMessageArray InvalMessageArrays[NumberofCache];
 
 /* Control information for one logical group of messages */
 typedef struct InvalidationMsgsGroup
 {
-	int			firstmsg[2];	/* first index in relevant array */
-	int			nextmsg[2];		/* last+1 index */
+	int			firstmsg[NumberofCache];	/* first index in relevant array */
+	int			nextmsg[NumberofCache];		/* last+1 index */
 } InvalidationMsgsGroup;
 
 /* Macros to help preserve InvalidationMsgsGroup abstraction */
@@ -189,6 +193,7 @@ typedef struct InvalidationMsgsGroup
 	do { \
 		SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \
 		SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \
+		SetSubGroupToFollow(targetgroup, priorgroup, NspCacheMsgs);	\
 	} while (0)
 
 #define NumMessagesInSubGroup(group, subgroup) \
@@ -196,7 +201,8 @@ typedef struct InvalidationMsgsGroup
 
 #define NumMessagesInGroup(group) \
 	(NumMessagesInSubGroup(group, CatCacheMsgs) + \
-	 NumMessagesInSubGroup(group, RelCacheMsgs))
+	 NumMessagesInSubGroup(group, RelCacheMsgs) + \
+	 NumMessagesInSubGroup(group, NspCacheMsgs))
 
 
 /*----------------
@@ -251,6 +257,7 @@ int			debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_NSPCACHE_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -270,7 +277,14 @@ static struct RELCACHECALLBACK
 	Datum		arg;
 }			relcache_callback_list[MAX_RELCACHE_CALLBACKS];
 
+static struct NSPCACHECALLBACK
+{
+	NspcacheCallbackFunction function;
+	Datum		arg;
+}			nspcache_callback_list[MAX_NSPCACHE_CALLBACKS];
+
 static int	relcache_callback_count = 0;
+static int	nspcache_callback_count = 0;
 
 /* ----------------------------------------------------------------
  *				Invalidation subgroup support functions
@@ -464,6 +478,35 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
 	AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+static void
+AddNspcacheInvalidationMessage(InvalidationMsgsGroup *group,
+							   Oid dbId, Oid nspId)
+{
+	SharedInvalidationMessage msg;
+
+	/*
+	 * Don't add a duplicate item. We assume dbId need not be checked because
+	 * it will never change. InvalidOid for relId means all relations so we
+	 * don't need to add individual ones when it is present.
+	 */
+
+	ProcessMessageSubGroup(group, NspCacheMsgs,
+						   if (msg->nc.id == SHAREDINVALNSPCACHE_ID &&
+							   (msg->nc.nspId == nspId ||
+								msg->nc.nspId == InvalidOid))
+						   return);
+
+
+	/* OK, add the item */
+	msg.nc.id = SHAREDINVALNSPCACHE_ID;
+	msg.nc.dbId = dbId;
+	msg.nc.nspId = nspId;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	AddInvalidationMessage(group, NspCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -502,6 +545,7 @@ AppendInvalidationMessages(InvalidationMsgsGroup *dest,
 {
 	AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs);
 	AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs);
+	AppendInvalidationMessageSubGroup(dest, src, NspCacheMsgs);
 }
 
 /*
@@ -516,6 +560,7 @@ ProcessInvalidationMessages(InvalidationMsgsGroup *group,
 {
 	ProcessMessageSubGroup(group, CatCacheMsgs, func(msg));
 	ProcessMessageSubGroup(group, RelCacheMsgs, func(msg));
+	ProcessMessageSubGroup(group, NspCacheMsgs, func(msg));
 }
 
 /*
@@ -528,6 +573,7 @@ ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group,
 {
 	ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n));
 	ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n));
+	ProcessMessageSubGroupMulti(group, NspCacheMsgs, func(msgs, n));
 }
 
 /* ----------------------------------------------------------------
@@ -590,6 +636,18 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
 		transInvalInfo->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterNspcacheInvalidation
+ *
+ * As above, but register a namespace invalidation event.
+ */
+static void
+RegisterNspcacheInvalidation(Oid dbId, Oid nspId)
+{
+	AddNspcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
+								   dbId, nspId);
+}
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -660,6 +718,8 @@ PrepareInvalidationState(void)
 		InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
 		InvalMessageArrays[RelCacheMsgs].msgs = NULL;
 		InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
+		InvalMessageArrays[NspCacheMsgs].msgs = NULL;
+		InvalMessageArrays[NspCacheMsgs].maxmsgs = 0;
 	}
 
 	transInvalInfo = myInfo;
@@ -773,6 +833,20 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
 		else if (msg->sn.dbId == MyDatabaseId)
 			InvalidateCatalogSnapshot();
 	}
+	else if (msg->id == SHAREDINVALNSPCACHE_ID)
+	{
+		if (msg->nc.dbId == MyDatabaseId || msg->nc.dbId == InvalidOid)
+		{
+			int			i;
+
+			for (i = 0; i < nspcache_callback_count; i++)
+			{
+				struct NSPCACHECALLBACK *ncitem = nspcache_callback_list + i;
+
+				ncitem->function(ncitem->arg, msg->nc.nspId);
+			}
+		}
+	}
 	else
 		elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -944,6 +1018,18 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
 										msgs,
 										n * sizeof(SharedInvalidationMessage)),
 								 nmsgs += n));
+	ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
+								NspCacheMsgs,
+								(memcpy(msgarray + nmsgs,
+										msgs,
+										n * sizeof(SharedInvalidationMessage)),
+								 nmsgs += n));
+	ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+								NspCacheMsgs,
+								(memcpy(msgarray + nmsgs,
+										msgs,
+										n * sizeof(SharedInvalidationMessage)),
+								 nmsgs += n));
 	Assert(nmsgs == nummsgs);
 
 	return nmsgs;
@@ -1312,6 +1398,17 @@ CacheInvalidateHeapTuple(Relation relation,
 		else
 			return;
 	}
+	else if (tupleRelId == NamespaceRelationId)
+	{
+		Form_pg_namespace nsptup = (Form_pg_namespace) GETSTRUCT(tuple);
+
+		/* get namespace id */
+		relationId = nsptup->oid;
+		databaseId = MyDatabaseId;
+
+		RegisterNspcacheInvalidation(databaseId, relationId);
+		return;
+	}
 	else
 		return;
 
@@ -1567,6 +1664,25 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 	++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterNspcacheCallback
+ *		Register the specified function to be called for all future
+ *		namespace invalidation events.  The OID of the namespace being
+ *		invalidated will be passed to the function.
+ */
+void
+CacheRegisterNspcacheCallback(NspcacheCallbackFunction func,
+							  Datum arg)
+{
+	if (nspcache_callback_count >= MAX_NSPCACHE_CALLBACKS)
+		elog(FATAL, "out of nspcache_callback_list slots");
+
+	nspcache_callback_list[nspcache_callback_count].function = func;
+	nspcache_callback_list[nspcache_callback_count].arg = arg;
+
+	++nspcache_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1629,6 +1745,9 @@ LogLogicalInvalidations(void)
 		ProcessMessageSubGroupMulti(group, RelCacheMsgs,
 									XLogRegisterData((char *) msgs,
 													 n * sizeof(SharedInvalidationMessage)));
+		ProcessMessageSubGroupMulti(group, NspCacheMsgs,
+									XLogRegisterData((char *) msgs,
+													 n * sizeof(SharedInvalidationMessage)));
 		XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
 	}
 }
diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h
index 8f5744b21b..4c53012528 100644
--- a/src/include/storage/sinval.h
+++ b/src/include/storage/sinval.h
@@ -110,6 +110,14 @@ typedef struct
 	Oid			relId;			/* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALNSPCACHE_ID	(-6)
+typedef struct
+{
+	int8		id;				/* type field --- must be first */
+	Oid			dbId;			/* database ID, or 0 if a shared relation */
+	Oid			nspId;			/* namespace ID */
+} SharedInvalNspcacheMsg;
+
 typedef union
 {
 	int8		id;				/* type field --- must be first */
@@ -119,6 +127,7 @@ typedef union
 	SharedInvalSmgrMsg sm;
 	SharedInvalRelmapMsg rm;
 	SharedInvalSnapshotMsg sn;
+	SharedInvalNspcacheMsg nc;
 } SharedInvalidationMessage;
 
 
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 24695facf2..99a0c90b6d 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*NspcacheCallbackFunction) (Datum arg, Oid nspid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -59,6 +60,9 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
 										  Datum arg);
 
+extern void CacheRegisterNspcacheCallback(NspcacheCallbackFunction func,
+										  Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
 extern void InvalidateSystemCaches(void);
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 85d5c0d016..e038dd8a87 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -611,6 +611,97 @@ is( $result, qq(1
 	'Verify that the incremental data for table sch3.tab_conc added after table synchronization is replicated to the subscriber'
 );
 
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab_conc VALUES (4);
+]);
+
+# Maintain an active transaction with a schema table that will be added to the
+# publication.
+$background_psql2->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO sch3.tab_conc VALUES (4);
+]);
+
+$background_psql3->query_safe(
+	qq[
+	ALTER PUBLICATION regress_pub1 DROP TABLE tab_conc, TABLES IN SCHEMA sch3;
+]);
+
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql2->query_safe(qq[COMMIT]);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (5);
+	INSERT INTO sch3.tab_conc VALUES (5);
+));
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert is replicated to the subscriber.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2
+3
+4),
+	'Verify that the incremental data for table tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2
+3
+4),
+	'Verify that the incremental data for table sch3.tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+$background_psql3->query_safe(
+	qq[
+	ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc, TABLES IN SCHEMA sch3;
+]);
+
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab_conc VALUES (6);
+]);
+
+$background_psql2->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO sch3.tab_conc VALUES (6);
+]);
+
+$background_psql3->query_safe(
+	qq[
+	DROP PUBLICATION regress_pub1;
+]);
+
+$background_psql1->query_safe(
+	qq[
+	INSERT INTO tab_conc VALUES (7);
+]);
+
+$background_psql2->query_safe(
+	qq[
+	INSERT INTO sch3.tab_conc VALUES (7);
+]);
+
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql2->query_safe(qq[COMMIT]);
+
+my $offset = -s $node_subscriber->logfile;
+$node_subscriber->wait_for_log(
+	qr/ERROR:  publication "regress_pub1" does not exist/,
+	$offset);
+
+$node_subscriber->safe_psql('postgres',
+	'DROP SUBSCRIPTION regress_sub1;');
+
 $background_psql1->quit;
 $background_psql2->quit;
 $background_psql3->quit;
-- 
2.34.1

Reply via email to