On Wed, 7 Feb 2024 at 16:27, vignesh C <vignes...@gmail.com> wrote:
>
> On Wed, 7 Feb 2024 at 15:21, Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > On Tue, Feb 6, 2024 at 8:21 PM Tom Lane <t...@sss.pgh.pa.us> wrote:
> > >
> > > Amit Kapila <amit.kapil...@gmail.com> writes:
> > > > Yeah, I was worried about that. The other idea I have previously
> > > > thought was to change Alter Subscription to Drop+Create Subscription.
> > > > That should also help in bringing stability without losing any
> > > > functionality.
> > >
> > > Hm, why would that fix it?
> > >
> >
> > Because for new subscriptions, we will start reading WAL from the
> > latest WAL insert pointer on the publisher which will be after the
> > point where publication is created.
>
> I was able to reproduce the issue consistently with the changes shared
> by Tom Lane at [1].
> I have made changes to change ALTER SUBSCRIPTION to DROP+CREATE
> SUBSCRIPTION and verified that the test has passed consistently for
> >50 runs that I ran. Also the test execution time increased for this
> case is very negligibly:
> Without patch:                 7.991 seconds
> With test change patch:   8.121 seconds
>
> The test changes for the same are attached.

Alternative, this could also be fixed like the changes proposed by
Amit at [1]. In this case we ignore publications that are not found
for the purpose of computing RelSyncEntry attributes. We won't mark
such an entry as valid till all the publications are loaded without
anything missing. This means we won't publish operations on tables
corresponding to that publication till we found such a publication and
that seems okay.

Tomas had raised a performance issue forcing us to reload it for every
replicated change/row in case the publications are invalid at [2]. How
about keeping the default option as it is and providing a new option
skip_not_exist_publication while creating/altering a subscription. In
this case if skip_not_exist_publication  is specified we will ignore
the case if publication is not present and publications will be kept
in invalid and get validated later.

The attached patch has the changes for the same. Thoughts?

[1] - 
https://www.postgresql.org/message-id/CAA4eK1%2BT-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q%40mail.gmail.com
[2] - 
https://www.postgresql.org/message-id/dc08add3-10a8-738b-983a-191c7406707b%40enterprisedb.com

Regards,
Vignesh
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 406a3c2dd1..404577725e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -74,6 +74,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->passwordrequired = subform->subpasswordrequired;
 	sub->runasowner = subform->subrunasowner;
 	sub->failover = subform->subfailover;
+	sub->skipnotexistpub = subform->subskipnotexistpub;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 6791bff9dd..bdd367d272 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1359,7 +1359,8 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
 			  subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subskipnotexistpub, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index a05d69922d..bd59efc73a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_FAILOVER				0x00002000
 #define SUBOPT_LSN					0x00004000
 #define SUBOPT_ORIGIN				0x00008000
+#define SUBOPT_SKIP_NOT_EXISTS_PUB  0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +98,7 @@ typedef struct SubOpts
 	bool		passwordrequired;
 	bool		runasowner;
 	bool		failover;
+	bool		skipnotexistpub;
 	char	   *origin;
 	XLogRecPtr	lsn;
 } SubOpts;
@@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_FAILOVER))
 		opts->failover = false;
+	if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+		opts->skipnotexistpub = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
@@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_FAILOVER;
 			opts->failover = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) &&
+				 strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB;
+			opts->skipnotexistpub = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
 				 strcmp(defel->defname, "origin") == 0)
 		{
@@ -408,6 +421,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "failover = true")));
 
+		if (opts->skipnotexistpub &&
+			IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "skip_not_exist_publication = true")));
+
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
@@ -611,7 +631,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+					  SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -718,6 +739,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+	values[Anum_pg_subscription_subskipnotexistpub - 1] =
+		BoolGetDatum(opts.skipnotexistpub);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1169,6 +1192,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+								  SUBOPT_SKIP_NOT_EXISTS_PUB |
 								  SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
@@ -1248,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subrunasowner - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB))
+				{
+					values[Anum_pg_subscription_subskipnotexistpub - 1] =
+						BoolGetDatum(opts.runasowner);
+					replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
 					if (!sub->slotname)
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9270d7b855..a66108aee8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -593,6 +593,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendStringInfo(&cmd, ", origin '%s'",
 							 options->proto.logical.origin);
 
+		if (options->proto.logical.skipnotexistpub &&
+			PQserverVersion(conn->streamConn) >= 170000)
+			appendStringInfo(&cmd, ", skip_not_exist_publication 'true'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9dd2446fbf..ef362e3571 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3952,6 +3952,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->passwordrequired != MySubscription->passwordrequired ||
+		newsub->skipnotexistpub != MySubscription->skipnotexistpub ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
 		!equal(newsub->publications, MySubscription->publications))
@@ -4380,6 +4381,9 @@ set_stream_options(WalRcvStreamOptions *options,
 	options->proto.logical.publication_names = MySubscription->publications;
 	options->proto.logical.binary = MySubscription->binary;
 
+	if (server_version >= 170000 && MySubscription->skipnotexistpub)
+		options->proto.logical.skipnotexistpub = true;
+
 	/*
 	 * Assign the appropriate option value for streaming option according to
 	 * the 'streaming' mode and the publisher's ability to support that mode.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..7c27d4a7d7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 
-static List *LoadPublications(List *pubnames);
+static List *LoadPublications(List *pubnames, bool skipnotexistpub,
+							  bool *skipped);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
@@ -284,11 +285,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		skipnotexistpub_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
 	data->messages = false;
 	data->two_phase = false;
+	data->skipnotexistpub = false;
 
 	foreach(lc, options)
 	{
@@ -397,6 +400,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", origin));
 		}
+		else if (strcmp(defel->defname, "skip_not_exist_publication") == 0)
+		{
+			if (skipnotexistpub_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			skipnotexistpub_option_given = true;
+
+			data->skipnotexistpub = true;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1703,9 +1716,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we just skip the publications that don't exist yet. 'skipped'
+ * will be true if we find any publication from the given list that doesn't
+ * exist.
  */
 static List *
-LoadPublications(List *pubnames)
+LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -1713,9 +1730,12 @@ LoadPublications(List *pubnames)
 	foreach(lc, pubnames)
 	{
 		char	   *pubname = (char *) lfirst(lc);
-		Publication *pub = GetPublicationByName(pubname, false);
+		Publication *pub = GetPublicationByName(pubname, skipnotexistpub);
 
-		result = lappend(result, pub);
+		if (pub)
+			result = lappend(result, pub);
+		else
+			*skipped = true;
 	}
 
 	return result;
@@ -1994,7 +2014,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	}
 
 	/* Validate the entry */
-	if (!entry->replicate_valid)
+	if (!entry->replicate_valid || !publications_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
 		List	   *pubids = GetRelationPublications(relid);
@@ -2011,6 +2031,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		bool		am_partition = get_rel_relispartition(relid);
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
+		bool		skipped_pub = false;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -2021,9 +2042,17 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				list_free_deep(data->publications);
 				data->publications = NIL;
 			}
-			data->publications = LoadPublications(data->publication_names);
+			data->publications = LoadPublications(data->publication_names,
+												  data->skipnotexistpub,
+												  &skipped_pub);
 			MemoryContextSwitchTo(oldctx);
-			publications_valid = true;
+
+			/*
+			 * We don't consider the publications to be valid till we have
+			 * information of all the publications.
+			 */
+			if (!skipped_pub)
+				publications_valid = true;
 		}
 
 		/*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 151a5211ee..0642ff4581 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1944,8 +1944,9 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3341,8 +3342,9 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "disable_on_error", "enabled", "failover", "origin",
-					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+					  "password_required", "run_as_owner",
+					  "skip_not_exist_publication", "slot_name", "streaming",
+					  "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0aa14ec4a2..704d1217d9 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
 
+	bool		subskipnotexistpub;	/* True if the not-exist publications should
+									 * be ignored */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -151,6 +154,8 @@ typedef struct Subscription
 								 * (i.e. the main slot and the table sync
 								 * slots) in the upstream database are enabled
 								 * to be synchronized to the standbys. */
+	bool		skipnotexistpub; /* True if the non-existent publications should
+								  * be ignored. */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 89f94e1147..38faa2ea04 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -33,6 +33,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		publish_no_origin;
+	bool		skipnotexistpub;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index b906bb5ce8..e7a0d9e08a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -186,6 +186,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		skipnotexistpub;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 938582e31a..8626397c1c 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -145,7 +145,7 @@ $node_publisher->safe_psql(
 # then check the sync results
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true)
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -741,7 +741,7 @@ $node_publisher->safe_psql(
 $node_subscriber->safe_psql(
 	'postgres', qq(
 	DROP SUBSCRIPTION sub1;
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -921,7 +921,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -973,7 +973,7 @@ $node_publisher->safe_psql(
 # both table sync and data replication.
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1213,7 +1213,7 @@ $node_subscriber->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true);
 ));
 
 $node_subscriber->wait_for_subscription_sync;
@@ -1255,7 +1255,7 @@ $node_subscriber->safe_psql(
 
 my ($cmdret, $stdout, $stderr) = $node_subscriber->psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 ok( $stderr =~
@@ -1272,7 +1272,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true);
 ));
 
 $node_publisher->wait_for_catchup('sub1');

Reply via email to