On Mon, 4 Aug 2025 at 16:08, vignesh C <vignes...@gmail.com> wrote: > > On Mon, 4 Aug 2025 at 09:47, Ashutosh Bapat > <ashutosh.bapat....@gmail.com> wrote: > > > > Hi Vignesh, > > Thanks for the patches. > > > > On Sat, Aug 2, 2025 at 7:10 PM vignesh C <vignes...@gmail.com> wrote: > > > > > > > > > > The backport seems to be straight forward. Please let me know if you > > > > need my help in doing so, if we decide to backport the fix. > > > > > > Now that this has been reported on the back branches, we should > > > consider whether it's appropriate to backport the fix. Here are the > > > patches prepared for the back branches. > > > > PG14 and + patches do not test that DROP PUBLICATION does not disrupt > > the publication. I think we need to test that as well. > > Currently, the test across all branches except PG13 is the same test > used in the master branch. For PG13, since there was no existing > subscription, I modified the test slightly to accommodate that. If I > handle the comment you suggest, the test in master and the backbranch > will be different. Should we keep the test similar to the master or is > it ok to address your above comment and keep it different? > > > PG13 tests DROP PUBLICATION OTOH. That's good. I think it has a race > > condition because +my $offset = -s $node_publisher->logfile; is > > executed after dropping the publication. If some background change > > triggers publication validation before capturing the file offset, we > > might miss the WARNING and the test will fail. Instead capturing > > offset before dropping publication may be safer - the publication > > exists till it dropped, so the log file cannot have WARNING in there > > when offset is captured. > > I will handle this in the next version.
This is addressed in the attached patch. Only the PG13 branch patch is updated, there is no change in other branch patches. Regards, Vignesh
From 3d874dc02ccadb04723d4451da6da43bd1ed4d14 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 14:51:26 +0530 Subject: [PATCH v1_PG17] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/024_add_drop_pub.pl | 55 ++++++++++++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 99518c6b6dd..fa0fb915f15 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1741,6 +1741,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -1751,9 +1756,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index c0d7ffcb6b8..1ea7d4611da 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -1,7 +1,9 @@ # Copyright (c) 2021-2024, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; @@ -80,6 +82,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From f0e05a438b89e86e6424c461aff6d2de618bd917 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 15:28:46 +0530 Subject: [PATCH v1_PG15] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/024_add_drop_pub.pl | 55 ++++++++++++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 64f62de635f..9e427beed4f 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1773,6 +1773,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -1783,9 +1788,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index eaf47e66f1a..b39feaf3119 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -1,7 +1,9 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -80,6 +82,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From 5d5e45a48ecbaf9dba05dd757a82ac98e35f0057 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 22:26:06 +0530 Subject: [PATCH v1_PG14] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/021_alter_sub_pub.pl | 57 +++++++++++++++++++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index a81215cff86..f463e4d6d60 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -841,6 +841,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -851,9 +856,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/021_alter_sub_pub.pl b/src/test/subscription/t/021_alter_sub_pub.pl index 4c59d44e33f..9cc93caf7e9 100644 --- a/src/test/subscription/t/021_alter_sub_pub.pl +++ b/src/test/subscription/t/021_alter_sub_pub.pl @@ -1,12 +1,14 @@ # Copyright (c) 2021, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 3; +use Test::More tests => 4; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -81,6 +83,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From 910b8d66e50050574cfb98a53d6a1ed8f00ac6ec Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 15:25:53 +0530 Subject: [PATCH v1_PG16] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/024_add_drop_pub.pl | 55 ++++++++++++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 32b74bb4752..d83da138081 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1737,6 +1737,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -1747,9 +1752,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index 8614b1b5b34..33ecfe7a415 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -1,7 +1,9 @@ # Copyright (c) 2021-2023, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -80,6 +82,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From 29ff09285362319a6938d33d14d0e84a5a778608 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Sat, 2 Aug 2025 18:46:49 +0530 Subject: [PATCH v2_PG13] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 ++++++- src/test/subscription/t/001_rep_changes.pl | 47 ++++++++++++++++++++- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3d98d60d6c4..b3bd2b92f58 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -642,6 +642,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -652,9 +657,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index c60ef1c4f52..311c3e0cb45 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 27; +use Test::More tests => 28; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -390,6 +390,51 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); is($result, qq(0), 'check replication origin was dropped on subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub1 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub1" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub1'); + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub1"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub1/, + $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub1 FOR TABLE tab_3"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(1)"); + +$node_publisher->wait_for_catchup('tap_sub1'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_3"); +is($result, qq(1), + 'check that the incremental data is replicated after the publication is created' +); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub1"); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0