Hi,

Currently ALTER SUBSCRIPTION ... SET PUBLICATION will break the
logical replication in certain cases. This can happen as the apply
worker will get restarted after SET PUBLICATION, the apply worker will
use the existing slot and replication origin corresponding to the
subscription. Now, it is possible that before restart the origin has
not been updated and the WAL start location points to a location prior
to where PUBLICATION pub exists which can lead to such an error. Once
this error occurs, apply worker will never be able to proceed and will
always return the same error.

There was discussion on this and Amit had posted a patch to handle
this at [2]. Amit's patch does continue using a historic snapshot but
ignores publications that are not found for the purpose of computing
RelSyncEntry attributes. We won't mark such an entry as valid till all
the publications are loaded without anything missing. This means we
won't publish operations on tables corresponding to that publication
till we found such a publication and that seems okay.
I have added an option skip_not_exist_publication to enable this
operation only when skip_not_exist_publication is specified as true.
There is no change in default behavior when skip_not_exist_publication
is specified as false.

But one thing to note with the patch (with skip_not_exist_publication
option) is that replication of few WAL entries will be skipped till
the publication is loaded like in the below example:
-- Create table in publisher and subscriber
create table t1(c1 int);
create table t2(c1 int);

-- Create publications
create publication pub1 for table t1;
create publication pub2 for table t2;

-- Create subscription
create subscription test1 connection 'dbname=postgres host=localhost
port=5432' publication pub1, pub2;

-- Drop one publication
drop publication pub1;

-- Insert in the publisher
insert into t1 values(11);
insert into t2 values(21);

-- Select in subscriber
postgres=# select * from t1;
 c1
----
(0 rows)

postgres=# select * from t2;
 c1
----
 21
(1 row)

-- Create the dropped publication in publisher
create publication pub1 for table t1;

-- Insert in the publisher
insert into t1 values(12);
postgres=# select * from t1;
 c1
----
 11
 12
(2 rows)

-- Select data in subscriber
postgres=# select * from t1; -- record with value 11 will be missing
in subscriber
 c1
----
 12
(1 row)

Thoughts?

[1] - 
https://www.postgresql.org/message-id/CAA4eK1%2BT-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q%40mail.gmail.com

Regards,
Vignesh
From 9b79dee26554ae4af9ff00948ab185482b071ba8 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Mon, 19 Feb 2024 10:20:02 +0530
Subject: [PATCH v1 1/2] Skip loading the publication if the publication does
 not exist.

Skip loading the publication if the publication does not exist.
---
 src/backend/replication/pgoutput/pgoutput.c | 28 +++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..f7b6d0384d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -1703,9 +1703,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1713,9 +1717,12 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, true);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			*skipped = true;
 	}
 
 	return result;
@@ -1994,7 +2001,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	}
 
 	/* Validate the entry */
-	if (!entry->replicate_valid)
+	if (!entry->replicate_valid || !publications_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2011,6 +2018,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -2021,9 +2029,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names, &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
-- 
2.34.1

From 0030521b178b80b1de26afd8a71cb7b741511b01 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Mon, 19 Feb 2024 10:23:05 +0530
Subject: [PATCH v1 2/2] Added an option skip_not_exist_publication which will
 skip loading the publication, if the publication does not exist.

Added an option skip_not_exist_publication which will skip loading the
publication, if the publication does not exist.
---
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/catalog/system_views.sql          |  3 +-
 src/backend/commands/subscriptioncmds.c       | 33 ++++++++++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +++
 src/backend/replication/logical/worker.c      |  4 +++
 src/backend/replication/pgoutput/pgoutput.c   | 23 ++++++++++---
 src/bin/psql/tab-complete.c                   | 10 +++---
 src/include/catalog/pg_subscription.h         |  5 +++
 src/include/replication/pgoutput.h            |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/subscription/t/031_column_list.pl    | 14 ++++----
 11 files changed, 82 insertions(+), 17 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 406a3c2dd1..404577725e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -74,6 +74,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->skipnotexistpub = subform->subskipnotexistpub;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d1..323cd2485d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1360,7 +1360,8 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
 			  subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subskipnotexistpub, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index a05d69922d..bd59efc73a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_SKIP_NOT_EXISTS_PUB  0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
 	bool		passwordrequired;
 	bool		runasowner;
 	bool		failover;
+	bool		skipnotexistpub;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_FAILOVER))
 		opts->failover = false;
+	if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+		opts->skipnotexistpub = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_FAILOVER;
 			opts->failover = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) &&
+				 strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB;
+			opts->skipnotexistpub = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -408,6 +421,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "failover = true")));
 
+		if (opts->skipnotexistpub &&
+			IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "skip_not_exist_publication = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
@@ -611,7 +631,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+					  SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -718,6 +739,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+	values[Anum_pg_subscription_subskipnotexistpub - 1] =
+		BoolGetDatum(opts.skipnotexistpub);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1169,6 +1192,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+								  SUBOPT_SKIP_NOT_EXISTS_PUB |
 								  SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
@@ -1248,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subrunasowner - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				{
+					values[Anum_pg_subscription_subskipnotexistpub - 1] =
+						BoolGetDatum(opts.runasowner);
+					replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
 					if (!sub->slotname)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9270d7b855..a66108aee8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -593,6 +593,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.skipnotexistpub &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfo(&cmd, ", skip_not_exist_publication 'true'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9dd2446fbf..ef362e3571 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3952,6 +3952,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->passwordrequired != MySubscription->passwordrequired ||
+		newsub->skipnotexistpub != MySubscription->skipnotexistpub ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
@@ -4380,6 +4381,9 @@ set_stream_options(WalRcvStreamOptions *options,
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
+	if (server_version >= 170000 && MySubscription->skipnotexistpub)
+		options->proto.logical.skipnotexistpub = true;
+
 	/*
 	 * Assign the appropriate option value for streaming option according to
 	 * the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f7b6d0384d..7c27d4a7d7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames, bool *skipped);
+static List *LoadPublications(List *pubnames, bool skipnotexistpub,
+							  bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -284,11 +285,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		skipnotexistpub_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->skipnotexistpub = false;
 
 	foreach(lc, options)
 	{
@@ -397,6 +400,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (skipnotexistpub_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			skipnotexistpub_option_given = true;
+
+			data->skipnotexistpub = true;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1709,7 +1722,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
  * exist.
  */
 static List *
-LoadPublications(List *pubnames, bool *skipped)
+LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1717,7 +1730,7 @@ LoadPublications(List *pubnames, bool *skipped)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, true);
+		Publication *pub = GetPublicationByName(pubname, skipnotexistpub);
 
 		if (pub)
 			result = lappend(result, pub);
@@ -2029,7 +2042,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names, &skipped_pub);
+			data->publications = LoadPublications(data->publication_names,
+												  data->skipnotexistpub,
+												  &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
 
 			/*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 151a5211ee..0642ff4581 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1944,8 +1944,9 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3341,8 +3342,9 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..704d1217d9 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subskipnotexistpub;	/* True if the not-exist publications should
+									 * be ignored */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -151,6 +154,8 @@ typedef struct Subscription
 								 * (i.e. the main slot and the table sync
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
+	bool		skipnotexistpub; /* True if the non-existent publications should
+								  * be ignored. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..38faa2ea04 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		skipnotexistpub;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index b906bb5ce8..e7a0d9e08a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		skipnotexistpub;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 938582e31a..8626397c1c 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -145,7 +145,7 @@ $node_publisher->safe_psql(
 # then check the sync results
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true)
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -741,7 +741,7 @@ $node_publisher->safe_psql(
 $node_subscriber->safe_psql(
 	'postgres', qq(
 	DROP SUBSCRIPTION sub1;
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -921,7 +921,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -973,7 +973,7 @@ $node_publisher->safe_psql(
 # both table sync and data replication.
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1213,7 +1213,7 @@ $node_subscriber->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1255,7 +1255,7 @@ $node_subscriber->safe_psql(
 
 my ($cmdret, $stdout, $stderr) = $node_subscriber->psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 ok( $stderr =~
@@ -1272,7 +1272,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 $node_publisher->wait_for_catchup('sub1');
-- 
2.34.1

Reply via email to