On Fri, 1 Aug 2025 at 10:54, Ashutosh Bapat <ashutosh.bapat....@gmail.com> wrote: > > Hi Vignesh, Amit, > We encountered a situation where a customer dropped a publication > accidentally and that broke logical replication in an irrecoverable > manner. This is PG 15.3 but the team confirmed that the behaviour is > reproducible with PG 17 as well. > > When a WAL sender processes a WAL record recording a change in > publication, it ends up calling LoadPublication() which throws an > error if a publication mentioned in START_REPLICATION command is not > found. The downstream tries to reconnect but the WAL sender again > repeats the same process going in an error loop. Creating the > publication does not help since WAL sender will always encounter the > WAL record dropping the publication first. > > There are ways to come out of this situation, but not very clean always > 1. Remove publication from subscription, run logical replication till > it passes the point where publication was added, add the publication > back and continue. It's not always possible to know when the > publication was added back and thus it becomes tedious or next to > impossible to apply these steps. > 2. Reseeding the replication slot which involves copying all the data > again and not feasible in case of large databases. > 3. Skipping the transaction which dropped the publication. This will > work if drop publication was the only thing in that transaction but > not otherwise. Confirming that is tricky and requires some expert > help. > > In PG 18 onwards, this behaviour is fixed by throwing a WARNING > instead of an error. In the relevant thread [1] where the fix to PG 18 > was discussed, backpatching was also discussed. Back then it was > deferred because of lack of field reports. But we are seeing this > situation now. So maybe it's time to backpatch the fix. Further PG 15 > documentation mentions that > https://www.postgresql.org/docs/15/sql-createsubscription.html. So the > users will expect that their logical replication will not be affected > (except for the data published by the publication) if a publication is > dropped or does not exist. So, backpatching the change would make the > behaviour compatible with the documentation. > > The backport seems to be straight forward. Please let me know if you > need my help in doing so, if we decide to backport the fix.
Now that this has been reported on the back branches, we should consider whether it's appropriate to backport the fix. Here are the patches prepared for the back branches. Regards, Vignesh
From fa8d5b256e8d7ca1428695115aad115fbec6f11c Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Sat, 2 Aug 2025 18:46:49 +0530 Subject: [PATCH v1_PG13] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 ++++++- src/test/subscription/t/001_rep_changes.pl | 48 ++++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3d98d60d6c4..b3bd2b92f58 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -642,6 +642,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -652,9 +657,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index c60ef1c4f52..8c33ae51f4b 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 27; +use Test::More tests => 28; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -390,6 +390,52 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); is($result, qq(0), 'check replication origin was dropped on subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub1 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub1" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub1'); + +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub1"); + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub1/, + $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub1 FOR TABLE tab_3"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(1)"); + +$node_publisher->wait_for_catchup('tap_sub1'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_3"); +is($result, qq(1), + 'check that the incremental data is replicated after the publication is created' +); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub1"); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From 5d5e45a48ecbaf9dba05dd757a82ac98e35f0057 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 22:26:06 +0530 Subject: [PATCH v1_PG14] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/021_alter_sub_pub.pl | 57 +++++++++++++++++++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index a81215cff86..f463e4d6d60 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -841,6 +841,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -851,9 +856,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/021_alter_sub_pub.pl b/src/test/subscription/t/021_alter_sub_pub.pl index 4c59d44e33f..9cc93caf7e9 100644 --- a/src/test/subscription/t/021_alter_sub_pub.pl +++ b/src/test/subscription/t/021_alter_sub_pub.pl @@ -1,12 +1,14 @@ # Copyright (c) 2021, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 3; +use Test::More tests => 4; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -81,6 +83,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From f0e05a438b89e86e6424c461aff6d2de618bd917 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 15:28:46 +0530 Subject: [PATCH v1_PG15] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/024_add_drop_pub.pl | 55 ++++++++++++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 64f62de635f..9e427beed4f 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1773,6 +1773,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -1783,9 +1788,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index eaf47e66f1a..b39feaf3119 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -1,7 +1,9 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -80,6 +82,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From 3d874dc02ccadb04723d4451da6da43bd1ed4d14 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 14:51:26 +0530 Subject: [PATCH v1_PG17] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/024_add_drop_pub.pl | 55 ++++++++++++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 99518c6b6dd..fa0fb915f15 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1741,6 +1741,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -1751,9 +1756,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index c0d7ffcb6b8..1ea7d4611da 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -1,7 +1,9 @@ # Copyright (c) 2021-2024, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; @@ -80,6 +82,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0
From 910b8d66e50050574cfb98a53d6a1ed8f00ac6ec Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 1 Aug 2025 15:25:53 +0530 Subject: [PATCH v1_PG16] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 +++++- src/test/subscription/t/024_add_drop_pub.pl | 55 ++++++++++++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 32b74bb4752..d83da138081 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1737,6 +1737,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -1747,9 +1752,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index 8614b1b5b34..33ecfe7a415 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -1,7 +1,9 @@ # Copyright (c) 2021-2023, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -80,6 +82,57 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); + +# Set the subscription with a missing publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3"); + +# Wait for the walsender to restart after altering the subscription +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply worker to restart after altering the subscription"; + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_3"); +is($result, qq(1 +2), 'check that the incremental data is replicated after the publication is created'); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0