Hi, Currently ALTER SUBSCRIPTION ... SET PUBLICATION will break the logical replication in certain cases. This can happen as the apply worker will get restarted after SET PUBLICATION, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before restart the origin has not been updated and the WAL start location points to a location prior to where PUBLICATION pub exists which can lead to such an error. Once this error occurs, apply worker will never be able to proceed and will always return the same error.
There was discussion on this and Amit had posted a patch to handle this at [2]. Amit's patch does continue using a historic snapshot but ignores publications that are not found for the purpose of computing RelSyncEntry attributes. We won't mark such an entry as valid till all the publications are loaded without anything missing. This means we won't publish operations on tables corresponding to that publication till we found such a publication and that seems okay. I have added an option skip_not_exist_publication to enable this operation only when skip_not_exist_publication is specified as true. There is no change in default behavior when skip_not_exist_publication is specified as false. But one thing to note with the patch (with skip_not_exist_publication option) is that replication of few WAL entries will be skipped till the publication is loaded like in the below example: -- Create table in publisher and subscriber create table t1(c1 int); create table t2(c1 int); -- Create publications create publication pub1 for table t1; create publication pub2 for table t2; -- Create subscription create subscription test1 connection 'dbname=postgres host=localhost port=5432' publication pub1, pub2; -- Drop one publication drop publication pub1; -- Insert in the publisher insert into t1 values(11); insert into t2 values(21); -- Select in subscriber postgres=# select * from t1; c1 ---- (0 rows) postgres=# select * from t2; c1 ---- 21 (1 row) -- Create the dropped publication in publisher create publication pub1 for table t1; -- Insert in the publisher insert into t1 values(12); postgres=# select * from t1; c1 ---- 11 12 (2 rows) -- Select data in subscriber postgres=# select * from t1; -- record with value 11 will be missing in subscriber c1 ---- 12 (1 row) Thoughts? [1] - https://www.postgresql.org/message-id/CAA4eK1%2BT-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q%40mail.gmail.com Regards, Vignesh
From 9b79dee26554ae4af9ff00948ab185482b071ba8 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Mon, 19 Feb 2024 10:20:02 +0530 Subject: [PATCH v1 1/2] Skip loading the publication if the publication does not exist. Skip loading the publication if the publication does not exist. --- src/backend/replication/pgoutput/pgoutput.c | 28 +++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 998f92d671..f7b6d0384d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; -static List *LoadPublications(List *pubnames); +static List *LoadPublications(List *pubnames, bool *skipped); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, @@ -1703,9 +1703,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we just skip the publications that don't exist yet. 'skipped' + * will be true if we find any publication from the given list that doesn't + * exist. */ static List * -LoadPublications(List *pubnames) +LoadPublications(List *pubnames, bool *skipped) { List *result = NIL; ListCell *lc; @@ -1713,9 +1717,12 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + *skipped = true; } return result; @@ -1994,7 +2001,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* Validate the entry */ - if (!entry->replicate_valid) + if (!entry->replicate_valid || !publications_valid) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); @@ -2011,6 +2018,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + bool skipped_pub = false; /* Reload publications if needed before use. */ if (!publications_valid) @@ -2021,9 +2029,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free_deep(data->publications); data->publications = NIL; } - data->publications = LoadPublications(data->publication_names); + data->publications = LoadPublications(data->publication_names, &skipped_pub); MemoryContextSwitchTo(oldctx); - publications_valid = true; + + /* + * We don't consider the publications to be valid till we have + * information of all the publications. + */ + if (!skipped_pub) + publications_valid = true; } /* -- 2.34.1
From 0030521b178b80b1de26afd8a71cb7b741511b01 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Mon, 19 Feb 2024 10:23:05 +0530 Subject: [PATCH v1 2/2] Added an option skip_not_exist_publication which will skip loading the publication, if the publication does not exist. Added an option skip_not_exist_publication which will skip loading the publication, if the publication does not exist. --- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 33 ++++++++++++++++++- .../libpqwalreceiver/libpqwalreceiver.c | 4 +++ src/backend/replication/logical/worker.c | 4 +++ src/backend/replication/pgoutput/pgoutput.c | 23 ++++++++++--- src/bin/psql/tab-complete.c | 10 +++--- src/include/catalog/pg_subscription.h | 5 +++ src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/subscription/t/031_column_list.pl | 14 ++++---- 11 files changed, 82 insertions(+), 17 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 406a3c2dd1..404577725e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -74,6 +74,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->skipnotexistpub = subform->subskipnotexistpub; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 04227a72d1..323cd2485d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1360,7 +1360,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subskipnotexistpub, subslotname, subsynccommit, subpublications, + suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index a05d69922d..bd59efc73a 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -72,6 +72,7 @@ #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_LSN 0x00004000 #define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_SKIP_NOT_EXISTS_PUB 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -97,6 +98,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool skipnotexistpub; char *origin; XLogRecPtr lsn; } SubOpts; @@ -159,6 +161,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + opts->skipnotexistpub = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -316,6 +320,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_SKIP_NOT_EXISTS_PUB) && + strcmp(defel->defname, "skip_not_exist_publication") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_SKIP_NOT_EXISTS_PUB; + opts->skipnotexistpub = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -408,6 +421,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "failover = true"))); + if (opts->skipnotexistpub && + IsSet(opts->specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "skip_not_exist_publication = true"))); + /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; @@ -611,7 +631,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -718,6 +739,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subskipnotexistpub - 1] = + BoolGetDatum(opts.skipnotexistpub); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1169,6 +1192,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_SKIP_NOT_EXISTS_PUB | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, @@ -1248,6 +1272,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subrunasowner - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SKIP_NOT_EXISTS_PUB)) + { + values[Anum_pg_subscription_subskipnotexistpub - 1] = + BoolGetDatum(opts.runasowner); + replaces[Anum_pg_subscription_subskipnotexistpub - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) { if (!sub->slotname) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9270d7b855..a66108aee8 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -593,6 +593,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", origin '%s'", options->proto.logical.origin); + if (options->proto.logical.skipnotexistpub && + PQserverVersion(conn->streamConn) >= 170000) + appendStringInfo(&cmd, ", skip_not_exist_publication 'true'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9dd2446fbf..ef362e3571 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3952,6 +3952,7 @@ maybe_reread_subscription(void) newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || newsub->passwordrequired != MySubscription->passwordrequired || + newsub->skipnotexistpub != MySubscription->skipnotexistpub || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || !equal(newsub->publications, MySubscription->publications)) @@ -4380,6 +4381,9 @@ set_stream_options(WalRcvStreamOptions *options, options->proto.logical.publication_names = MySubscription->publications; options->proto.logical.binary = MySubscription->binary; + if (server_version >= 170000 && MySubscription->skipnotexistpub) + options->proto.logical.skipnotexistpub = true; + /* * Assign the appropriate option value for streaming option according to * the 'streaming' mode and the publisher's ability to support that mode. diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f7b6d0384d..7c27d4a7d7 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,7 +82,8 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; -static List *LoadPublications(List *pubnames, bool *skipped); +static List *LoadPublications(List *pubnames, bool skipnotexistpub, + bool *skipped); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, @@ -284,11 +285,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool skipnotexistpub_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; + data->skipnotexistpub = false; foreach(lc, options) { @@ -397,6 +400,16 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } + else if (strcmp(defel->defname, "skip_not_exist_publication") == 0) + { + if (skipnotexistpub_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + skipnotexistpub_option_given = true; + + data->skipnotexistpub = true; + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1709,7 +1722,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) * exist. */ static List * -LoadPublications(List *pubnames, bool *skipped) +LoadPublications(List *pubnames, bool skipnotexistpub, bool *skipped) { List *result = NIL; ListCell *lc; @@ -1717,7 +1730,7 @@ LoadPublications(List *pubnames, bool *skipped) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, true); + Publication *pub = GetPublicationByName(pubname, skipnotexistpub); if (pub) result = lappend(result, pub); @@ -2029,7 +2042,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free_deep(data->publications); data->publications = NIL; } - data->publications = LoadPublications(data->publication_names, &skipped_pub); + data->publications = LoadPublications(data->publication_names, + data->skipnotexistpub, + &skipped_pub); MemoryContextSwitchTo(oldctx); /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 151a5211ee..0642ff4581 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1944,8 +1944,9 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + "password_required", "run_as_owner", + "skip_not_exist_publication", "slot_name", "streaming", + "synchronous_commit"); /* ALTER SUBSCRIPTION <name> SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3341,8 +3342,9 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "run_as_owner", + "skip_not_exist_publication", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..704d1217d9 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,6 +98,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subskipnotexistpub; /* True if the not-exist publications should + * be ignored */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -151,6 +154,8 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool skipnotexistpub; /* True if the non-existent publications should + * be ignored. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 89f94e1147..38faa2ea04 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -33,6 +33,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool publish_no_origin; + bool skipnotexistpub; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index b906bb5ce8..e7a0d9e08a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -186,6 +186,7 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + bool skipnotexistpub; } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 938582e31a..8626397c1c 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -145,7 +145,7 @@ $node_publisher->safe_psql( # then check the sync results $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (skip_not_exist_publication = true) )); $node_subscriber->wait_for_subscription_sync; @@ -741,7 +741,7 @@ $node_publisher->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -921,7 +921,7 @@ $node_publisher->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -973,7 +973,7 @@ $node_publisher->safe_psql( # both table sync and data replication. $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_test_root, pub_test_root_1 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -1213,7 +1213,7 @@ $node_subscriber->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_7, pub_mix_8 WITH (skip_not_exist_publication = true); )); $node_subscriber->wait_for_subscription_sync; @@ -1255,7 +1255,7 @@ $node_subscriber->safe_psql( my ($cmdret, $stdout, $stderr) = $node_subscriber->psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true); )); ok( $stderr =~ @@ -1272,7 +1272,7 @@ $node_publisher->safe_psql( $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2 WITH (skip_not_exist_publication = true); )); $node_publisher->wait_for_catchup('sub1'); -- 2.34.1