On Thu, Mar 4, 2021 at 1:04 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> On Wed, Mar 3, 2021 at 8:59 AM Euler Taveira <eu...@eulerto.com> wrote:
> >
> > On Wed, Feb 3, 2021, at 2:13 AM, Bharath Rupireddy wrote:
> >
> > On Mon, Jan 25, 2021 at 10:32 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > > If a publication which does not exist is specified during create 
> > > subscription, then we should throw an error similar to step 2 behavior. 
> > > Similarly if a publication which does not exist is specified during alter 
> > > subscription, then we should throw an error similar to step 4 behavior. 
> > > If publication is dropped after subscription is created, this should be 
> > > removed when an alter subscription subname refresh publication is 
> > > performed similar to step 4.
> > > Thoughts?
> >
> > Postgres implemented a replication mechanism that is decoupled which means 
> > that
> > both parties can perform "uncoordinated" actions. As you shown, the CREATE
> > SUBSCRIPTION informing a non-existent publication is one of them. I think 
> > that
> > being permissive in some situations can prevent you from painting yourself 
> > into
> > a corner. Even if you try to be strict on the subscriber side, the other 
> > side
> > (publisher) can impose you additional complexity.
> >
> > You are arguing that in the initial phase, the CREATE SUBSCRIPTION has a 
> > strict
> > mechanism. It is a fair point. However, impose this strictness for the other
> > SUBSCRIPTION commands should be carefully forethought. If we go that route, 
> > I
> > suggest to have the current behavior as an option. The reasons are: (i) it 
> > is
> > backward-compatible, (ii) it allows us some known flexibility (non-existent
> > publication), and (iii) it would probably allow us to fix a scenario 
> > created by
> > the strict mode. This strict mode can be implemented via new parameter
> > validate_publication (ENOCAFFEINE to propose a better name) that checks if 
> > the
> > publication is available when you executed the CREATE SUBSCRIPTION. Similar
> > parameter can be used in ALTER SUBSCRIPTION ... SET PUBLICATION and ALTER
> > SUBSCRIPTION ... REFRESH PUBLICATION.
>
> IIUC the validate_publication is kind of the feature enable/disable
> flag. With the default value being false, when set to true, it checks
> whether the publications exists or not while CREATE/ALTER SUBSCRIPTION
> and throw error. If I'm right, then the validate_publication option
> looks good to me. So, whoever wants to impose strict restrictions to
> CREATE/ALTER subscription can set it to true. All others will not see
> any new behaviour or such.
>
> > To not mandate any new behaviour, I would suggest to have a new option
> > for ALTER SUBSCRIPTION ... REFRESH PUBLICATION WITH
> > (remove_dropped_publications = false). The new option
> > remove_dropped_publications will have a default value false, when set
> > to true it will check if the publications that are present in the
> > subscription list are actually existing on the publisher or not, if
> > not remove them from the list. And also in the documentation we need
> > to clearly mention the consequence of this new option setting to true,
> > that is, the dropped publications if created again will need to be
> > added to the subscription list again.
> >
> > REFRESH PUBLICATION is not the right command to remove publications. There 
> > is a
> > command for it: ALTER SUBSCRIPTION ... SET PUBLICATION.
> >
> > The other alternative is to document that non-existent publication names 
> > can be
> > in the subscription catalog and it is ignored while executing SUBSCRIPTION
> > commands. You could possibly propose a NOTICE/WARNING that informs the user
> > that the SUBSCRIPTION command contains non-existent publication.
>
> I think, we can also have validate_publication option allowed for
> ALTER SUBSCRIPTION SET PUBLICATION and REFRESH PUBLICATION commands
> with the same behaviour i.e. error out when specified publications
> don't exist in the publisher. Thoughts?

Sorry for the delayed reply, I was working on a few other projects so
I was not able to reply quickly.
Since we are getting the opinion that if we make the check
publications by default it might affect the existing users, I'm fine
with having an option validate_option to check if the publication
exists in the publisher, that way there is no change for the existing
user. I have made a patch in similar lines, attached patch has the
changes for the same.
Thoughts?

Regards,
Vignesh
From 2bae27e37c6c1ace313f544709c0582e2b94b937 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Wed, 7 Apr 2021 22:05:53 +0530
Subject: [PATCH v3] Identify missing publications from publisher while
 create/alter subscription.

Creating/altering subscription is successful when we specify a publication which
does not exist in the publisher. This patch checks if the specified publications
are present in the publisher and throws an error if any of the publication is
missing in the publisher.
---
 doc/src/sgml/ref/alter_subscription.sgml  |  12 ++
 doc/src/sgml/ref/create_subscription.sgml |  12 ++
 src/backend/commands/subscriptioncmds.c   | 242 +++++++++++++++++++---
 src/test/subscription/t/100_bugs.pl       |  71 ++++++-
 4 files changed, 306 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 367ac814f4..85d3b8f17b 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -160,6 +160,18 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>validate_publication</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscriber must verify if the specified
+          publications are present in the publisher. By default, the subscriber
+          will not check if the publications are present in the publisher.
+         </para>
+        </listitem>
+       </varlistentry>
+
       </variablelist></para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e812beee37..ee9e656c60 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -239,6 +239,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>validate_publication</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscriber must verify if the specified
+          publications are present in the publisher. By default, the subscriber
+          will not check if the publications are present in the publisher.
+         </para>
+        </listitem>
+       </varlistentry>
+
       </variablelist></para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 517c8edd3b..e595388472 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -50,6 +50,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static void check_publications(WalReceiverConn *wrconn, List *publications);
 
 
 /*
@@ -69,7 +70,9 @@ parse_subscription_options(List *options,
 						   char **synchronous_commit,
 						   bool *refresh,
 						   bool *binary_given, bool *binary,
-						   bool *streaming_given, bool *streaming)
+						   bool *streaming_given, bool *streaming,
+						   bool *validate_publication_given,
+						   bool *validate_publication)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -111,6 +114,12 @@ parse_subscription_options(List *options,
 		*streaming = false;
 	}
 
+	if (validate_publication)
+	{
+		*validate_publication_given = false;
+		*validate_publication = false;
+	}
+
 	/* Parse options */
 	foreach(lc, options)
 	{
@@ -215,6 +224,16 @@ parse_subscription_options(List *options,
 			*streaming_given = true;
 			*streaming = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "validate_publication") == 0 && validate_publication)
+		{
+			if (*validate_publication_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			*validate_publication_given = true;
+			*validate_publication = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -247,10 +266,18 @@ parse_subscription_options(List *options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
+		if (validate_publication && validate_publication_given &&
+			*validate_publication)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "validate_publication = true")));
+
 		/* Change the defaults of other options. */
 		*enabled = false;
 		*create_slot = false;
 		*copy_data = false;
+		*validate_publication = false;
 	}
 
 	/*
@@ -343,6 +370,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		slotname_given;
 	bool		binary;
 	bool		binary_given;
+	bool		validate_publication;
+	bool		validate_publication_given;
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
 	List	   *publications;
@@ -361,7 +390,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 							   &synchronous_commit,
 							   NULL,	/* no "refresh" */
 							   &binary_given, &binary,
-							   &streaming_given, &streaming);
+							   &streaming_given, &streaming,
+							   &validate_publication_given,
+							   &validate_publication);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -472,6 +503,12 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 		PG_TRY();
 		{
+			if (validate_publication)
+			{
+				/* Verify specified publications exists in the publisher. */
+				check_publications(wrconn, publications);
+			}
+
 			/*
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
@@ -539,7 +576,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data, bool check_pub)
 {
 	char	   *err;
 	List	   *pubrel_names;
@@ -568,6 +605,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 			ereport(ERROR,
 					(errmsg("could not connect to the publisher: %s", err)));
 
+		/* Verify specified publications exists in the publisher. */
+		if (check_pub)
+			check_publications(wrconn, sub->publications);
+
 		/* Get the table list from publisher. */
 		pubrel_names = fetch_table_list(wrconn, sub->publications);
 
@@ -814,7 +855,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   &synchronous_commit,
 										   NULL,	/* no "refresh" */
 										   &binary_given, &binary,
-										   &streaming_given, &streaming);
+										   &streaming_given, &streaming,
+										   NULL, NULL);
 
 				if (slotname_given)
 				{
@@ -871,6 +913,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   NULL,	/* no "refresh" */
 										   NULL, NULL,	/* no "binary" */
+										   NULL, NULL,
 										   NULL, NULL); /* no streaming */
 				Assert(enabled_given);
 
@@ -906,6 +949,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 			{
 				bool		copy_data;
 				bool		refresh;
+				bool		validate_publication;
+				bool		validate_publication_given;
+				char	   *err;
+				WalReceiverConn *wrconn;
 
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
@@ -916,13 +963,33 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   &refresh,
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   &validate_publication_given,
+										   &validate_publication);
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
 
 				update_tuple = true;
 
+				if (validate_publication)
+				{
+					/* Load the library providing us libpq calls. */
+					load_file("libpqwalreceiver", false);
+
+					/* Try to connect to the publisher. */
+					wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+					if (!wrconn)
+						ereport(ERROR,
+								(errmsg("could not connect to the publisher: %s", err)));
+
+					/* Verify specified publications exists in the publisher. */
+					check_publications(wrconn, stmt->publication);
+
+					/* We are done with the remote side, close connection. */
+					walrcv_disconnect(wrconn);
+				}
+
 				/* Refresh if user asked us to. */
 				if (refresh)
 				{
@@ -937,7 +1004,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, copy_data, false);
 				}
 
 				break;
@@ -950,6 +1017,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				bool		copy_data;
 				bool		refresh;
 				List	   *publist;
+				bool		validate_publication;
+				bool		validate_publication_given;
+				char	   *err;
+				WalReceiverConn *wrconn;
 
 				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
 
@@ -963,7 +1034,9 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   &refresh,
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   &validate_publication_given,
+										   &validate_publication);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(publist);
@@ -971,6 +1044,24 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 				update_tuple = true;
 
+				if (validate_publication)
+				{
+					/* Load the library providing us libpq calls. */
+					load_file("libpqwalreceiver", false);
+
+					/* Try to connect to the publisher. */
+					wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+					if (!wrconn)
+						ereport(ERROR,
+								(errmsg("could not connect to the publisher: %s", err)));
+
+					/* Verify specified publications exists in the publisher. */
+					check_publications(wrconn, stmt->publication);
+
+					/* We are done with the remote side, close connection. */
+					walrcv_disconnect(wrconn);
+				}
+
 				/* Refresh if user asked us to. */
 				if (refresh)
 				{
@@ -985,7 +1076,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Only refresh the added/dropped list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, copy_data, false);
 				}
 
 				break;
@@ -994,6 +1085,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
 				bool		copy_data;
+				bool		validate_publication;
+				bool		validate_publication_given;
 
 				if (!sub->enabled)
 					ereport(ERROR,
@@ -1009,11 +1102,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   NULL,	/* no "refresh" */
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   &validate_publication_given,
+										   &validate_publication);
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, copy_data);
+				AlterSubscription_refresh(sub, copy_data, validate_publication);
 
 				break;
 			}
@@ -1466,27 +1561,20 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 }
 
 /*
- * Get the list of tables which belong to specified publications on the
- * publisher connection.
+ * Return a query by appending the publications to the input query.
  */
-static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+static StringInfoData *
+get_appended_publications_query(char *query, List *publications)
 {
-	WalRcvExecResult *res;
-	StringInfoData cmd;
-	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	bool		first = true;
+	StringInfoData *cmd = makeStringInfo();
 	ListCell   *lc;
-	bool		first;
-	List	   *tablelist = NIL;
 
 	Assert(list_length(publications) > 0);
 
-	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	first = true;
+	initStringInfo(cmd);
+	appendStringInfoString(cmd, query);
+
 	foreach(lc, publications)
 	{
 		char	   *pubname = strVal(lfirst(lc));
@@ -1494,14 +1582,108 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		if (first)
 			first = false;
 		else
-			appendStringInfoString(&cmd, ", ");
+			appendStringInfoString(cmd, ", ");
+
+		appendStringInfoString(cmd, quote_literal_cstr(pubname));
+	}
+
+	appendStringInfoChar(cmd, ')');
+	return cmd;
+}
+
+/*
+ * Verify that the specified publication(s) exists in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData *cmd;
+	TupleTableSlot *slot;
+	List	   *publicationsCopy = NIL;
+	Oid			tableRow[1] = {TEXTOID};
+
+	cmd = get_appended_publications_query("SELECT t.pubname FROM\n"
+										  " pg_catalog.pg_publication t WHERE\n"
+										  " t.pubname IN (", publications);
+
+	res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+	pfree(cmd->data);
+	pfree(cmd);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of publications from the publisher: %s",
+						res->err)));
 
-		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	publicationsCopy = list_copy(publications);
+
+	/* Process publications. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *pubname;
+		bool		isnull;
+
+		pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		/* Delete the publication present in publisher from the list. */
+		publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+		ExecClearTuple(slot);
 	}
-	appendStringInfoChar(&cmd, ')');
 
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
-	pfree(cmd.data);
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	if (list_length(publicationsCopy))
+	{
+		StringInfoData nonExistentPublications;
+		bool		first = true;
+		ListCell   *lc;
+
+		/* Convert the publications which does not exist into a string. */
+		initStringInfo(&nonExistentPublications);
+		foreach(lc, publicationsCopy)
+		{
+			char	   *pubname = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&nonExistentPublications, ", ");
+			appendStringInfoString(&nonExistentPublications, "\"");
+			appendStringInfoString(&nonExistentPublications, pubname);
+			appendStringInfoString(&nonExistentPublications, "\"");
+		}
+
+		ereport(ERROR,
+				(errmsg("publication(s) %s does not exist in the publisher",
+						nonExistentPublications.data)));
+	}
+}
+
+/*
+ * Get the list of tables which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_table_list(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData *cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	List	   *tablelist = NIL;
+
+	cmd = get_appended_publications_query(
+										  "SELECT DISTINCT t.schemaname, t.tablename\n"
+										  "  FROM pg_catalog.pg_publication_tables t\n"
+										  " WHERE t.pubname IN (", publications);
+	res = walrcv_exec(wrconn, cmd->data, 2, tableRow);
+	pfree(cmd->data);
+	pfree(cmd);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index b8f46f08cc..6ad640600f 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 5;
+use Test::More tests => 11;
 
 # Bug #15114
 
@@ -154,6 +154,75 @@ is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
 is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
 	$rows * 2, "2x$rows rows in t2");
 
+# Create subcription for a publication which does not exist.
+$node_publisher = get_new_node('testpublisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+$node_subscriber = get_new_node('testsubscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION testpub1 FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher_connstr' PUBLICATION testpub1"
+);
+
+# Specified publication does not exist.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+       "Create subscription for non existent publication fails");
+
+# One of the specified publication exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION testpub1, pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+       "Create subscription for non existent publication fails");
+
+# Multiple publications does not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist, pub_doesnt_exist1 WITH (VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         /ERROR:  publication\(s\) "pub_doesnt_exist", "pub_doesnt_exist1" does not exist in the publisher/,
+       "Create subscription for non existent publication fails");
+
+# Specified publication does not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (VALIDATE_PUBLICATION = TRUE)");
+ok( $stderr =~
+         /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+       "Alter subscription for non existent publication fails");
+
+# Specified publication does not exist with refresh = false.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+       "Alter subscription for non existent publication fails");
+
+# Set publication on non existent database.
+$node_subscriber->safe_psql('postgres',
+       "ALTER SUBSCRIPTION testsub1 CONNECTION 'dbname=regress_doesnotexist2'");
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~ /ERROR:  could not connect to the publisher/,
+       "Alter subscription for non existent publication fails");
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');
+
 # Verify table data is synced with cascaded replication setup. This is mainly
 # to test whether the data written by tablesync worker gets replicated.
 my $node_pub = get_new_node('testpublisher1');
-- 
2.25.1

Reply via email to