On Thu, May 6, 2021 at 9:08 AM Japin Li <japi...@hotmail.com> wrote:
>
>
> On Tue, 04 May 2021 at 21:20, vignesh C <vignes...@gmail.com> wrote:
> > On Tue, May 4, 2021 at 2:37 PM Bharath Rupireddy
> > <bharath.rupireddyforpostg...@gmail.com> wrote:
> >>
> >> On Mon, May 3, 2021 at 7:59 PM vignesh C <vignes...@gmail.com> wrote:
> >> > Thanks for the comments, these comments are handle in the v7 patch
> >> > posted in my earlier mail.
> >>
> >> Thanks. Some comments on v7 patch:
> >>
> >> 1) How about "Add publication names from the list to a string."
> >>  instead of
> >>  * Append the list of publication to dest string.
> >>
> >
> > Modified.
> >
> >> 2) How about "Connect to the publisher and see if the given
> >> publication(s) is(are) present."
> >> instead of
> >>  * Connect to the publisher and check if the publication(s) exist.
> >>
> >
> > Modified.
> >
> >> 3) Below comments are unnecessary as the functions/code following them
> >> will tell what the code does.
> >>     /* Verify specified publication(s) exist in the publisher. */
> >>     /* We are done with the remote side, close connection. */
> >>
> >>     /* Verify specified publication(s) exist in the publisher. */
> >>     PG_TRY();
> >>     {
> >>         check_publications(wrconn, publications, true);
> >>     }
> >>     PG_FINALLY();
> >>     {
> >>         /* We are done with the remote side, close connection. */
> >>         walrcv_disconnect(wrconn);
> >>     }
> >>
> >
> > Modified.
> >
> >> 4) And also the comment below that's there before check_publications
> >> is unnecessary, as the function name and description would say it all.
> >> /* Verify specified publication(s) exist in the publisher. */
> >>
> >
> > Modified.
> >
> >> 5) A typo -  it is "do not exist"
> >> # Multiple publications does not exist.
> >>
> >
> > Modified.
> >
> >> 6) Should we use "m" specified in all the test cases something like we
> >> do for $stderr =~ m/threads are not supported on this platform/ or
> >> m/replication slot "test_slot" was not created in this database/?
> >> $stderr =~
> >>          /ERROR:  publication "non_existent_pub" does not exist in the
> >> publisher/,
> >
> > Modified.
> >
> > Thanks for the comments, Attached patch has the fixes for the same.
>
> Thanks for updating the patch.  Some comments on v8 patch.
>
> 1) How about use appendStringInfoChar() to replace the first and last one,
> since it more faster.
> +                       appendStringInfoString(dest, "\"");
> +                       appendStringInfoString(dest, pubname);
> +                       appendStringInfoString(dest, "\"");

Modified.

> 2) How about use if (!validate_publication) to keep the code style consistent?
> +       if (validate_publication == false)
> +               return;

Modified.

> 3) Should we free the memory when finish the check_publications()?
> +       publicationsCopy = list_copy(publications);

I felt this list entries will be deleted in the success case, in error
case I felt no need to delete it as we will be exiting. Thoughts?

> 4) It is better wrap the word "streaming" with quote.  Also, should we add
> 'no "validate_publication"' comment for validate_publication parameters?
>                                                                               
>      NULL, NULL,  /* no "binary" */
> -                                                                             
>      NULL, NULL); /* no streaming */
> +                                                                             
>      NULL, NULL,  /* no streaming */
> +                                                                             
>      NULL, NULL);

Modified.

Thanks for the comments, attached v9 patch has the fixes for the same.

Regards,
Vignesh
From e34353210e22f1c61c13f43c24b263cd88d9fc3e Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Wed, 7 Apr 2021 22:05:53 +0530
Subject: [PATCH v9] Identify missing publications from publisher while
 create/alter subscription.

Creating/altering subscription is successful when we specify a publication which
does not exist in the publisher. This patch checks if the specified publications
are present in the publisher and throws an error if any of the publication is
missing in the publisher.
---
 doc/src/sgml/ref/alter_subscription.sgml  |  13 ++
 doc/src/sgml/ref/create_subscription.sgml |  18 +-
 src/backend/commands/subscriptioncmds.c   | 230 +++++++++++++++++++---
 src/bin/psql/tab-complete.c               |   7 +-
 src/test/subscription/t/007_ddl.pl        |  68 ++++++-
 5 files changed, 301 insertions(+), 35 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 367ac814f4..81e156437b 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -160,6 +160,19 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>validate_publication</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          When true, the command verifies if all the specified publications
+          that are being subscribed to are present in the publisher and throws
+          an error if any of the publication doesn't exist. The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
       </variablelist></para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e812beee37..cad9285c16 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -207,8 +207,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           Specifies whether the <command>CREATE SUBSCRIPTION</command>
           should connect to the publisher at all.  Setting this to
           <literal>false</literal> will change default values of
-          <literal>enabled</literal>, <literal>create_slot</literal> and
-          <literal>copy_data</literal> to <literal>false</literal>.
+          <literal>enabled</literal>, <literal>create_slot</literal>,
+          <literal>copy_data</literal> and
+          <literal>validate_publication</literal> to <literal>false</literal>.
          </para>
 
          <para>
@@ -239,6 +240,19 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>validate_publication</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          When true, the command verifies if all the specified publications
+          that are being subscribed to are present in the publisher and throws
+          an error if any of the publication doesn't exist. The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
       </variablelist></para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 517c8edd3b..fd46f43798 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -69,7 +69,9 @@ parse_subscription_options(List *options,
 						   char **synchronous_commit,
 						   bool *refresh,
 						   bool *binary_given, bool *binary,
-						   bool *streaming_given, bool *streaming)
+						   bool *streaming_given, bool *streaming,
+						   bool *validate_publication_given,
+						   bool *validate_publication)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -111,6 +113,12 @@ parse_subscription_options(List *options,
 		*streaming = false;
 	}
 
+	if (validate_publication)
+	{
+		*validate_publication_given = false;
+		*validate_publication = false;
+	}
+
 	/* Parse options */
 	foreach(lc, options)
 	{
@@ -215,6 +223,17 @@ parse_subscription_options(List *options,
 			*streaming_given = true;
 			*streaming = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "validate_publication") == 0 &&
+				 validate_publication)
+		{
+			if (*validate_publication_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+
+			*validate_publication_given = true;
+			*validate_publication = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -247,10 +266,18 @@ parse_subscription_options(List *options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "copy_data = true")));
 
+		if (validate_publication && validate_publication_given &&
+			*validate_publication)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("%s and %s are mutually exclusive options",
+							"connect = false", "validate_publication = true")));
+
 		/* Change the defaults of other options. */
 		*enabled = false;
 		*create_slot = false;
 		*copy_data = false;
+		*validate_publication = false;
 	}
 
 	/*
@@ -287,6 +314,139 @@ parse_subscription_options(List *options,
 	}
 }
 
+/*
+ * Add publication names from the list to a string.
+ */
+static void
+get_publications_str(List *publications, StringInfo dest, bool quote_literal)
+{
+	ListCell   *lc;
+	bool		first = true;
+
+	Assert(list_length(publications) > 0);
+
+	foreach(lc, publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(dest, ", ");
+
+		if (quote_literal)
+			appendStringInfoString(dest, quote_literal_cstr(pubname));
+		else
+		{
+			appendStringInfoChar(dest, '"');
+			appendStringInfoString(dest, pubname);
+			appendStringInfoChar(dest, '"');
+		}
+	}
+}
+
+/*
+ * Check the specified publication(s) is(are) present in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications,
+				   bool validate_publication)
+{
+	WalRcvExecResult *res;
+	StringInfo 		cmd;
+	TupleTableSlot *slot;
+	List	   *publicationsCopy = NIL;
+	Oid			tableRow[1] = {TEXTOID};
+
+	if (!validate_publication)
+		return;
+
+	cmd = makeStringInfo();
+	appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
+								" pg_catalog.pg_publication t WHERE\n"
+								" t.pubname IN (");
+	get_publications_str(publications, cmd, true);
+	appendStringInfoChar(cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+	pfree(cmd->data);
+	pfree(cmd);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg_plural("could not receive publication from the publisher: %s",
+							   "could not receive list of publications from the publisher: %s",
+							   list_length(publications),
+							   res->err)));
+
+	publicationsCopy = list_copy(publications);
+
+	/* Process publication(s). */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *pubname;
+		bool		isnull;
+
+		pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		/* Delete the publication present in publisher from the list. */
+		publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+		ExecClearTuple(slot);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	if (list_length(publicationsCopy))
+	{
+		/* Prepare the list of non-existent publication(s) for error message. */
+		StringInfo	pubnames = makeStringInfo();
+
+		get_publications_str(publicationsCopy, pubnames, false);
+		ereport(ERROR,
+				(errcode(ERRCODE_TOO_MANY_ARGUMENTS),
+				 errmsg_plural("publication %s does not exist in the publisher",
+							   "publications %s do not exist in the publisher",
+							   list_length(publicationsCopy),
+							   pubnames->data)));
+	}
+}
+
+/*
+ * Connect to the publisher and see if the given publication(s) is(are) present.
+ */
+static void
+connect_and_check_pubs(Subscription *sub, List *publications,
+					   bool validate_publication)
+{
+	char	   *err;
+
+	if (!validate_publication)
+		return;
+
+	/* Load the library providing us libpq calls. */
+	load_file("libpqwalreceiver", false);
+
+	/* Try to connect to the publisher. */
+	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+	if (!wrconn)
+		ereport(ERROR,
+				(errmsg("could not connect to the publisher: %s", err)));
+
+	PG_TRY();
+	{
+		check_publications(wrconn, publications, true);
+	}
+	PG_FINALLY();
+	{
+		walrcv_disconnect(wrconn);
+	}
+	PG_END_TRY();
+}
+
 /*
  * Auxiliary function to build a text array out of a list of String nodes.
  */
@@ -343,6 +503,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		slotname_given;
 	bool		binary;
 	bool		binary_given;
+	bool		validate_publication;
+	bool		validate_publication_given;
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
 	List	   *publications;
@@ -361,7 +523,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 							   &synchronous_commit,
 							   NULL,	/* no "refresh" */
 							   &binary_given, &binary,
-							   &streaming_given, &streaming);
+							   &streaming_given, &streaming,
+							   &validate_publication_given,
+							   &validate_publication);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -472,6 +636,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 		PG_TRY();
 		{
+			check_publications(wrconn, publications, validate_publication);
+
 			/*
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
@@ -539,7 +705,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data,
+						  bool validate_publication)
 {
 	char	   *err;
 	List	   *pubrel_names;
@@ -568,6 +735,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 			ereport(ERROR,
 					(errmsg("could not connect to the publisher: %s", err)));
 
+		check_publications(wrconn, sub->publications, validate_publication);
+
 		/* Get the table list from publisher. */
 		pubrel_names = fetch_table_list(wrconn, sub->publications);
 
@@ -814,7 +983,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   &synchronous_commit,
 										   NULL,	/* no "refresh" */
 										   &binary_given, &binary,
-										   &streaming_given, &streaming);
+										   &streaming_given, &streaming,
+										   NULL, NULL); /* no "validate_publication" */
 
 				if (slotname_given)
 				{
@@ -871,7 +1041,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   NULL,	/* no "refresh" */
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no streaming */
+										   NULL, NULL,	/* no "streaming" */
+										   NULL, NULL); /* no "validate_publication" */
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -906,6 +1077,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 			{
 				bool		copy_data;
 				bool		refresh;
+				bool		validate_publication;
+				bool		validate_publication_given;
 
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
@@ -916,12 +1089,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   &refresh,
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   &validate_publication_given,
+										   &validate_publication);
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
 
 				update_tuple = true;
+				connect_and_check_pubs(sub, stmt->publication, validate_publication);
 
 				/* Refresh if user asked us to. */
 				if (refresh)
@@ -937,7 +1113,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, copy_data, false);
 				}
 
 				break;
@@ -950,6 +1126,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				bool		copy_data;
 				bool		refresh;
 				List	   *publist;
+				bool		validate_publication;
+				bool		validate_publication_given;
 
 				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
 
@@ -963,13 +1141,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   &refresh,
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   /* for drop, no "validate_publication" */
+										   isadd ? &validate_publication_given : NULL,
+										   isadd ? &validate_publication : NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(publist);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
 
 				update_tuple = true;
+				if (isadd)
+					connect_and_check_pubs(sub, stmt->publication, validate_publication);
 
 				/* Refresh if user asked us to. */
 				if (refresh)
@@ -985,7 +1168,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 					/* Only refresh the added/dropped list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, copy_data, false);
 				}
 
 				break;
@@ -994,6 +1177,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
 				bool		copy_data;
+				bool		validate_publication;
+				bool		validate_publication_given;
 
 				if (!sub->enabled)
 					ereport(ERROR,
@@ -1009,11 +1194,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 										   NULL,	/* no "synchronous_commit" */
 										   NULL,	/* no "refresh" */
 										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+										   NULL, NULL,	/* no "streaming" */
+										   &validate_publication_given,
+										   &validate_publication);
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, copy_data);
+				AlterSubscription_refresh(sub, copy_data, validate_publication);
 
 				break;
 			}
@@ -1476,28 +1663,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[2] = {TEXTOID, TEXTOID};
-	ListCell   *lc;
-	bool		first;
 	List	   *tablelist = NIL;
 
-	Assert(list_length(publications) > 0);
-
 	initStringInfo(&cmd);
 	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	first = true;
-	foreach(lc, publications)
-	{
-		char	   *pubname = strVal(lfirst(lc));
-
-		if (first)
-			first = false;
-		else
-			appendStringInfoString(&cmd, ", ");
-
-		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
-	}
+								"  FROM pg_catalog.pg_publication_tables t\n"
+								" WHERE t.pubname IN (");
+	get_publications_str(publications, &cmd, true);
 	appendStringInfoChar(&cmd, ')');
 
 	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 7c4933333b..5a0d0f3c8d 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1674,7 +1674,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
 			 TailMatches("REFRESH", "PUBLICATION", "WITH", "("))
-		COMPLETE_WITH("copy_data");
+		COMPLETE_WITH("copy_data", "validate_publication");
 	/* ALTER SUBSCRIPTION <name> SET */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, "SET"))
 		COMPLETE_WITH("(", "PUBLICATION");
@@ -1693,7 +1693,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
 			 TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "("))
-		COMPLETE_WITH("copy_data", "refresh");
+		COMPLETE_WITH("copy_data", "refresh", "validate_publication");
 
 	/* ALTER SCHEMA <name> */
 	else if (Matches("ALTER", "SCHEMA", MatchAny))
@@ -2780,7 +2780,8 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("copy_data", "connect", "create_slot", "enabled",
-					  "slot_name", "synchronous_commit");
+					  "slot_name", "synchronous_commit",
+					  "validate_publication");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
index 7fe6cc6d63..3292e38744 100644
--- a/src/test/subscription/t/007_ddl.pl
+++ b/src/test/subscription/t/007_ddl.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 1;
+use Test::More tests => 9;
 
 my $node_publisher = get_new_node('publisher');
 $node_publisher->init(allows_streaming => 'logical');
@@ -38,5 +38,71 @@ COMMIT;
 
 pass "subscription disable and drop in same transaction did not hang";
 
+# Specified publication does not exist.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub WITH (VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         m/ERROR:  publication "non_existent_pub" does not exist in the publisher/,
+       "Create subscription for non existent publication fails");
+
+# One of the specified publication exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub WITH (VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         m/ERROR:  publication "non_existent_pub" does not exist in the publisher/,
+       "Create subscription for non existent publication fails");
+# Multiple publications do not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub, non_existent_pub1 WITH (VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         m/ERROR:  publications "non_existent_pub", "non_existent_pub1" do not exist in the publisher/,
+       "Create subscription for non existent publication fails");
+
+# Create subscription with mutually exclusive options connect as false and validate_publication as true.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub, non_existent_pub1 WITH (CONNECT = FALSE, VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         m/ERROR:  connect = false and validate_publication = true are mutually exclusive options/,
+       "Create subscription with connect=false and validate_publication=true should fail");
+
+# Add non existent publication.
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+($ret, $stdout, $stderr) = ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         m/ERROR:  publication "non_existent_pub" does not exist in the publisher/,
+       "Alter subscription add non existent publication fails");
+
+# Specified publication does not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub WITH (VALIDATE_PUBLICATION = TRUE)");
+ok( $stderr =~
+         m/ERROR:  publication "non_existent_pub" does not exist in the publisher/,
+       "Alter subscription for non existent publication fails");
+
+# Specified publication does not exist with refresh = false.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~
+         m/ERROR:  publication "non_existent_pub" does not exist in the publisher/,
+       "Alter subscription for non existent publication fails");
+
+# Set publication on non existent database.
+$node_subscriber->safe_psql('postgres',
+       "ALTER SUBSCRIPTION mysub1 CONNECTION 'dbname=regress_doesnotexist2'");
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)"
+);
+ok( $stderr =~ m/ERROR:  could not connect to the publisher/,
+       "Alter subscription for non existent publication fails");
+
 $node_subscriber->stop;
 $node_publisher->stop;
-- 
2.25.1

Reply via email to