On Thu, Jan 21, 2021 at 10:21 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> On Thu, Jan 21, 2021 at 6:56 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > Hi,
> >
> > Creating/altering subscription is successful when we specify a
> > publication which does not exist in the publisher. I felt we should
> > throw an error in this case, that will help the user to check if there
> > is any typo in the create subscription command or to create the
> > publication before the subscription is created.
> > If the above analysis looks correct, then please find a patch that
> > checks if the specified publications are present in the publisher and
> > throws an error if any of the publications is missing in the
> > publisher.
> > Thoughts?
>
> I was having similar thoughts (while working on  the logical
> replication bug on alter publication...drop table behaviour) on why
> create subscription succeeds without checking the publication
> existence. I checked in documentation, to find if there's a strong
> reason for that, but I couldn't. Maybe it's because of the principle
> "first let users create subscriptions, later the publications can be
> created on the publisher system", similar to this behaviour
> "publications can be created without any tables attached to it
> initially, later they can be added".
>
> Others may have better thoughts.
>
> If we do check publication existence for CREATE SUBSCRIPTION, I think
> we should also do it for ALTER SUBSCRIPTION ... SET PUBLICATION.
>
> I wonder, why isn't dropping a publication from a list of publications
> that are with subscription is not allowed?
>
> Some comments so far on the patch:
>
> 1) I see most of the code in the new function check_publications() and
> existing fetch_table_list() is the same. Can we have a generic
> function, with maybe a flag to separate out the logic specific for
> checking publication and fetching table list from the publisher.

I have made the common code between the check_publications and
fetch_table_list into a common function
get_appended_publications_query. I felt the rest of the code is better
off kept as it is.
The Attached patch has the changes for the same and also the change to
check publication exists during alter subscription set publication.
Thoughts?


Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From 6ef5a35d82065ec7e497ebe38e575afa45ad9469 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@enterprisedb.com>
Date: Thu, 21 Jan 2021 18:38:54 +0530
Subject: [PATCH v2] Identify missing publications from publisher while
 create/alter subscription.

Creating/altering subscription is successful when we specify a publication which
does not exist in the publisher. This patch checks if the specified publications
are present in the publisher and throws an error if any of the publication is
missing in the publisher. One of the Alter Subscritpion test from regression was
moved to tap test as the error message "/tmp/pg_regress-P6KQrj/.s.PGSQL.58080"
varies from exeuction to execution.
---
 src/backend/commands/subscriptioncmds.c    | 158 ++++++++++++++++++++++++-----
 src/test/regress/expected/subscription.out |  33 +++---
 src/test/regress/sql/subscription.sql      |   1 -
 src/test/subscription/t/100_bugs.pl        |  78 +++++++++++++-
 4 files changed, 224 insertions(+), 46 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 082f785..59076b5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,7 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications(WalReceiverConn *wrconn, List *publications);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -490,6 +491,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 		PG_TRY();
 		{
+			/* Verify specified publications exists in the publisher. */
+			check_publications(wrconn, publications);
+
 			/*
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
@@ -557,7 +561,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data, bool check_pub)
 {
 	char	   *err;
 	List	   *pubrel_names;
@@ -576,6 +580,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		ereport(ERROR,
 				(errmsg("could not connect to the publisher: %s", err)));
 
+	/* Verify specified publications exists in the publisher. */
+	if (check_pub)
+		check_publications(wrconn, sub->publications);
+
 	/* Get the table list from publisher. */
 	pubrel_names = fetch_table_list(wrconn, sub->publications);
 
@@ -822,6 +830,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 			{
 				bool		copy_data;
 				bool		refresh;
+				char	   *err;
+				WalReceiverConn *wrconn;
 
 				parse_subscription_options(stmt->options,
 										   NULL,	/* no "connect" */
@@ -839,6 +849,21 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				update_tuple = true;
 
+				/* Load the library providing us libpq calls. */
+				load_file("libpqwalreceiver", false);
+
+				/* Try to connect to the publisher. */
+				wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+				if (!wrconn)
+					ereport(ERROR,
+							(errmsg("could not connect to the publisher: %s", err)));
+
+				/* Verify specified publications exists in the publisher. */
+				check_publications(wrconn, stmt->publication);
+
+				/* We are done with the remote side, close connection. */
+				walrcv_disconnect(wrconn);
+
 				/* Refresh if user asked us to. */
 				if (refresh)
 				{
@@ -851,7 +876,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
-					AlterSubscription_refresh(sub, copy_data);
+					AlterSubscription_refresh(sub, copy_data, false);
 				}
 
 				break;
@@ -877,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 										   NULL, NULL,	/* no "binary" */
 										   NULL, NULL); /* no "streaming" */
 
-				AlterSubscription_refresh(sub, copy_data);
+				AlterSubscription_refresh(sub, copy_data, true);
 
 				break;
 			}
@@ -1211,27 +1236,20 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 }
 
 /*
- * Get the list of tables which belong to specified publications on the
- * publisher connection.
+ * Return a query by appending the publications to the input query.
  */
-static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+static StringInfoData *
+get_appended_publications_query(List *publications, char *query)
 {
-	WalRcvExecResult *res;
-	StringInfoData cmd;
-	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	bool		first = true;
+	StringInfoData *cmd = makeStringInfo();
 	ListCell   *lc;
-	bool		first;
-	List	   *tablelist = NIL;
 
 	Assert(list_length(publications) > 0);
 
-	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	first = true;
+	initStringInfo(cmd);
+	appendStringInfoString(cmd, query);
+
 	foreach(lc, publications)
 	{
 		char	   *pubname = strVal(lfirst(lc));
@@ -1239,14 +1257,108 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		if (first)
 			first = false;
 		else
-			appendStringInfoString(&cmd, ", ");
+			appendStringInfoString(cmd, ", ");
 
-		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+		appendStringInfoString(cmd, quote_literal_cstr(pubname));
 	}
-	appendStringInfoChar(&cmd, ')');
 
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
-	pfree(cmd.data);
+	appendStringInfoChar(cmd, ')');
+	return cmd;
+}
+
+/*
+ * Verify that the specified publication(s) exists in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData *cmd;
+	TupleTableSlot *slot;
+	List	   *publicationsCopy = NIL;
+	Oid			tableRow[1] = {TEXTOID};
+
+	cmd = get_appended_publications_query(publications, "SELECT t.pubname\n"
+										  "  FROM pg_catalog.pg_publication t\n"
+										  " WHERE t.pubname IN (");
+
+	res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+	pfree(cmd->data);
+	pfree(cmd);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of publications from the publisher: %s",
+						res->err)));
+
+	publicationsCopy = list_copy(publications);
+
+	/* Process publications. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *pubname;
+		bool		isnull;
+
+		pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		/* Delete the publication present in publisher from the list. */
+		publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+		ExecClearTuple(slot);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+
+	if (list_length(publicationsCopy))
+	{
+		StringInfoData nonExistentPublications;
+		bool		first = true;
+		ListCell   *lc;
+
+		/* Convert the publications which does not exist into a string. */
+		initStringInfo(&nonExistentPublications);
+		foreach(lc, publicationsCopy)
+		{
+			char	   *pubname = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&nonExistentPublications, ", ");
+			appendStringInfoString(&nonExistentPublications, "\"");
+			appendStringInfoString(&nonExistentPublications, pubname);
+			appendStringInfoString(&nonExistentPublications, "\"");
+		}
+
+		ereport(ERROR,
+				(errmsg("publication(s) %s does not exist in the publisher",
+						nonExistentPublications.data)));
+	}
+}
+
+/*
+ * Get the list of tables which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_table_list(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData *cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	List	   *tablelist = NIL;
+
+	cmd = get_appended_publications_query(publications,
+									"SELECT DISTINCT t.schemaname, t.tablename\n"
+									"  FROM pg_catalog.pg_publication_tables t\n"
+									" WHERE t.pubname IN (");
+	res = walrcv_exec(wrconn, cmd->data, 2, tableRow);
+	pfree(cmd->data);
+	pfree(cmd);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 2fa9bce..17792e0 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -82,7 +82,6 @@ ERROR:  invalid connection string syntax: missing "=" after "foobar" in connecti
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
 (1 row)
 
-ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 -- fail
@@ -91,27 +90,27 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | off                | dbname=regress_doesnotexist2
+                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
 ALTER SUBSCRIPTION regress_testsub ENABLE;
 \dRs
-                            List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     
------------------+---------------------------+---------+---------------------
- regress_testsub | regress_subscription_user | t       | {testpub2,testpub3}
+                        List of subscriptions
+      Name       |           Owner           | Enabled | Publication 
+-----------------+---------------------------+---------+-------------
+ regress_testsub | regress_subscription_user | t       | {testpub}
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub DISABLE;
 \dRs
-                            List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     
------------------+---------------------------+---------+---------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3}
+                        List of subscriptions
+      Name       |           Owner           | Enabled | Publication 
+-----------------+---------------------------+---------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}
 (1 row)
 
 COMMIT;
@@ -126,10 +125,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                  List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | local              | dbname=regress_doesnotexist2
+                                                              List of subscriptions
+        Name         |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+-------------+--------+-----------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub}   | f      | f         | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 14fa0b2..a8a7110 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -61,7 +61,6 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
 \dRs+
 
-ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index d1e407a..06dbae2 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 5;
+use Test::More tests => 11;
 
 # Bug #15114
 
@@ -134,10 +134,9 @@ $node_twoways->safe_psql(
 	INSERT INTO t SELECT * FROM generate_series(1, $rows);
 	INSERT INTO t2 SELECT * FROM generate_series(1, $rows);
 	});
-$node_twoways->safe_psql(
-	'd1', 'ALTER PUBLICATION testpub ADD TABLE t2');
-$node_twoways->safe_psql(
-	'd2', 'ALTER SUBSCRIPTION testsub REFRESH PUBLICATION');
+$node_twoways->safe_psql('d1', 'ALTER PUBLICATION testpub ADD TABLE t2');
+$node_twoways->safe_psql('d2',
+	'ALTER SUBSCRIPTION testsub REFRESH PUBLICATION');
 
 # We cannot rely solely on wait_for_catchup() here; it isn't sufficient
 # when tablesync workers might still be running. So in addition to that,
@@ -153,3 +152,72 @@ is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
 	$rows * 2, "2x$rows rows in t");
 is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
 	$rows * 2, "2x$rows rows in t2");
+
+# Create subcription for a publication which does not exist.
+$node_publisher = get_new_node('testpublisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+$node_subscriber = get_new_node('testsubscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION testpub1 FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher_connstr' PUBLICATION testpub1"
+);
+
+# Specified publication does not exist.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist"
+);
+ok( $stderr =~
+	  /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+	"Create subscription for non existent publication fails");
+
+# One of the specified publication exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION testpub1, pub_doesnt_exist"
+);
+ok( $stderr =~
+	  /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+	"Create subscription for non existent publication fails");
+
+# Multiple publications does not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist, pub_doesnt_exist1"
+);
+ok( $stderr =~
+	  /ERROR:  publication\(s\) "pub_doesnt_exist", "pub_doesnt_exist1" does not exist in the publisher/,
+	"Create subscription for non existent publication fails");
+
+# Specified publication does not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist");
+ok( $stderr =~
+	  /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+	"Alter subscription for non existent publication fails");
+
+# Specified publication does not exist with refresh = false.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE)"
+);
+ok( $stderr =~
+	  /ERROR:  publication\(s\) "pub_doesnt_exist" does not exist in the publisher/,
+	"Alter subscription for non existent publication fails");
+
+# Set publication on non existent database.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION testsub1 CONNECTION 'dbname=regress_doesnotexist2'");
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+	"ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist WITH (REFRESH = FALSE)"
+);
+ok( $stderr =~ /ERROR:  could not connect to the publisher/,
+	"Alter subscription for non existent publication fails");
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');
-- 
1.8.3.1

Reply via email to