On Fri, Jul 16, 2021 at 10:51 PM vignesh C <vignes...@gmail.com> wrote:
>
> On Sat, Jul 3, 2021 at 11:23 AM Dilip Kumar <dilipbal...@gmail.com> wrote:
> >
> > On Fri, Jul 2, 2021 at 12:03 PM Dilip Kumar <dilipbal...@gmail.com> wrote:
> > >
> > > Yeah, this looks like a bug.  I will look at the patch.
> > >
> >
> > While looking into this, I think the main cause of the problem is that
> > schema rename does not invalidate the relation cache right?  I also
> > tried other cases e.g. if there is an open cursor and we rename the
> > schema
> >
> > CREATE SCHEMA sch1;
> > CREATE TABLE sch1.t1(c1 int);
> > insert into sch1.t1 values(1);
> > insert into sch1.t1 values(2);
> > insert into sch1.t1 values(3);
> > BEGIN;
> > DECLARE mycur CURSOR FOR SELECT * FROM sch1.t1;
> > FETCH NEXT FROM mycur ;
> > ----------At this point rename sch1 to sch2 from another session------
> > FETCH NEXT FROM mycur ;
> > UPDATE sch2.t1 SET c1 = 20 WHERE CURRENT OF mycur;
> > select * from sch2.t1 ;
> >
> > So even after the schema rename the cursor is able to fetch and its
> > also able to update on the same table in the new schema, ideally using
> > CURRENT OF CUR, you can update the same table for which you have
> > declared the cursor.  I am giving this example because this behavior
> > also looks somewhat similar.
>
> It works in this case because it uses the relation id for performing
> the next fetch and the relation id does not get changed after renaming
> the schema. Also since it holds a lock on the relation, alter/drop
> operations will not be allowed. I felt this behavior might be ok.  But
> the original scenario reported is an issue because it replicates the
> data of both the original table and the renamed schema's table.

The previous patch was failing because of the recent test changes made
by commit 201a76183e2 which unified new and get_new_node, attached
patch has the changes to handle the changes accordingly.

Regards,
Vignesh
From e86247f9502727f2c2e5f41489f8bbd4f69b24e4 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Thu, 26 Aug 2021 19:55:35 +0530
Subject: [PATCH v1] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 51 ++++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 76 ++++++++++++++++++++-
 2 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 14d737fd93..1eec9f603d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -141,6 +141,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1068,6 +1070,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1342,6 +1347,52 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation map syscache invalidation callback
  */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 0c84d87873..88763771b2 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 32;
+use Test::More tests => 33;
 
 # Initialize publisher node
 my $node_publisher = PostgresNode->new('publisher');
@@ -518,6 +518,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgresNode->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgresNode->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.30.2

Reply via email to