From c8fb374d71ef9e19edfb3515051428904ba8fd86 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 16 Jul 2025 05:19:14 -0400
Subject: [PATCH v2] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
 doc/src/sgml/func.sgml                     |   4 +-
 doc/src/sgml/logicaldecoding.sgml          |   5 +-
 src/backend/replication/logical/slotsync.c | 113 +++++++++++++++--------------
 3 files changed, 65 insertions(+), 57 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index a6d7976..5e5e6a9 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29980,9 +29980,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionaly,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index a8c18f9..2e4d2fa 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -370,7 +370,10 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      <function>pg_create_logical_replication_slot</function></link>, or by
      using the <link linkend="sql-createsubscription-params-with-failover">
      <literal>failover</literal></link> option of
-     <command>CREATE SUBSCRIPTION</command> during slot creation.
+     <command>CREATE SUBSCRIPTION</command> during slot creation, and then
+     calling <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby.
      Additionally, enabling <link linkend="guc-sync-replication-slots">
      <varname>sync_replication_slots</varname></link> on the standby
      is required. By enabling <link linkend="guc-sync-replication-slots">
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index c2a8e81..ca71727 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -559,9 +559,10 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
 static bool
 wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
 {
-#define SLOT_QUERY_COLUMN_COUNT 4
+#define SLOT_QUERY_COLUMN_COUNT 3
 
 	StringInfoData cmd;
+	int			   wait_iterations = 0;
 
 	Assert(!AmLogicalSlotSyncWorkerProcess());
 
@@ -576,7 +577,7 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
 
 	initStringInfo(&cmd);
 	appendStringInfo(&cmd,
-					 "SELECT invalidation_reason IS NOT NULL, restart_lsn,"
+					 "SELECT restart_lsn,"
 					 " confirmed_flush_lsn, catalog_xmin"
 					 " FROM pg_catalog.pg_replication_slots"
 					 " WHERE slot_name = %s",
@@ -584,7 +585,6 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
 
 	for (;;)
 	{
-		bool		new_invalidated;
 		XLogRecPtr	new_restart_lsn;
 		XLogRecPtr	new_confirmed_lsn;
 		TransactionId new_catalog_xmin;
@@ -594,7 +594,7 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
 		int			rc;
 		int			col = 0;
 		bool		isnull;
-		Oid			slotRow[SLOT_QUERY_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, XIDOID};
+		Oid			slotRow[SLOT_QUERY_COLUMN_COUNT] = {LSNOID, LSNOID, XIDOID};
 
 		/* Handle any termination request if any */
 		ProcessSlotSyncInterrupts(wrconn);
@@ -621,52 +621,23 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
 			return false;
 		}
 
-		/*
-		 * It is possible to get null value for restart_lsn if the slot is
-		 * invalidated on the primary server, so handle accordingly.
-		 */
-		new_invalidated = DatumGetBool(slot_getattr(tupslot, ++col, &isnull));
-		Assert(!isnull);
-
+		/* Any slot with NULL in these fields should not have made it this far */
 		d = slot_getattr(tupslot, ++col, &isnull);
-		new_restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
-
-		if (new_invalidated || XLogRecPtrIsInvalid(new_restart_lsn))
-		{
-			/*
-			 * The slot won't be persisted by the caller; it will be cleaned up
-			 * at the end of synchronization.
-			 */
-			ereport(WARNING,
-					errmsg("aborting initial sync for slot \"%s\"",
-						   remote_slot->name),
-					errdetail("This slot was invalidated on the primary server."));
-
-			pfree(cmd.data);
-			ExecClearTuple(tupslot);
-			walrcv_clear_result(res);
-
-			return false;
-		}
+		Assert(!isnull);
+		new_restart_lsn = DatumGetLSN(d);
 
-		/*
-		 * It is possible to get null values for confirmed_lsn and
-		 * catalog_xmin if on the primary server the slot is just created with
-		 * a valid restart_lsn and slot-sync worker has fetched the slot
-		 * before the primary server could set valid confirmed_lsn and
-		 * catalog_xmin.
-		 */
 		d = slot_getattr(tupslot, ++col, &isnull);
-		new_confirmed_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+		Assert(!isnull);
+		new_confirmed_lsn = DatumGetLSN(d);
 
 		d = slot_getattr(tupslot, ++col, &isnull);
-		new_catalog_xmin = isnull ? InvalidTransactionId : DatumGetTransactionId(d);
+		Assert(!isnull);
+		new_catalog_xmin = DatumGetTransactionId(d);
 
 		ExecClearTuple(tupslot);
 		walrcv_clear_result(res);
 
 		if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn &&
-			!XLogRecPtrIsInvalid(new_confirmed_lsn) &&
 			TransactionIdFollowsOrEquals(new_catalog_xmin,
 										 MyReplicationSlot->data.catalog_xmin))
 		{
@@ -691,6 +662,22 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
 		}
 
 		/*
+		 * If in SQL API synchronization, and we've been  promoted, then no point
+		 * continuing.
+		 */
+		if (!AmLogicalSlotSyncWorkerProcess() && PromoteIsTriggered())
+		{
+			ereport(WARNING,
+					errmsg("aborting sync for slot \"%s\"",
+							remote_slot->name),
+					errdetail("Promotion occurred before this slot was fully"
+							  " synchronized."));
+			pfree(cmd.data);
+
+			return false;
+		}
+
+		/*
 		 * XXX: Is waiting for 2 seconds before retrying enough or more or
 		 * less?
 		 */
@@ -701,6 +688,20 @@ wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
 
 		if (rc & WL_LATCH_SET)
 			ResetLatch(MyLatch);
+
+		/* log a message every ten seconds */
+		wait_iterations++;
+		if (wait_iterations % 5 == 0)
+		{
+			ereport(LOG,
+					errmsg("continuing to wait for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+						   " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+						   remote_slot->name,
+						   LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+						   remote_slot->catalog_xmin,
+						   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+						   MyReplicationSlot->data.catalog_xmin));
+		}
 	}
 }
 
@@ -733,22 +734,28 @@ update_and_persist_local_synced_slot(WalReceiverConn *wrconn,
 		/*
 		 * The remote slot didn't catch up to locally reserved position.
 		 *
-		 * For the slotsync worker, we do not drop the slot because the
-		 * restart_lsn can be ahead of the current location when recreating the
-		 * slot in the next cycle. It may take more time to create such a slot.
-		 * Therefore, we keep this slot and attempt the synchronization in the
-		 * next cycle.
-		 *
+		 * If we're in the slotsync worker, we retain the slot and retry in the
+		 * next cycle. The restart_lsn might advance by then, allowing the slot
+		 * to be created successfully later.
+		 */
+		if (AmLogicalSlotSyncWorkerProcess())
+			return false;
+
+		/*
 		 * For SQL API synchronization, we wait for the remote slot to catch up
-		 * rather than leaving temporary slots. This is because we could not
-		 * predict when (or if) the SQL function might be executed again, and
-		 * the creating session might persist after promotion. Without
-		 * automatic cleanup, this could lead to temporary slots being retained
-		 * for a longer time.
+		 * here, since we can't assume the SQL API will be called again soon.
+		 * We will retry the sync once the slot catches up.
+		 *
+		 * Note: This will return false if a promotion is triggered on the
+		 * standby while waiting, in which case we stop syncing and drop the
+		 * temporary slot.
 		 */
-		if (AmLogicalSlotSyncWorkerProcess() ||
-			!wait_for_primary_slot_catchup(wrconn, remote_slot))
+		if (!wait_for_primary_slot_catchup(wrconn, remote_slot))
 			return false;
+		else
+			update_local_synced_slot(remote_slot, remote_dbid,
+									 &found_consistent_snapshot,
+									 &remote_slot_precedes);
 	}
 
 	/*
-- 
1.8.3.1

