On Fri, Dec 3, 2021 at 1:13 PM Michael Paquier <mich...@paquier.xyz> wrote:
>
> On Thu, Aug 26, 2021 at 09:00:39PM +0530, vignesh C wrote:
> > The previous patch was failing because of the recent test changes made
> > by commit 201a76183e2 which unified new and get_new_node, attached
> > patch has the changes to handle the changes accordingly.
>
> Please note that the CF app is complaining about this patch, so a
> rebase is required.  I have moved it to next CF, waiting on author,
> for now.

Thanks for letting me know, I have rebased it on top of HEAD, the
attached v2 version has the rebased changes.

Regards,
Vignesh
From e91a4ff5d3f3d615de3e959c7dae691bda798195 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Fri, 3 Dec 2021 15:18:53 +0530
Subject: [PATCH v2] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 51 ++++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 76 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..03a105dc86 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -141,6 +141,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1071,6 +1073,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1357,6 +1362,52 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation/schema map syscache invalidation callback
  */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 9531d81f19..622c508b3b 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 32;
+use Test::More tests => 33;
 
 # Initialize publisher node
 my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
@@ -520,6 +520,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.32.0

Reply via email to