On Wed, 7 Feb 2024 at 16:27, vignesh C <vignes...@gmail.com> wrote: > > On Wed, 7 Feb 2024 at 15:21, Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > On Tue, Feb 6, 2024 at 8:21 PM Tom Lane <t...@sss.pgh.pa.us> wrote: > > > > > > Amit Kapila <amit.kapil...@gmail.com> writes: > > > > Yeah, I was worried about that. The other idea I have previously > > > > thought was to change Alter Subscription to Drop+Create Subscription. > > > > That should also help in bringing stability without losing any > > > > functionality. > > > > > > Hm, why would that fix it? > > > > > > > Because for new subscriptions, we will start reading WAL from the > > latest WAL insert pointer on the publisher which will be after the > > point where publication is created. > > I was able to reproduce the issue consistently with the changes shared > by Tom Lane at [1]. > I have made changes to change ALTER SUBSCRIPTION to DROP+CREATE > SUBSCRIPTION and verified that the test has passed consistently for > >50 runs that I ran. Also the test execution time increased for this > case is very negligibly: > Without patch: 7.991 seconds > With test change patch: 8.121 seconds > > The test changes for the same are attached.
Alternative, this could also be fixed like the changes proposed by Amit at [1]. In this case we ignore publications that are not found for the purpose of computing RelSyncEntry attributes. We won't mark such an entry as valid till all the publications are loaded without anything missing. This means we won't publish operations on tables corresponding to that publication till we found such a publication and that seems okay. Tomas had raised a performance issue forcing us to reload it for every replicated change/row in case the publications are invalid at [2]. How about keeping the default option as it is and providing a new option skip_not_exist_publication while creating/altering a subscription. In this case if skip_not_exist_publication is specified we will ignore the case if publication is not present and publications will be kept in invalid and get validated later. The attached patch has the changes for the same. Thoughts? [1] - https://www.postgresql.org/message-id/CAA4eK1%2BT-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q%40mail.gmail.com [2] - https://www.postgresql.org/message-id/dc08add3-10a8-738b-983a-191c7406707b%40enterprisedb.com Regards, Vignesh
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 406a3c2dd1..404577725e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -74,6 +74,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->skipnotexistpub = subform->subskipnotexistpub; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6791bff9dd..bdd367d272 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1359,7 +1359,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subskipnotexistpub, subslotname, subsynccommit, subpublications, + suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index a05d69922d..bd59efc73a 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -72,6 +72,7 @@ #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_LSN 0x00004000 #define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_SKIP_NOT_EXISTS_PUB 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -97,6 +98,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool skipnotexistpub; char *origin; XLogRecPtr lsn; } SubOpts; @@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + opts->skipnotexistpub = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) && + strcmp(defel->defname, "skip_not_exist_publication") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB; + opts->skipnotexistpub = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -408,6 +421,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "failover = true"))); + if (opts->skipnotexistpub && + IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "skip_not_exist_publication = true"))); + /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; @@ -611,7 +631,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -718,6 +739,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subskipnotexistpub - 1] = + BoolGetDatum(opts.skipnotexistpub); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1169,6 +1192,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, @@ -1248,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subrunasowner - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + { + values[Anum_pg_subscription_subskipnotexistpub - 1] = + BoolGetDatum(opts.runasowner); + replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) { if (!sub->slotname) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9270d7b855..a66108aee8 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -593,6 +593,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", origin '%s'", options->proto.logical.origin); + if (options->proto.logical.skipnotexistpub && + PQserverVersion(conn->streamConn) >= 170000) + appendStringInfo(&cmd, ", skip_not_exist_publication 'true'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9dd2446fbf..ef362e3571 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3952,6 +3952,7 @@ maybe_reread_subscription(void) newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || newsub->passwordrequired != MySubscription->passwordrequired || + newsub->skipnotexistpub != MySubscription->skipnotexistpub || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || !equal(newsub->publications, MySubscription->publications)) @@ -4380,6 +4381,9 @@ set_stream_options(WalRcvStreamOptions *options, options->proto.logical.publication_names = MySubscription->publications; options->proto.logical.binary = MySubscription->binary; + if (server_version >= 170000 && MySubscription->skipnotexistpub) + options->proto.logical.skipnotexistpub = true; + /* * Assign the appropriate option value for streaming option according to * the 'streaming' mode and the publisher's ability to support that mode. diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 998f92d671..7c27d4a7d7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; -static List *LoadPublications(List *pubnames); +static List *LoadPublications(List *pubnames, bool skipnotexistpub, + bool *skipped); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, @@ -284,11 +285,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool skipnotexistpub_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; + data->skipnotexistpub = false; foreach(lc, options) { @@ -397,6 +400,16 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } + else if (strcmp(defel->defname, "skip_not_exist_publication") == 0) + { + if (skipnotexistpub_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + skipnotexistpub_option_given = true; + + data->skipnotexistpub = true; + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1703,9 +1716,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we just skip the publications that don't exist yet. 'skipped' + * will be true if we find any publication from the given list that doesn't + * exist. */ static List * -LoadPublications(List *pubnames) +LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped) { List *result = NIL; ListCell *lc; @@ -1713,9 +1730,12 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, skipnotexistpub); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + *skipped = true; } return result; @@ -1994,7 +2014,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* Validate the entry */ - if (!entry->replicate_valid) + if (!entry->replicate_valid || !publications_valid) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); @@ -2011,6 +2031,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + bool skipped_pub = false; /* Reload publications if needed before use. */ if (!publications_valid) @@ -2021,9 +2042,17 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free_deep(data->publications); data->publications = NIL; } - data->publications = LoadPublications(data->publication_names); + data->publications = LoadPublications(data->publication_names, + data->skipnotexistpub, + &skipped_pub); MemoryContextSwitchTo(oldctx); - publications_valid = true; + + /* + * We don't consider the publications to be valid till we have + * information of all the publications. + */ + if (!skipped_pub) + publications_valid = true; } /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 151a5211ee..0642ff4581 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1944,8 +1944,9 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + "password_required", "run_as_owner", + "skip_not_exist_publication", "slot_name", "streaming", + "synchronous_commit"); /* ALTER SUBSCRIPTION <name> SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3341,8 +3342,9 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "run_as_owner", + "skip_not_exist_publication", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..704d1217d9 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subskipnotexistpub; /* True if the not-exist publications should + * be ignored */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -151,6 +154,8 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool skipnotexistpub; /* True if the non-existent publications should + * be ignored. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 89f94e1147..38faa2ea04 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -33,6 +33,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool publish_no_origin; + bool skipnotexistpub; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index b906bb5ce8..e7a0d9e08a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -186,6 +186,7 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + bool skipnotexistpub; } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 938582e31a..8626397c1c 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -145,7 +145,7 @@ $node_publisher->safe_psql( # then check the sync results $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true) )); $node_subscriber->wait_for_subscription_sync; @@ -741,7 +741,7 @@ $node_publisher->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -921,7 +921,7 @@ $node_publisher->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -973,7 +973,7 @@ $node_publisher->safe_psql( # both table sync and data replication. $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -1213,7 +1213,7 @@ $node_subscriber->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -1255,7 +1255,7 @@ $node_subscriber->safe_psql( my ($cmdret, $stdout, $stderr) = $node_subscriber->psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true); )); ok( $stderr =~ @@ -1272,7 +1272,7 @@ $node_publisher->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true); )); $node_publisher->wait_for_catchup('sub1');