Hi,

On Thu, Mar 09, 2023 at 12:05:36PM +0530, Amit Kapila wrote:
> On Wed, Mar 8, 2023 at 12:26 PM Julien Rouhaud <rjuju...@gmail.com> wrote:
> >
> > Is there something that can be done for pg16? I was thinking that having a
> > fix for the normal and easy case could be acceptable: only allowing
> > pg_upgrade to optionally, and not by default, preserve the subscription
> > relations IFF all subscriptions only have tables in ready state. Different
> > states should be transient, and it's easy to check as a user beforehand and
> > also easy to check during pg_upgrade, so it seems like an acceptable
> > limitations (which I personally see as a good sanity check, but YMMV). It
> > could be lifted in later releases if wanted anyway.
> >
> > It's unclear to me whether this limited scope would also require to
> > preserve the replication origins, but having looked at the code I don't
> > think it would be much of a problem as the local LSN doesn't have to be
> > preserved.
> >
>
> I think we need to preserve replication origins as they help us to
> determine the WAL location from where to start the streaming after the
> upgrade. If we don't preserve those then from which location will the
> subscriber start streaming?

It would start from the slot's information on the publisher side, but I guess
there's no guarantee that this will be accurate in all cases.

> We don't want to replicate the WAL which
> has already been sent.

Yeah I agree.  I added support to also preserve the subscription's replication
origin information, a new --preserve-subscription-state (better naming welcome)
documented option for pg_upgrade to optionally ask for this new mode, and a
similar (but undocumented) option for pg_dump that only works with
--binary-upgrade and added a check in pg_upgrade that all relations are in 'r'
(ready) mode.  Patch v2 attached.
>From 0a77ac305243e0f58dbfce6bb7c8cf062b45d4f4 Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouh...@free.fr>
Date: Wed, 22 Feb 2023 09:19:32 +0800
Subject: [PATCH v2] Optionally preserve the full subscription's state during
 pg_upgrade

Previously, only the subscription metadata information was preserved.  Without
the list of relations and their state it's impossible to re-enable the
subscriptions without missing some records as the list of relations can only be
refreshed after enabling the subscription (and therefore starting the apply
worker).  Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relations are new on the publication
side, and therefore should be fully synced, and which shouldn't.

Similarly, the subscription's replication origin are needed to ensure
that we don't replicate anything twice.

To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit
additional commands to be able to restore the content of pg_subscription_rel,
and addition LSN parameter in the subscription creation to restore the
underlying replication origin remote LSN.  The LSN parameter is only accepted
in CREATE SUBSCRIPTION in binary upgrade mode.

The new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has
the following syntax:

ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y'])

The relation is identified by its oid, as it's preserved during pg_upgrade.
The lsn is optional, and defaults to NULL / InvalidXLogRecPtr.

This mode is optional and not enabled by default.  A new
--preserve-subscription-state option is added to pg_upgrade to use it.  For
now, pg_upgrade will check that all the subscription relations are in 'r'
(ready) state, and will error out if any subscription relation in any database
has a different state, logging the list of problematic databases with the
number of problematic relation in each.

Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
 doc/src/sgml/ref/pgupgrade.sgml         |  13 +++
 src/backend/commands/subscriptioncmds.c |  67 +++++++++++++-
 src/backend/parser/gram.y               |  11 +++
 src/bin/pg_dump/pg_backup.h             |   2 +
 src/bin/pg_dump/pg_dump.c               | 114 +++++++++++++++++++++++-
 src/bin/pg_dump/pg_dump.h               |  13 +++
 src/bin/pg_upgrade/check.c              |  54 +++++++++++
 src/bin/pg_upgrade/dump.c               |   3 +-
 src/bin/pg_upgrade/option.c             |   7 ++
 src/bin/pg_upgrade/pg_upgrade.h         |   1 +
 src/include/nodes/parsenodes.h          |   3 +-
 11 files changed, 283 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..aef3b8a8b8 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,19 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--preserve-subscription-state</option></term>
+      <listitem>
+       <para>
+        Fully preserve the logical subscription state if any.  That include the
+        underlying replication origin with their remote LSN and the list of
+        relations in each subscription.  If any of the subscription on the old
+        cluster has any relation in a state different from <literal>r</literal>
+        (ready), the <application>pg_upgrade</application> run will error.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 464db6d247..75278991e4 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,8 @@
 #define SUBOPT_DISABLE_ON_ERR          0x00000400
 #define SUBOPT_LSN                                     0x00000800
 #define SUBOPT_ORIGIN                          0x00001000
+#define SUBOPT_RELID                           0x00002000
+#define SUBOPT_STATE                           0x00004000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +92,8 @@ typedef struct SubOpts
        bool            disableonerr;
        char       *origin;
        XLogRecPtr      lsn;
+       Oid                     relid;
+       char            state;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -324,6 +328,38 @@ parse_subscription_options(ParseState *pstate, List 
*stmt_options,
                        opts->specified_opts |= SUBOPT_LSN;
                        opts->lsn = lsn;
                }
+               else if (IsSet(supported_opts, SUBOPT_RELID) &&
+                                strcmp(defel->defname, "relid") == 0)
+               {
+                       Oid                     relid = defGetObjectId(defel);
+
+                       if (IsSet(opts->specified_opts, SUBOPT_RELID))
+                               errorConflictingDefElem(defel, pstate);
+
+                       if (!OidIsValid(relid))
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid relation 
identifier used")));
+
+                       opts->specified_opts |= SUBOPT_RELID;
+                       opts->relid = relid;
+               }
+               else if (IsSet(supported_opts, SUBOPT_STATE) &&
+                                strcmp(defel->defname, "state") == 0)
+               {
+                       char       *state_str = defGetString(defel);
+
+                       if (IsSet(opts->specified_opts, SUBOPT_STATE))
+                               errorConflictingDefElem(defel, pstate);
+
+                       if (strlen(state_str) != 1)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid relation state 
used")));
+
+                       opts->specified_opts |= SUBOPT_STATE;
+                       opts->state = defGetString(defel)[0];
+               }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -550,6 +586,7 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
        List       *publications;
        bits32          supported_opts;
        SubOpts         opts = {0};
+       RepOriginId     originid;
 
        /*
         * Parse and check options.
@@ -561,6 +598,8 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                                          SUBOPT_SYNCHRONOUS_COMMIT | 
SUBOPT_BINARY |
                                          SUBOPT_STREAMING | 
SUBOPT_TWOPHASE_COMMIT |
                                          SUBOPT_DISABLE_ON_ERR | 
SUBOPT_ORIGIN);
+       if(IsBinaryUpgrade)
+               supported_opts |= SUBOPT_LSN;
        parse_subscription_options(pstate, stmt->options, supported_opts, 
&opts);
 
        /*
@@ -659,7 +698,12 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
        recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
        ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, 
sizeof(originname));
-       replorigin_create(originname);
+       originid = replorigin_create(originname);
+
+       if (IsBinaryUpgrade && IsSet(opts.lsn, SUBOPT_LSN))
+               replorigin_advance(originid, opts.lsn, InvalidXLogRecPtr,
+                                                       false /* backward */ ,
+                                                       false /* WAL log */ );
 
        /*
         * Connect to remote side to execute requested commands and fetch table
@@ -1341,6 +1385,27 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                break;
                        }
 
+               case ALTER_SUBSCRIPTION_ADD_TABLE:
+                       {
+                               if (!IsBinaryUpgrade)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_SYNTAX_ERROR)),
+                                                       errmsg("ALTER 
SUBSCRIPTION ... ADD TABLE is not supported"));
+
+                               supported_opts = SUBOPT_RELID | SUBOPT_STATE | 
SUBOPT_LSN;
+                               parse_subscription_options(pstate, 
stmt->options,
+                                                                               
   supported_opts, &opts);
+
+                               /* relid and state should always be provided. */
+                               Assert(IsSet(opts.specified_opts, 
SUBOPT_RELID));
+                               Assert(IsSet(opts.specified_opts, 
SUBOPT_STATE));
+
+                               AddSubscriptionRelState(subid, opts.relid, 
opts.state,
+                                                                               
opts.lsn);
+
+                               break;
+                       }
+
                default:
                        elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
                                 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..0a3448c487 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10670,6 +10670,17 @@ AlterSubscriptionStmt:
                                        n->options = $5;
                                        $$ = (Node *) n;
                                }
+                       /* for binary upgrade only */
+                       | ALTER SUBSCRIPTION name ADD_P TABLE definition
+                               {
+                                       AlterSubscriptionStmt *n =
+                                               makeNode(AlterSubscriptionStmt);
+
+                                       n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
+                                       n->subname = $3;
+                                       n->options = $6;
+                                       $$ = (Node *) n;
+                               }
                ;
 
 /*****************************************************************************
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..8a72a39d60 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -200,6 +200,8 @@ typedef struct _dumpOptions
 
        int                     sequence_data;  /* dump sequence data even in 
schema-only mode */
        int                     do_nothing;
+
+       bool            preserve_subscriptions;
 } DumpOptions;
 
 /*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 4217908f84..c6499a3d24 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -421,6 +421,7 @@ main(int argc, char **argv)
                {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
                {"rows-per-insert", required_argument, NULL, 10},
                {"include-foreign-data", required_argument, NULL, 11},
+               {"preserve-subscription-state", no_argument, NULL, 12},
 
                {NULL, 0, NULL, 0}
        };
@@ -631,6 +632,10 @@ main(int argc, char **argv)
                                                                                
  optarg);
                                break;
 
+                       case 12:                        /* include full 
subscription state */
+                               dopt.preserve_subscriptions = true;
+                               break;
+
                        default:
                                /* getopt_long already emitted a complaint */
                                pg_log_error_hint("Try \"%s --help\" for more 
information.", progname);
@@ -688,6 +693,10 @@ main(int argc, char **argv)
        if (dopt.do_nothing && dopt.dump_inserts == 0)
                pg_fatal("option --on-conflict-do-nothing requires option 
--inserts, --rows-per-insert, or --column-inserts");
 
+       /* --preserve-subscription-state requires --binary-upgrade */
+       if (dopt.preserve_subscriptions && !dopt.binary_upgrade)
+               pg_fatal("option --preserve-subscription-state requires option 
--binary-upgrade");
+
        /* Identify archive format to emit */
        archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -4485,6 +4494,69 @@ is_superuser(Archive *fout)
        return false;
 }
 
+/*
+ * getSubscriptionRels
+ *       get information about the given subscription's relations
+ */
+static SubRelInfo *
+getSubscriptionRels(Archive *fout, Oid subid, int *nrels)
+{
+       SubRelInfo *rels;
+       PQExpBuffer query;
+       PGresult   *res;
+       int                     i_srrelid;
+       int                     i_srsubstate;
+       int                     i_srsublsn;
+       int                     i,
+                               ntups;
+
+       if (!fout->dopt->binary_upgrade || !fout->dopt->preserve_subscriptions)
+       {
+               *nrels = 0;
+
+               return NULL;
+       }
+
+       query = createPQExpBuffer();
+
+       appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn "
+                                                               " FROM 
pg_subscription_rel"
+                                                               " WHERE srsubid 
= %u", subid);
+
+       res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+       ntups = PQntuples(res);
+       *nrels = ntups;
+
+       if (ntups == 0)
+       {
+               rels = NULL;
+               goto cleanup;
+       }
+
+       /*
+        * Get subscription relation fields.
+        */
+       i_srrelid = PQfnumber(res, "srrelid");
+       i_srsubstate = PQfnumber(res, "srsubstate");
+       i_srsublsn = PQfnumber(res, "srsublsn");
+
+       rels = pg_malloc(ntups * sizeof(SubRelInfo));
+
+       for (i = 0; i < ntups; i++)
+       {
+               rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid));
+               rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+               rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+       }
+
+cleanup:
+       PQclear(res);
+       destroyPQExpBuffer(query);
+
+       return rels;
+}
+
 /*
  * getSubscriptions
  *       get information about subscriptions
@@ -4509,6 +4581,7 @@ getSubscriptions(Archive *fout)
        int                     i_subsynccommit;
        int                     i_subpublications;
        int                     i_subbinary;
+       int                     i_suboriginremotelsn;
        int                     i,
                                ntups;
 
@@ -4561,12 +4634,16 @@ getSubscriptions(Archive *fout)
                                                  
LOGICALREP_TWOPHASE_STATE_DISABLED);
 
        if (fout->remoteVersion >= 160000)
-               appendPQExpBufferStr(query, " s.suborigin\n");
+               appendPQExpBufferStr(query, " s.suborigin,\n");
        else
-               appendPQExpBuffer(query, " '%s' AS suborigin\n", 
LOGICALREP_ORIGIN_ANY);
+               appendPQExpBuffer(query, " '%s' AS suborigin,\n", 
LOGICALREP_ORIGIN_ANY);
+
+       appendPQExpBufferStr(query, "o.remote_lsn\n");
 
        appendPQExpBufferStr(query,
                                                 "FROM pg_subscription s\n"
+                                                "LEFT JOIN 
pg_replication_origin_status o \n"
+                                                "    ON o.external_id = 'pg_' 
|| s.oid::text \n"
                                                 "WHERE s.subdbid = (SELECT oid 
FROM pg_database\n"
                                                 "                   WHERE 
datname = current_database())");
 
@@ -4591,6 +4668,7 @@ getSubscriptions(Archive *fout)
        i_subtwophasestate = PQfnumber(res, "subtwophasestate");
        i_subdisableonerr = PQfnumber(res, "subdisableonerr");
        i_suborigin = PQfnumber(res, "suborigin");
+       i_suboriginremotelsn = PQfnumber(res, "remote_lsn");
 
        subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4621,6 +4699,15 @@ getSubscriptions(Archive *fout)
                subinfo[i].subdisableonerr =
                        pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
                subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, 
i_suborigin));
+               if (PQgetisnull(res, i, i_suboriginremotelsn))
+                       subinfo[i].suboriginremotelsn = NULL;
+               else
+                       subinfo[i].suboriginremotelsn =
+                               pg_strdup(PQgetvalue(res, i, 
i_suboriginremotelsn));
+
+               subinfo[i].subrels = getSubscriptionRels(fout,
+                                                                               
                 subinfo[i].dobj.catId.oid,
+                                                                               
                 &subinfo[i].nrels);
 
                /* Decide whether we want to dump it */
                selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4702,9 +4789,31 @@ dumpSubscription(Archive *fout, const SubscriptionInfo 
*subinfo)
        if (strcmp(subinfo->subsynccommit, "off") != 0)
                appendPQExpBuffer(query, ", synchronous_commit = %s", 
fmtId(subinfo->subsynccommit));
 
+       if (dopt->binary_upgrade && dopt->preserve_subscriptions &&
+               subinfo->suboriginremotelsn)
+       {
+               appendPQExpBuffer(query, ", lsn = '%s'", 
subinfo->suboriginremotelsn);
+       }
+
        appendPQExpBufferStr(query, ");\n");
 
        if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+       {
+               for (i = 0; i < subinfo->nrels; i++)
+               {
+                       appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD 
TABLE "
+                                                                        
"(relid = %u, state = '%c'",
+                                                                        
qsubname,
+                                                                        
subinfo->subrels[i].srrelid,
+                                                                        
subinfo->subrels[i].srsubstate);
+
+                       if (subinfo->subrels[i].srsublsn[0] != '\0')
+                               appendPQExpBuffer(query, ", LSN = '%s'",
+                                                                 
subinfo->subrels[i].srsublsn);
+
+                       appendPQExpBufferStr(query, ");");
+               }
+
                ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
                                         ARCHIVE_OPTS(.tag = subinfo->dobj.name,
                                                                  .owner = 
subinfo->rolname,
@@ -4712,6 +4821,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo 
*subinfo)
                                                                  .section = 
SECTION_POST_DATA,
                                                                  .createStmt = 
query->data,
                                                                  .dropStmt = 
delq->data));
+       }
 
        if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
                dumpComment(fout, "SUBSCRIPTION", qsubname,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index cdca0b993d..43ab4acf35 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -645,6 +645,16 @@ typedef struct _PublicationSchemaInfo
        NamespaceInfo *pubschema;
 } PublicationSchemaInfo;
 
+/*
+ * The SubRelInfo struct is used to represent subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+       Oid             srrelid;
+       char    srsubstate;
+       char   *srsublsn;
+} SubRelInfo;
+
 /*
  * The SubscriptionInfo struct is used to represent subscription.
  */
@@ -661,6 +671,9 @@ typedef struct _SubscriptionInfo
        char       *suborigin;
        char       *subsynccommit;
        char       *subpublications;
+       char       *suboriginremotelsn;
+       int                     nrels;
+       SubRelInfo *subrels;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 7cf68dc9af..74806bf2cc 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -23,6 +23,7 @@ static void check_is_install_user(ClusterInfo *cluster);
 static void check_proper_datallowconn(ClusterInfo *cluster);
 static void check_for_prepared_transactions(ClusterInfo *cluster);
 static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
+static void check_for_subscription_rels(ClusterInfo *cluster);
 static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
 static void check_for_incompatible_polymorphics(ClusterInfo *cluster);
 static void check_for_tables_with_oids(ClusterInfo *cluster);
@@ -107,6 +108,8 @@ check_and_dump_old_cluster(bool live_check)
        check_for_composite_data_type_usage(&old_cluster);
        check_for_reg_data_type_usage(&old_cluster);
        check_for_isn_and_int8_passing_mismatch(&old_cluster);
+       if (user_opts.preserve_subscriptions)
+               check_for_subscription_rels(&old_cluster);
 
        /*
         * PG 16 increased the size of the 'aclitem' type, which breaks the 
on-disk
@@ -907,6 +910,57 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                check_ok();
 }
 
+/*
+ * check_for_subscription_rels()
+ *
+ * Verify that no table in a subscription is in a state different than ready.
+ */
+static void
+check_for_subscription_rels(ClusterInfo *cluster)
+{
+       int                     dbnum;
+       bool            is_error = false;
+
+       Assert(user_opts.preserve_subscriptions);
+
+       /* No subscription before pg10. */
+       if (GET_MAJOR_VERSION(cluster->major_version < 1000))
+               return;
+
+       prep_status("Checking for non-ready subscription relations");
+
+       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       {
+               PGresult   *res;
+               int                     nb;
+               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
+               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+
+               res = executeQueryOrDie(conn,
+                                                               "SELECT 
count(0) "
+                                                               "FROM 
pg_catalog.pg_subscription_rel "
+                                                               "WHERE 
srsubstate != 'r'");
+
+               if (PQntuples(res) != 1)
+                       pg_fatal("could not determine the number of non-ready 
subscription relations");
+
+               nb = atooid(PQgetvalue(res, 0, 0));
+               if (nb != 0)
+               {
+                       is_error = true;
+                       pg_log(PG_WARNING,
+                                  "\nWARNING: database \"%s\" has %d 
subscription "
+                                  "relations(s) in non-ready state", 
active_db->db_name, nb);
+               }
+       }
+
+       if (is_error)
+               pg_fatal("--preserve-subscription-state is incompatible with "
+                               "subscription relations in non-ready state");
+
+       check_ok();
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..9284576af7 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -53,9 +53,10 @@ generate_old_dump(void)
 
                parallel_exec_prog(log_file_name, NULL,
                                                   "\"%s/pg_dump\" %s 
--schema-only --quote-all-identifiers "
-                                                  "--binary-upgrade 
--format=custom %s --file=\"%s/%s\" %s",
+                                                  "--binary-upgrade 
--format=custom %s %s --file=\"%s/%s\" %s",
                                                   new_cluster.bindir, 
cluster_conn_opts(&old_cluster),
                                                   log_opts.verbose ? 
"--verbose" : "",
+                                                  
user_opts.preserve_subscriptions ? "--preserve-subscription-state" : "",
                                                   log_opts.dumpdir,
                                                   sql_file_name, 
escaped_connstr.data);
 
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 8869b6b60d..b033aa26ba 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
                {"verbose", no_argument, NULL, 'v'},
                {"clone", no_argument, NULL, 1},
                {"copy", no_argument, NULL, 2},
+               {"preserve-subscription-state", no_argument, NULL, 3},
 
                {NULL, 0, NULL, 0}
        };
@@ -66,6 +67,7 @@ parseCommandLine(int argc, char *argv[])
 
        user_opts.do_sync = true;
        user_opts.transfer_mode = TRANSFER_MODE_COPY;
+       user_opts.preserve_subscriptions = false;
 
        os_info.progname = get_progname(argv[0]);
 
@@ -199,6 +201,10 @@ parseCommandLine(int argc, char *argv[])
                                user_opts.transfer_mode = TRANSFER_MODE_COPY;
                                break;
 
+                       case 3:
+                               user_opts.preserve_subscriptions = true;
+                               break;
+
                        default:
                                fprintf(stderr, _("Try \"%s --help\" for more 
information.\n"),
                                                os_info.progname);
@@ -289,6 +295,7 @@ usage(void)
        printf(_("  -V, --version                 display version information, 
then exit\n"));
        printf(_("  --clone                       clone instead of copying 
files to new cluster\n"));
        printf(_("  --copy                        copy files to new cluster 
(default)\n"));
+       printf(_("  --preserve-subscription-state preserve the subscription 
state fully\n"));
        printf(_("  -?, --help                    show this help, then 
exit\n"));
        printf(_("\n"
                         "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 5f2a116f23..e0d44e41e3 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -296,6 +296,7 @@ typedef struct
        transferMode transfer_mode; /* copy files or link them? */
        int                     jobs;                   /* number of 
processes/threads to use */
        char       *socketdir;          /* directory to use for Unix sockets */
+       bool            preserve_subscriptions; /* fully transfer subscription 
state */
 } UserOpts;
 
 typedef struct
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 371aa0ffc5..d441fccd5e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3920,7 +3920,8 @@ typedef enum AlterSubscriptionType
        ALTER_SUBSCRIPTION_DROP_PUBLICATION,
        ALTER_SUBSCRIPTION_REFRESH,
        ALTER_SUBSCRIPTION_ENABLED,
-       ALTER_SUBSCRIPTION_SKIP
+       ALTER_SUBSCRIPTION_SKIP,
+       ALTER_SUBSCRIPTION_ADD_TABLE
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
-- 
2.37.0

Reply via email to