From 24ff5a032267eeb918582ce84bf9e511636c676f Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Thu, 2 Oct 2025 21:08:17 +1000
Subject: [PATCH v15] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
 doc/src/sgml/func/func-admin.sgml             |   4 +-
 doc/src/sgml/logicaldecoding.sgml             |  35 +-
 src/backend/replication/logical/slotsync.c    | 367 +++++++++++++++---
 .../utils/activity/wait_event_names.txt       |   2 +-
 4 files changed, 333 insertions(+), 75 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 1b465bc8ba7..2896cd9e429 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1497,9 +1497,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionally,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index b803a819cf1..504c79f2fd2 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -370,12 +370,16 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      <function>pg_create_logical_replication_slot</function></link>, or by
      using the <link linkend="sql-createsubscription-params-with-failover">
      <literal>failover</literal></link> option of
-     <command>CREATE SUBSCRIPTION</command> during slot creation.
-     Additionally, enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link> on the standby
-     is required. By enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link>
-     on the standby, the failover slots can be synchronized periodically in
+     <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+     synchronization can be performed either manually by calling
+     <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby, or automatically by enabling
+     <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> on the standby.
+     When <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> is enabled
+     on the standby, the failover slots are periodically synchronized by
      the slotsync worker. For the synchronization to work, it is mandatory to
      have a physical replication slot between the primary and the standby (i.e.,
      <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +402,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      receiving the WAL up to the latest flushed position on the primary server.
     </para>
 
-    <note>
-     <para>
-      While enabling <link linkend="guc-sync-replication-slots">
-      <varname>sync_replication_slots</varname></link> allows for automatic
-      periodic synchronization of failover slots, they can also be manually
-      synchronized using the <link linkend="pg-sync-replication-slots">
-      <function>pg_sync_replication_slots</function></link> function on the standby.
-      However, this function is primarily intended for testing and debugging and
-      should be used with caution. Unlike automatic synchronization, it does not
-      include cyclic retries, making it more prone to synchronization failures,
-      particularly during initial sync scenarios where the required WAL files
-      or catalog rows for the slot might have already been removed or are at risk
-      of being removed on the standby. In contrast, automatic synchronization
-      via <varname>sync_replication_slots</varname> provides continuous slot
-      updates, enabling seamless failover and supporting high availability.
-      Therefore, it is the recommended method for synchronizing slots.
-     </para>
-    </note>
-
     <para>
      When slot synchronization is configured as recommended,
      and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8c061d55bdb..3ba2e500c92 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -39,6 +39,12 @@
  * the last cycle. Refer to the comments above wait_for_slot_activity() for
  * more details.
  *
+ * If the pg_sync_replication API is used to sync the slots, and if the slots
+ * are not ready to be synced and are marked as RS_TEMPORARY because of any of
+ * the reasons mentioned above, then the API also waits and retries until the
+ * slots are marked as RS_PERSISTENT (which means sync-ready). Refer to the
+ * comments in SyncReplicationSlots() for more details.
+ *
  * Any standby synchronized slots will be dropped if they no longer need
  * to be synchronized. See comment atop drop_local_obsolete_slots() for more
  * details.
@@ -64,6 +70,7 @@
 #include "storage/procarray.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
@@ -100,6 +107,16 @@ typedef struct SlotSyncCtxStruct
 	slock_t		mutex;
 } SlotSyncCtxStruct;
 
+/*
+ * Structure holding parameters that need to be freed on error in
+ * pg_sync_replication_slots()
+ */
+typedef struct SlotSyncApiFailureParams
+{
+	WalReceiverConn *wrconn;
+	List			*slot_names;
+} SlotSyncApiFailureParams;
+
 static SlotSyncCtxStruct *SlotSyncCtx = NULL;
 
 /* GUC variable */
@@ -112,6 +129,7 @@ bool		sync_replication_slots = false;
  */
 #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000	/* 30s */
+#define SLOTSYNC_API_NAPTIME_MS         2000	/* 2s */
 
 static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
 
@@ -147,6 +165,7 @@ typedef struct RemoteSlot
 
 static void slotsync_failure_callback(int code, Datum arg);
 static void update_synced_slots_inactive_since(void);
+static void slotsync_api_reread_config(void);
 
 /*
  * If necessary, update the local synced slot's metadata based on the data
@@ -553,11 +572,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
  * local ones, then update the LSNs and persist the local synced slot for
  * future synchronization; otherwise, do nothing.
  *
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist. It is utilized by the pg_sync_replication_slots() API.
+ *
  * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
  * false.
  */
 static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+									 bool *slot_persistence_pending)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		found_consistent_snapshot = false;
@@ -576,11 +599,18 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/*
 		 * The remote slot didn't catch up to locally reserved position.
 		 *
-		 * We do not drop the slot because the restart_lsn can be ahead of the
-		 * current location when recreating the slot in the next cycle. It may
-		 * take more time to create such a slot. Therefore, we keep this slot
-		 * and attempt the synchronization in the next cycle.
+		 * We do not drop the slot because the restart_lsn can be
+		 * ahead of the current location when recreating the slot in
+		 * the next cycle. It may take more time to create such a
+		 * slot. Therefore, we keep this slot and attempt the
+		 * synchronization in the next cycle.
+		 *
+		 * We also update the slot_persistence_pending parameter, so
+		 * the API can retry.
 		 */
+		if (slot_persistence_pending)
+			*slot_persistence_pending = true;
+
 		return false;
 	}
 
@@ -595,6 +625,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 				errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
 						  LSN_FORMAT_ARGS(slot->data.restart_lsn)));
 
+		/* Set this, so that API can retry */
+		if (slot_persistence_pending)
+			*slot_persistence_pending = true;
+
 		return false;
 	}
 
@@ -618,10 +652,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * updated. The slot is then persisted and is considered as sync-ready for
  * periodic syncs.
  *
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist. It is utilized by the pg_sync_replication_slots() API.
+ *
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+					 bool *slot_persistence_pending)
 {
 	ReplicationSlot *slot;
 	XLogRecPtr	latestFlushPtr;
@@ -715,7 +753,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		if (slot->data.persistency == RS_TEMPORARY)
 		{
 			slot_updated = update_and_persist_local_synced_slot(remote_slot,
-																remote_dbid);
+																remote_dbid,
+																slot_persistence_pending);
 		}
 
 		/* Slot ready for sync, so sync it. */
@@ -784,7 +823,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
 
-		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+		update_and_persist_local_synced_slot(remote_slot, remote_dbid,
+											 slot_persistence_pending);
 
 		slot_updated = true;
 	}
@@ -795,15 +835,23 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 }
 
 /*
- * Synchronize slots.
+ * Fetch remote slots.
  *
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * If slot_names is NIL, fetches all failover logical slots from the
+ * primary server, otherwise fetches only the ones with names in slot_names.
+ *
+ * Parameters:
+ *	wrconn - Connection to the primary server
+ *	slot_names - List of slot names (char *) to fetch from primary,
+ *				or NIL to fetch all failover logical slots.
+ *
+ * Returns:
+ *	List of remote slot information structures. Returns NIL if no slot
+ *	is found.
  *
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
  */
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
 {
 #define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
@@ -812,29 +860,47 @@ synchronize_slots(WalReceiverConn *wrconn)
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
 	List	   *remote_slot_list = NIL;
-	bool		some_slot_updated = false;
-	bool		started_tx = false;
-	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
-		" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
-		" database, invalidation_reason"
-		" FROM pg_catalog.pg_replication_slots"
-		" WHERE failover and NOT temporary";
-
-	/* The syscache access in walrcv_exec() needs a transaction env. */
-	if (!IsTransactionState())
+	StringInfoData query;
+
+	initStringInfo(&query);
+	appendStringInfoString(&query,
+				"SELECT slot_name, plugin, confirmed_flush_lsn,"
+				" restart_lsn, catalog_xmin, two_phase,"
+				" two_phase_at, failover,"
+				" database, invalidation_reason"
+				" FROM pg_catalog.pg_replication_slots"
+				" WHERE failover and NOT temporary");
+
+	if (slot_names != NIL)
 	{
-		StartTransactionCommand();
-		started_tx = true;
+		ListCell   *lc;
+		bool		first_slot = true;
+
+		/*
+		 * Construct the query to fetch only the specified slots
+		 */
+		appendStringInfoString(&query, " AND slot_name IN (");
+
+		foreach(lc, slot_names)
+		{
+			char *slot_name = (char *) lfirst(lc);
+
+			if (!first_slot)
+				appendStringInfoString(&query, ", ");
+
+			appendStringInfo(&query, "'%s'", slot_name);
+			first_slot = false;
+		}
+		appendStringInfoString(&query, ")");
 	}
 
 	/* Execute the query */
-	res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
+	res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
 				errmsg("could not fetch failover logical slots info from the primary server: %s",
 					   res->err));
 
-	/* Construct the remote_slot tuple and synchronize each slot locally */
 	tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
 	{
@@ -885,7 +951,6 @@ synchronize_slots(WalReceiverConn *wrconn)
 		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
 			GetSlotInvalidationCause(TextDatumGetCString(d));
 
-		/* Sanity check */
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
 		/*
@@ -905,12 +970,38 @@ synchronize_slots(WalReceiverConn *wrconn)
 			remote_slot->invalidated == RS_INVAL_NONE)
 			pfree(remote_slot);
 		else
-			/* Create list of remote slots */
 			remote_slot_list = lappend(remote_slot_list, remote_slot);
 
 		ExecClearTuple(tupslot);
 	}
 
+	walrcv_clear_result(res);
+	pfree(query.data);
+
+	return remote_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Takes a list of remote slots and synchronizes them locally. Creates the
+ * slots if not present on the standby and updates existing ones.
+ *
+ * Parameters:
+ * wrconn - Connection to the primary server
+ * remote_slot_list - List of RemoteSlot structures to synchronize.
+ * slot_persistence_pending - boolean used by pg_sync_replication_slots
+ * 							  API to track if any slots could not be
+ * 							  persisted and need to be retried.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
+				  bool *slot_persistence_pending)
+{
+	bool		some_slot_updated = false;
+
 	/* Drop local slots that no longer need to be synced. */
 	drop_local_obsolete_slots(remote_slot_list);
 
@@ -926,19 +1017,12 @@ synchronize_slots(WalReceiverConn *wrconn)
 		 */
 		LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid,
+												  slot_persistence_pending);
 
 		UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 	}
 
-	/* We are done, free remote_slot_list elements */
-	list_free_deep(remote_slot_list);
-
-	walrcv_clear_result(res);
-
-	if (started_tx)
-		CommitTransactionCommand();
-
 	return some_slot_updated;
 }
 
@@ -1186,6 +1270,26 @@ ProcessSlotSyncInterrupts(void)
 		slotsync_reread_config();
 }
 
+/*
+ * Interrupt handler for pg_sync_replication_slots() API.
+ */
+static void
+ProcessSlotSyncAPIInterrupts()
+{
+	CHECK_FOR_INTERRUPTS();
+
+	/* If we've been promoted, then no point continuing. */
+	if (SlotSyncCtx->stopSignaled)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot continue replication slots synchronization"
+						" as standby promotion is triggered")));
+
+	/* error out if configuration parameters changed */
+	if (ConfigReloadPending)
+		slotsync_api_reread_config();
+}
+
 /*
  * Connection cleanup function for slotsync worker.
  *
@@ -1275,7 +1379,7 @@ wait_for_slot_activity(bool some_slot_updated)
 	rc = WaitLatch(MyLatch,
 				   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 				   sleep_ms,
-				   WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+				   WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
 
 	if (rc & WL_LATCH_SET)
 		ResetLatch(MyLatch);
@@ -1505,10 +1609,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
 	for (;;)
 	{
 		bool		some_slot_updated = false;
+		bool		started_tx = false;
+		List	   *remote_slots;
 
 		ProcessSlotSyncInterrupts();
 
-		some_slot_updated = synchronize_slots(wrconn);
+		/*
+		 * The syscache access in fetch_or_refresh_remote_slots() needs a
+		 * transaction env.
+		 */
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+		remote_slots = fetch_remote_slots(wrconn, NIL);
+		some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL);
+		list_free_deep(remote_slots);
+
+		if (started_tx)
+			CommitTransactionCommand();
 
 		wait_for_slot_activity(some_slot_updated);
 	}
@@ -1705,7 +1826,8 @@ SlotSyncShmemInit(void)
 static void
 slotsync_failure_callback(int code, Datum arg)
 {
-	WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+	SlotSyncApiFailureParams *fparams =
+		(SlotSyncApiFailureParams *) DatumGetPointer(arg);
 
 	/*
 	 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
@@ -1732,23 +1854,176 @@ slotsync_failure_callback(int code, Datum arg)
 	if (syncing_slots)
 		reset_syncing_flag();
 
-	walrcv_disconnect(wrconn);
+	if (fparams->slot_names)
+		list_free_deep(fparams->slot_names);
+
+	walrcv_disconnect(fparams->wrconn);
+}
+
+/*
+ * Helper function to extract slot names from a list of remote slots
+ */
+static List *
+extract_slot_names(List *remote_slots)
+{
+	List		*slot_names = NIL;
+	ListCell	*lc;
+	MemoryContext oldcontext;
+
+	/* Switch to long-lived TopMemoryContext to store slot names */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	foreach(lc, remote_slots)
+	{
+		RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);
+		char       *slot_name;
+
+		slot_name = pstrdup(remote_slot->name);
+		slot_names = lappend(slot_names, slot_name);
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return slot_names;
+}
+
+/*
+ * Re-read the config file and check for critical parameter changes.
+ *
+ */
+static void
+slotsync_api_reread_config(void)
+{
+	char       *old_primary_conninfo = pstrdup(PrimaryConnInfo);
+	char       *old_primary_slotname = pstrdup(PrimarySlotName);
+	bool        old_hot_standby_feedback = hot_standby_feedback;
+	bool        conninfo_changed;
+	bool        primary_slotname_changed;
+
+	ConfigReloadPending = false;
+	ProcessConfigFile(PGC_SIGHUP);
+
+	conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
+	primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
+
+	pfree(old_primary_conninfo);
+	pfree(old_primary_slotname);
+
+	/* throw error for certain parameter changes */
+	if (conninfo_changed ||
+		primary_slotname_changed ||
+		(old_hot_standby_feedback != hot_standby_feedback))
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIG_FILE_ERROR),
+				 errmsg("cannot continue slot synchronization due"
+						" to parameter changes"),
+				 errdetail("One or more of primary_conninfo,"
+						   " primary_slot_name or hot_standby_feedback"
+						   " were modified"
+				 errhint("Retry pg_sync_replication_slots() to use the"
+						 " updated configuration.")));
+	}
 }
 
 /*
  * Synchronize the failover enabled replication slots using the specified
  * primary server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready". Retry is done after
+ * SLOTSYNC_API_NAPTIME_MS wait. Exits early if promotion is triggered or
+ * certain critical configuration parameters have changed.
  */
 void
 SyncReplicationSlots(WalReceiverConn *wrconn)
 {
-	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+	SlotSyncApiFailureParams fparams;
+
+	fparams.wrconn = wrconn;
+	fparams.slot_names = NULL;
+
+	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams));
 	{
+		List		*remote_slots = NIL;
+		List		*slot_names = NIL;  /* List of slot names to track */
+
 		check_and_set_sync_info(InvalidPid);
 
 		validate_remote_info(wrconn);
 
-		synchronize_slots(wrconn);
+		/* Retry until all the slots are sync-ready */
+		for (;;)
+		{
+			int		rc;
+			bool	started_tx = false;
+			bool	slot_persistence_pending = false;
+
+			/* Reset flag before every iteration */
+			slot_persistence_pending = false;
+
+			/* Check for interrupts and config changes */
+			ProcessSlotSyncAPIInterrupts();
+
+			/*
+			 * The syscache access in fetch_remote_slots() needs a
+			 * transaction env.
+			 */
+			if (!IsTransactionState()) {
+				StartTransactionCommand();
+				started_tx = true;
+			}
+
+			/*
+			 * Fetch remote slot info for the given slot_names. If slot_names is NIL,
+			 * fetch all failover-enabled slots. Note that we reuse slot_names from
+			 * the first iteration; re-fetching all failover slots each time could
+			 * cause an endless loop. Instead of reprocessing only the pending slots
+			 * in each iteration, it's better to process all the slots received in
+			 * the first iteration. This ensures that by the time we're done, all
+			 * slots reflect the latest values.
+			 */
+			remote_slots = fetch_remote_slots(wrconn, slot_names);
+
+			/* Attempt to synchronize slots */
+			synchronize_slots(wrconn, remote_slots, &slot_persistence_pending);
+
+			/*
+			 * If slot_persistence_pending is true, extract slot names
+			 * for future iterations (only needed if we haven't done it yet)
+			 */
+			if (slot_names == NIL && slot_persistence_pending)
+			{
+				slot_names = extract_slot_names(remote_slots);
+
+				/* Update the failure structure so that it can be freed on error */
+				fparams.slot_names = slot_names;
+			}
+
+			/* Free the current remote_slots list */
+			list_free_deep(remote_slots);
+
+			/* Commit transaction if we started it */
+			if (started_tx)
+				CommitTransactionCommand();
+
+			/* Done if all slots are persisted i.e are sync-ready */
+			if (!slot_persistence_pending)
+				break;
+
+			/* Wait before retrying */
+			rc = WaitLatch(MyLatch,
+					WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					SLOTSYNC_API_NAPTIME_MS,
+					WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
+
+			if (rc & WL_LATCH_SET)
+				ResetLatch(MyLatch);
+
+		}
+
+		if (slot_names)
+			list_free_deep(slot_names);
 
 		/* Cleanup the synced temporary slots */
 		ReplicationSlotCleanup(true);
@@ -1756,5 +2031,5 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 		/* We are done with sync, so reset sync flag */
 		reset_syncing_flag();
 	}
-	PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+	PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(&fparams));
 }
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..16b3b04d3c4 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN	"Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN	"Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication parallel apply process."
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP	"Waiting for the primary to catch-up."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN	"Waiting in main loop of WAL receiver process."
 WAL_SENDER_MAIN	"Waiting in main loop of WAL sender process."
-- 
2.47.3

