On Wed, May 19, 2021 at 6:13 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
> Thanks. I will work on the new structure ParseSubOption only for
> subscription options.

PSA v2 patch that has changes for 1) new ParseSubOption structure 2)
the error reporting code refactoring.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From b72336b7dc803c4ebd49d25850c0b15d8fbdbbed Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Thu, 20 May 2021 09:03:37 +0530
Subject: [PATCH v2] Refactor parse_subscription_options

Currently parse_subscription_options function receives a lot(14) of input
parameters which makes it inextensible to add the new parameters. So,
better wrap all the input parameters within a ParseSubOptions structure to
which new parameters can be added easily.

Also, remove similar code (with the only difference in the option) when
throwing errors for mutually exclusive options. Have a variable for the
option and use it in the error message. It saves some LOC and makes the
code look better with lesser ereport(ERROR statements.
---
 src/backend/commands/subscriptioncmds.c | 348 +++++++++++++-----------
 1 file changed, 187 insertions(+), 161 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8aa6de1785..feb9074436 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -51,6 +51,26 @@ static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
+/*
+ * Structure to hold subscription options for parsing
+ */
+typedef struct ParseSubOptions
+{
+	List 	*stmt_options;
+	bool 	*connect;
+	bool 	*enabled_given;
+	bool 	*enabled;
+	bool 	*create_slot;
+	bool 	*slot_name_given;
+	char 	**slot_name;
+	bool 	*copy_data;
+	char 	**synchronous_commit;
+	bool 	*refresh;
+	bool 	*binary_given;
+	bool 	*binary;
+	bool 	*streaming_given;
+	bool 	*streaming;
+} ParseSubOptions;
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -60,16 +80,7 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
  * accommodate that.
  */
 static void
-parse_subscription_options(List *options,
-						   bool *connect,
-						   bool *enabled_given, bool *enabled,
-						   bool *create_slot,
-						   bool *slot_name_given, char **slot_name,
-						   bool *copy_data,
-						   char **synchronous_commit,
-						   bool *refresh,
-						   bool *binary_given, bool *binary,
-						   bool *streaming_given, bool *streaming)
+parse_subscription_options(ParseSubOptions *opts)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -78,45 +89,46 @@ parse_subscription_options(List *options,
 	bool		refresh_given = false;
 
 	/* If connect is specified, the others also need to be. */
-	Assert(!connect || (enabled && create_slot && copy_data));
+	Assert(!opts->connect || (opts->enabled && opts->create_slot &&
+							  opts->copy_data));
 
-	if (connect)
-		*connect = true;
-	if (enabled)
+	if (opts->connect)
+		*opts->connect = true;
+	if (opts->enabled)
 	{
-		*enabled_given = false;
-		*enabled = true;
+		*opts->enabled_given = false;
+		*opts->enabled = true;
 	}
-	if (create_slot)
-		*create_slot = true;
-	if (slot_name)
+	if (opts->create_slot)
+		*opts->create_slot = true;
+	if (opts->slot_name)
 	{
-		*slot_name_given = false;
-		*slot_name = NULL;
+		*opts->slot_name_given = false;
+		*opts->slot_name = NULL;
 	}
-	if (copy_data)
-		*copy_data = true;
-	if (synchronous_commit)
-		*synchronous_commit = NULL;
-	if (refresh)
-		*refresh = true;
-	if (binary)
+	if (opts->copy_data)
+		*opts->copy_data = true;
+	if (opts->synchronous_commit)
+		*opts->synchronous_commit = NULL;
+	if (opts->refresh)
+		*opts->refresh = true;
+	if (opts->binary)
 	{
-		*binary_given = false;
-		*binary = false;
+		*opts->binary_given = false;
+		*opts->binary = false;
 	}
-	if (streaming)
+	if (opts->streaming)
 	{
-		*streaming_given = false;
-		*streaming = false;
+		*opts->streaming_given = false;
+		*opts->streaming = false;
 	}
 
 	/* Parse options */
-	foreach(lc, options)
+	foreach(lc, opts->stmt_options)
 	{
 		DefElem    *defel = (DefElem *) lfirst(lc);
 
-		if (strcmp(defel->defname, "connect") == 0 && connect)
+		if (strcmp(defel->defname, "connect") == 0 && opts->connect)
 		{
 			if (connect_given)
 				ereport(ERROR,
@@ -124,19 +136,19 @@ parse_subscription_options(List *options,
 						 errmsg("conflicting or redundant options")));
 
 			connect_given = true;
-			*connect = defGetBoolean(defel);
+			*opts->connect = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "enabled") == 0 && enabled)
+		else if (strcmp(defel->defname, "enabled") == 0 && opts->enabled)
 		{
-			if (*enabled_given)
+			if (*opts->enabled_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*enabled_given = true;
-			*enabled = defGetBoolean(defel);
+			*opts->enabled_given = true;
+			*opts->enabled = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
+		else if (strcmp(defel->defname, "create_slot") == 0 && opts->create_slot)
 		{
 			if (create_slot_given)
 				ereport(ERROR,
@@ -144,23 +156,23 @@ parse_subscription_options(List *options,
 						 errmsg("conflicting or redundant options")));
 
 			create_slot_given = true;
-			*create_slot = defGetBoolean(defel);
+			*opts->create_slot = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
+		else if (strcmp(defel->defname, "slot_name") == 0 && opts->slot_name)
 		{
-			if (*slot_name_given)
+			if (*opts->slot_name_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*slot_name_given = true;
-			*slot_name = defGetString(defel);
+			*opts->slot_name_given = true;
+			*opts->slot_name = defGetString(defel);
 
 			/* Setting slot_name = NONE is treated as no slot name. */
-			if (strcmp(*slot_name, "none") == 0)
-				*slot_name = NULL;
+			if (strcmp(*opts->slot_name, "none") == 0)
+				*opts->slot_name = NULL;
 		}
-		else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
+		else if (strcmp(defel->defname, "copy_data") == 0 && opts->copy_data)
 		{
 			if (copy_data_given)
 				ereport(ERROR,
@@ -168,24 +180,25 @@ parse_subscription_options(List *options,
 						 errmsg("conflicting or redundant options")));
 
 			copy_data_given = true;
-			*copy_data = defGetBoolean(defel);
+			*opts->copy_data = defGetBoolean(defel);
 		}
 		else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
 				 synchronous_commit)
 		{
-			if (*synchronous_commit)
+			if (*opts->synchronous_commit)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*synchronous_commit = defGetString(defel);
+			*opts->synchronous_commit = defGetString(defel);
 
 			/* Test if the given value is valid for synchronous_commit GUC. */
-			(void) set_config_option("synchronous_commit", *synchronous_commit,
+			(void) set_config_option("synchronous_commit",
+									 *opts->synchronous_commit,
 									 PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
 									 false, 0, false);
 		}
-		else if (strcmp(defel->defname, "refresh") == 0 && refresh)
+		else if (strcmp(defel->defname, "refresh") == 0 && opts->refresh)
 		{
 			if (refresh_given)
 				ereport(ERROR,
@@ -193,27 +206,27 @@ parse_subscription_options(List *options,
 						 errmsg("conflicting or redundant options")));
 
 			refresh_given = true;
-			*refresh = defGetBoolean(defel);
+			*opts->refresh = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		else if (strcmp(defel->defname, "binary") == 0 && opts->binary)
 		{
-			if (*binary_given)
+			if (*opts->binary_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*binary_given = true;
-			*binary = defGetBoolean(defel);
+			*opts->binary_given = true;
+			*opts->binary = defGetBoolean(defel);
 		}
-		else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+		else if (strcmp(defel->defname, "streaming") == 0 && opts->streaming)
 		{
-			if (*streaming_given)
+			if (*opts->streaming_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
 
-			*streaming_given = true;
-			*streaming = defGetBoolean(defel);
+			*opts->streaming_given = true;
+			*opts->streaming = defGetBoolean(defel);
 		}
 		else
 			ereport(ERROR,
@@ -225,65 +238,63 @@ parse_subscription_options(List *options,
 	 * We've been explicitly asked to not connect, that requires some
 	 * additional processing.
 	 */
-	if (connect && !*connect)
+	if (opts->connect && !*opts->connect)
 	{
+		char *option = NULL;
+
 		/* Check for incompatible options from the user. */
-		if (enabled && *enabled_given && *enabled)
+		if (opts->enabled && *opts->enabled_given && *opts->enabled)
+			option = "enabled = true";
+		else if (opts->create_slot && create_slot_given && *opts->create_slot)
+			option = "create_slot = true";
+		else if (opts->copy_data && copy_data_given && *opts->copy_data)
+			option = "copy_data = true";
+
+		if (option)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "enabled = true")));
-
-		if (create_slot && create_slot_given && *create_slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "create_slot = true")));
-
-		if (copy_data && copy_data_given && *copy_data)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "copy_data = true")));
+							"connect = false", option)));
 
 		/* Change the defaults of other options. */
-		*enabled = false;
-		*create_slot = false;
-		*copy_data = false;
+		*opts->enabled = false;
+		*opts->create_slot = false;
+		*opts->copy_data = false;
 	}
 
 	/*
 	 * Do additional checking for disallowed combination when slot_name = NONE
 	 * was used.
 	 */
-	if (slot_name && *slot_name_given && !*slot_name)
+	if (opts->slot_name && *opts->slot_name_given && !*opts->slot_name)
 	{
-		if (enabled && *enabled_given && *enabled)
+		char *option = NULL;
+
+		if (opts->enabled && *opts->enabled_given && *opts->enabled)
+			option = "enabled = true";
+		else if (opts->create_slot && create_slot_given && *opts->create_slot)
+			option = "create_slot = true";
+
+		if (option)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 			/*- translator: both %s are strings of the form "option = value" */
 					 errmsg("%s and %s are mutually exclusive options",
-							"slot_name = NONE", "enabled = true")));
+							"slot_name = NONE", option)));
 
-		if (create_slot && create_slot_given && *create_slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("%s and %s are mutually exclusive options",
-							"slot_name = NONE", "create_slot = true")));
+		option = NULL;
 
-		if (enabled && !*enabled_given && *enabled)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-			/*- translator: both %s are strings of the form "option = value" */
-					 errmsg("subscription with %s must also set %s",
-							"slot_name = NONE", "enabled = false")));
+		if (opts->enabled && !*opts->enabled_given && *opts->enabled)
+			option = "enabled = false";
+		else if (opts->create_slot && !create_slot_given && *opts->create_slot)
+			option = "create_slot = false";
 
-		if (create_slot && !create_slot_given && *create_slot)
+		if (option)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("subscription with %s must also set %s",
-							"slot_name = NONE", "create_slot = false")));
+							"slot_name = NONE", option)));
 	}
 }
 
@@ -346,22 +357,32 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
 	List	   *publications;
+	ParseSubOptions *opts;
+
+	opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions));
+
+	/* Fill only the options that are of interest here. */
+	opts->stmt_options = stmt->options;
+	opts->connect = &connect;
+	opts->enabled_given = &enabled_given;
+	opts->enabled = &enabled;
+	opts->create_slot = &create_slot;
+	opts->slot_name_given = &slotname_given;
+	opts->slot_name = &slotname;
+	opts->copy_data = &copy_data;
+	opts->synchronous_commit = &synchronous_commit;
+	opts->binary_given = &binary_given;
+	opts->binary = &binary;
+	opts->streaming_given = &streaming_given;
+	opts->streaming = &streaming;
 
 	/*
 	 * Parse and check options.
 	 *
 	 * Connection and publication should not be specified here.
 	 */
-	parse_subscription_options(stmt->options,
-							   &connect,
-							   &enabled_given, &enabled,
-							   &create_slot,
-							   &slotname_given, &slotname,
-							   &copy_data,
-							   &synchronous_commit,
-							   NULL,	/* no "refresh" */
-							   &binary_given, &binary,
-							   &streaming_given, &streaming);
+	parse_subscription_options(opts);
+	pfree(opts);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -804,17 +825,22 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				bool		binary;
 				bool		streaming_given;
 				bool		streaming;
+				ParseSubOptions *opts;
+
+				opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions));
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   &slotname_given, &slotname,
-										   NULL,	/* no "copy_data" */
-										   &synchronous_commit,
-										   NULL,	/* no "refresh" */
-										   &binary_given, &binary,
-										   &streaming_given, &streaming);
+				/* Fill only the options that are of interest here. */
+				opts->stmt_options = stmt->options;
+				opts->slot_name_given = &slotname_given;
+				opts->slot_name = &slotname;
+				opts->synchronous_commit = &synchronous_commit;
+				opts->binary_given = &binary_given;
+				opts->binary = &binary;
+				opts->streaming_given = &streaming_given;
+				opts->streaming = &streaming;
+
+				parse_subscription_options(opts);
+				pfree(opts);
 
 				if (slotname_given)
 				{
@@ -859,19 +885,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
 		case ALTER_SUBSCRIPTION_ENABLED:
 			{
-				bool		enabled,
-							enabled_given;
-
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   &enabled_given, &enabled,
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   NULL,	/* no "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no streaming */
+				bool		enabled;
+				bool		enabled_given;
+				ParseSubOptions *opts;
+
+				opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions));
+
+				/* Fill only the options that are of interest here. */
+				opts->stmt_options = stmt->options;
+				opts->enabled_given = &enabled_given;
+				opts->enabled = &enabled;
+
+				parse_subscription_options(opts);
+				pfree(opts);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -906,17 +932,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 			{
 				bool		copy_data;
 				bool		refresh;
+				ParseSubOptions *opts;
+
+				opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions));
+
+				/* Fill only the options that are of interest here. */
+				opts->stmt_options = stmt->options;
+				opts->copy_data = &copy_data;
+				opts->refresh = &refresh;
+
+				parse_subscription_options(opts);
+				pfree(opts);
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -950,20 +977,20 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				bool		copy_data;
 				bool		refresh;
 				List	   *publist;
+				ParseSubOptions *opts;
 
 				publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   isadd ? &copy_data : NULL,	/* for drop, no
-																		 * "copy_data" */
-										   NULL,	/* no "synchronous_commit" */
-										   &refresh,
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions));
+
+				/* Fill only the options that are of interest here. */
+				opts->stmt_options = stmt->options;
+				/* For DROP PUBLICATION, copy_data option is not supported. */
+				opts->copy_data = isadd ? &copy_data : NULL;
+				opts->refresh = &refresh;
+
+				parse_subscription_options(opts);
+				pfree(opts);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(publist);
@@ -994,22 +1021,21 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
 				bool		copy_data;
+				ParseSubOptions *opts;
 
 				if (!sub->enabled)
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-				parse_subscription_options(stmt->options,
-										   NULL,	/* no "connect" */
-										   NULL, NULL,	/* no "enabled" */
-										   NULL,	/* no "create_slot" */
-										   NULL, NULL,	/* no "slot_name" */
-										   &copy_data,
-										   NULL,	/* no "synchronous_commit" */
-										   NULL,	/* no "refresh" */
-										   NULL, NULL,	/* no "binary" */
-										   NULL, NULL); /* no "streaming" */
+				opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions));
+
+				/* Fill only the options that are of interest here. */
+				opts->stmt_options = stmt->options;
+				opts->copy_data = &copy_data;
+
+				parse_subscription_options(opts);
+				pfree(opts);
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-- 
2.25.1

Reply via email to