From 512b817c55d8d2dec7f70684250e5c67f1562711 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v7 1/3] pg_upgrade: Add --include-logical-replication-slots
 option

This commit introduces a new pg_upgrade option called "--include-logical-replication-slots".
This allows nodes with logical replication slots to be upgraded. The commit can
be divided into two parts: one for pg_dump and another for pg_upgrade.

For pg_dump this commit includes a new option called "--logical-replication-slots-only".
This option can be used to dump logical replication slots. When this option is
specified, the slot_name, plugin, and two_phase parameters are extracted from
pg_replication_slots. An SQL file is then generated which executes
pg_create_logical_replication_slot() with the extracted parameters.

For pg_upgrade, when '--include-logical-replication-slots' is specified, it executes
pg_dump with the new "--logical-replication-slots-only" option and restores from the
dump. Apart from restoring schema, pg_resetwal must not be called after restoring
replication slots. This is because the command discards WAL files and starts from a
new segment, even if they are required by replication slots. This leads to an ERROR:
"requested WAL segment XXX has already been removed". To avoid this, replication slots
are restored at a different time than other objects, after running pg_resetwal.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously, pg_upgrade
allowed copying publications to a new node. With this new commit, adjusting the
connection string to the new publisher will cause the apply worker on the subscriber
to connect to the new publisher automatically. This enables seamless continuation
of logical replication, even after an upgrade.

Author: Hayato Kuroda
Reviewed-by: Peter Smith, Julien Rouhaud
---
 doc/src/sgml/ref/pg_dump.sgml                 |  10 ++
 doc/src/sgml/ref/pgupgrade.sgml               |  11 ++
 src/bin/pg_dump/pg_backup.h                   |   1 +
 src/bin/pg_dump/pg_dump.c                     | 147 ++++++++++++++++++
 src/bin/pg_dump/pg_dump.h                     |  17 +-
 src/bin/pg_dump/pg_dump_sort.c                |  11 +-
 src/bin/pg_upgrade/dump.c                     |  24 +++
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/option.c                   |   7 +
 src/bin/pg_upgrade/pg_upgrade.c               |  61 ++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |   3 +
 .../t/003_logical_replication_slots.pl        |  90 +++++++++++
 src/tools/pgindent/typedefs.list              |   1 +
 13 files changed, 381 insertions(+), 3 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index e81e35c13b..6e07f85281 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -1206,6 +1206,16 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--logical-replication-slots-only</option></term>
+      <listitem>
+       <para>
+        Dump only logical replication slots; not the schema (data definitions),
+        nor data. This is mainly used when upgrading nodes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
        <term><option>-?</option></term>
        <term><option>--help</option></term>
diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..94e90ff506 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,17 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--include-logical-replication-slots</option></term>
+      <listitem>
+       <para>
+        Upgrade logical replication slots. Only permanent replication slots
+        are included. Note that pg_upgrade does not check the installation of
+        plugins.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..0a4e931f9b 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -187,6 +187,7 @@ typedef struct _dumpOptions
 	int			use_setsessauth;
 	int			enable_row_security;
 	int			load_via_partition_root;
+	int			logical_slots_only;
 
 	/* default, if no "inclusion" switches appear, is to dump everything */
 	bool		include_everything;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 73a6c964ba..6cff4009b3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -328,6 +328,9 @@ static void setupDumpWorker(Archive *AH);
 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
 
+static void getLogicalReplicationSlots(Archive *fout);
+static void dumpLogicalReplicationSlot(Archive *fout,
+									   const LogicalReplicationSlotInfo *slotinfo);
 
 int
 main(int argc, char **argv)
@@ -431,6 +434,7 @@ main(int argc, char **argv)
 		{"table-and-children", required_argument, NULL, 12},
 		{"exclude-table-and-children", required_argument, NULL, 13},
 		{"exclude-table-data-and-children", required_argument, NULL, 14},
+		{"logical-replication-slots-only", no_argument, NULL, 15},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -657,6 +661,10 @@ main(int argc, char **argv)
 										  optarg);
 				break;
 
+			case 15:			/* dump only replication slot(s) */
+				dopt.logical_slots_only = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -714,6 +722,11 @@ main(int argc, char **argv)
 	if (dopt.do_nothing && dopt.dump_inserts == 0)
 		pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
 
+	if (dopt.logical_slots_only && dopt.dataOnly)
+		pg_fatal("options --logical-replication-slots-only and -a/--data-only cannot be used together");
+	if (dopt.logical_slots_only && dopt.schemaOnly)
+		pg_fatal("options --logical-replication-slots-only and -s/--schema-only cannot be used together");
+
 	/* Identify archive format to emit */
 	archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -876,6 +889,16 @@ main(int argc, char **argv)
 			pg_fatal("no matching extensions were found");
 	}
 
+	/*
+	 * If dump logical-replication-slots-only was requested, dump only them
+	 * and skip everything else.
+	 */
+	if (dopt.logical_slots_only)
+	{
+		getLogicalReplicationSlots(fout);
+		goto dump;
+	}
+
 	/*
 	 * Dumping LOs is the default for dumps where an inclusion switch is not
 	 * used (an "include everything" dump).  -B can be used to exclude LOs
@@ -936,6 +959,8 @@ main(int argc, char **argv)
 	if (!dopt.no_security_labels)
 		collectSecLabels(fout);
 
+dump:
+
 	/* Lastly, create dummy objects to represent the section boundaries */
 	boundaryObjs = createBoundaryObjects();
 
@@ -1109,6 +1134,8 @@ help(const char *progname)
 			 "                               servers matching PATTERN\n"));
 	printf(_("  --inserts                    dump data as INSERT commands, rather than COPY\n"));
 	printf(_("  --load-via-partition-root    load partitions via the root table\n"));
+	printf(_("  --logical-replication-slots-only\n"
+			 "                               dump only logical replication slots, no schema or data\n"));
 	printf(_("  --no-comments                do not dump comments\n"));
 	printf(_("  --no-publications            do not dump publications\n"));
 	printf(_("  --no-security-labels         do not dump security label assignments\n"));
@@ -10252,6 +10279,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
+		case DO_LOGICAL_REPLICATION_SLOT:
+			dumpLogicalReplicationSlot(fout,
+									   (const LogicalReplicationSlotInfo *) dobj);
+			break;
 		case DO_PRE_DATA_BOUNDARY:
 		case DO_POST_DATA_BOUNDARY:
 			/* never dumped, nothing to do */
@@ -18227,6 +18258,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_PUBLICATION_REL:
 			case DO_PUBLICATION_TABLE_IN_SCHEMA:
 			case DO_SUBSCRIPTION:
+			case DO_LOGICAL_REPLICATION_SLOT:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
 				break;
@@ -18488,3 +18520,118 @@ appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
 	if (!res)
 		pg_log_warning("could not parse %s array", "reloptions");
 }
+
+/*
+ * getLogicalReplicationSlots
+ *	  get information about replication slots
+ */
+static void
+getLogicalReplicationSlots(Archive *fout)
+{
+	PGresult   *res;
+	LogicalReplicationSlotInfo *slotinfo;
+	PQExpBuffer query;
+	DumpOptions *dopt = fout->dopt;
+
+	int			i_slotname;
+	int			i_plugin;
+	int			i_twophase;
+	int			i,
+				ntups;
+
+	/* Check whether we should dump or not */
+	if (fout->remoteVersion < 160000 || !dopt->logical_slots_only)
+		return;
+
+	query = createPQExpBuffer();
+
+	resetPQExpBuffer(query);
+
+	/*
+	 * Get replication slots.
+	 *
+	 * XXX: Which information must be extracted from old node? Currently three
+	 * attributes are extracted because they are used by
+	 * pg_create_logical_replication_slot().
+	 *
+	 * XXX: Do we have to support physical slots?
+	 */
+	appendPQExpBufferStr(query,
+						 "SELECT slot_name, plugin, two_phase "
+						 "FROM pg_catalog.pg_replication_slots "
+						 "WHERE database = current_database() AND temporary = false "
+						 "AND wal_status IN ('reserved', 'extended');");
+
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+
+	i_slotname = PQfnumber(res, "slot_name");
+	i_plugin = PQfnumber(res, "plugin");
+	i_twophase = PQfnumber(res, "two_phase");
+
+	slotinfo = pg_malloc(ntups * sizeof(LogicalReplicationSlotInfo));
+
+	for (i = 0; i < ntups; i++)
+	{
+		slotinfo[i].dobj.objType = DO_LOGICAL_REPLICATION_SLOT;
+
+		slotinfo[i].dobj.catId.tableoid = InvalidOid;
+		slotinfo[i].dobj.catId.oid = InvalidOid;
+		AssignDumpId(&slotinfo[i].dobj);
+
+		slotinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_slotname));
+
+		slotinfo[i].plugin = pg_strdup(PQgetvalue(res, i, i_plugin));
+		slotinfo[i].twophase = pg_strdup(PQgetvalue(res, i, i_twophase));
+
+		/*
+		 * Note: Currently we do not have any options to include/exclude slots
+		 * in dumping, so all the slots must be selected.
+		 */
+		slotinfo[i].dobj.dump = DUMP_COMPONENT_ALL;
+	}
+	PQclear(res);
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpLogicalReplicationSlot
+ *	  dump creation functions for the given logical replication slots
+ */
+static void
+dumpLogicalReplicationSlot(Archive *fout,
+						   const LogicalReplicationSlotInfo *slotinfo)
+{
+	DumpOptions *dopt = fout->dopt;
+
+	if (!dopt->logical_slots_only)
+		return;
+
+	if (slotinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		PQExpBuffer query = createPQExpBuffer();
+		char	   *slotname = pg_strdup(slotinfo->dobj.name);
+
+		/*
+		 * XXX: For simplification, pg_create_logical_replication_slot() is
+		 * used. Is it sufficient?
+		 */
+		appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot('%s', ",
+						  slotname);
+		appendStringLiteralAH(query, slotinfo->plugin, fout);
+		appendPQExpBuffer(query, ", ");
+		appendStringLiteralAH(query, slotinfo->twophase, fout);
+		appendPQExpBuffer(query, ");");
+
+		ArchiveEntry(fout, slotinfo->dobj.catId, slotinfo->dobj.dumpId,
+					 ARCHIVE_OPTS(.tag = slotname,
+								  .description = "REPLICATION SLOT",
+								  .section = SECTION_POST_DATA,
+								  .createStmt = query->data));
+
+		pfree(slotname);
+		destroyPQExpBuffer(query);
+	}
+}
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ed6ce41ad7..8028ccf6ff 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -82,7 +82,8 @@ typedef enum
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
 	DO_PUBLICATION_TABLE_IN_SCHEMA,
-	DO_SUBSCRIPTION
+	DO_SUBSCRIPTION,
+	DO_LOGICAL_REPLICATION_SLOT
 } DumpableObjectType;
 
 /*
@@ -666,6 +667,20 @@ typedef struct _SubscriptionInfo
 	char	   *subpasswordrequired;
 } SubscriptionInfo;
 
+/*
+ * The LogicalReplicationSlotInfo struct is used to represent replication
+ * slots.
+ *
+ * XXX: add more attributes if needed
+ */
+typedef struct _LogicalReplicationSlotInfo
+{
+	DumpableObject dobj;
+	char	   *plugin;
+	char	   *slottype;
+	char	   *twophase;
+} LogicalReplicationSlotInfo;
+
 /*
  *	common utility functions
  */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 8266c117a3..7c37873743 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -93,6 +93,7 @@ enum dbObjectTypePriorities
 	PRIO_PUBLICATION_REL,
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,
 	PRIO_SUBSCRIPTION,
+	PRIO_LOGICAL_REPLICATION_SLOT,
 	PRIO_DEFAULT_ACL,			/* done in ACL pass */
 	PRIO_EVENT_TRIGGER,			/* must be next to last! */
 	PRIO_REFRESH_MATVIEW		/* must be last! */
@@ -146,10 +147,11 @@ static const int dbObjectTypePriority[] =
 	PRIO_PUBLICATION,			/* DO_PUBLICATION */
 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,	/* DO_PUBLICATION_TABLE_IN_SCHEMA */
-	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */
+	PRIO_SUBSCRIPTION,			/* DO_SUBSCRIPTION */
+	PRIO_LOGICAL_REPLICATION_SLOT	/* DO_LOGICAL_REPLICATION_SLOT */
 };
 
-StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1),
+StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_LOGICAL_REPLICATION_SLOT + 1),
 				 "array length mismatch");
 
 static DumpId preDataBoundId;
@@ -1498,6 +1500,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "SUBSCRIPTION (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
 			return;
+		case DO_LOGICAL_REPLICATION_SLOT:
+			snprintf(buf, bufsize,
+					 "LOGICAL REPLICATION SLOT (ID %d NAME %s)",
+					 obj->dumpId, obj->name);
+			return;
 		case DO_PRE_DATA_BOUNDARY:
 			snprintf(buf, bufsize,
 					 "PRE-DATA BOUNDARY  (ID %d)",
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..e6b90864f5 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -59,6 +59,30 @@ generate_old_dump(void)
 						   log_opts.dumpdir,
 						   sql_file_name, escaped_connstr.data);
 
+		/*
+		 * Dump logical replication slots if needed.
+		 *
+		 * XXX We cannot dump replication slots at the same time as the schema
+		 * dump because we need to separate the timing of restoring
+		 * replication slots and other objects. Replication slots, in
+		 * particular, should not be restored before executing the pg_resetwal
+		 * command because it will remove WALs that are required by the slots.
+		 */
+		if (user_opts.include_logical_slots)
+		{
+			char		slots_file_name[MAXPGPATH];
+
+			snprintf(slots_file_name, sizeof(slots_file_name),
+					 DB_DUMP_LOGICAL_SLOTS_FILE_MASK, old_db->db_oid);
+			parallel_exec_prog(log_file_name, NULL,
+							   "\"%s/pg_dump\" %s --logical-replication-slots-only "
+							   "--quote-all-identifiers --binary-upgrade %s "
+							   "--file=\"%s/%s\" %s",
+							   new_cluster.bindir, cluster_conn_opts(&old_cluster),
+							   log_opts.verbose ? "--verbose" : "",
+							   log_opts.dumpdir,
+							   slots_file_name, escaped_connstr.data);
+		}
 		termPQExpBuffer(&escaped_connstr);
 	}
 
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 8869b6b60d..f7b5ec9879 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
 		{"copy", no_argument, NULL, 2},
+		{"include-logical-replication-slots", no_argument, NULL, 3},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -199,6 +200,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.transfer_mode = TRANSFER_MODE_COPY;
 				break;
 
+			case 3:
+				user_opts.include_logical_slots = true;
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
@@ -289,6 +294,8 @@ usage(void)
 	printf(_("  -V, --version                 display version information, then exit\n"));
 	printf(_("  --clone                       clone instead of copying files to new cluster\n"));
 	printf(_("  --copy                        copy files to new cluster (default)\n"));
+	printf(_("  --include-logical-replication-slots\n"
+			 "                                upgrade logical replication slots\n"));
 	printf(_("  -?, --help                    show this help, then exit\n"));
 	printf(_("\n"
 			 "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 75bab0a04c..1241060f4e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create logical replication slots if requested.
+	 *
+	 * Note: This must be done after doing pg_resetwal command because the
+	 * command will remove required WALs.
+	 */
+	if (user_opts.include_logical_slots)
+	{
+		start_postmaster(&new_cluster, true);
+		create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,50 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		char		slots_file_name[MAXPGPATH],
+					log_file_name[MAXPGPATH];
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		snprintf(slots_file_name, sizeof(slots_file_name),
+				 DB_DUMP_LOGICAL_SLOTS_FILE_MASK, old_db->db_oid);
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		parallel_exec_prog(log_file_name,
+						   NULL,
+						   "\"%s/psql\" %s --echo-queries --set ON_ERROR_STOP=on "
+						   "--no-psqlrc --dbname %s -f \"%s/%s\"",
+						   new_cluster.bindir,
+						   cluster_conn_opts(&new_cluster),
+						   old_db->db_name,
+						   log_opts.dumpdir,
+						   slots_file_name);
+	}
+
+	/* reap all children */
+	while (reap_child(true) == true)
+		;
+
+	end_progress_output();
+	check_ok();
+
+	/* update new_cluster info now that we have objects in the databases */
+	get_db_and_rel_infos(&new_cluster);
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..5f3d7a407e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -29,6 +29,7 @@
 /* contains both global db information and CREATE DATABASE commands */
 #define GLOBALS_DUMP_FILE	"pg_upgrade_dump_globals.sql"
 #define DB_DUMP_FILE_MASK	"pg_upgrade_dump_%u.custom"
+#define DB_DUMP_LOGICAL_SLOTS_FILE_MASK	"pg_upgrade_dump_%u_logical_slots.sql"
 
 /*
  * Base directories that include all the files generated internally, from the
@@ -304,6 +305,8 @@ typedef struct
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
+	bool		include_logical_slots;	/* true -> dump and restore logical
+										 * replication slots */
 } UserOpts;
 
 typedef struct
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..81917f3b14
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,90 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Tests for logical replication, especially for upgrading replication slots
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+$old_publisher->start;
+
+# Initialize subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+$subscriber->start;
+
+$old_publisher->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1,10) AS a");
+$subscriber->safe_psql('postgres', "CREATE TABLE tbl (a int)");
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub");
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+my $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(10), 'check initial rows on subscriber');
+
+# Preparations for upgrading publisher
+$old_publisher->stop;
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'logical');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+# Run pg_upgrade. pg_upgrade_output.d is removed at the end
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,        '--include-logical-replication-slots'
+	],
+	'run of pg_upgrade of old publisher');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Check whether the replication slot is copied
+$new_publisher->start;
+$result =
+  $new_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(1), 'check the replication slot is copied to new publisher');
+
+# Change connection string and enable logical replication
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+
+$new_publisher->wait_for_catchup('sub');
+
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b4058b88c3..7e999726c2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1479,6 +1479,7 @@ LogicalRepBeginData
 LogicalRepCommitData
 LogicalRepCommitPreparedTxnData
 LogicalRepCtxStruct
+LogicalReplicationSlotInfo
 LogicalRepMode
 LogicalRepMsgType
 LogicalRepPartMapEntry
-- 
2.27.0

