On Fri, Dec 3, 2021 at 1:13 PM Michael Paquier <mich...@paquier.xyz> wrote: > > On Thu, Aug 26, 2021 at 09:00:39PM +0530, vignesh C wrote: > > The previous patch was failing because of the recent test changes made > > by commit 201a76183e2 which unified new and get_new_node, attached > > patch has the changes to handle the changes accordingly. > > Please note that the CF app is complaining about this patch, so a > rebase is required. I have moved it to next CF, waiting on author, > for now.
Thanks for letting me know, I have rebased it on top of HEAD, the attached v2 version has the rebased changes. Regards, Vignesh
From e91a4ff5d3f3d615de3e959c7dae691bda798195 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C <vignes...@gmail.com> Date: Fri, 3 Dec 2021 15:18:53 +0530 Subject: [PATCH v2] Fix for invalidating logical replication relations when there is a change in schema. When the schema gets changed, the rel sync cache invalidation was not happening, fixed it by adding a callback for schema change. --- src/backend/replication/pgoutput/pgoutput.c | 51 ++++++++++++++ src/test/subscription/t/001_rep_changes.pl | 76 ++++++++++++++++++++- 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 6f6a203dea..03a105dc86 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -141,6 +141,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); +static void rel_sync_cache_namespace_cb(Datum arg, int cacheid, + uint32 hashvalue); static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, @@ -1071,6 +1073,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(NAMESPACEOID, + rel_sync_cache_namespace_cb, + (Datum) 0); } /* @@ -1357,6 +1362,52 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) } } +/* + * Namespace syscache invalidation callback + */ +static void +rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS status; + RelationSyncEntry *entry; + + /* + * We can get here if the plugin was used in SQL interface as the + * RelSchemaSyncCache is destroyed when the decoding finishes, but there + * is no way to unregister the relcache invalidation callback. + */ + if (RelationSyncCache == NULL) + return; + + hash_seq_init(&status, RelationSyncCache); + while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) + { + /* + * Reset schema sent status as the relation definition may have changed. + * Also free any objects that depended on the earlier definition. + */ + entry->schema_sent = false; + list_free(entry->streamed_txns); + entry->streamed_txns = NIL; + if (entry->map) + { + /* + * Must free the TupleDescs contained in the map explicitly, + * because free_conversion_map() doesn't. + */ + FreeTupleDesc(entry->map->indesc); + FreeTupleDesc(entry->map->outdesc); + free_conversion_map(entry->map); + } + entry->map = NULL; + + if (hash_search(RelationSyncCache, + (void *) &entry->relid, + HASH_REMOVE, NULL) == NULL) + elog(ERROR, "hash table corrupted"); + } +} + /* * Publication relation/schema map syscache invalidation callback */ diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 9531d81f19..622c508b3b 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; -use Test::More tests => 32; +use Test::More tests => 33; # Initialize publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); @@ -520,6 +520,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber'); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); +# Test schema invalidation by renaming the schema +# Create tables on publisher +# Initialize publisher node +my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1'); +$node_publisher1->init(allows_streaming => 'logical'); +$node_publisher1->start; + +# Create subscriber node +my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$node_subscriber1->init(allows_streaming => 'logical'); +$node_subscriber1->start; + +my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres'; + +$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)"); + +# Create tables on subscriber +$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)"); +$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2"); +$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)"); + +# Setup logical replication that will only be used for this test +$node_publisher1->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES" +); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch" +); + +$node_publisher1->wait_for_catchup('tap_sub_sch'); + +# Also wait for initial table sync to finish +$synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher1->safe_psql('postgres', + "begin; +insert into sch1.t1 values(1); +alter schema sch1 rename to sch2; +create schema sch1; +create table sch1.t1(c1 int); +insert into sch1.t1 values(2); +insert into sch2.t1 values(3); +commit;"); + +$node_publisher1->wait_for_catchup('tap_sub_sch'); +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Subscriber should not receive the inserted row for renamed schema +$result = + $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(1 +2), 'check rows on subscriber after schema invalidation'); + +# Drop subscription as we don't need it anymore +$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch"); + +# Drop publications as we don't need them anymore +$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch"); + +# Clean up the tables on both publisher and subscriber as we don't need them +$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade"); +$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade"); +$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade"); +$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade"); + +$node_subscriber1->stop('fast'); +$node_publisher1->stop('fast'); + # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING $node_publisher->append_conf( 'postgresql.conf', qq( -- 2.32.0