From 676ffaba23a6c35e4ec63db1dda54373687c5d50 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 16 Jul 2025 06:11:51 -0400
Subject: [PATCH v2] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
 doc/src/sgml/func.sgml                          |   4 +-
 doc/src/sgml/logicaldecoding.sgml               |  24 +--
 src/backend/replication/logical/slotsync.c      | 196 ++++++++++++++++++++++--
 src/backend/utils/activity/wait_event_names.txt |   1 +
 4 files changed, 192 insertions(+), 33 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index f5a0e09..53f97a0 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -30023,9 +30023,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionally,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 593f784..d299ca3 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -370,7 +370,10 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      <function>pg_create_logical_replication_slot</function></link>, or by
      using the <link linkend="sql-createsubscription-params-with-failover">
      <literal>failover</literal></link> option of
-     <command>CREATE SUBSCRIPTION</command> during slot creation.
+     <command>CREATE SUBSCRIPTION</command> during slot creation, and then
+     calling <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby.
      Additionally, enabling <link linkend="guc-sync-replication-slots">
      <varname>sync_replication_slots</varname></link> on the standby
      is required. By enabling <link linkend="guc-sync-replication-slots">
@@ -398,25 +401,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      receiving the WAL up to the latest flushed position on the primary server.
     </para>
 
-    <note>
-     <para>
-      While enabling <link linkend="guc-sync-replication-slots">
-      <varname>sync_replication_slots</varname></link> allows for automatic
-      periodic synchronization of failover slots, they can also be manually
-      synchronized using the <link linkend="pg-sync-replication-slots">
-      <function>pg_sync_replication_slots</function></link> function on the standby.
-      However, this function is primarily intended for testing and debugging and
-      should be used with caution. Unlike automatic synchronization, it does not
-      include cyclic retries, making it more prone to synchronization failures,
-      particularly during initial sync scenarios where the required WAL files
-      or catalog rows for the slot may have already been removed or are at risk
-      of being removed on the standby. In contrast, automatic synchronization
-      via <varname>sync_replication_slots</varname> provides continuous slot
-      updates, enabling seamless failover and supporting high availability.
-      Therefore, it is the recommended method for synchronizing slots.
-     </para>
-    </note>
-
     <para>
      When slot synchronization is configured as recommended,
      and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2f0c08b..4604fda 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -146,6 +146,7 @@ typedef struct RemoteSlot
 	ReplicationSlotInvalidationCause invalidated;
 } RemoteSlot;
 
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
 static void slotsync_failure_callback(int code, Datum arg);
 static void update_synced_slots_inactive_since(void);
 
@@ -550,6 +551,161 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
 }
 
 /*
+ * Wait for remote slot to pass locally reserved position.
+ *
+ * Return true if remote_slot could catch up with the locally reserved
+ * position. Return false in all other cases.
+ */
+static bool
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
+{
+#define SLOT_QUERY_COLUMN_COUNT 3
+
+	StringInfoData cmd;
+	int			   wait_iterations = 0;
+
+	Assert(!AmLogicalSlotSyncWorkerProcess());
+
+	ereport(LOG,
+			errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+				   " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+				   remote_slot->name,
+				   LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+				   remote_slot->catalog_xmin,
+				   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				   MyReplicationSlot->data.catalog_xmin));
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd,
+					 "SELECT restart_lsn,"
+					 " confirmed_flush_lsn, catalog_xmin"
+					 " FROM pg_catalog.pg_replication_slots"
+					 " WHERE slot_name = %s",
+					 quote_literal_cstr(remote_slot->name));
+
+	for (;;)
+	{
+		XLogRecPtr	new_restart_lsn;
+		XLogRecPtr	new_confirmed_lsn;
+		TransactionId new_catalog_xmin;
+		WalRcvExecResult *res;
+		TupleTableSlot *tupslot;
+		Datum		d;
+		int			rc;
+		int			col = 0;
+		bool		isnull;
+		Oid			slotRow[SLOT_QUERY_COLUMN_COUNT] = {LSNOID, LSNOID, XIDOID};
+
+		/* Handle any termination request if any */
+		ProcessSlotSyncInterrupts(wrconn);
+
+		res = walrcv_exec(wrconn, cmd.data, SLOT_QUERY_COLUMN_COUNT, slotRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					errmsg("could not fetch slot \"%s\" info from the"
+						   " primary server: %s",
+						   remote_slot->name, res->err));
+
+		tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+		{
+			ereport(WARNING,
+					errmsg("aborting initial sync for slot \"%s\"",
+						   remote_slot->name),
+					errdetail("This slot was not found on the primary server."));
+
+			pfree(cmd.data);
+			walrcv_clear_result(res);
+
+			return false;
+		}
+
+		/* Any slot with NULL in these fields should not have made it this far */
+		d = slot_getattr(tupslot, ++col, &isnull);
+		Assert(!isnull);
+		new_restart_lsn = DatumGetLSN(d);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		Assert(!isnull);
+		new_confirmed_lsn = DatumGetLSN(d);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		Assert(!isnull);
+		new_catalog_xmin = DatumGetTransactionId(d);
+
+		ExecClearTuple(tupslot);
+		walrcv_clear_result(res);
+
+		if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn &&
+			TransactionIdFollowsOrEquals(new_catalog_xmin,
+										 MyReplicationSlot->data.catalog_xmin))
+		{
+			/* Update new values in remote_slot */
+			remote_slot->restart_lsn = new_restart_lsn;
+			remote_slot->confirmed_lsn = new_confirmed_lsn;
+			remote_slot->catalog_xmin = new_catalog_xmin;
+
+			ereport(LOG,
+					errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)"
+						   " and catalog xmin (%u) has now passed local slot LSN"
+						   " (%X/%X) and catalog xmin (%u)",
+						   remote_slot->name,
+						   LSN_FORMAT_ARGS(new_restart_lsn),
+						   new_catalog_xmin,
+						   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+						   MyReplicationSlot->data.catalog_xmin));
+
+			pfree(cmd.data);
+
+			return true;
+		}
+
+		/*
+		 * If in SQL API synchronization, and we've been  promoted, then no point
+		 * continuing.
+		 */
+		if (!AmLogicalSlotSyncWorkerProcess() && PromoteIsTriggered())
+		{
+			ereport(WARNING,
+					errmsg("aborting sync for slot \"%s\"",
+							remote_slot->name),
+					errdetail("Promotion occurred before this slot was fully"
+							  " synchronized."));
+			pfree(cmd.data);
+
+			return false;
+		}
+
+		/*
+		 * XXX: Is waiting for 2 seconds before retrying enough or more or
+		 * less?
+		 */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   2000L,
+					   WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+
+		/* log a message every ten seconds */
+		wait_iterations++;
+		if (wait_iterations % 5 == 0)
+		{
+			ereport(LOG,
+					errmsg("continuing to wait for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+						   " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+						   remote_slot->name,
+						   LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+						   remote_slot->catalog_xmin,
+						   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+						   MyReplicationSlot->data.catalog_xmin));
+		}
+	}
+}
+
+/*
  * If the remote restart_lsn and catalog_xmin have caught up with the
  * local ones, then update the LSNs and persist the local synced slot for
  * future synchronization; otherwise, do nothing.
@@ -558,7 +714,8 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
  * false.
  */
 static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(WalReceiverConn *wrconn,
+									 RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		found_consistent_snapshot = false;
@@ -577,12 +734,28 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/*
 		 * The remote slot didn't catch up to locally reserved position.
 		 *
-		 * We do not drop the slot because the restart_lsn can be ahead of the
-		 * current location when recreating the slot in the next cycle. It may
-		 * take more time to create such a slot. Therefore, we keep this slot
-		 * and attempt the synchronization in the next cycle.
+		 * If we're in the slotsync worker, we retain the slot and retry in the
+		 * next cycle. The restart_lsn might advance by then, allowing the slot
+		 * to be created successfully later.
 		 */
-		return false;
+		if (AmLogicalSlotSyncWorkerProcess())
+			return false;
+
+		/*
+		 * For SQL API synchronization, we wait for the remote slot to catch up
+		 * here, since we can't assume the SQL API will be called again soon.
+		 * We will retry the sync once the slot catches up.
+		 *
+		 * Note: This will return false if a promotion is triggered on the
+		 * standby while waiting, in which case we stop syncing and drop the
+		 * temporary slot.
+		 */
+		if (!wait_for_primary_slot_catchup(wrconn, remote_slot))
+			return false;
+		else
+			update_local_synced_slot(remote_slot, remote_dbid,
+									 &found_consistent_snapshot,
+									 &remote_slot_precedes);
 	}
 
 	/*
@@ -622,7 +795,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
+					 Oid remote_dbid)
 {
 	ReplicationSlot *slot;
 	XLogRecPtr	latestFlushPtr;
@@ -715,7 +889,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/* Slot not ready yet, let's attempt to make it sync-ready now. */
 		if (slot->data.persistency == RS_TEMPORARY)
 		{
-			slot_updated = update_and_persist_local_synced_slot(remote_slot,
+			slot_updated = update_and_persist_local_synced_slot(wrconn,
+																remote_slot,
 																remote_dbid);
 		}
 
@@ -785,7 +960,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
 
-		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+		update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid);
 
 		slot_updated = true;
 	}
@@ -927,7 +1102,8 @@ synchronize_slots(WalReceiverConn *wrconn)
 		 */
 		LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+		some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+												  remote_dbid);
 
 		UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 	}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da6831..ba82cc1 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -64,6 +64,7 @@ LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication paralle
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
 REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP	"Waiting for the primary to catch-up."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN	"Waiting in main loop of WAL receiver process."
 WAL_SENDER_MAIN	"Waiting in main loop of WAL sender process."
-- 
1.8.3.1

