On Mon, 15 Jul 2024 at 15:31, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Thu, Jul 11, 2024 at 6:19 PM Nitin Motiani <nitinmoti...@google.com> wrote:
> >
> > On Wed, Jul 10, 2024 at 11:22 PM Nitin Motiani <nitinmoti...@google.com> 
> > wrote:
> > >
> > > On Wed, Jul 10, 2024 at 10:39 PM vignesh C <vignes...@gmail.com> wrote:
> > > >
> > > > On Wed, 10 Jul 2024 at 12:28, Amit Kapila <amit.kapil...@gmail.com> 
> > > > wrote:
> > > > > The patch missed to use the ShareRowExclusiveLock for partitions, see
> > > > > attached. I haven't tested it but they should also face the same
> > > > > problem. Apart from that, I have changed the comments in a few places
> > > > > in the patch.
> > > >
> > > > I could not hit the updated ShareRowExclusiveLock changes through the
> > > > partition table, instead I could verify it using the inheritance
> > > > table. Added a test for the same and also attaching the backbranch
> > > > patch.
> > > >
> > >
> > > Hi,
> > >
> > > I tested alternative-experimental-fix-lock.patch provided by Tomas
> > > (replaces SUE with SRE in OpenTableList). I believe there are a couple
> > > of scenarios the patch does not cover.
> > >
> > > 1. It doesn't handle the case of "ALTER PUBLICATION <pub> ADD TABLES
> > > IN SCHEMA  <schema>".
> > >
> > > I took crash-test.sh provided by Tomas and modified it to add all
> > > tables in the schema to publication using the following command :
> > >
> > >            ALTER PUBLICATION p ADD TABLES IN SCHEMA  public
> > >
> > > The modified script is attached (crash-test-with-schema.sh). With this
> > > script, I can reproduce the issue even with the patch applied. This is
> > > because the code path to add a schema to the publication doesn't go
> > > through OpenTableList.
> > >
> > > I have also attached a script run-test-with-schema.sh to run
> > > crash-test-with-schema.sh in a loop with randomly generated parameters
> > > (modified from run.sh provided by Tomas).
> > >
> > > 2.  The second issue is a deadlock which happens when the alter
> > > publication command is run for a comma separated list of tables.
> > >
> > > I created another script create-test-tables-order-reverse.sh. This
> > > script runs a command like the following :
> > >
> > >             ALTER PUBLICATION p ADD TABLE test_2,test_1
> > >
> > > Running the above script, I was able to get a deadlock error (the
> > > output is attached in deadlock.txt). In the alter publication command,
> > > I added the tables in the reverse order to increase the probability of
> > > the deadlock. But it should happen with any order of tables.
> > >
> > > I am not sure if the deadlock is a major issue because detecting the
> > > deadlock is better than data loss.
> > >
>
> The deadlock reported in this case is an expected behavior. This is no
> different that locking tables or rows in reverse order.
>
> >
> > I looked further into the scenario of adding the tables in schema to
> > the publication. Since in that case, the entry is added to
> > pg_publication_namespace instead of pg_publication_rel, the codepaths
> > for 'add table' and 'add tables in schema' are different. And in the
> > 'add tables in schema' scenario, the OpenTableList function is not
> > called to get the relation ids. Therefore even with the proposed
> > patch, the data loss issue still persists in that case.
> >
> > To validate this idea, I tried locking all the affected tables in the
> > schema just before the invalidation for those relations (in
> > ShareRowExclusiveLock mode).
> >
>
> This sounds like a reasonable approach to fix the issue. However, we
> should check SET publication_object as well, especially the drop part
> in it. It should not happen that we miss sending the data for ADD but
> for DROP, we send data when we shouldn't have sent it.

There were few other scenarios, similar to the one you mentioned,
where the issue occurred. For example: a) When specifying a subset of
existing tables in the ALTER PUBLICATION ... SET TABLE command, the
tables that were supposed to be removed from the publication were not
locked in ShareRowExclusiveLock mode. b) The ALTER PUBLICATION ...
DROP TABLES IN SCHEMA command did not lock the relations that will be
removed from the publication in ShareRowExclusiveLock mode. Both of
these scenarios resulted in data inconsistency due to inadequate
locking. The attached patch addresses these issues.

Regards,
Vignesh
From e1e79bcf24cacf4f8291692f7815dd323e7b4ab5 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Tue, 9 Jul 2024 19:23:10 +0530
Subject: [PATCH v5] Fix data loss during initial sync in logical replication.

Previously, when adding tables to a publication in PostgreSQL, they were
locked using ShareUpdateExclusiveLock mode. This mode allowed the lock to
succeed even if there were ongoing DML transactions on that table. As a
consequence, the ALTER PUBLICATION command could be completed before these
transactions, leading to a scenario where the catalog snapshot used for
replication did not include changes from transactions initiated before the
alteration.

To fix this issue, tables are now locked using ShareRowExclusiveLock mode
during the addition to a publication. This change ensures that the
ALTER PUBLICATION command waits for any ongoing transactions on the tables
(to be added to the publication) to be completed before proceeding. As a
result, transactions initiated before the publication alteration are
correctly included in the replication process.

A similar problem occurred when adding tables in schema to a publication
for an ongoing DML transaction involving those tables in the schema, as
the tables were not locked during the ALTER PUBLICATION.

The issue has now been resolved by locking all the tables in the schema
with ShareRowExclusiveLock mode during their addition to the publication
which resolves the addition of tables in schema waits similarly to the
addition of tables.

Reported-by: Tomas Vondra
Diagnosed-by: Andres Freund
Author: Vignesh C, Tomas Vondra
Reviewed-by: Amit Kapila
Backpatch-through: 12
Discussion: https://postgr.es/m/de52b282-1166-1180-45a2-8d8917ca7...@enterprisedb.com
---
 src/backend/catalog/pg_publication.c   |   9 +
 src/backend/commands/publicationcmds.c |  34 +-
 src/test/subscription/t/100_bugs.pl    | 481 +++++++++++++++++++++++++
 3 files changed, 518 insertions(+), 6 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 0602398a54..f078b705d6 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_type.h"
 #include "commands/publicationcmds.h"
 #include "funcapi.h"
+#include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -677,6 +678,14 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
 	 */
 	schemaRels = GetSchemaPublicationRelations(schemaid,
 											   PUBLICATION_PART_ALL);
+
+	/*
+	 * Data loss due to concurrency issues are avoided by locking the
+	 * relation in ShareRowExclusiveLock as described atop OpenTableList.
+	 */
+	foreach_oid(schrelid, schemaRels)
+		LockRelationOid(schrelid, ShareRowExclusiveLock);
+
 	InvalidatePublicationRels(schemaRels);
 
 	return myself;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 6ea709988e..d5cd9e3820 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -1219,8 +1219,14 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 				oldrel = palloc(sizeof(PublicationRelInfo));
 				oldrel->whereClause = NULL;
 				oldrel->columns = NIL;
+
+				/*
+				 * Data loss due to concurrency issues are avoided by locking
+				 * the relation in ShareRowExclusiveLock as described atop
+				 * OpenTableList.
+				 */
 				oldrel->relation = table_open(oldrelid,
-											  ShareUpdateExclusiveLock);
+											  ShareRowExclusiveLock);
 				delrels = lappend(delrels, oldrel);
 			}
 		}
@@ -1542,8 +1548,14 @@ RemovePublicationSchemaById(Oid psoid)
 
 /*
  * Open relations specified by a PublicationTable list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
- * add them to a publication.
+ *
+ * The returned tables are locked in ShareRowExclusiveLock mode to add them
+ * to a publication. The table needs to be locked in ShareRowExclusiveLock
+ * mode to ensure that any ongoing transactions involving that table are
+ * completed before adding it to the publication. Otherwise, the transaction
+ * initiated before the alteration of the publication will continue to use a
+ * catalog snapshot predating the publication change, leading to
+ * non-replication of these transaction changes.
  */
 static List *
 OpenTableList(List *tables)
@@ -1568,7 +1580,7 @@ OpenTableList(List *tables)
 		/* Allow query cancel in case this takes a long time */
 		CHECK_FOR_INTERRUPTS();
 
-		rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
+		rel = table_openrv(t->relation, ShareRowExclusiveLock);
 		myrelid = RelationGetRelid(rel);
 
 		/*
@@ -1594,7 +1606,7 @@ OpenTableList(List *tables)
 						 errmsg("conflicting or redundant column lists for table \"%s\"",
 								RelationGetRelationName(rel))));
 
-			table_close(rel, ShareUpdateExclusiveLock);
+			table_close(rel, ShareRowExclusiveLock);
 			continue;
 		}
 
@@ -1622,7 +1634,7 @@ OpenTableList(List *tables)
 			List	   *children;
 			ListCell   *child;
 
-			children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
+			children = find_all_inheritors(myrelid, ShareRowExclusiveLock,
 										   NULL);
 
 			foreach(child, children)
@@ -1860,6 +1872,16 @@ PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
 	foreach(lc, schemas)
 	{
 		Oid			schemaid = lfirst_oid(lc);
+		List 		*schemaRels;
+
+		schemaRels = GetSchemaPublicationRelations(schemaid, PUBLICATION_PART_ALL);
+
+		/*
+		 * Data loss due to concurrency issues are avoided by locking the
+		 * relation in ShareRowExclusiveLock as described atop OpenTableList.
+		 */
+		foreach_oid(schrelid, schemaRels)
+			LockRelationOid(schrelid, ShareRowExclusiveLock);
 
 		psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
 							   Anum_pg_publication_namespace_oid,
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index cb36ca7b16..a087bc2d08 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -487,6 +487,487 @@ $result =
 is( $result, qq(2|f
 3|t), 'check replicated update on subscriber');
 
+# cleanpup
+$node_publisher->safe_psql('postgres', qq(DROP  PUBLICATION pub1;));
+$node_subscriber->safe_psql('postgres', qq(DROP  SUBSCRIPTION sub1;));
+
+# =============================================================================
+# The bug was that the incremental data synchronization was being skipped when
+# a new table is added to the publication in presence of a concurrent active
+# transaction performing the DML on the same table.
+# =============================================================================
+
+# Initial setup.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	CREATE TABLE tab_conc(a int);
+	CREATE TABLE tab1_conc(a int);
+	CREATE TABLE tab1_conc_child() inherits (tab1_conc);
+	CREATE SCHEMA sch3;
+	CREATE TABLE sch3.tab_conc(a int);
+	CREATE SCHEMA sch4;
+	CREATE TABLE sch4.tab_conc(a int);
+	CREATE PUBLICATION regress_pub1;
+));
+
+$node_subscriber->safe_psql(
+	'postgres', qq(
+	CREATE TABLE tab_conc(a int);
+	CREATE TABLE tab1_conc(a int);
+	CREATE TABLE tab1_conc_child() inherits (tab1_conc);
+	CREATE SCHEMA sch3;
+	CREATE TABLE sch3.tab_conc(a int);
+	CREATE SCHEMA sch4;
+	CREATE TABLE sch4.tab_conc(a int);
+	CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1;
+
+));
+
+# Bump the query timeout to avoid false negatives on slow test systems.
+my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
+
+# Initiate a background session that keeps a transaction active.
+my $background_psql1 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+# Maintain an active transaction with the table.
+$background_psql1->set_query_timer_restart();
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab_conc VALUES (1);
+]);
+
+# Add the table to the publication using background_psql, as the alter
+# publication operation will wait for the lock and can only be completed after
+# the previous open transaction is committed.
+my $background_psql2 = $node_publisher->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+$background_psql2->set_query_timer_restart();
+
+# This operation will wait because there is an open transaction holding a lock.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc;\n");
+
+# Verify that the table addition is waiting to acquire a ShareRowExclusiveLock.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = 'tab_conc'::regclass AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+# Complete the old transaction.
+$background_psql1->query_safe(qq[COMMIT]);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (2);
+));
+
+# Refresh the publication.
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2),
+	'Ensure that the data from the tab_conc table is synchronized to the subscriber after the subscription is refreshed'
+);
+
+# Perform an insert.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (3);
+));
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert is replicated to the subscriber.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2
+3),
+	'Verify that the incremental data for table tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+# =============================================================================
+# This bug is  present with inheritance table as well.
+# =============================================================================
+
+# Maintain an active transaction with inheritance table.
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab1_conc_child VALUES (1);
+]);
+
+# Add an inheritance table to the publication, this operation will wait because
+# there is an open transaction holding a lock.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 ADD TABLE tab1_conc;\n");
+
+# Verify that the child table addition is waiting to acquire a
+# ShareRowExclusiveLock.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = 'tab1_conc_child'::regclass AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+# Complete the old transaction.
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql1->quit;
+
+# Wait till the tables are added to the publication.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_publication_rel WHERE prrelid IN ('tab1_conc'::regclass);"
+  )
+  or die
+  "Timed out while waiting for alter publication to add the table to the publication";
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab1_conc_child VALUES (2);
+));
+
+# Refresh the publication.
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1_conc_child");
+is( $result, qq(1
+2),
+	'Ensure that the data from the tab1_conc_child table is synchronized to the subscriber after the subscription is refreshed'
+);
+
+# Perform an insert.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab1_conc_child VALUES (3);
+));
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert is replicated to the subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1_conc_child");
+is( $result, qq(1
+2
+3),
+	'Verify that the incremental data for table tab1_conc_child added after table synchronization is replicated to the subscriber'
+);
+
+# =============================================================================
+# This bug is  present with ALTER PUBLICATION ... SET TABLE.
+# Specify a subset of tables present in the publication, ShareRowExclusiveLock
+# was not taken for the tables that were dropped as part of SET TABLE operation.
+# =============================================================================
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab1_conc_child VALUES (4);
+]);
+
+# This operation will wait because an open transaction is holding a lock on the
+# publication's relation.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 SET TABLE tab_conc;\n");
+
+# Check that the tab1_conc_child table, which is set to be removed from the
+# publication, is waiting to acquire a ShareRowExclusiveLock due to the open
+# transaction.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation IN ('tab1_conc_child'::regclass) AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+# Verify that ShareRowExclusiveLock lock is acquired for tab_conc.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation IN ('tab_conc'::regclass) AND mode = 'ShareRowExclusiveLock' AND waitstart IS NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+# Complete the old transaction.
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql1->quit;
+
+# Wait till the table is removed from the publication.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 0 FROM pg_publication_rel WHERE prrelid IN ('tab1_conc_child'::regclass);"
+  )
+  or die
+  "Timed out while waiting for alter publication to add the table to the publication";
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert before SET PUBLICATION is replicated to the subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1_conc_child");
+is( $result, qq(1
+2
+3
+4),
+	'Verify that the incremental data for table tab1_conc_child before removing table from publication is replicated to the subscriber'
+);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab1_conc_child VALUES (5);
+));
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Confirm that the insertion following SET PUBLICATION, which will remove the
+# relation from the publication, will not be replicated to the subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1_conc_child");
+is( $result, qq(1
+2
+3
+4),
+	'Verify that the incremental data for table tab1_conc_child after removing table from publication is not replicated to the subscriber'
+);
+
+# =============================================================================
+# This bug is present with ALTER PUBLICATION ... DROP TABLE.
+# =============================================================================
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO tab_conc VALUES (4);
+]);
+
+# This operation will wait because there is an open transaction holding a lock.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 DROP TABLE tab_conc;\n");
+
+# Verify that the child table addition is waiting to acquire a
+# ShareRowExclusiveLock.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = 'tab_conc'::regclass AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+# Complete the old transaction.
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql1->quit;
+
+# Wait till the tables are dropped from the publication.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 0 FROM pg_publication_rel WHERE prrelid IN ('tab_conc'::regclass);"
+  )
+  or die
+  "Timed out while waiting for alter publication to add the table to the publication";
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert before drop table is replicated to the subscriber.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2
+3
+4),
+	'Verify that the incremental data for table tab_conc before removing table from publication is replicated to the subscriber'
+);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO tab_conc VALUES (5);
+));
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert fter drop table is not replicated to the subscriber.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc");
+is( $result, qq(1
+2
+3
+4),
+	'Verify that the incremental data for table tab_conc after removing table from publication is not replicated to the subscriber'
+);
+
+# =============================================================================
+# This bug is present with ADD TABLES IN SCHEMA too.
+# =============================================================================
+
+# Maintain an active transaction with a schema table that will be added to the
+# publication.
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO sch3.tab_conc VALUES (1);
+]);
+
+# Add this schema to the publication, this operation will wait because
+# there is an open transaction holding a lock.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 ADD TABLES IN SCHEMA sch3, sch4;\n");
+
+# Verify that the schema addition is waiting to acquire a ShareRowExclusiveLock.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = (SELECT oid FROM pg_class WHERE relname = 'tab_conc' AND relnamespace = 'sch3'::regnamespace) AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql1->quit;
+
+$node_publisher->safe_psql('postgres',
+	qq(INSERT INTO sch3.tab_conc VALUES (2);));
+
+# Refresh the publication.
+$node_subscriber->safe_psql('postgres',
+	'ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION');
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2),
+	'Ensure that the data from the sch3.tab_conc table is synchronized to the subscriber after the subscription is refreshed'
+);
+
+# Perform an insert.
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO sch3.tab_conc VALUES (3);
+));
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert is replicated to the subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2
+3),
+	'Verify that the incremental data for table sch3.tab_conc added after table synchronization is replicated to the subscriber'
+);
+
+# =============================================================================
+# This bug is present with SET TABLES IN SCHEMA too.
+# =============================================================================
+
+# Maintain an active transaction with a schema table that will be removed from
+# the publication.
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO sch4.tab_conc VALUES (1);
+]);
+
+# Set a subset of schema to the publication.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 SET TABLES IN SCHEMA sch3;\n");
+
+# Verify that the sch4.tab_conc table which will be removed from the
+# publication is waiting to acquire a ShareRowExclusiveLock because of the open
+# transaction.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = (SELECT oid FROM pg_class WHERE relname = 'tab_conc' AND relnamespace = 'sch4'::regnamespace) AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+# Complete the old transaction.
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql1->quit;
+
+# Wait till the table is removed from the publication.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 0 FROM pg_publication_namespace WHERE pnnspid IN ('sch4'::regnamespace);"
+  )
+  or die
+  "Timed out while waiting for alter publication to add the table to the publication";
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert before SET TABLES IN SCHEMA is replicated to the subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch4.tab_conc");
+is($result, qq(1),
+	'Verify that the incremental data for table sch4.tab_conc before removing table from publication is replicated to the subscriber'
+);
+
+$node_publisher->safe_psql(
+	'postgres', qq(
+	INSERT INTO sch4.tab_conc VALUES (2);
+));
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+# Verify that the insert after SET TABLES IN SCHEMA is not replicated to the subscriber.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch4.tab_conc");
+is($result, qq(1),
+	'Verify that the incremental data for table sch4.tab_conc after removing table from publication is not replicated to the subscriber'
+);
+
+# =============================================================================
+# This bug is present with DROP TABLES IN SCHEMA too.
+# =============================================================================
+
+# Maintain an active transaction with a schema table that will be dropped from
+# the publication.
+$background_psql1->query_safe(
+	qq[
+	BEGIN;
+	INSERT INTO sch3.tab_conc VALUES (4);
+]);
+
+# DROP this schema to the publication, this operation will wait because
+# there is an open transaction holding a lock.
+$background_psql2->query_until(qr//,
+	"ALTER PUBLICATION regress_pub1 DROP TABLES IN SCHEMA sch3;\n");
+
+# Verify that the sch3.tab_conc table which will be dropped from the
+# publicaiton is waiting to acquire a ShareRowExclusiveLock.
+$node_publisher->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = (SELECT oid FROM pg_class WHERE relname = 'tab_conc' AND relnamespace = 'sch3'::regnamespace) AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;"
+  )
+  or die
+  "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock";
+
+$background_psql1->query_safe(qq[COMMIT]);
+$background_psql1->quit;
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2
+3
+4),
+	'Ensure that the data from the sch3.tab_conc table is replicated to the subscriber before drop tables in schema from publication'
+);
+
+$node_publisher->safe_psql('postgres',
+	qq(INSERT INTO sch3.tab_conc VALUES (5);));
+
+$node_publisher->wait_for_catchup('regress_sub1');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc");
+is( $result, qq(1
+2
+3
+4),
+	'Ensure that the data from the sch3.tab_conc table is not replicated after drop tables in schema from the publication'
+);
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
-- 
2.34.1

Reply via email to