On Wed, May 19, 2021 at 6:13 PM Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> wrote: > Thanks. I will work on the new structure ParseSubOption only for > subscription options.
PSA v2 patch that has changes for 1) new ParseSubOption structure 2) the error reporting code refactoring. With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com
From b72336b7dc803c4ebd49d25850c0b15d8fbdbbed Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com> Date: Thu, 20 May 2021 09:03:37 +0530 Subject: [PATCH v2] Refactor parse_subscription_options Currently parse_subscription_options function receives a lot(14) of input parameters which makes it inextensible to add the new parameters. So, better wrap all the input parameters within a ParseSubOptions structure to which new parameters can be added easily. Also, remove similar code (with the only difference in the option) when throwing errors for mutually exclusive options. Have a variable for the option and use it in the error message. It saves some LOC and makes the code look better with lesser ereport(ERROR statements. --- src/backend/commands/subscriptioncmds.c | 348 +++++++++++++----------- 1 file changed, 187 insertions(+), 161 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8aa6de1785..feb9074436 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -51,6 +51,26 @@ static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +/* + * Structure to hold subscription options for parsing + */ +typedef struct ParseSubOptions +{ + List *stmt_options; + bool *connect; + bool *enabled_given; + bool *enabled; + bool *create_slot; + bool *slot_name_given; + char **slot_name; + bool *copy_data; + char **synchronous_commit; + bool *refresh; + bool *binary_given; + bool *binary; + bool *streaming_given; + bool *streaming; +} ParseSubOptions; /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -60,16 +80,7 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, * accommodate that. */ static void -parse_subscription_options(List *options, - bool *connect, - bool *enabled_given, bool *enabled, - bool *create_slot, - bool *slot_name_given, char **slot_name, - bool *copy_data, - char **synchronous_commit, - bool *refresh, - bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) +parse_subscription_options(ParseSubOptions *opts) { ListCell *lc; bool connect_given = false; @@ -78,45 +89,46 @@ parse_subscription_options(List *options, bool refresh_given = false; /* If connect is specified, the others also need to be. */ - Assert(!connect || (enabled && create_slot && copy_data)); + Assert(!opts->connect || (opts->enabled && opts->create_slot && + opts->copy_data)); - if (connect) - *connect = true; - if (enabled) + if (opts->connect) + *opts->connect = true; + if (opts->enabled) { - *enabled_given = false; - *enabled = true; + *opts->enabled_given = false; + *opts->enabled = true; } - if (create_slot) - *create_slot = true; - if (slot_name) + if (opts->create_slot) + *opts->create_slot = true; + if (opts->slot_name) { - *slot_name_given = false; - *slot_name = NULL; + *opts->slot_name_given = false; + *opts->slot_name = NULL; } - if (copy_data) - *copy_data = true; - if (synchronous_commit) - *synchronous_commit = NULL; - if (refresh) - *refresh = true; - if (binary) + if (opts->copy_data) + *opts->copy_data = true; + if (opts->synchronous_commit) + *opts->synchronous_commit = NULL; + if (opts->refresh) + *opts->refresh = true; + if (opts->binary) { - *binary_given = false; - *binary = false; + *opts->binary_given = false; + *opts->binary = false; } - if (streaming) + if (opts->streaming) { - *streaming_given = false; - *streaming = false; + *opts->streaming_given = false; + *opts->streaming = false; } /* Parse options */ - foreach(lc, options) + foreach(lc, opts->stmt_options) { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "connect") == 0 && connect) + if (strcmp(defel->defname, "connect") == 0 && opts->connect) { if (connect_given) ereport(ERROR, @@ -124,19 +136,19 @@ parse_subscription_options(List *options, errmsg("conflicting or redundant options"))); connect_given = true; - *connect = defGetBoolean(defel); + *opts->connect = defGetBoolean(defel); } - else if (strcmp(defel->defname, "enabled") == 0 && enabled) + else if (strcmp(defel->defname, "enabled") == 0 && opts->enabled) { - if (*enabled_given) + if (*opts->enabled_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *enabled_given = true; - *enabled = defGetBoolean(defel); + *opts->enabled_given = true; + *opts->enabled = defGetBoolean(defel); } - else if (strcmp(defel->defname, "create_slot") == 0 && create_slot) + else if (strcmp(defel->defname, "create_slot") == 0 && opts->create_slot) { if (create_slot_given) ereport(ERROR, @@ -144,23 +156,23 @@ parse_subscription_options(List *options, errmsg("conflicting or redundant options"))); create_slot_given = true; - *create_slot = defGetBoolean(defel); + *opts->create_slot = defGetBoolean(defel); } - else if (strcmp(defel->defname, "slot_name") == 0 && slot_name) + else if (strcmp(defel->defname, "slot_name") == 0 && opts->slot_name) { - if (*slot_name_given) + if (*opts->slot_name_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *slot_name_given = true; - *slot_name = defGetString(defel); + *opts->slot_name_given = true; + *opts->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ - if (strcmp(*slot_name, "none") == 0) - *slot_name = NULL; + if (strcmp(*opts->slot_name, "none") == 0) + *opts->slot_name = NULL; } - else if (strcmp(defel->defname, "copy_data") == 0 && copy_data) + else if (strcmp(defel->defname, "copy_data") == 0 && opts->copy_data) { if (copy_data_given) ereport(ERROR, @@ -168,24 +180,25 @@ parse_subscription_options(List *options, errmsg("conflicting or redundant options"))); copy_data_given = true; - *copy_data = defGetBoolean(defel); + *opts->copy_data = defGetBoolean(defel); } else if (strcmp(defel->defname, "synchronous_commit") == 0 && synchronous_commit) { - if (*synchronous_commit) + if (*opts->synchronous_commit) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *synchronous_commit = defGetString(defel); + *opts->synchronous_commit = defGetString(defel); /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", *synchronous_commit, + (void) set_config_option("synchronous_commit", + *opts->synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, false, 0, false); } - else if (strcmp(defel->defname, "refresh") == 0 && refresh) + else if (strcmp(defel->defname, "refresh") == 0 && opts->refresh) { if (refresh_given) ereport(ERROR, @@ -193,27 +206,27 @@ parse_subscription_options(List *options, errmsg("conflicting or redundant options"))); refresh_given = true; - *refresh = defGetBoolean(defel); + *opts->refresh = defGetBoolean(defel); } - else if (strcmp(defel->defname, "binary") == 0 && binary) + else if (strcmp(defel->defname, "binary") == 0 && opts->binary) { - if (*binary_given) + if (*opts->binary_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *binary_given = true; - *binary = defGetBoolean(defel); + *opts->binary_given = true; + *opts->binary = defGetBoolean(defel); } - else if (strcmp(defel->defname, "streaming") == 0 && streaming) + else if (strcmp(defel->defname, "streaming") == 0 && opts->streaming) { - if (*streaming_given) + if (*opts->streaming_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *streaming_given = true; - *streaming = defGetBoolean(defel); + *opts->streaming_given = true; + *opts->streaming = defGetBoolean(defel); } else ereport(ERROR, @@ -225,65 +238,63 @@ parse_subscription_options(List *options, * We've been explicitly asked to not connect, that requires some * additional processing. */ - if (connect && !*connect) + if (opts->connect && !*opts->connect) { + char *option = NULL; + /* Check for incompatible options from the user. */ - if (enabled && *enabled_given && *enabled) + if (opts->enabled && *opts->enabled_given && *opts->enabled) + option = "enabled = true"; + else if (opts->create_slot && create_slot_given && *opts->create_slot) + option = "create_slot = true"; + else if (opts->copy_data && copy_data_given && *opts->copy_data) + option = "copy_data = true"; + + if (option) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", - "connect = false", "enabled = true"))); - - if (create_slot && create_slot_given && *create_slot) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "connect = false", "create_slot = true"))); - - if (copy_data && copy_data_given && *copy_data) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "connect = false", "copy_data = true"))); + "connect = false", option))); /* Change the defaults of other options. */ - *enabled = false; - *create_slot = false; - *copy_data = false; + *opts->enabled = false; + *opts->create_slot = false; + *opts->copy_data = false; } /* * Do additional checking for disallowed combination when slot_name = NONE * was used. */ - if (slot_name && *slot_name_given && !*slot_name) + if (opts->slot_name && *opts->slot_name_given && !*opts->slot_name) { - if (enabled && *enabled_given && *enabled) + char *option = NULL; + + if (opts->enabled && *opts->enabled_given && *opts->enabled) + option = "enabled = true"; + else if (opts->create_slot && create_slot_given && *opts->create_slot) + option = "create_slot = true"; + + if (option) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", - "slot_name = NONE", "enabled = true"))); + "slot_name = NONE", option))); - if (create_slot && create_slot_given && *create_slot) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "slot_name = NONE", "create_slot = true"))); + option = NULL; - if (enabled && !*enabled_given && *enabled) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - /*- translator: both %s are strings of the form "option = value" */ - errmsg("subscription with %s must also set %s", - "slot_name = NONE", "enabled = false"))); + if (opts->enabled && !*opts->enabled_given && *opts->enabled) + option = "enabled = false"; + else if (opts->create_slot && !create_slot_given && *opts->create_slot) + option = "create_slot = false"; - if (create_slot && !create_slot_given && *create_slot) + if (option) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("subscription with %s must also set %s", - "slot_name = NONE", "create_slot = false"))); + "slot_name = NONE", option))); } } @@ -346,22 +357,32 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; bool create_slot; List *publications; + ParseSubOptions *opts; + + opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions)); + + /* Fill only the options that are of interest here. */ + opts->stmt_options = stmt->options; + opts->connect = &connect; + opts->enabled_given = &enabled_given; + opts->enabled = &enabled; + opts->create_slot = &create_slot; + opts->slot_name_given = &slotname_given; + opts->slot_name = &slotname; + opts->copy_data = ©_data; + opts->synchronous_commit = &synchronous_commit; + opts->binary_given = &binary_given; + opts->binary = &binary; + opts->streaming_given = &streaming_given; + opts->streaming = &streaming; /* * Parse and check options. * * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, - &connect, - &enabled_given, &enabled, - &create_slot, - &slotname_given, &slotname, - ©_data, - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); + parse_subscription_options(opts); + pfree(opts); /* * Since creating a replication slot is not transactional, rolling back @@ -804,17 +825,22 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) bool binary; bool streaming_given; bool streaming; + ParseSubOptions *opts; + + opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions)); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - &slotname_given, &slotname, - NULL, /* no "copy_data" */ - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); + /* Fill only the options that are of interest here. */ + opts->stmt_options = stmt->options; + opts->slot_name_given = &slotname_given; + opts->slot_name = &slotname; + opts->synchronous_commit = &synchronous_commit; + opts->binary_given = &binary_given; + opts->binary = &binary; + opts->streaming_given = &streaming_given; + opts->streaming = &streaming; + + parse_subscription_options(opts); + pfree(opts); if (slotname_given) { @@ -859,19 +885,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_ENABLED: { - bool enabled, - enabled_given; - - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - &enabled_given, &enabled, - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - NULL, /* no "copy_data" */ - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ + bool enabled; + bool enabled_given; + ParseSubOptions *opts; + + opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions)); + + /* Fill only the options that are of interest here. */ + opts->stmt_options = stmt->options; + opts->enabled_given = &enabled_given; + opts->enabled = &enabled; + + parse_subscription_options(opts); + pfree(opts); Assert(enabled_given); if (!sub->slotname && enabled) @@ -906,17 +932,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { bool copy_data; bool refresh; + ParseSubOptions *opts; + + opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions)); + + /* Fill only the options that are of interest here. */ + opts->stmt_options = stmt->options; + opts->copy_data = ©_data; + opts->refresh = &refresh; + + parse_subscription_options(opts); + pfree(opts); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -950,20 +977,20 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) bool copy_data; bool refresh; List *publist; + ParseSubOptions *opts; publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - isadd ? ©_data : NULL, /* for drop, no - * "copy_data" */ - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions)); + + /* Fill only the options that are of interest here. */ + opts->stmt_options = stmt->options; + /* For DROP PUBLICATION, copy_data option is not supported. */ + opts->copy_data = isadd ? ©_data : NULL; + opts->refresh = &refresh; + + parse_subscription_options(opts); + pfree(opts); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publist); @@ -994,22 +1021,21 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; + ParseSubOptions *opts; if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + opts = (ParseSubOptions *) palloc0(sizeof(ParseSubOptions)); + + /* Fill only the options that are of interest here. */ + opts->stmt_options = stmt->options; + opts->copy_data = ©_data; + + parse_subscription_options(opts); + pfree(opts); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); -- 2.25.1