On Tue, 16 Jul 2024 at 11:59, Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Tue, Jul 16, 2024 at 9:29 AM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > One related comment: > > @@ -1219,8 +1219,14 @@ AlterPublicationTables(AlterPublicationStmt > > *stmt, HeapTuple tup, > > oldrel = palloc(sizeof(PublicationRelInfo)); > > oldrel->whereClause = NULL; > > oldrel->columns = NIL; > > + > > + /* > > + * Data loss due to concurrency issues are avoided by locking > > + * the relation in ShareRowExclusiveLock as described atop > > + * OpenTableList. > > + */ > > oldrel->relation = table_open(oldrelid, > > - ShareUpdateExclusiveLock); > > + ShareRowExclusiveLock); > > > > Isn't it better to lock the required relations in > > RemovePublicationRelById()? > > > > On my CentOS VM, the test file '100_bugs.pl' takes ~11s without a > patch and ~13.3s with a patch. So, 2 to 2.3s additional time for newly > added tests. It isn't worth adding this much extra time for one bug > fix. Can we combine table and schema tests into one single test and > avoid inheritance table tests as the code for those will mostly follow > the same path as a regular table?
Yes, that is better. The attached v6 version patch has the changes for the same. The patch also addresses the comments from [1]. [1] - https://www.postgresql.org/message-id/CAA4eK1LZDW2AVDYFZdZcvmsKVGajH2-gZmjXr9BsYiy8ct_fEw%40mail.gmail.com Regards, Vignesh
From f09ac0daf8914a264a710fb27983560086a97742 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Tue, 9 Jul 2024 19:23:10 +0530 Subject: [PATCH v6] Fix data loss during initial sync in logical replication. Previously, when adding tables to a publication in PostgreSQL, they were locked using ShareUpdateExclusiveLock mode. This mode allowed the lock to succeed even if there were ongoing DML transactions on that table. As a consequence, the ALTER PUBLICATION command could be completed before these transactions, leading to a scenario where the catalog snapshot used for replication did not include changes from transactions initiated before the alteration. To fix this issue, tables are now locked using ShareRowExclusiveLock mode during the addition to a publication. This change ensures that the ALTER PUBLICATION command waits for any ongoing transactions on the tables (to be added to the publication) to be completed before proceeding. As a result, transactions initiated before the publication alteration are correctly included in the replication process. The same issue arose with operations such as a) ALTER PUBLICATION ... DROP TABLE, b) ALTER PUBLICATION ... SET TABLE, c) ALTER PUBLICATION ... ADD TABLES IN SCHEMA, d) ALTER PUBLICATION ... SET TABLES IN SCHEMA and e) ALTER PUBLICATION ... DROP TABLES IN SCHEMA. This occurred due to tables not being locked during the ALTER PUBLICATION process. To address this, the tables of the publication are now locked using ShareRowExclusiveLock mode during the ALTER PUBLICATION command. This modification ensures that the ALTER PUBLICATION command waits until ongoing transactions are completed before proceeding. Reported-by: Tomas Vondra Diagnosed-by: Andres Freund Author: Vignesh C, Tomas Vondra Reviewed-by: Amit Kapila Backpatch-through: 12 Discussion: https://postgr.es/m/de52b282-1166-1180-45a2-8d8917ca7...@enterprisedb.com --- src/backend/catalog/pg_publication.c | 9 ++ src/backend/commands/publicationcmds.c | 31 ++++- src/test/subscription/t/100_bugs.pl | 156 +++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 5 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 0602398a54..a7c257a994 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -32,6 +32,7 @@ #include "catalog/pg_type.h" #include "commands/publicationcmds.h" #include "funcapi.h" +#include "storage/lmgr.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" @@ -677,6 +678,14 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) */ schemaRels = GetSchemaPublicationRelations(schemaid, PUBLICATION_PART_ALL); + + /* + * Data loss due to concurrency issues are avoided by locking the relation + * in ShareRowExclusiveLock as described atop OpenTableList. + */ + foreach_oid(schrelid, schemaRels) + LockRelationOid(schrelid, ShareRowExclusiveLock); + InvalidatePublicationRels(schemaRels); return myself; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 6ea709988e..9d9b5f6af9 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -1466,6 +1466,13 @@ RemovePublicationRelById(Oid proid) relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL, pubrel->prrelid); + /* + * Data loss due to concurrency issues are avoided by locking the relation + * in ShareRowExclusiveLock as described atop OpenTableList. + */ + foreach_oid(relid, relids) + LockRelationOid(relid, ShareRowExclusiveLock); + InvalidatePublicationRels(relids); CatalogTupleDelete(rel, &tup->t_self); @@ -1531,6 +1538,14 @@ RemovePublicationSchemaById(Oid psoid) */ schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid, PUBLICATION_PART_ALL); + + /* + * Data loss due to concurrency issues are avoided by locking the relation + * in ShareRowExclusiveLock as described atop OpenTableList. + */ + foreach_oid(schrelid, schemaRels) + LockRelationOid(schrelid, ShareRowExclusiveLock); + InvalidatePublicationRels(schemaRels); CatalogTupleDelete(rel, &tup->t_self); @@ -1542,8 +1557,14 @@ RemovePublicationSchemaById(Oid psoid) /* * Open relations specified by a PublicationTable list. - * The returned tables are locked in ShareUpdateExclusiveLock mode in order to - * add them to a publication. + * + * The returned tables are locked in ShareRowExclusiveLock mode to add them + * to a publication. The table needs to be locked in ShareRowExclusiveLock + * mode to ensure that any ongoing transactions involving that table are + * completed before adding it to the publication. Otherwise, the transaction + * initiated before the alteration of the publication will continue to use a + * catalog snapshot predating the publication change, leading to + * non-replication of these transaction changes. */ static List * OpenTableList(List *tables) @@ -1568,7 +1589,7 @@ OpenTableList(List *tables) /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); - rel = table_openrv(t->relation, ShareUpdateExclusiveLock); + rel = table_openrv(t->relation, ShareRowExclusiveLock); myrelid = RelationGetRelid(rel); /* @@ -1594,7 +1615,7 @@ OpenTableList(List *tables) errmsg("conflicting or redundant column lists for table \"%s\"", RelationGetRelationName(rel)))); - table_close(rel, ShareUpdateExclusiveLock); + table_close(rel, ShareRowExclusiveLock); continue; } @@ -1622,7 +1643,7 @@ OpenTableList(List *tables) List *children; ListCell *child; - children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock, + children = find_all_inheritors(myrelid, ShareRowExclusiveLock, NULL); foreach(child, children) diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index cb36ca7b16..04c75b7806 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -487,6 +487,162 @@ $result = is( $result, qq(2|f 3|t), 'check replicated update on subscriber'); +# Clean up +$node_publisher->safe_psql('postgres', qq(DROP PUBLICATION pub1;)); +$node_subscriber->safe_psql('postgres', qq(DROP SUBSCRIPTION sub1;)); + +# The bug was that the incremental data synchronization was being skipped when +# a new table is added to the publication in presence of a concurrent active +# transaction performing the DML on the same table. + +# Initial setup. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE tab_conc(a int); + CREATE SCHEMA sch3; + CREATE TABLE sch3.tab_conc(a int); + CREATE SCHEMA sch4; + CREATE TABLE sch4.tab_conc(a int); + CREATE PUBLICATION regress_pub1; +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE tab_conc(a int); + CREATE SCHEMA sch3; + CREATE TABLE sch3.tab_conc(a int); + CREATE SCHEMA sch4; + CREATE TABLE sch4.tab_conc(a int); + CREATE SUBSCRIPTION regress_sub1 CONNECTION '$publisher_connstr' PUBLICATION regress_pub1; +)); + +# Bump the query timeout to avoid false negatives on slow test systems. +my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default; + +# Initiate 3 background sessions. +my $background_psql1 = $node_publisher->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $psql_timeout_secs); +$background_psql1->set_query_timer_restart(); + +my $background_psql2 = $node_publisher->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $psql_timeout_secs); + +$background_psql2->set_query_timer_restart(); + +my $background_psql3 = $node_publisher->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $psql_timeout_secs); +$background_psql3->set_query_timer_restart(); + +# Maintain an active transaction with the table that will be added to the +# publication. +$background_psql1->query_safe( + qq[ + BEGIN; + INSERT INTO tab_conc VALUES (1); +]); + +# Maintain an active transaction with a schema table that will be added to the +# publication. +$background_psql2->query_safe( + qq[ + BEGIN; + INSERT INTO sch3.tab_conc VALUES (1); +]); + +# Add the table to the publication using background_psql, as the alter +# publication operation will wait for the lock and can only be completed after +$background_psql3->query_until(qr//, + "ALTER PUBLICATION regress_pub1 ADD TABLE tab_conc, TABLES IN SCHEMA sch4, sch3;\n" +); + +# Verify that the table addition is waiting to acquire a ShareRowExclusiveLock. +$node_publisher->poll_query_until('postgres', + "SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = 'tab_conc'::regclass AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;" + ) + or die + "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock"; + +# Complete the transaction on the tables, so that ALTER PUBLICATION can proceed +# further to acquire locks on the schema table. +$background_psql1->query_safe(qq[COMMIT]); + +# Verify that ShareRowExclusiveLock is acquired on sch4.tab_conc for which +# there is no on-going transaction. +$node_publisher->poll_query_until('postgres', + "SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = (SELECT oid FROM pg_class WHERE relname = 'tab_conc' AND relnamespace = 'sch4'::regnamespace) AND mode = 'ShareRowExclusiveLock' AND waitstart IS NULL;" + ) + or die + "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock"; + +# Verify that the schema addition is waiting to acquire a ShareRowExclusiveLock +# for the table tab_conc which has an on-going transaction. +$node_publisher->poll_query_until('postgres', + "SELECT COUNT(1) = 1 FROM pg_locks WHERE relation = (SELECT oid FROM pg_class WHERE relname = 'tab_conc' AND relnamespace = 'sch3'::regnamespace) AND mode = 'ShareRowExclusiveLock' AND waitstart IS NOT NULL;" + ) + or die + "Timed out while waiting for alter publication tries to wait on ShareRowExclusiveLock"; + +$background_psql2->query_safe(qq[COMMIT]); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO tab_conc VALUES (2); + INSERT INTO sch3.tab_conc VALUES (2); +)); + +# Refresh the publication. +$node_subscriber->safe_psql('postgres', + 'ALTER SUBSCRIPTION regress_sub1 REFRESH PUBLICATION'); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2), + 'Ensure that the data from the tab_conc table is synchronized to the subscriber after the subscription is refreshed' +); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc"); +is( $result, qq(1 +2), + 'Ensure that the data from the sch3.tab_conc table is synchronized to the subscriber after the subscription is refreshed' +); + +# Perform an insert. +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO tab_conc VALUES (3); + INSERT INTO sch3.tab_conc VALUES (3); +)); +$node_publisher->wait_for_catchup('regress_sub1'); + +# Verify that the insert is replicated to the subscriber. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_conc"); +is( $result, qq(1 +2 +3), + 'Verify that the incremental data for table tab_conc added after table synchronization is replicated to the subscriber' +); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch3.tab_conc"); +is( $result, qq(1 +2 +3), + 'Verify that the incremental data for table sch3.tab_conc added after table synchronization is replicated to the subscriber' +); + +$background_psql1->quit; +$background_psql2->quit; +$background_psql3->quit; + $node_publisher->stop('fast'); $node_subscriber->stop('fast'); -- 2.34.1