On Thu, 9 Nov 2023 at 12:23, Michael Paquier <mich...@paquier.xyz> wrote: > > On Thu, Nov 09, 2023 at 01:14:05PM +1100, Peter Smith wrote: > > Looks like v12 accidentally forgot to update this to the modified > > function name 'binary_upgrade_add_sub_rel_state' > > This v12 is overall cleaner than its predecessors. Nice to see. > > +my $result = $publisher->safe_psql('postgres', "SELECT count(*) FROM t1"); > +is($result, qq(1), "check initial t1 table data on publisher"); > +$result = $publisher->safe_psql('postgres', "SELECT count(*) FROM t2"); > +is($result, qq(1), "check initial t1 table data on publisher"); > +$result = $old_sub->safe_psql('postgres', "SELECT count(*) FROM t1"); > +is($result, qq(1), "check initial t1 table data on the old subscriber"); > +$result = $old_sub->safe_psql('postgres', "SELECT count(*) FROM t2"); > > I'd argue that t1 and t2 should have less generic names. t1 is used > to check that the upgrade process works, while t2 is added to the > publication after upgrading the subscriber. Say something like > tab_upgraded or tab_not_upgraded?
Modified > +my $synced_query = > + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN > ('r');"; > > Perhaps it would be safer to use a query that checks the number of > relations in 'r' state? This query would return true if > pg_subscription_rel has no tuples. Modified > +# Table will be in 'd' (data is being copied) state as table sync will fail > +# because of primary key constraint error. > +my $started_query = > + "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'd';"; > > Relying on a pkey error to enforce an incorrect state is a good trick. > Nice. That was better way to get data sync state without manually changing the pg_subscription_rel catalog > +command_fails( > + [ > + 'pg_upgrade', '--no-sync', '-d', $old_sub->data_dir, > + '-D', $new_sub->data_dir, '-b', $bindir, > + '-B', $bindir, '-s', $new_sub->host, > + '-p', $old_sub->port, '-P', $new_sub->port, > + $mode, '--check', > + ], > + 'run of pg_upgrade --check for old instance with relation in \'d\' > datasync(invalid) state'); > +rmtree($new_sub->data_dir . "/pg_upgrade_output.d"); > > Okay by me to not stop the cluster for the --check to shave a few > cycles. It's a bit sad that we don't cross-check the contents of > subscription_state.txt before removing pg_upgrade_output.d. Finding > the file is easy even if the subdir where it is included is not a > constant name. Then it is possible to apply a regexp with the > contents consumed by a slurp_file(). Modified > +my $remote_lsn = $old_sub->safe_psql('postgres', > + "SELECT remote_lsn FROM pg_replication_origin_status"); > Perhaps you've not noticed, but this would be 0/0 most of the time. > However the intention is to check after a valid LSN to make sure that > the origin is set, no? I have added few more inserts to make remote_lsn not be 0/0 > I am wondering whether this should use a bit more data than just one > tuple, say at least two transaction, one of them with a multi-value > INSERT? Added one more multi-insert > +# ------------------------------------------------------ > +# Check that pg_upgrade is successful when all tables are in ready state. > +# ------------------------------------------------------ > This comment is a bit inconsistent with the state that are accepted, > but why not, at least that's predictible. The key test validation is mentioned in this style of comment > + * relation to pg_subscripion_rel table. This will be used only in > > Typo: s/pg_subscripion_rel/pg_subscription_rel/. Modified > This needs some word-smithing to explain the reasons why a state is > not needed: > > + /* > + * The subscription relation should be in either i (initialize), > + * r (ready) or s (synchronized) state as either the replication slot > + * is not created or the replication slot is already dropped and the > + * required WAL files will be present in the publisher. The other > + * states are not ok as the worker has dependency on the replication > + * slot/origin in these case: > > A slot not created yet refers to the 'i' state, while 'r' and 's' > refer to a slot created previously but already dropped, right? > Shouldn't this comment tell that rather than mixing the assumptions? Modified > + * a) SUBREL_STATE_DATASYNC: In this case, the table sync worker will > + * try to drop the replication slot but as the replication slots will > + * be created with old subscription id in the publisher and the > + * upgraded subscriber will not be able to clean the slots in this > + * case. > > Proposal: A relation upgraded while in this state would retain a > replication slot, which could not be dropped by the sync worker > spawned after the upgrade because the subscription ID tracked by the > publisher does not match anymore. Modified > Note: actually, this would be OK if we are able to keep the OIDs of > the subscribers consistent across upgrades? I'm OK to not do nothing > about that in this patch, to keep it simpler. Just asking in passing. I will analyze more on this and post the analysis in the subsequent mail. > + * b) SUBREL_STATE_FINISHEDCOPY: In this case, the tablesync worker > will > + * expect the origin to be already existing as the origin is created > + * with an old subscription id, tablesync worker will not be able to > + * find the origin in this case. > > Proposal: A tablesync worker spawned to work on a relation upgraded > while in this state would expect an origin ID with the OID of the > subscription used before the upgrade, causing it to fail. Modified > + "A list of problem subscriptions is in the file:\n" > > Sounds a bit strange, perhaps use an extra "the", as of "the problem > subscriptions"? Modified > Could it be worth mentioning in the docs that one could also DISABLE > the subscriptions before running the upgrade? I felt since the changes that we are planning to make won't start the apply workers during upgrade, there will be no impact even if the subscriptions are enabled. I felt no need to mention it unless we are planning to allow starting of apply workers during upgrade. > + The replication origin entry corresponding to each of the > subscriptions > + should exist in the old cluster. This can be found by checking > + <link linkend="catalog-pg-subscription">pg_subscription</link> and > + <link > linkend="catalog-pg-replication-origin">pg_replication_origin</link> > + system tables. > > Hmm. No need to mention pg_replication_origin_status? When we create origin, the origin status would be created implicitly, I felt we need not check on replication origin status and also need not mention it here. > If I may ask, how did you check that the given relation states were > OK or not OK? Did you hardcode some wait points in tablesync.c up to > where a state is updated in pg_subscription_rel, then shutdown the > cluster before the upgrade to maintain the catalog in this state? > Finally, after the upgrade, you've cross-checked the dependencies on > the slots and origins to see that the spawned sync workers turned > crazy because of the inconsistencies. Right? I did testing in the same lines that you mentioned. Apart from that I also reviewed the design where it was using the old subscription id like in case of table sync workers, the tables sync worker will use replication using old subscription id. replication slot and replication origin. I also checked the impact of remote_lsn's. Few example: IN SUBREL_STATE_DATASYNC state we will try to drop the replication slot once worker is started but since the slot will be created with an old subscription, we will not be able to drop the replication slot and create a leak. Similarly the problem exists with SUBREL_STATE_FINISHEDCOPY where we will not be able to drop the origin created with an old sub id. Thanks for the comments, the attached v13 version patch has the changes for the same. Regards, Vignesh
From 81f8423df8672bd29ad214946fb29a282cd6c796 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Mon, 30 Oct 2023 12:31:59 +0530 Subject: [PATCH v13] Preserve the full subscription's state during pg_upgrade Previously, only the subscription metadata information was preserved. Without the list of relations and their state it's impossible to re-enable the subscriptions without missing some records as the list of relations can only be refreshed after enabling the subscription (and therefore starting the apply worker). Even if we added a way to refresh the subscription while enabling a publication, we still wouldn't know which relations are new on the publication side, and therefore should be fully synced, and which shouldn't. To fix this problem, this patch teaches pg_dump to restore the content of pg_subscription_rel from the old cluster by using binary_upgrade_add_sub_rel_state SQL function. This is supported only in binary upgrade mode. The new SQL binary_upgrade_add_sub_rel_state function has the following syntax: SELECT binary_upgrade_add_sub_rel_state(subname text, relid oid, state char [,sublsn pg_lsn]) In the above, subname is the subscription name, relid is the relation identifier, the state is the state of the relation, sublsn is subscription lsn which is optional, and defaults to NULL/InvalidXLogRecPtr if not provided. pg_dump will retrieve these values(subname, relid, state and sublsn) from the old cluster. The subscription's replication origin is needed to ensure that we don't replicate anything twice. To fix this problem, this patch teaches pg_dump to update the replication origin along with create subscription by using binary_upgrade_replorigin_advance SQL function to restore the underlying replication origin remote LSN. This is supported only in binary upgrade mode. The new SQL binary_upgrade_replorigin_advance function has the following syntax: SELECT binary_upgrade_replorigin_advance(subname text, sublsn pg_lsn) In the above, subname is the subscription name and sublsn is subscription lsn. pg_dump will retrieve these values(subname and sublsn) from the old cluster. pg_upgrade will check that all the subscription relations are in 'i' (init), 's' (data sync) or in 'r' (ready) state, and will error out if that's not the case, logging the reason for the failure. Author: Julien Rouhaud, Vignesh C Reviewed-by: FIXME Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud --- doc/src/sgml/ref/pgupgrade.sgml | 64 +++++ src/backend/utils/adt/pg_upgrade_support.c | 118 ++++++++++ src/bin/pg_dump/common.c | 22 ++ src/bin/pg_dump/pg_dump.c | 201 +++++++++++++++- src/bin/pg_dump/pg_dump.h | 16 ++ src/bin/pg_dump/pg_dump_sort.c | 11 +- src/bin/pg_upgrade/check.c | 123 ++++++++++ src/bin/pg_upgrade/meson.build | 1 + src/bin/pg_upgrade/t/004_subscription.pl | 261 +++++++++++++++++++++ src/include/catalog/pg_proc.dat | 10 + src/tools/pgindent/typedefs.list | 1 + 11 files changed, 822 insertions(+), 6 deletions(-) create mode 100644 src/bin/pg_upgrade/t/004_subscription.pl diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml index 46e8a0b746..1a6ad16060 100644 --- a/doc/src/sgml/ref/pgupgrade.sgml +++ b/doc/src/sgml/ref/pgupgrade.sgml @@ -456,6 +456,70 @@ make prefix=/usr/local/pgsql.new install </step> + <step> + <title>Prepare for subscriber upgrades</title> + + <para> + Setup the <link linkend="logical-replication-config-subscriber"> + subscriber configurations</link> in the new subscriber. + <application>pg_upgrade</application> attempts to migrate subscription + dependencies which includes the subscription table information present in + <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link> + system catalog and also the subscription replication origin. This allows + logical replication on the new subscriber to continue from where the + old subscriber was up to. Migration of subscription dependencies is only + supported when the old cluster is version 17.0 or later. Subscription + dependencies on clusters before version 17.0 will silently be ignored. + </para> + + <para> + There are some prerequisites for <application>pg_upgrade</application> to + be able to upgrade the subscriptions. If these are not met an error + will be reported. + </para> + + <itemizedlist> + <listitem> + <para> + All the subscription tables in the old subscriber should be in state + <literal>i</literal> (initialize), <literal>r</literal> (ready) or + <literal>s</literal> (synchronized). This can be verified by checking + <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>.<structfield>srsubstate</structfield>. + </para> + </listitem> + <listitem> + <para> + The replication origin entry corresponding to each of the subscriptions + should exist in the old cluster. This can be found by checking + <link linkend="catalog-pg-subscription">pg_subscription</link> and + <link linkend="catalog-pg-replication-origin">pg_replication_origin</link> + system tables. + </para> + </listitem> + </itemizedlist> + + <para> + The subscriptions will be migrated to the new cluster in a disabled state. + After migration, do this: + </para> + + <itemizedlist> + <listitem> + <para> + Enable the subscriptions by executing + <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... ENABLE</command></link>. + </para> + </listitem> + <listitem> + <para> + Create all the new tables that were created in the publication during + upgrade and refresh the publication by executing + <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command></link>. + </para> + </listitem> + </itemizedlist> + </step> + <step> <title>Stop both servers</title> diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 2f6fc86c3d..4a3da80e49 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -11,15 +11,21 @@ #include "postgres.h" +#include "access/table.h" #include "catalog/binary_upgrade.h" #include "catalog/heap.h" #include "catalog/namespace.h" +#include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/extension.h" #include "miscadmin.h" #include "replication/logical.h" +#include "replication/origin.h" +#include "replication/worker_internal.h" #include "utils/array.h" #include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/syscache.h" #define CHECK_IS_BINARY_UPGRADE \ @@ -305,3 +311,115 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) PG_RETURN_BOOL(!found_pending_wal); } + +/* + * binary_upgrade_add_sub_rel_state + * + * Add the relation with the specified relation state to pg_subscription_rel + * catalog. + */ +Datum +binary_upgrade_add_sub_rel_state(PG_FUNCTION_ARGS) +{ + Relation rel; + HeapTuple tup; + Oid subid; + Form_pg_subscription form; + char *subname; + Oid relid; + char relstate; + XLogRecPtr sublsn; + + CHECK_IS_BINARY_UPGRADE; + + /* We must check these things before dereferencing the arguments */ + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2)) + elog(ERROR, "null argument to binary_upgrade_add_sub_rel_state is not allowed"); + + subname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + relid = PG_GETARG_OID(1); + relstate = PG_GETARG_CHAR(2); + sublsn = PG_ARGISNULL(3) ? InvalidXLogRecPtr : PG_GETARG_LSN(3); + + if (!OidIsValid(relid)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid relation identifier used: %u", relid)); + + tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("relation %u does not exist", relid)); + ReleaseSysCache(tup); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(subname)); + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription \"%s\" does not exist", subname)); + + form = (Form_pg_subscription) GETSTRUCT(tup); + subid = form->oid; + + AddSubscriptionRelState(subid, relid, relstate, sublsn); + + ReleaseSysCache(tup); + table_close(rel, RowExclusiveLock); + + PG_RETURN_VOID(); +} + +/* + * binary_upgrade_replorigin_advance + * + * Update the remote_lsn for the subscriber's replication origin. + */ +Datum +binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS) +{ + Relation rel; + HeapTuple tup; + Oid subid; + Form_pg_subscription form; + char *subname; + XLogRecPtr sublsn; + char originname[NAMEDATALEN]; + RepOriginId originid; + + CHECK_IS_BINARY_UPGRADE; + + /* We must check these things before dereferencing the arguments */ + if (PG_ARGISNULL(0)) + elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed"); + + subname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + sublsn = PG_ARGISNULL(1) ? InvalidXLogRecPtr : PG_GETARG_LSN(1); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(subname)); + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription \"%s\" does not exist", subname)); + + form = (Form_pg_subscription) GETSTRUCT(tup); + subid = form->oid; + + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); + originid = replorigin_by_name(originname, false); + replorigin_advance(originid, sublsn, InvalidXLogRecPtr, + false /* backward */ , + false /* WAL log */ ); + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + PG_RETURN_VOID(); +} diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index 8b0c1e7b53..764a39fcb9 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -24,6 +24,7 @@ #include "catalog/pg_operator_d.h" #include "catalog/pg_proc_d.h" #include "catalog/pg_publication_d.h" +#include "catalog/pg_subscription_d.h" #include "catalog/pg_type_d.h" #include "common/hashfn.h" #include "fe_utils/string_utils.h" @@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading subscriptions"); getSubscriptions(fout); + pg_log_info("reading subscription membership of tables"); + getSubscriptionTables(fout); + free(inhinfo); /* not needed any longer */ *numTablesPtr = numTables; @@ -978,6 +982,24 @@ findPublicationByOid(Oid oid) return (PublicationInfo *) dobj; } +/* + * findSubscriptionByOid + * finds the DumpableObject for the subscription with the given oid + * returns NULL if not found + */ +SubscriptionInfo * +findSubscriptionByOid(Oid oid) +{ + CatalogId catId; + DumpableObject *dobj; + + catId.tableoid = SubscriptionRelationId; + catId.oid = oid; + dobj = findObjectByCatalogId(catId); + Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION); + return (SubscriptionInfo *) dobj; +} + /* * recordExtensionMembership diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e863913849..c8a635330a 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -296,6 +296,7 @@ static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo); static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo); static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo); static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo); +static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo); static void dumpDatabase(Archive *fout); static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf, const char *dbname, Oid dboid); @@ -4581,6 +4582,99 @@ is_superuser(Archive *fout) return false; } +/* + * getSubscriptionTables + * Get information about subscription membership for dumpable tables. This + * will be used only in binary-upgrade mode. + */ +void +getSubscriptionTables(Archive *fout) +{ + DumpOptions *dopt = fout->dopt; + SubscriptionInfo *subinfo = NULL; + SubRelInfo *subrinfo; + PQExpBuffer query; + PGresult *res; + int i_srsubid; + int i_srrelid; + int i_srsubstate; + int i_srsublsn; + int i; + int cur_rel = 0; + int ntups; + Oid last_srsubid = InvalidOid; + + if (dopt->no_subscriptions || !dopt->binary_upgrade || + fout->remoteVersion < 170000) + return; + + query = createPQExpBuffer(); + appendPQExpBuffer(query, "SELECT srsubid, srrelid, srsubstate, srsublsn" + " FROM pg_catalog.pg_subscription_rel" + " ORDER BY srsubid"); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + if (ntups == 0) + goto cleanup; + + /* Get pg_subscription_rel attributes */ + i_srsubid = PQfnumber(res, "srsubid"); + i_srrelid = PQfnumber(res, "srrelid"); + i_srsubstate = PQfnumber(res, "srsubstate"); + i_srsublsn = PQfnumber(res, "srsublsn"); + + subrinfo = pg_malloc(ntups * sizeof(SubRelInfo)); + for (i = 0; i < ntups; i++) + { + Oid cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid)); + Oid relid = atooid(PQgetvalue(res, i, i_srrelid)); + TableInfo *tblinfo; + + /* + * If we switched to a new subscription, check if the subscription + * exists. + */ + if (cur_srsubid != last_srsubid) + { + subinfo = findSubscriptionByOid(cur_srsubid); + if (subinfo == NULL) + pg_fatal("subscription with OID %u does not exist", cur_srsubid); + + last_srsubid = cur_srsubid; + } + + tblinfo = findTableByOid(relid); + if (tblinfo == NULL) + pg_fatal("failed sanity check, table with OID %u not found", + relid); + + /* OK, make a DumpableObject for this relationship */ + subrinfo[cur_rel].dobj.objType = DO_SUBSCRIPTION_REL; + subrinfo[cur_rel].dobj.catId.tableoid = relid; + subrinfo[cur_rel].dobj.catId.oid = cur_srsubid; + AssignDumpId(&subrinfo[cur_rel].dobj); + subrinfo[cur_rel].dobj.name = pg_strdup(subinfo->dobj.name); + subrinfo[cur_rel].tblinfo = tblinfo; + subrinfo[cur_rel].srsubstate = PQgetvalue(res, i, i_srsubstate)[0]; + if (PQgetisnull(res, i, i_srsublsn)) + subrinfo[cur_rel].srsublsn = NULL; + else + subrinfo[cur_rel].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn)); + + subrinfo[cur_rel].subinfo = subinfo; + + /* Decide whether we want to dump it */ + selectDumpableObject(&(subrinfo[cur_rel].dobj), fout); + + cur_rel++; + } + +cleanup: + PQclear(res); + destroyPQExpBuffer(query); +} + /* * getSubscriptions * get information about subscriptions @@ -4607,6 +4701,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_suborigin; + int i_suboriginremotelsn; int i, ntups; @@ -4662,17 +4757,20 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " s.subpasswordrequired,\n" " s.subrunasowner,\n" - " s.suborigin\n"); + " s.suborigin,\n"); else appendPQExpBuffer(query, " 't' AS subpasswordrequired,\n" " 't' AS subrunasowner,\n" - " '%s' AS suborigin\n", + " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY); + appendPQExpBufferStr(query, "o.remote_lsn AS suboriginremotelsn\n"); appendPQExpBufferStr(query, - "FROM pg_subscription s\n" - "WHERE s.subdbid = (SELECT oid FROM pg_database\n" + "FROM pg_catalog.pg_subscription s\n" + "LEFT JOIN pg_catalog.pg_replication_origin_status o \n" + " ON o.external_id = 'pg_' || s.oid::text \n" + "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database\n" " WHERE datname = current_database())"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -4698,6 +4796,7 @@ getSubscriptions(Archive *fout) i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subpublications = PQfnumber(res, "subpublications"); i_suborigin = PQfnumber(res, "suborigin"); + i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4735,6 +4834,11 @@ getSubscriptions(Archive *fout) subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + if (PQgetisnull(res, i, i_suboriginremotelsn)) + subinfo[i].suboriginremotelsn = NULL; + else + subinfo[i].suboriginremotelsn = + pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4744,6 +4848,80 @@ getSubscriptions(Archive *fout) destroyPQExpBuffer(query); } +/* + * dumpSubscriptionTable + * Dump the definition of the given subscription table mapping. This will be + * used only in binary-upgrade mode. + */ +static void +dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo) +{ + DumpOptions *dopt = fout->dopt; + SubscriptionInfo *subinfo = subrinfo->subinfo; + PQExpBuffer query; + char *tag; + + /* Do nothing in data-only dump */ + if (dopt->dataOnly) + return; + + Assert(fout->dopt->binary_upgrade); + + tag = psprintf("%s %s", subinfo->dobj.name, subrinfo->dobj.name); + + query = createPQExpBuffer(); + + if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) + { + /* + * binary_upgrade_add_sub_rel_state will add the subscription relation + * to pg_subscription_rel table. This will be used only in + * binary-upgrade mode. + */ + if (fout->remoteVersion >= 170000) + { + appendPQExpBufferStr(query, + "\n-- For binary upgrade, must preserve the subscriber table.\n"); + appendPQExpBufferStr(query, + "SELECT pg_catalog.binary_upgrade_add_sub_rel_state("); + appendStringLiteralAH(query, subrinfo->dobj.name, fout); + appendPQExpBuffer(query, + ", %u, '%c'", + subrinfo->tblinfo->dobj.catId.oid, + subrinfo->srsubstate); + + if (subrinfo->srsublsn && subrinfo->srsublsn[0] != '\0') + appendPQExpBuffer(query, ", '%s'", + subrinfo->srsublsn); + else + appendPQExpBuffer(query, ", NULL"); + + appendPQExpBufferStr(query, ");\n"); + } + } + + /* + * There is no point in creating a drop query as the drop is done by table + * drop. (If you think to change this, see also _printTocEntry().) + * Although this object doesn't really have ownership as such, set the + * owner field anyway to ensure that the command is run by the correct + * role at restore time. + */ + if (subrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) + ArchiveEntry(fout, subrinfo->dobj.catId, subrinfo->dobj.dumpId, + ARCHIVE_OPTS(.tag = tag, + .namespace = subrinfo->tblinfo->dobj.namespace->dobj.name, + .owner = subinfo->rolname, + .description = "SUBSCRIPTION TABLE", + .section = SECTION_POST_DATA, + .createStmt = query->data)); + + /* These objects can't currently have comments or seclabels */ + + free(tag); + destroyPQExpBuffer(query); +} + /* * dumpSubscription * dump the definition of the given subscription @@ -4824,6 +5002,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBufferStr(query, ");\n"); + if (dopt->binary_upgrade && fout->remoteVersion >= 170000 && + subinfo->suboriginremotelsn) + { + appendPQExpBufferStr(query, + "\n-- For binary upgrade, must preserve the remote_lsn for the subscriber's replication origin.\n"); + appendPQExpBufferStr(query, + "SELECT pg_catalog.binary_upgrade_replorigin_advance("); + appendStringLiteralAH(query, subinfo->dobj.name, fout); + appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn); + } + if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, ARCHIVE_OPTS(.tag = subinfo->dobj.name, @@ -10442,6 +10631,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_SUBSCRIPTION: dumpSubscription(fout, (const SubscriptionInfo *) dobj); break; + case DO_SUBSCRIPTION_REL: + dumpSubscriptionTable(fout, (const SubRelInfo *) dobj); + break; case DO_PRE_DATA_BOUNDARY: case DO_POST_DATA_BOUNDARY: /* never dumped, nothing to do */ @@ -18508,6 +18700,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_PUBLICATION_REL: case DO_PUBLICATION_TABLE_IN_SCHEMA: case DO_SUBSCRIPTION: + case DO_SUBSCRIPTION_REL: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); break; diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 2fe3cbed9a..62b3d9249b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -83,6 +83,7 @@ typedef enum DO_PUBLICATION_REL, DO_PUBLICATION_TABLE_IN_SCHEMA, DO_SUBSCRIPTION, + DO_SUBSCRIPTION_REL, } DumpableObjectType; /* @@ -671,8 +672,21 @@ typedef struct _SubscriptionInfo char *subsynccommit; char *subpublications; char *suborigin; + char *suboriginremotelsn; } SubscriptionInfo; +/* + * The SubRelInfo struct is used to represent a subscription relation. + */ +typedef struct _SubRelInfo +{ + DumpableObject dobj; + SubscriptionInfo *subinfo; + TableInfo *tblinfo; + char srsubstate; + char *srsublsn; +} SubRelInfo; + /* * common utility functions */ @@ -697,6 +711,7 @@ extern CollInfo *findCollationByOid(Oid oid); extern NamespaceInfo *findNamespaceByOid(Oid oid); extern ExtensionInfo *findExtensionByOid(Oid oid); extern PublicationInfo *findPublicationByOid(Oid oid); +extern SubscriptionInfo *findSubscriptionByOid(Oid oid); extern void recordExtensionMembership(CatalogId catId, ExtensionInfo *ext); extern ExtensionInfo *findOwningExtension(CatalogId catalogId); @@ -756,5 +771,6 @@ extern void getPublicationNamespaces(Archive *fout); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); extern void getSubscriptions(Archive *fout); +extern void getSubscriptionTables(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index abfea15c09..e8d9c8ac86 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -94,6 +94,7 @@ enum dbObjectTypePriorities PRIO_PUBLICATION_REL, PRIO_PUBLICATION_TABLE_IN_SCHEMA, PRIO_SUBSCRIPTION, + PRIO_SUBSCRIPTION_REL, PRIO_DEFAULT_ACL, /* done in ACL pass */ PRIO_EVENT_TRIGGER, /* must be next to last! */ PRIO_REFRESH_MATVIEW /* must be last! */ @@ -147,10 +148,11 @@ static const int dbObjectTypePriority[] = PRIO_PUBLICATION, /* DO_PUBLICATION */ PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */ PRIO_PUBLICATION_TABLE_IN_SCHEMA, /* DO_PUBLICATION_TABLE_IN_SCHEMA */ - PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */ + PRIO_SUBSCRIPTION, /* DO_SUBSCRIPTION */ + PRIO_SUBSCRIPTION_REL /* DO_SUBSCRIPTION_REL */ }; -StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1), +StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION_REL + 1), "array length mismatch"); static DumpId preDataBoundId; @@ -1472,6 +1474,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "SUBSCRIPTION (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_SUBSCRIPTION_REL: + snprintf(buf, bufsize, + "SUBSCRIPTION TABLE (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_PRE_DATA_BOUNDARY: snprintf(buf, bufsize, "PRE-DATA BOUNDARY (ID %d)", diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index fa52aa2c22..96e35602ac 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -20,6 +20,7 @@ static void check_is_install_user(ClusterInfo *cluster); static void check_proper_datallowconn(ClusterInfo *cluster); static void check_for_prepared_transactions(ClusterInfo *cluster); static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster); +static void check_for_subscription_state(ClusterInfo *cluster); static void check_for_user_defined_postfix_ops(ClusterInfo *cluster); static void check_for_incompatible_polymorphics(ClusterInfo *cluster); static void check_for_tables_with_oids(ClusterInfo *cluster); @@ -111,6 +112,7 @@ check_and_dump_old_cluster(bool live_check) check_for_composite_data_type_usage(&old_cluster); check_for_reg_data_type_usage(&old_cluster); check_for_isn_and_int8_passing_mismatch(&old_cluster); + check_for_subscription_state(&old_cluster); /* * Logical replication slots can be migrated since PG17. See comments atop @@ -812,6 +814,127 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) check_ok(); } +/* + * check_for_subscription_state() + * + * Verify that each of the subscriptions has all their corresponding tables in + * i (initialize), r (ready) or s (synchronized) state. + */ +static void +check_for_subscription_state(ClusterInfo *cluster) +{ + int dbnum; + FILE *script = NULL; + char output_path[MAXPGPATH]; + int ntup; + + /* Subscription relations state can be migrated since PG17. */ + if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) + return; + + prep_status("Checking for subscription state"); + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + "subscription_state.txt"); + for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + { + PGresult *res; + DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; + PGconn *conn = connectToServer(cluster, active_db->db_name); + + /* We need to check for pg_replication_origin only once. */ + if (dbnum == 0) + { + /* + * Check that all the subscriptions have their respective + * replication origin. + */ + res = executeQueryOrDie(conn, + "SELECT d.datname, s.subname " + "FROM pg_catalog.pg_subscription s " + "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " + " ON o.roname = 'pg_' || s.oid " + "INNER JOIN pg_catalog.pg_database d " + " ON d.oid = s.subdbid " + "WHERE o.roname iS NULL;"); + + ntup = PQntuples(res); + for (int i = 0; i < ntup; i++) + { + if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %s", + output_path, strerror(errno)); + fprintf(script, "replication origin is missing for database:%s subscription:%s\n", + PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + } + PQclear(res); + } + + /* + * A slot not created yet refers to the 'i' (initialize) state, while + * 'r' (ready) and 's' (synchronized) states refer to a slot created + * previously but already dropped. These states are supported states + * for upgrade. The other states listed below are not ok: + * + * a) SUBREL_STATE_DATASYNC:A relation upgraded while in this state + * would retain a replication slot, which could not be dropped by the + * sync worker spawned after the upgrade because the subscription ID + * tracked by the publisher does not match anymore. + * + * b) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on + * a relation upgraded while in this state would expect an origin ID + * with the OID of the subscription used before the upgrade, causing + * it to fail. + * + * c) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and + * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, + * so we need not allow these states. + */ + res = executeQueryOrDie(conn, + "SELECT s.subname, n.nspname, c.relname, r.srsubstate " + "FROM pg_catalog.pg_subscription_rel r " + "LEFT JOIN pg_catalog.pg_subscription s" + " ON r.srsubid = s.oid " + "LEFT JOIN pg_catalog.pg_class c" + " ON r.srrelid = c.oid " + "LEFT JOIN pg_catalog.pg_namespace n" + " ON c.relnamespace = n.oid " + "WHERE r.srsubstate NOT IN ('i', 'r', 's') " + "ORDER BY s.subname"); + + ntup = PQntuples(res); + for (int i = 0; i < ntup; i++) + { + if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %s", + output_path, strerror(errno)); + + fprintf(script, "database:%s subscription:%s schema:%s relation:%s state:%s not in required state\n", + active_db->db_name, + PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1), + PQgetvalue(res, i, 2), + PQgetvalue(res, i, 3)); + } + + PQclear(res); + PQfinish(conn); + } + + if (script) + { + fclose(script); + pg_log(PG_REPORT, "fatal"); + pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize), r (ready) or s (synchronized) state.\n" + "A list of the problem subscriptions is in the file:\n" + " %s", output_path); + } + else + check_ok(); +} + /* * Verify that no user defined postfix operators exist. */ diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build index 3e8a08e062..32f12f9e27 100644 --- a/src/bin/pg_upgrade/meson.build +++ b/src/bin/pg_upgrade/meson.build @@ -43,6 +43,7 @@ tests += { 't/001_basic.pl', 't/002_pg_upgrade.pl', 't/003_logical_slots.pl', + 't/004_subscription.pl', ], 'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow }, diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl new file mode 100644 index 0000000000..ebc6b75257 --- /dev/null +++ b/src/bin/pg_upgrade/t/004_subscription.pl @@ -0,0 +1,261 @@ +# Copyright (c) 2023, PostgreSQL Global Development Group + +# Test for pg_upgrade of logical subscription +use strict; +use warnings; + +use File::Find qw(find); +use File::Path qw(rmtree); + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Can be changed to test the other modes. +my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy'; + +# Initialize publisher node +my $publisher = PostgreSQL::Test::Cluster->new('publisher'); +$publisher->init(allows_streaming => 'logical'); +$publisher->start; + +# Initialize the old subscriber node +my $old_sub = PostgreSQL::Test::Cluster->new('old_sub'); +$old_sub->init; +$old_sub->start; + +# Initialize the new subscriber +my $new_sub = PostgreSQL::Test::Cluster->new('new_sub'); +$new_sub->init; +my $bindir = $new_sub->config_data('--bindir'); + +sub insert_line_at_pub +{ + my $payload = shift; + + foreach ("tab_upgraded", "tab_not_upgraded") + { + $publisher->safe_psql('postgres', + "INSERT INTO " . $_ . " (val) VALUES('$payload')"); + } +} + +# Initial setup +foreach ("tab_upgraded", "tab_not_upgraded") +{ + $publisher->safe_psql('postgres', + "CREATE TABLE " . $_ . " (id serial, val text)"); + $old_sub->safe_psql('postgres', + "CREATE TABLE " . $_ . " (id serial, val text)"); +} +insert_line_at_pub('before initial sync'); + +# Setup logical replication, replicating only 1 table +my $connstr = $publisher->connstr . ' dbname=postgres'; + +$publisher->safe_psql('postgres', + "CREATE PUBLICATION regress_pub FOR TABLE tab_upgraded"); + +$old_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub CONNECTION '$connstr' PUBLICATION regress_pub" +); + +# Wait for the catchup, as we need the subscription rel in ready state +$old_sub->wait_for_subscription_sync($publisher, 'regress_sub'); + +$publisher->safe_psql('postgres', + "INSERT INTO tab_upgraded VALUES (generate_series(2,50), 'after initial sync')" +); +$publisher->wait_for_catchup('regress_sub'); + +# ------------------------------------------------------ +# Check that pg_upgrade is successful when all tables are in ready state. +# ------------------------------------------------------ +my $synced_query = + "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'r'"; +$old_sub->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +command_ok( + [ + 'pg_upgrade', '--no-sync', '-d', $old_sub->data_dir, + '-D', $new_sub->data_dir, '-b', $bindir, + '-B', $bindir, '-s', $new_sub->host, + '-p', $old_sub->port, '-P', $new_sub->port, + $mode, '--check', + ], + 'run of pg_upgrade --check for old instance when the subscription tables are in ready state' +); +ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d", + "pg_upgrade_output.d/ removed after successful pg_upgrade"); + +# Check the number of rows for each table on each server +my $result = + $publisher->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded"); +is($result, qq(50), "check initial tab_upgraded table data on publisher"); +$result = + $publisher->safe_psql('postgres', "SELECT count(*) FROM tab_not_upgraded"); +is($result, qq(1), "check initial tab_upgraded table data on publisher"); +$result = + $old_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded"); +is($result, qq(50), + "check initial tab_upgraded table data on the old subscriber"); +$result = + $old_sub->safe_psql('postgres', "SELECT count(*) FROM tab_not_upgraded"); +is($result, qq(0), + "check initial tab_not_upgraded table data on the old subscriber"); + +# ------------------------------------------------------ +# Check that pg_upgrade refuses to run if there's a subscription with tables in +# a state different than 'r' (ready), 'i' (init) and 's' (synchronized). +# ------------------------------------------------------ +$publisher->safe_psql('postgres', + "CREATE TABLE tab_primary_key(id serial PRIMARY KEY, val text);"); +$old_sub->safe_psql('postgres', + "CREATE TABLE tab_primary_key(id serial PRIMARY KEY, val text);"); +$publisher->safe_psql('postgres', + "INSERT INTO tab_primary_key values(1, 'before initial sync')"); + +# Insert the same value that is already present in publisher to the primary key +# column of subscriber so that the table sync will fail. +$old_sub->safe_psql('postgres', + "INSERT INTO tab_primary_key values(1, 'before initial sync')"); + +$publisher->safe_psql('postgres', + "ALTER PUBLICATION regress_pub ADD TABLE tab_primary_key"); +$old_sub->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_sub REFRESH PUBLICATION"); + +# Table will be in 'd' (data is being copied) state as table sync will fail +# because of primary key constraint error. +my $started_query = + "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'd';"; +$old_sub->poll_query_until('postgres', $started_query) + or die + "Timed out while waiting for the table state to become 'd' (datasync)"; + +command_fails( + [ + 'pg_upgrade', '--no-sync', '-d', $old_sub->data_dir, + '-D', $new_sub->data_dir, '-b', $bindir, + '-B', $bindir, '-s', $new_sub->host, + '-p', $old_sub->port, '-P', $new_sub->port, + $mode, '--check', + ], + 'run of pg_upgrade --check for old instance with relation in \'d\' datasync(invalid) state' +); + +# Verify the reason why the subscriber cannot be upgraded +my $sub_relstate_filename; + +# Find a txt file that contains a list of tables that cannot be upgraded. We +# cannot predict the file's path because the output directory contains a +# milliseconds timestamp. File::Find::find must be used. +find( + sub { + if ($File::Find::name =~ m/subscription_state\.txt/) + { + $sub_relstate_filename = $File::Find::name; + } + }, + $new_sub->data_dir . "/pg_upgrade_output.d"); + +# Check the file content which should have tab_primary_key table in invalid +# state. +like( + slurp_file($sub_relstate_filename), + qr/database:postgres subscription:regress_sub schema:public relation:tab_primary_key state:d not in required state/m, + 'the previous test failed due to subscription table in invalid state'); + +# Delete the table data so that the primary key violation error will not happen +# and tab_primary_key reaches ready state. +$old_sub->safe_psql('postgres', "DELETE FROM tab_primary_key"); + +$synced_query = + "SELECT count(1) = 2 FROM pg_subscription_rel WHERE srsubstate = 'r'"; +$old_sub->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# ------------------------------------------------------ +# The incremental changes added to the publisher are replicated after upgrade. +# ------------------------------------------------------ + +# Stop the old subscriber, insert a row in tab_upgraded and tab_not_upgraded +# publisher table while it's down and add tab_not_upgraded to the publication. +my $remote_lsn = $old_sub->safe_psql('postgres', + "SELECT remote_lsn FROM pg_replication_origin_status"); +$old_sub->stop; + +insert_line_at_pub('while old_sub is down'); + +# Run pg_upgrade +command_ok( + [ + 'pg_upgrade', '--no-sync', '-d', $old_sub->data_dir, + '-D', $new_sub->data_dir, '-b', $bindir, + '-B', $bindir, '-s', $new_sub->host, + '-p', $old_sub->port, '-P', $new_sub->port, + $mode, + ], + 'run of pg_upgrade for new sub'); + +$publisher->safe_psql('postgres', + "ALTER PUBLICATION regress_pub ADD TABLE tab_not_upgraded"); + +$new_sub->start; + +# Subscription relations and replication origin remote_lsn should be preserved +$result = + $new_sub->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(2), + "There should be 2 rows in pg_subscription_rel(representing tab_upgraded and tab_primary_key)" +); + +$result = $new_sub->safe_psql('postgres', + "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s where os.external_id = 'pg_' || s.oid" +); +is($result, qq($remote_lsn), "remote_lsn should have been preserved"); + +# There should be no new replicated rows before enabling the subscription +$result = + $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded"); +is($result, qq(50), + "tab_upgraded table has no new replicated rows before enabling the subscription" +); +$result = + $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_not_upgraded"); +is($result, qq(0), + "no change in tab_not_upgraded table which is not part of the publication" +); + +# Enable the subscription +$new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub ENABLE"); + +$publisher->wait_for_catchup('regress_sub'); + +# Rows on tab_upgraded should have been replicated, while nothing should happen +# for tab_not_upgraded. +$result = + $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded"); +is($result, qq(51), "check replicated inserts on new subscriber"); +$result = + $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_not_upgraded"); +is($result, qq(0), + "no change in table tab_not_upgraded afer enable subscription which is not part of the publication" +); + +# Refresh the subscription, only the missing row on tab_not_upgraded should be +# replicated. +$new_sub->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_sub REFRESH PUBLICATION"); +$new_sub->wait_for_subscription_sync($publisher, 'regress_sub'); +$result = + $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded"); +is($result, qq(51), + "check there is no change when there was no changes replicated"); +$result = + $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_not_upgraded"); +is($result, qq(2), + "check replicated inserts on new subscriber after refreshing"); + +done_testing(); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f14aed422a..c7bf3cbd55 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11382,6 +11382,16 @@ provolatile => 'v', proparallel => 'u', prorettype => 'bool', proargtypes => 'name', prosrc => 'binary_upgrade_logical_slot_has_caught_up' }, +{ oid => '8404', descr => 'for use by pg_upgrade (relation for pg_subscription_rel)', + proname => 'binary_upgrade_add_sub_rel_state', proisstrict => 'f', + provolatile => 'v', proparallel => 'u', prorettype => 'void', + proargtypes => 'text oid char pg_lsn', + prosrc => 'binary_upgrade_add_sub_rel_state' }, +{ oid => '8405', descr => 'for use by pg_upgrade (remote_lsn for origin)', + proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f', + provolatile => 'v', proparallel => 'u', prorettype => 'void', + proargtypes => 'text pg_lsn', + prosrc => 'binary_upgrade_replorigin_advance' }, # conversion functions { oid => '4302', diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index bf50a32119..a4946b40b1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2660,6 +2660,7 @@ SubLinkType SubOpts SubPlan SubPlanState +SubRelInfo SubRemoveRels SubTransactionId SubXactCallback -- 2.34.1