On Thu, Jul 15, 2021 at 5:57 PM vignesh C <vignes...@gmail.com> wrote: > > On Tue, Jul 6, 2021 at 8:09 PM vignesh C <vignes...@gmail.com> wrote: > > > > On Wed, Jun 30, 2021 at 8:23 PM vignesh C <vignes...@gmail.com> wrote: > > > > > > On Sun, Jun 6, 2021 at 11:55 AM vignesh C <vignes...@gmail.com> wrote: > > > > > > > > On Fri, May 7, 2021 at 6:44 PM vignesh C <vignes...@gmail.com> wrote: > > > > > > > > > > Thanks for the comments, the attached patch has the fix for the same. > > > >
The patch was not applying on the head, attached patch which is rebased on HEAD. Regards, Vignesh
From 36e522ba31829c76e6c4c2069f2806c653aecf62 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C <vignes...@gmail.com> Date: Thu, 26 Aug 2021 16:32:56 +0530 Subject: [PATCH v11] Identify missing publications from publisher while create/alter subscription. Creating/altering subscription is successful when we specify a publication which does not exist in the publisher. This patch checks if the specified publications are present in the publisher and throws an error if any of the publication is missing in the publisher. --- doc/src/sgml/ref/alter_subscription.sgml | 13 ++ doc/src/sgml/ref/create_subscription.sgml | 18 +- src/backend/commands/subscriptioncmds.c | 207 +++++++++++++++++++--- src/bin/psql/tab-complete.c | 14 +- src/test/subscription/t/007_ddl.pl | 68 ++++++- 5 files changed, 287 insertions(+), 33 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 835be0d2a4..d5b28e9afa 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -163,6 +163,19 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </para> </listitem> </varlistentry> + + <varlistentry> + <term><literal>validate_publication</literal> (<type>boolean</type>)</term> + <listitem> + <para> + When true, the command verifies if all the specified publications + that are being subscribed to are present in the publisher and throws + an error if any of the publication doesn't exist. The default is + <literal>false</literal>. + </para> + </listitem> + </varlistentry> + </variablelist></para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 702934eba1..a39c85b2bb 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -207,8 +207,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl Specifies whether the <command>CREATE SUBSCRIPTION</command> should connect to the publisher at all. Setting this to <literal>false</literal> will change default values of - <literal>enabled</literal>, <literal>create_slot</literal> and - <literal>copy_data</literal> to <literal>false</literal>. + <literal>enabled</literal>, <literal>create_slot</literal>, + <literal>copy_data</literal> and + <literal>validate_publication</literal> to <literal>false</literal>. </para> <para> @@ -266,6 +267,19 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </listitem> </varlistentry> + + <varlistentry> + <term><literal>validate_publication</literal> (<type>boolean</type>)</term> + <listitem> + <para> + When true, the command verifies if all the specified publications + that are being subscribed to are present in the publisher and throws + an error if any of the publication doesn't exist. The default is + <literal>false</literal>. + </para> + </listitem> + </varlistentry> + </variablelist></para> </listitem> </varlistentry> diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index c47ba26369..8d2aaf72c6 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,6 +60,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_VALIDATE_PUB 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -81,6 +82,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + bool validate_publication; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -128,6 +130,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_VALIDATE_PUB)) + opts->validate_publication = false; /* Parse options */ foreach(lc, stmt_options) @@ -247,6 +251,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; opts->twophase = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_VALIDATE_PUB) && + strcmp(defel->defname, "validate_publication") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_VALIDATE_PUB)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_VALIDATE_PUB; + opts->validate_publication = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -285,10 +298,19 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); + if (opts->validate_publication && + IsSet(supported_opts, SUBOPT_VALIDATE_PUB) && + IsSet(opts->specified_opts, SUBOPT_VALIDATE_PUB)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "validate_publication = true"))); + /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; opts->copy_data = false; + opts->validate_publication = false; } /* @@ -337,6 +359,140 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, } } +/* + * Add publication names from the list to a string. + */ +static void +get_publications_str(List *publications, StringInfo dest, bool quote_literal) +{ + ListCell *lc; + bool first = true; + + Assert(list_length(publications) > 0); + + foreach(lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(dest, ", "); + + if (quote_literal) + appendStringInfoString(dest, quote_literal_cstr(pubname)); + else + { + appendStringInfoChar(dest, '"'); + appendStringInfoString(dest, pubname); + appendStringInfoChar(dest, '"'); + } + } +} + +/* + * Check the specified publication(s) is(are) present in the publisher. + */ +static void +check_publications(WalReceiverConn *wrconn, List *publications, + bool validate_publication) +{ + WalRcvExecResult *res; + StringInfo cmd; + TupleTableSlot *slot; + List *publicationsCopy = NIL; + Oid tableRow[1] = {TEXTOID}; + + if (!validate_publication) + return; + + cmd = makeStringInfo(); + appendStringInfoString(cmd, "SELECT t.pubname FROM\n" + " pg_catalog.pg_publication t WHERE\n" + " t.pubname IN ("); + get_publications_str(publications, cmd, true); + appendStringInfoChar(cmd, ')'); + + res = walrcv_exec(wrconn, cmd->data, 1, tableRow); + pfree(cmd->data); + pfree(cmd); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg_plural("could not receive publication from the publisher: %s", + "could not receive list of publications from the publisher: %s", + list_length(publications), + res->err))); + + publicationsCopy = list_copy(publications); + + /* Process publication(s). */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + /* Delete the publication present in publisher from the list. */ + publicationsCopy = list_delete(publicationsCopy, makeString(pubname)); + ExecClearTuple(slot); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + + if (list_length(publicationsCopy)) + { + /* Prepare the list of non-existent publication(s) for error message. */ + StringInfo pubnames = makeStringInfo(); + + get_publications_str(publicationsCopy, pubnames, false); + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_ARGUMENTS), + errmsg_plural("publication %s does not exist in the publisher", + "publications %s do not exist in the publisher", + list_length(publicationsCopy), + pubnames->data))); + } +} + +/* + * Connect to the publisher and see if the given publication(s) is(are) present. + */ +static void +connect_and_check_pubs(Subscription *sub, List *publications, + bool validate_publication) +{ + char *err; + WalReceiverConn *wrconn; + + if (!validate_publication) + return; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + PG_TRY(); + { + check_publications(wrconn, publications, true); + } + PG_FINALLY(); + { + walrcv_disconnect(wrconn); + } + PG_END_TRY(); +} + /* * Auxiliary function to build a text array out of a list of String nodes. */ @@ -396,7 +552,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_VALIDATE_PUB); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -514,6 +671,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + check_publications(wrconn, publications, opts.validate_publication); + /* * Set sync state based on if we were asked to do data copy or * not. @@ -606,7 +765,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, } static void -AlterSubscription_refresh(Subscription *sub, bool copy_data) +AlterSubscription_refresh(Subscription *sub, bool copy_data, + bool validate_publication) { char *err; List *pubrel_names; @@ -637,6 +797,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) PG_TRY(); { + check_publications(wrconn, sub->publications, validate_publication); + /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); @@ -959,7 +1121,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, case ALTER_SUBSCRIPTION_SET_PUBLICATION: { - supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; + supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH | SUBOPT_VALIDATE_PUB; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -968,6 +1130,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subpublications - 1] = true; update_tuple = true; + connect_and_check_pubs(sub, stmt->publication, + opts.validate_publication); /* Refresh if user asked us to. */ if (opts.refresh) @@ -994,7 +1158,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, false); } break; @@ -1007,6 +1171,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA; + if (isadd) + supported_opts |= SUBOPT_VALIDATE_PUB; + parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1016,6 +1183,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subpublications - 1] = true; update_tuple = true; + if (isadd) + connect_and_check_pubs(sub, stmt->publication, + opts.validate_publication); /* Refresh if user asked us to. */ if (opts.refresh) @@ -1042,7 +1212,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Refresh the new list of publications. */ sub->publications = publist; - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, false); } break; @@ -1056,7 +1226,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA | SUBOPT_VALIDATE_PUB, + &opts); /* * The subscription option "two_phase" requires that @@ -1084,7 +1255,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); - AlterSubscription_refresh(sub, opts.copy_data); + AlterSubscription_refresh(sub, opts.copy_data, + opts.validate_publication); break; } @@ -1548,28 +1720,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[2] = {TEXTOID, TEXTOID}; - ListCell *lc; - bool first; List *tablelist = NIL; - Assert(list_length(publications) > 0); - initStringInfo(&cmd); appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); - first = true; - foreach(lc, publications) - { - char *pubname = strVal(lfirst(lc)); - - if (first) - first = false; - else - appendStringInfoString(&cmd, ", "); - - appendStringInfoString(&cmd, quote_literal_cstr(pubname)); - } + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); res = walrcv_exec(wrconn, cmd.data, 2, tableRow); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 7cdfc7c637..16e409be72 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1659,7 +1659,7 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("REFRESH", "PUBLICATION", "WITH", "(")) - COMPLETE_WITH("copy_data"); + COMPLETE_WITH("copy_data", "validate_publication"); /* ALTER SUBSCRIPTION <name> SET */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, "SET")) COMPLETE_WITH("(", "PUBLICATION"); @@ -1675,11 +1675,14 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny)) COMPLETE_WITH("WITH ("); - /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */ + /* ALTER SUBSCRIPTION <name> ADD|SET PUBLICATION <name> WITH ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && - TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "(")) + TailMatches("ADD|SET", "PUBLICATION", MatchAny, "WITH", "(")) + COMPLETE_WITH("copy_data", "refresh", "validate_publication"); + /* ALTER SUBSCRIPTION <name> DROP PUBLICATION <name> WITH ( */ + else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && + TailMatches("DROP", "PUBLICATION", MatchAny, "WITH", "(")) COMPLETE_WITH("copy_data", "refresh"); - /* ALTER SCHEMA <name> */ else if (Matches("ALTER", "SCHEMA", MatchAny)) COMPLETE_WITH("OWNER TO", "RENAME TO"); @@ -2767,7 +2770,8 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "enabled", "slot_name", "streaming", - "synchronous_commit", "two_phase"); + "synchronous_commit", "two_phase", + "validate_publication"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl index 1a3a1dcf14..05503adbc2 100644 --- a/src/test/subscription/t/007_ddl.pl +++ b/src/test/subscription/t/007_ddl.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 1; +use Test::More tests => 9; my $node_publisher = PostgresNode->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -41,5 +41,71 @@ COMMIT; pass "subscription disable and drop in same transaction did not hang"; +# Specified publication does not exist. +my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + m/ERROR: publication "non_existent_pub" does not exist in the publisher/, + "Create subscription for non existent publication fails"); + +# One of the specified publication exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + m/ERROR: publication "non_existent_pub" does not exist in the publisher/, + "Create subscription for non existent publication fails"); +# Multiple publications do not exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub, non_existent_pub1 WITH (VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + m/ERROR: publications "non_existent_pub", "non_existent_pub1" do not exist in the publisher/, + "Create subscription for non existent publication fails"); + +# Create subscription with mutually exclusive options connect as false and validate_publication as true. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION non_existent_pub, non_existent_pub1 WITH (CONNECT = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + m/ERROR: connect = false and validate_publication = true are mutually exclusive options/, + "Create subscription with connect=false and validate_publication=true should fail"); + +# Add non existent publication. +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); +($ret, $stdout, $stderr) = ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + m/ERROR: publication "non_existent_pub" does not exist in the publisher/, + "Alter subscription add non existent publication fails"); + +# Specified publication does not exist. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub WITH (VALIDATE_PUBLICATION = TRUE)"); +ok( $stderr =~ + m/ERROR: publication "non_existent_pub" does not exist in the publisher/, + "Alter subscription for non existent publication fails"); + +# Specified publication does not exist with refresh = false. +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ + m/ERROR: publication "non_existent_pub" does not exist in the publisher/, + "Alter subscription for non existent publication fails"); + +# Set publication on non existent database. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION mysub1 CONNECTION 'dbname=regress_doesnotexist2'"); +($ret, $stdout, $stderr) = $node_subscriber->psql('postgres', + "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub WITH (REFRESH = FALSE, VALIDATE_PUBLICATION = TRUE)" +); +ok( $stderr =~ m/ERROR: could not connect to the publisher/, + "Alter subscription for non existent publication fails"); + $node_subscriber->stop; $node_publisher->stop; -- 2.30.2