On Wed, Jun 9, 2021 at 10:37 AM Peter Smith <smithpb2...@gmail.com> wrote: > > On Wed, Jun 2, 2021 at 10:41 PM Bharath Rupireddy > <bharath.rupireddyforpostg...@gmail.com> wrote: > > > > On Wed, Jun 2, 2021 at 11:43 AM Peter Smith <smithpb2...@gmail.com> wrote: > > > Yes, it looks better, but (since the masks are all 1 bit) I was only > > > asking why not do like: > > > > > > if (supported_opts & SUBOPT_CONNECT) > > > if (supported_opts & SUBOPT_ENABLED) > > > if (supported_opts & SUBOPT_SLOT_NAME) > > > if (supported_opts & SUBOPT_COPY_DATA) > > > > Please review the attached v3 patch further. > > OK. I have applied the v3 patch and reviewed it again: > > - It applies OK. > - The code builds OK. > - The make check and TAP subscription tests are OK
Thanks. > 1. > +/* > + * Structure to hold the bitmaps and values of all the options for > + * CREATE/ALTER SUBSCRIPTION commands. > + */ > > There seems to be an extra space before "commands." Removed. > 2. > + /* If connect option is supported, the others also need to be. */ > + Assert(!IsSet(supported_opts, SUBOPT_CONNECT) || > + (IsSet(supported_opts, SUBOPT_ENABLED) && > + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && > + IsSet(supported_opts, SUBOPT_COPY_DATA))); > > This comment about "the others" doesn’t make sense to me. > > e.g. Why only these 3 options? What about all those other SUBOPT_* options? It is an existing Assert and comment for ensuring somebody doesn't call parse_subscription_options with SUBOPT_CONNECT, without SUBOPT_ENABLED, SUBOPT_CREATE_SLOT and SUBOPT_COPY_DATA. In other words, when SUBOPT_CONNECT is passed in, the other three options should also be passed. " the others" there in the comment makes sense just by looking at the Assert statement. > 3. > I feel that this patch should be split into 2 parts > a) the SubOpts changes, and > b) the mutually exclusive options change. Divided the patch into two. > I agree that the new SubOpts struct etc. is an improvement over existing code. > > But, for the mutually exclusive options part I don't see what is > gained by the new patch code. I preferred the old code with its > multiple ereports. Although it was a bit repetitive IMO it was easier > to read that way, and length-wise there is almost no difference. So if > it is less readable and not a lot shorter then what is the benefit of > the change? I personally don't like the repeated code when there's a chance of doing it better. It might not reduce the loc, but it removes the many similar ereport(ERROR calls. PSA v4-0002 patch. I think the committer can take a call on it. > 4. > - char *slotname; > - bool slotname_given; > - char *synchronous_commit; > - bool binary_given; > - bool binary; > - bool streaming_given; > - bool streaming; > - > - parse_subscription_options(stmt->options, > - NULL, /* no "connect" */ > - NULL, NULL, /* no "enabled" */ > - NULL, /* no "create_slot" */ > - &slotname_given, &slotname, > - NULL, /* no "copy_data" */ > - &synchronous_commit, > - NULL, /* no "refresh" */ > - &binary_given, &binary, > - &streaming_given, &streaming); > - > - if (slotname_given) > + SubOpts opts = {0}; > > I feel it would be simpler to declare/init this "opts" variable just 1 > time at top of the function AlterSubscription, instead of the 6 > separate declarations in this v3 patch. Doing that can allow other > code simplifications too. (see #5) Done. > 5. > case ALTER_SUBSCRIPTION_DROP_PUBLICATION: > { > bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; > - bool copy_data; > - bool refresh; > List *publist; > + SubOpts opts = {0}; > + > + opts.supported_opts |= SUBOPT_REFRESH; > + > + if (isadd) > + opts.supported_opts |= SUBOPT_COPY_DATA; > > I think having a separate "isadd" variable is made moot now since > adding the SubOpts struct. > > Instead you can do this: > + if (stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION) > + opts.supported_opts |= SUBOPT_COPY_DATA; > > OR (after #4) you could do this: > > case ALTER_SUBSCRIPTION_ADD_PUBLICATION: > opts.supported_opts |= SUBOPT_COPY_DATA; > /* fall thru. */ > case ALTER_SUBSCRIPTION_DROP_PUBLICATION: Done. > 6. > + > +#define IsSet(val, option) ((val & option) != 0) > + > > Your IsSet macro might be better if changed to test *multiple* bits are all > set. > > Like this: > #define IsSet(val, bits) ((val & (bits)) == (bits)) > > ~ > > Most of the code remains the same, but some can be simplified. > e.g. > + /* If connect option is supported, the others also need to be. */ > + Assert(!IsSet(supported_opts, SUBOPT_CONNECT) || > + (IsSet(supported_opts, SUBOPT_ENABLED) && > + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && > + IsSet(supported_opts, SUBOPT_COPY_DATA))); > > Becomes: > Assert(!IsSet(supported_opts, SUBOPT_CONNECT) || > IsSet(supported_opts, SUBOPT_ENABLED|SUBOPT_CREATE_SLOT|SUBOPT_COPY_DATA)); Changed. PSA v4 patch set. With Regards, Bharath Rupireddy.
From 57e8189a43bffa3b3464e8b878ed18b6c354a4a8 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 9 Jun 2021 07:37:41 -0700 Subject: [PATCH v4] Refactor parse_subscription_options Currently parse_subscription_options function receives a lot of input parameters which makes it inextensible to add the new parameters. So, better wrap all the input parameters within a structure to which new parameters can be added easily. Also, use bitmaps to pass the supported and specified options much like the way it is done in the commit a3dc926. --- src/backend/commands/subscriptioncmds.c | 447 ++++++++++++------------ src/tools/pgindent/typedefs.list | 1 + 2 files changed, 219 insertions(+), 229 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8aa6de1785..9ec344cb0f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -46,6 +46,38 @@ #include "utils/memutils.h" #include "utils/syscache.h" +#define SUBOPT_NONE 0x00000000 +#define SUBOPT_CONNECT 0x00000001 +#define SUBOPT_ENABLED 0x00000002 +#define SUBOPT_CREATE_SLOT 0x00000004 +#define SUBOPT_SLOT_NAME 0x00000008 +#define SUBOPT_COPY_DATA 0x00000010 +#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020 +#define SUBOPT_REFRESH 0x00000040 +#define SUBOPT_BINARY 0x00000080 +#define SUBOPT_STREAMING 0x00000100 + +#define IsSet(val, bits) ((val & (bits)) == (bits)) + +/* + * Structure to hold the bitmaps and values of all the options for + * CREATE/ALTER SUBSCRIPTION commands. + */ +typedef struct SubOpts +{ + bits32 supported_opts; /* bitmap of supported SUBOPT_* */ + bits32 specified_opts; /* bitmap of user specified SUBOPT_* */ + char *slot_name; + char *synchronous_commit; + bool connect; + bool enabled; + bool create_slot; + bool copy_data; + bool refresh; + bool binary; + bool streaming; +} SubOpts; + static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); @@ -60,160 +92,162 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, * accommodate that. */ static void -parse_subscription_options(List *options, - bool *connect, - bool *enabled_given, bool *enabled, - bool *create_slot, - bool *slot_name_given, char **slot_name, - bool *copy_data, - char **synchronous_commit, - bool *refresh, - bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) +parse_subscription_options(List *stmt_options, SubOpts *opts) { ListCell *lc; - bool connect_given = false; - bool create_slot_given = false; - bool copy_data_given = false; - bool refresh_given = false; + bits32 supported_opts; + bits32 specified_opts; - /* If connect is specified, the others also need to be. */ - Assert(!connect || (enabled && create_slot && copy_data)); + supported_opts = opts->supported_opts; + specified_opts = SUBOPT_NONE; - if (connect) - *connect = true; - if (enabled) - { - *enabled_given = false; - *enabled = true; - } - if (create_slot) - *create_slot = true; - if (slot_name) - { - *slot_name_given = false; - *slot_name = NULL; - } - if (copy_data) - *copy_data = true; - if (synchronous_commit) - *synchronous_commit = NULL; - if (refresh) - *refresh = true; - if (binary) - { - *binary_given = false; - *binary = false; - } - if (streaming) - { - *streaming_given = false; - *streaming = false; - } + Assert(supported_opts != SUBOPT_NONE); + + /* If connect option is supported, the others also need to be. */ + Assert(!IsSet(supported_opts, SUBOPT_CONNECT) || + IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | + SUBOPT_COPY_DATA)); + + /* Set default values for the supported options. */ + if (IsSet(supported_opts, SUBOPT_CONNECT)) + opts->connect = true; + + if (IsSet(supported_opts, SUBOPT_ENABLED)) + opts->enabled = true; + + if (IsSet(supported_opts, SUBOPT_CREATE_SLOT)) + opts->create_slot = true; + + if (IsSet(supported_opts, SUBOPT_SLOT_NAME)) + opts->slot_name = NULL; + + if (IsSet(supported_opts, SUBOPT_COPY_DATA)) + opts->copy_data = true; + + if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT)) + opts->synchronous_commit = NULL; + + if (IsSet(supported_opts, SUBOPT_REFRESH)) + opts->refresh = true; + + if (IsSet(supported_opts, SUBOPT_BINARY)) + opts->binary = false; + + if (IsSet(supported_opts, SUBOPT_STREAMING)) + opts->streaming = false; /* Parse options */ - foreach(lc, options) + foreach(lc, stmt_options) { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "connect") == 0 && connect) + if (IsSet(supported_opts, SUBOPT_CONNECT) && + strcmp(defel->defname, "connect") == 0) { - if (connect_given) + if (IsSet(specified_opts, SUBOPT_CONNECT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - connect_given = true; - *connect = defGetBoolean(defel); + specified_opts |= SUBOPT_CONNECT; + opts->connect = defGetBoolean(defel); } - else if (strcmp(defel->defname, "enabled") == 0 && enabled) + else if (IsSet(supported_opts, SUBOPT_ENABLED) && + strcmp(defel->defname, "enabled") == 0) { - if (*enabled_given) + if (IsSet(specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *enabled_given = true; - *enabled = defGetBoolean(defel); + specified_opts |= SUBOPT_ENABLED; + opts->enabled = defGetBoolean(defel); } - else if (strcmp(defel->defname, "create_slot") == 0 && create_slot) + else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + strcmp(defel->defname, "create_slot") == 0) { - if (create_slot_given) + if (IsSet(specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - create_slot_given = true; - *create_slot = defGetBoolean(defel); + specified_opts |= SUBOPT_CREATE_SLOT; + opts->create_slot = defGetBoolean(defel); } - else if (strcmp(defel->defname, "slot_name") == 0 && slot_name) + else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) && + strcmp(defel->defname, "slot_name") == 0) { - if (*slot_name_given) + if (IsSet(specified_opts, SUBOPT_SLOT_NAME)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *slot_name_given = true; - *slot_name = defGetString(defel); + specified_opts |= SUBOPT_SLOT_NAME; + opts->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ - if (strcmp(*slot_name, "none") == 0) - *slot_name = NULL; + if (strcmp(opts->slot_name, "none") == 0) + opts->slot_name = NULL; } - else if (strcmp(defel->defname, "copy_data") == 0 && copy_data) + else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && + strcmp(defel->defname, "copy_data") == 0) { - if (copy_data_given) + if (IsSet(specified_opts, SUBOPT_COPY_DATA)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - copy_data_given = true; - *copy_data = defGetBoolean(defel); + specified_opts |= SUBOPT_COPY_DATA; + opts->copy_data = defGetBoolean(defel); } - else if (strcmp(defel->defname, "synchronous_commit") == 0 && - synchronous_commit) + else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && + strcmp(defel->defname, "synchronous_commit") == 0) { - if (*synchronous_commit) + if (IsSet(specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *synchronous_commit = defGetString(defel); + specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + opts->synchronous_commit = defGetString(defel); /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", *synchronous_commit, + (void) set_config_option("synchronous_commit", opts->synchronous_commit, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, false, 0, false); } - else if (strcmp(defel->defname, "refresh") == 0 && refresh) + else if (IsSet(supported_opts, SUBOPT_REFRESH) && + strcmp(defel->defname, "refresh") == 0) { - if (refresh_given) + if (IsSet(specified_opts, SUBOPT_REFRESH)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - refresh_given = true; - *refresh = defGetBoolean(defel); + specified_opts |= SUBOPT_REFRESH; + opts->refresh = defGetBoolean(defel); } - else if (strcmp(defel->defname, "binary") == 0 && binary) + else if (IsSet(supported_opts, SUBOPT_BINARY) && + strcmp(defel->defname, "binary") == 0) { - if (*binary_given) + if (IsSet(specified_opts, SUBOPT_BINARY)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *binary_given = true; - *binary = defGetBoolean(defel); + specified_opts |= SUBOPT_BINARY; + opts->binary = defGetBoolean(defel); } - else if (strcmp(defel->defname, "streaming") == 0 && streaming) + else if (IsSet(supported_opts, SUBOPT_STREAMING) && + strcmp(defel->defname, "streaming") == 0) { - if (*streaming_given) + if (IsSet(specified_opts, SUBOPT_STREAMING)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *streaming_given = true; - *streaming = defGetBoolean(defel); + specified_opts |= SUBOPT_STREAMING; + opts->streaming = defGetBoolean(defel); } else ereport(ERROR, @@ -225,66 +259,76 @@ parse_subscription_options(List *options, * We've been explicitly asked to not connect, that requires some * additional processing. */ - if (connect && !*connect) + if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT)) { /* Check for incompatible options from the user. */ - if (enabled && *enabled_given && *enabled) + if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && + IsSet(specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", "connect = false", "enabled = true"))); - if (create_slot && create_slot_given && *create_slot) + if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + IsSet(specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", "connect = false", "create_slot = true"))); - if (copy_data && copy_data_given && *copy_data) + if (opts->copy_data && IsSet(supported_opts, SUBOPT_COPY_DATA) && + IsSet(specified_opts, SUBOPT_COPY_DATA)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); /* Change the defaults of other options. */ - *enabled = false; - *create_slot = false; - *copy_data = false; + opts->enabled = false; + opts->create_slot = false; + opts->copy_data = false; } /* * Do additional checking for disallowed combination when slot_name = NONE * was used. */ - if (slot_name && *slot_name_given && !*slot_name) + if (!opts->slot_name && IsSet(supported_opts, SUBOPT_SLOT_NAME) && + IsSet(specified_opts, SUBOPT_SLOT_NAME)) { - if (enabled && *enabled_given && *enabled) + if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && + IsSet(specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", "slot_name = NONE", "enabled = true"))); - if (create_slot && create_slot_given && *create_slot) + if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + IsSet(specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", "slot_name = NONE", "create_slot = true"))); - if (enabled && !*enabled_given && *enabled) + if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && + !IsSet(specified_opts, SUBOPT_ENABLED)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("subscription with %s must also set %s", "slot_name = NONE", "enabled = false"))); - if (create_slot && !create_slot_given && *create_slot) + if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + !IsSet(specified_opts, SUBOPT_CREATE_SLOT)) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("subscription with %s must also set %s", "slot_name = NONE", "create_slot = false"))); } + + opts->specified_opts = specified_opts; } /* @@ -331,37 +375,26 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; - bool connect; - bool enabled_given; - bool enabled; - bool copy_data; - bool streaming; - bool streaming_given; - char *synchronous_commit; char *conninfo; - char *slotname; - bool slotname_given; - bool binary; - bool binary_given; char originname[NAMEDATALEN]; - bool create_slot; List *publications; + SubOpts opts = {0}; + + opts.supported_opts |= SUBOPT_CONNECT; + opts.supported_opts |= SUBOPT_ENABLED; + opts.supported_opts |= SUBOPT_CREATE_SLOT; + opts.supported_opts |= SUBOPT_SLOT_NAME; + opts.supported_opts |= SUBOPT_COPY_DATA; + opts.supported_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + opts.supported_opts |= SUBOPT_BINARY; + opts.supported_opts |= SUBOPT_STREAMING; /* * Parse and check options. * * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, - &connect, - &enabled_given, &enabled, - &create_slot, - &slotname_given, &slotname, - ©_data, - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); + parse_subscription_options(stmt->options, &opts); /* * Since creating a replication slot is not transactional, rolling back @@ -369,7 +402,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * CREATE SUBSCRIPTION inside a transaction block if creating a * replication slot. */ - if (create_slot) + if (opts.create_slot) PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); if (!superuser()) @@ -399,12 +432,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) stmt->subname))); } - if (!slotname_given && slotname == NULL) - slotname = stmt->subname; + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && + opts.slot_name == NULL) + opts.slot_name = stmt->subname; /* The default for synchronous_commit of subscriptions is off. */ - if (synchronous_commit == NULL) - synchronous_commit = "off"; + if (opts.synchronous_commit == NULL) + opts.synchronous_commit = "off"; conninfo = stmt->conninfo; publications = stmt->publication; @@ -426,18 +460,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); - values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); - values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); + values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); + values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); - if (slotname) + if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); else nulls[Anum_pg_subscription_subslotname - 1] = true; values[Anum_pg_subscription_subsynccommit - 1] = - CStringGetTextDatum(synchronous_commit); + CStringGetTextDatum(opts.synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publications); @@ -456,7 +490,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connect to remote side to execute requested commands and fetch table * info. */ - if (connect) + if (opts.connect) { char *err; WalReceiverConn *wrconn; @@ -476,7 +510,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Set sync state based on if we were asked to do data copy or * not. */ - table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; /* * Get the table list from publisher and build local table status @@ -503,15 +537,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * won't use the initial snapshot for anything, so no need to * export it. */ - if (create_slot) + if (opts.create_slot) { - Assert(slotname); + Assert(opts.slot_name); - walrcv_create_slot(wrconn, slotname, false, + walrcv_create_slot(wrconn, opts.slot_name, false, CRS_NOEXPORT_SNAPSHOT, NULL); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", - slotname))); + opts.slot_name))); } } PG_FINALLY(); @@ -528,7 +562,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) table_close(rel, RowExclusiveLock); - if (enabled) + if (opts.enabled) ApplyLauncherWakeupAtCommit(); ObjectAddressSet(myself, SubscriptionRelationId, subid); @@ -762,6 +796,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) bool update_tuple = false; Subscription *sub; Form_pg_subscription form; + SubOpts opts = {0}; rel = table_open(SubscriptionRelationId, RowExclusiveLock); @@ -797,59 +832,47 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slotname; - bool slotname_given; - char *synchronous_commit; - bool binary_given; - bool binary; - bool streaming_given; - bool streaming; - - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - &slotname_given, &slotname, - NULL, /* no "copy_data" */ - &synchronous_commit, - NULL, /* no "refresh" */ - &binary_given, &binary, - &streaming_given, &streaming); - - if (slotname_given) + opts.supported_opts |= SUBOPT_SLOT_NAME; + opts.supported_opts |= SUBOPT_SYNCHRONOUS_COMMIT; + opts.supported_opts |= SUBOPT_BINARY; + opts.supported_opts |= SUBOPT_STREAMING; + + parse_subscription_options(stmt->options, &opts); + + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { - if (sub->enabled && !slotname) + if (sub->enabled && !opts.slot_name) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot set %s for enabled subscription", "slot_name = NONE"))); - if (slotname) + if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); else nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; } - if (synchronous_commit) + if (opts.synchronous_commit) { values[Anum_pg_subscription_subsynccommit - 1] = - CStringGetTextDatum(synchronous_commit); + CStringGetTextDatum(opts.synchronous_commit); replaces[Anum_pg_subscription_subsynccommit - 1] = true; } - if (binary_given) + if (IsSet(opts.specified_opts, SUBOPT_BINARY)) { values[Anum_pg_subscription_subbinary - 1] = - BoolGetDatum(binary); + BoolGetDatum(opts.binary); replaces[Anum_pg_subscription_subbinary - 1] = true; } - if (streaming_given) + if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { values[Anum_pg_subscription_substream - 1] = - BoolGetDatum(streaming); + BoolGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; } @@ -859,31 +882,21 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_ENABLED: { - bool enabled, - enabled_given; - - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - &enabled_given, &enabled, - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - NULL, /* no "copy_data" */ - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ - Assert(enabled_given); - - if (!sub->slotname && enabled) + opts.supported_opts |= SUBOPT_ENABLED; + + parse_subscription_options(stmt->options, &opts); + Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED)); + + if (!sub->slotname && opts.enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot enable subscription that does not have a slot name"))); values[Anum_pg_subscription_subenabled - 1] = - BoolGetDatum(enabled); + BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; - if (enabled) + if (opts.enabled) ApplyLauncherWakeupAtCommit(); update_tuple = true; @@ -904,19 +917,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_SET_PUBLICATION: { - bool copy_data; - bool refresh; - - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + opts.supported_opts |= SUBOPT_COPY_DATA; + opts.supported_opts |= SUBOPT_REFRESH; + + parse_subscription_options(stmt->options, &opts); + values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -924,7 +929,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; /* Refresh if user asked us to. */ - if (refresh) + if (opts.refresh) { if (!sub->enabled) ereport(ERROR, @@ -937,33 +942,26 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.copy_data); } break; } case ALTER_SUBSCRIPTION_ADD_PUBLICATION: + opts.supported_opts |= SUBOPT_COPY_DATA; + /* FALL THRU */ case ALTER_SUBSCRIPTION_DROP_PUBLICATION: { - bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; - bool copy_data; - bool refresh; List *publist; - publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); + opts.supported_opts |= SUBOPT_REFRESH; + + publist = merge_publications(sub->publications, stmt->publication, + stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION ? true : false, + stmt->subname); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - isadd ? ©_data : NULL, /* for drop, no - * "copy_data" */ - NULL, /* no "synchronous_commit" */ - &refresh, - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + parse_subscription_options(stmt->options, &opts); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(publist); @@ -972,7 +970,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) update_tuple = true; /* Refresh if user asked us to. */ - if (refresh) + if (opts.refresh) { if (!sub->enabled) ereport(ERROR, @@ -985,7 +983,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Only refresh the added/dropped list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.copy_data); } break; @@ -993,27 +991,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_REFRESH: { - bool copy_data; + opts.supported_opts |= SUBOPT_COPY_DATA; if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, - NULL, /* no "connect" */ - NULL, NULL, /* no "enabled" */ - NULL, /* no "create_slot" */ - NULL, NULL, /* no "slot_name" */ - ©_data, - NULL, /* no "synchronous_commit" */ - NULL, /* no "refresh" */ - NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + parse_subscription_options(stmt->options, &opts); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, copy_data); + AlterSubscription_refresh(sub, opts.copy_data); break; } diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index abdb08319c..c7164532ec 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2507,6 +2507,7 @@ StringInfoData StripnullState SubLink SubLinkType +SubOpts SubPlan SubPlanState SubRemoveRels -- 2.25.1
From c4644584c13362eb67c9db8f6fbe8c109728fbd4 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 9 Jun 2021 08:04:12 -0700 Subject: [PATCH v4] Remove similar ereport calls in parse_subscription_options Remove similar code (with the only difference in the option) when throwing errors for mutually exclusive and disallowed combination of options. Have variables for the options and use them in the error messages. It makes the code look better with lesser ereport(ERROR statements. --- src/backend/commands/subscriptioncmds.c | 63 +++++++++++++------------ 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9ec344cb0f..dd33196e0f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -261,28 +261,26 @@ parse_subscription_options(List *stmt_options, SubOpts *opts) */ if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT)) { + char *incompat_opt = NULL; + /* Check for incompatible options from the user. */ if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && IsSet(specified_opts, SUBOPT_ENABLED)) + incompat_opt = "enabled = true"; + else if (opts->create_slot && + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + IsSet(specified_opts, SUBOPT_CREATE_SLOT)) + incompat_opt = "create_slot = true"; + else if (opts->copy_data && IsSet(supported_opts, SUBOPT_COPY_DATA) && + IsSet(specified_opts, SUBOPT_COPY_DATA)) + incompat_opt = "copy_data = true"; + + if (incompat_opt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", - "connect = false", "enabled = true"))); - - if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) && - IsSet(specified_opts, SUBOPT_CREATE_SLOT)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "connect = false", "create_slot = true"))); - - if (opts->copy_data && IsSet(supported_opts, SUBOPT_COPY_DATA) && - IsSet(specified_opts, SUBOPT_COPY_DATA)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "connect = false", "copy_data = true"))); + "connect = false", incompat_opt))); /* Change the defaults of other options. */ opts->enabled = false; @@ -297,35 +295,38 @@ parse_subscription_options(List *stmt_options, SubOpts *opts) if (!opts->slot_name && IsSet(supported_opts, SUBOPT_SLOT_NAME) && IsSet(specified_opts, SUBOPT_SLOT_NAME)) { + char *incompat_opt = NULL; + char *required_opt = NULL; + if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && IsSet(specified_opts, SUBOPT_ENABLED)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - /*- translator: both %s are strings of the form "option = value" */ - errmsg("%s and %s are mutually exclusive options", - "slot_name = NONE", "enabled = true"))); + incompat_opt = "enabled = true"; + else if (opts->create_slot && + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + IsSet(specified_opts, SUBOPT_CREATE_SLOT)) + incompat_opt = "create_slot = true"; - if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) && - IsSet(specified_opts, SUBOPT_CREATE_SLOT)) + if (incompat_opt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ errmsg("%s and %s are mutually exclusive options", - "slot_name = NONE", "create_slot = true"))); + "slot_name = NONE", incompat_opt))); if (opts->enabled && IsSet(supported_opts, SUBOPT_ENABLED) && !IsSet(specified_opts, SUBOPT_ENABLED)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - /*- translator: both %s are strings of the form "option = value" */ - errmsg("subscription with %s must also set %s", - "slot_name = NONE", "enabled = false"))); + required_opt = "enabled = false"; + else if (opts->create_slot && + IsSet(supported_opts, SUBOPT_CREATE_SLOT) && + !IsSet(specified_opts, SUBOPT_CREATE_SLOT)) + required_opt = "create_slot = false"; - if (opts->create_slot && IsSet(supported_opts, SUBOPT_CREATE_SLOT) && - !IsSet(specified_opts, SUBOPT_CREATE_SLOT)) + if (required_opt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ errmsg("subscription with %s must also set %s", - "slot_name = NONE", "create_slot = false"))); + "slot_name = NONE", required_opt))); } opts->specified_opts = specified_opts; -- 2.25.1