From d76753013c61dcc35609c3919ec27cc039e0bfd5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 22 Jul 2023 10:17:48 +0000
Subject: [PATCH v12 1/2] Allow logical walsenders to wait for physical
 standbys

---
 doc/src/sgml/config.sgml                      |  42 ++++
 .../replication/logical/reorderbuffer.c       |   9 +
 src/backend/replication/slot.c                | 216 +++++++++++++++++-
 src/backend/utils/misc/guc_tables.c           |  30 +++
 src/backend/utils/misc/postgresql.conf.sample |   4 +
 src/include/replication/slot.h                |   4 +
 src/include/utils/guc_hooks.h                 |   4 +
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_verify_slot_order.pl  | 146 ++++++++++++
 9 files changed, 455 insertions(+), 1 deletion(-)
 create mode 100644 src/test/recovery/t/050_verify_slot_order.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 11251fa05e..83a7d2e87e 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4397,6 +4397,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+      <term><varname>standby_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>standby_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        List of physical replication slots that logical replication waits for.
+        Specify <literal>*</literal> to wait for all physical replication
+        slots. If a logical replication connection is meant to switch to a
+        physical standby after the standby is promoted, the physical
+        replication slot for the standby should be listed here. This ensures
+        that logical replication is not ahead of the physical standby.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
@@ -4545,6 +4563,30 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-synchronize_slot_names" xreflabel="synchronize_slot_names">
+      <term><varname>synchronize_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>synchronize_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies a list of logical replication slots that a streaming
+        replication standby should synchronize from the primary server. This is
+        necessary to be able to retarget those logical replication connections
+        to this standby if it gets promoted.  Specify <literal>*</literal> to
+        synchronize all logical replication slots. The default is empty. On
+        primary, the logical walsenders associated with logical replication
+        slots specified in this parameter will wait for the standby servers
+        specified in <xref linkend="guc-standby-slot-names"/> parameter. In
+        other words, primary ensures those logical replication slots will
+        never get ahead of the standby servers. On standby server, the logical
+        replication slots specified are synchronized from the primary. Set this
+        parameter to same value on both primary and standby.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 87a4d2a24b..dc12d825e2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -100,6 +100,7 @@
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -107,6 +108,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
+#include "utils/varlena.h"
 
 
 /* entry for a hash table we use to map from xid to our transaction state */
@@ -2498,6 +2500,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		}
 		else
 		{
+			/*
+			 * Before we send out the last set of changes to logical decoding
+			 * output plugin, wait for specified streaming replication standby
+			 * servers (if any) to confirm receipt of WAL upto commit_lsn.
+			 */
+			WaitForStandbyLSN(commit_lsn);
+
 			/*
 			 * Call either PREPARE (for two-phase transactions) or COMMIT (for
 			 * regular ones).
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1dc27264f6..dc1d11a564 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -52,6 +52,8 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/varlena.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -98,9 +100,11 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 /* My backend's replication slot in the shared memory array */
 ReplicationSlot *MyReplicationSlot = NULL;
 
-/* GUC variable */
+/* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+char	*synchronize_slot_names;
+char	*standby_slot_names;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropAcquired(void);
@@ -111,6 +115,8 @@ static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
 
+static bool validate_slot_names(char **newval);
+
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
  */
@@ -2085,3 +2091,211 @@ RestoreSlotFromDisk(const char *name)
 				(errmsg("too many replication slots active before shutdown"),
 				 errhint("Increase max_replication_slots and try again.")));
 }
+
+/*
+ * A helper function to simplify check_hook implementation for
+ * synchronize_slot_names and standby_slot_names GUCs.
+ */
+static bool
+validate_slot_names(char **newval)
+{
+	char	   *rawname;
+	List	   *elemlist;
+
+	/* Need a modifiable copy of string */
+	rawname = pstrdup(*newval);
+
+	/* Parse string into list of identifiers */
+	if (!SplitIdentifierString(rawname, ',', &elemlist))
+	{
+		/* syntax error in name list */
+		GUC_check_errdetail("List syntax is invalid.");
+		pfree(rawname);
+		list_free(elemlist);
+		return false;
+	}
+
+	pfree(rawname);
+	list_free(elemlist);
+	return true;
+}
+
+/*
+ * GUC check_hook for synchronize_slot_names
+ */
+bool
+check_synchronize_slot_names(char **newval, void **extra, GucSource source)
+{
+	/* Special handling for "*" which means all. */
+	if (strcmp(*newval, "*") == 0)
+		return true;
+
+	if (strcmp(*newval, "") == 0)
+		return true;
+
+	return validate_slot_names(newval);
+}
+
+/*
+ * GUC check_hook for standby_slot_names
+ */
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+	/* Special handling for "*" which means all. */
+	if (strcmp(*newval, "*") == 0)
+		return true;
+
+	if (strcmp(*newval, "") == 0)
+		return true;
+
+	return validate_slot_names(newval);
+}
+
+/*
+ * Function in which logical walsender (the caller) corresponding to a logical
+ * slot specified in synchronize_slot_names GUC value waits for one or more
+ * physical standbys corresponding to specified physical slots in
+ * standby_slot_names GUC value.
+ */
+void
+WaitForStandbyLSN(XLogRecPtr wait_for_lsn)
+{
+	char	*rawname;
+	List	*elemlist;
+	ListCell	*l;
+	ReplicationSlot *slot;
+
+	Assert(MyReplicationSlot != NULL);
+	Assert(SlotIsLogical(MyReplicationSlot));
+
+	if (strcmp(standby_slot_names, "") == 0)
+		return;
+
+	/*
+	 * Check if the slot associated with this logical walsender is asked to
+	 * wait for physical standbys.
+	 */
+	if (strcmp(synchronize_slot_names, "") == 0)
+		return;
+
+	/* "*" means all logical walsenders should wait for physical standbys. */
+	if (strcmp(synchronize_slot_names, "*") != 0)
+	{
+		bool	shouldwait = false;
+
+		rawname = pstrdup(synchronize_slot_names);
+		SplitIdentifierString(rawname, ',', &elemlist);
+
+		foreach (l, elemlist)
+		{
+			char *name = lfirst(l);
+			if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0)
+			{
+				shouldwait = true;
+				break;
+			}
+		}
+
+		pfree(rawname);
+		rawname = NULL;
+		list_free(elemlist);
+		elemlist = NIL;
+
+		if (!shouldwait)
+			return;
+	}
+
+	rawname = pstrdup(standby_slot_names);
+	SplitIdentifierString(rawname, ',', &elemlist);
+
+retry:
+
+	foreach (l, elemlist)
+	{
+		char *name = lfirst(l);
+		XLogRecPtr	restart_lsn;
+		bool	invalidated;
+
+		slot = SearchNamedReplicationSlot(name, true);
+
+		/*
+		 * It may happen that the slot specified in standby_slot_names GUC
+		 * value is dropped, so let's skip over it.
+		 */
+		if (!slot)
+		{
+			ereport(WARNING,
+					errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring",
+							name, "standby_slot_names"));
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+
+		/*
+		 * It may happen that the physical slot specified in standby_slot_names
+		 * is dropped without removing it from the GUC value, and a logical
+		 * slot has been created with the same name meanwhile. Let's skip over
+		 * it.
+		 *
+		 * NB: We might think to modify the GUC value automatically while
+		 * dropping a physical replication slot, but that won't be a nice idea
+		 * given that the slot can sometimes be dropped in process exit paths
+		 * (check ReplicationSlotCleanup call sites), so modifying GUC value
+		 * there isn't a great idea.
+		 */
+		if (SlotIsLogical(slot))
+		{
+			ereport(WARNING,
+					errmsg("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring",
+							name, "standby_slot_names"));
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+
+		/* physical slots advance restart_lsn on remote flush */
+		SpinLockAcquire(&slot->mutex);
+		restart_lsn = slot->data.restart_lsn;
+		invalidated = slot->data.invalidated != RS_INVAL_NONE;
+		SpinLockRelease(&slot->mutex);
+
+		/*
+		 * Specified physical slot may have been invalidated, so no point in
+		 * waiting for it.
+		*/
+		if (restart_lsn == InvalidXLogRecPtr || invalidated)
+		{
+			ereport(WARNING,
+					errmsg("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring",
+							name, "standby_slot_names"));
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+
+		/* If the slot is past the wait_for_lsn, no need to wait anymore */
+		if (restart_lsn >= wait_for_lsn)
+		{
+			elemlist = foreach_delete_current(elemlist, l);
+			continue;
+		}
+	}
+
+	if (list_length(elemlist) == 0)
+	{
+		pfree(rawname);
+		return; 	/* Exit if done waiting for everyone */
+	}
+
+	/* XXX: Is waiting for 1 second before retrying enough or more or less? */
+
+	/* XXX: Need to have a new wait event type. */
+	(void) WaitLatch(MyLatch,
+					 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					 1000L,
+					 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+	ResetLatch(MyLatch);
+
+	CHECK_FOR_INTERRUPTS();
+
+	goto retry;
+}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index f9dba43b8c..d72b6b95b6 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4551,6 +4551,36 @@ struct config_string ConfigureNamesString[] =
 		check_io_direct, assign_io_direct, NULL
 	},
 
+	/*
+	 * XXX: synchronize_slot_names needs to be specified on both primary and
+	 * standby, therefore, we might need a new group REPLICATION.
+	 */
+	{
+		{"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("List of replication slot names to synchronize from "
+						 "primary to streaming replication standby server."),
+			gettext_noop("Value of \"*\" means all."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&synchronize_slot_names,
+		"",
+		check_synchronize_slot_names, NULL, NULL
+	},
+
+	{
+		{"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+			gettext_noop("List of streaming replication standby server slot "
+						 "names that logical walsenders waits for."),
+			gettext_noop("Decoded changes are sent out to plugins by logical "
+						 "walsenders only after specified replication slots "
+						 "confirm receiving WAL."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&standby_slot_names,
+		"",
+		check_standby_slot_names, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c768af9a73..63daf586f3 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -328,6 +328,8 @@
 				# method to choose sync standbys, number of sync standbys,
 				# and comma-separated list of application_name
 				# from standby(s); '*' = all
+#standby_slot_names = '' # streaming replication standby server slot names that
+				# logical walsenders waits for
 
 # - Standby Servers -
 
@@ -355,6 +357,8 @@
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
+#synchronize_slot_names = ''	# replication slot names to synchronize from
+					# primary to streaming replication standby server
 
 # - Subscribers -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..2765f99ccf 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -203,6 +203,8 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT char *synchronize_slot_names;
+extern PGDLLIMPORT char *standby_slot_names;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -246,4 +248,6 @@ extern void CheckPointReplicationSlots(void);
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 
+extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn);
+
 #endif							/* SLOT_H */
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 2ecb9fc086..259aefb9d7 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -159,5 +159,9 @@ extern void assign_wal_consistency_checking(const char *newval, void *extra);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
 extern bool check_io_direct(char **newval, void **extra, GucSource source);
 extern void assign_io_direct(const char *newval, void *extra);
+extern bool check_synchronize_slot_names(char **newval, void **extra,
+										 GucSource source);
+extern bool check_standby_slot_names(char **newval, void **extra,
+									 GucSource source);
 
 #endif							/* GUC_HOOKS_H */
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index e7328e4894..ee590eeac7 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -43,6 +43,7 @@ tests += {
       't/035_standby_logical_decoding.pl',
       't/036_truncated_dropped.pl',
       't/037_invalid_database.pl',
+      't/050_verify_slot_order.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl
new file mode 100644
index 0000000000..402b704e3f
--- /dev/null
+++ b/src/test/recovery/t/050_verify_slot_order.pl
@@ -0,0 +1,146 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test primary disallowing specified logical replication slots getting ahead of
+# specified physical replication slots. It uses the following set up:
+#
+#           	| ----> standby1 (connected via streaming replication)
+#				| ----> standby2 (connected via streaming replication)
+# primary -----	|
+#		    	| ----> subscriber1 (connected via logical replication)
+#		    	| ----> subscriber2 (connected via logical replication)
+#
+# Set up is configured in such a way that primary never lets subscriber1 ahead
+# of standby1.
+
+# Create primary
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Configure primary to disallow specified logical replication slot (lsub1_slot)
+# getting ahead of specified physical replication slot (sb1_slot).
+$primary->append_conf(
+	'postgresql.conf', qq(
+standby_slot_names = 'sb1_slot'
+synchronize_slot_names = 'lsub1_slot'
+));
+$primary->start;
+
+$primary->psql('postgres',
+	q{SELECT pg_create_physical_replication_slot('sb1_slot');});
+$primary->psql('postgres',
+	q{SELECT pg_create_physical_replication_slot('sb2_slot');});
+
+$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);");
+
+my $backup_name = 'backup';
+$primary->backup($backup_name);
+
+# Create a standby
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby1->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'sb1_slot'
+));
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+# Create another standby
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby2->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'sb2_slot'
+));
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Create publication on primary
+my $publisher = $primary;
+$publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR TABLE tab_int;");
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+
+# Create a subscriber node, wait for sync to complete
+my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$subscriber1->init(allows_streaming => 'logical');
+$subscriber1->start;
+$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);");
+$subscriber1->safe_psql('postgres',
+		"CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' "
+	  . "PUBLICATION mypub WITH (slot_name = lsub1_slot);");
+$subscriber1->wait_for_subscription_sync;
+
+# Create another subscriber node, wait for sync to complete
+my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
+$subscriber2->init(allows_streaming => 'logical');
+$subscriber2->start;
+$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);");
+$subscriber2->safe_psql('postgres',
+		"CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' "
+	  . "PUBLICATION mypub WITH (slot_name = lsub2_slot);");
+$subscriber2->wait_for_subscription_sync;
+
+# Stop the standby associated with specified physical replication slot so that
+# the logical replication slot won't receive changes until the standby comes
+# up.
+$standby1->stop;
+
+# Create some data on primary
+my $primary_row_count = 10;
+my $primary_insert_time = time();
+$primary->safe_psql('postgres',
+	"INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);");
+
+# Wait for the standby that's up and running gets the data from primary
+$primary->wait_for_replay_catchup($standby2);
+my $result = $standby2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby2 gets data from primary");
+
+# Wait for the subscriber that's up and running and not specified in
+# synchronize_slot_names GUC on primary gets the data from primary without
+# waiting for any standbys.
+$publisher->wait_for_catchup('mysub2');
+$result = $subscriber2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "subscriber2 gets data from primary");
+
+# The subscriber that's up and running and specified in synchronize_slot_names
+# GUC on primary doesn't get the data from primary and keeps waiting for the
+# standby specified in standby_slot_names.
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = 0 FROM tab_int;");
+is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes");
+
+# Start the standby specified in standby_slot_names and wait for it to catch
+# up with the primary.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Now that the standby specified in standby_slot_names is up and running,
+# primary must send the decoded changes to subscriber specified in
+# synchronize_slot_names. While the standby was down, this subscriber didn't
+# receive any data from primary i.e. the primary didn't allow it to go ahead
+# of standby.
+$publisher->wait_for_catchup('mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes");
+
+done_testing();
-- 
2.34.1

