From 05459fd3b78032e08241bb0f30df500e07b8b9be Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Tue, 19 Aug 2025 15:12:59 +1000
Subject: [PATCH v7] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
 doc/src/sgml/func/func-admin.sgml             |   4 +-
 doc/src/sgml/logicaldecoding.sgml             |  40 +--
 src/backend/replication/logical/slotsync.c    | 314 +++++++++++++-----
 .../utils/activity/wait_event_names.txt       |   2 +-
 4 files changed, 252 insertions(+), 108 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe56f4..360861004b2 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1478,9 +1478,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionally,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 77c720c422c..6e4251a810d 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
    <sect2 id="logicaldecoding-replication-slots-synchronization">
     <title>Replication Slot Synchronization</title>
     <para>
-     The logical replication slots on the primary can be synchronized to
-     the hot standby by using the <literal>failover</literal> parameter of
+     The logical replication slots on the primary can be enabled for
+     synchronization to the hot standby by using the
+     <literal>failover</literal> parameter of
      <link linkend="pg-create-logical-replication-slot">
      <function>pg_create_logical_replication_slot</function></link>, or by
      using the <link linkend="sql-createsubscription-params-with-failover">
      <literal>failover</literal></link> option of
-     <command>CREATE SUBSCRIPTION</command> during slot creation.
-     Additionally, enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link> on the standby
-     is required. By enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link>
-     on the standby, the failover slots can be synchronized periodically in
+     <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+     synchronization can be be performed either manually by calling
+     <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby, or automatically by enabling
+     <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> on the standby.
+     When <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> is enabled
+     on the standby, the failover slots are periodically synchronized by
      the slotsync worker. For the synchronization to work, it is mandatory to
      have a physical replication slot between the primary and the standby (i.e.,
      <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      receiving the WAL up to the latest flushed position on the primary server.
     </para>
 
-    <note>
-     <para>
-      While enabling <link linkend="guc-sync-replication-slots">
-      <varname>sync_replication_slots</varname></link> allows for automatic
-      periodic synchronization of failover slots, they can also be manually
-      synchronized using the <link linkend="pg-sync-replication-slots">
-      <function>pg_sync_replication_slots</function></link> function on the standby.
-      However, this function is primarily intended for testing and debugging and
-      should be used with caution. Unlike automatic synchronization, it does not
-      include cyclic retries, making it more prone to synchronization failures,
-      particularly during initial sync scenarios where the required WAL files
-      or catalog rows for the slot might have already been removed or are at risk
-      of being removed on the standby. In contrast, automatic synchronization
-      via <varname>sync_replication_slots</varname> provides continuous slot
-      updates, enabling seamless failover and supporting high availability.
-      Therefore, it is the recommended method for synchronizing slots.
-     </para>
-    </note>
-
     <para>
      When slot synchronization is configured as recommended,
      and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 37738440113..60d4776f760 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -91,6 +91,9 @@
  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
  * last_start_time before exiting, so that postmaster can start the worker
  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ *
+ * The 'slot_persistence_pending' flag is used by pg_sync_replication_slots()
+ * to do retries if the slot did not persist while syncing.
  */
 typedef struct SlotSyncCtxStruct
 {
@@ -99,6 +102,7 @@ typedef struct SlotSyncCtxStruct
 	bool		syncing;
 	time_t		last_start_time;
 	slock_t		mutex;
+	bool		slot_persistence_pending;
 } SlotSyncCtxStruct;
 
 static SlotSyncCtxStruct *SlotSyncCtx = NULL;
@@ -113,6 +117,7 @@ bool		sync_replication_slots = false;
  */
 #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000	/* 30s */
+#define SLOTSYNC_API_NAPTIME_MS         2000	/* 2s */
 
 static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
 
@@ -146,6 +151,7 @@ typedef struct RemoteSlot
 	ReplicationSlotInvalidationCause invalidated;
 } RemoteSlot;
 
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
 static void slotsync_failure_callback(int code, Datum arg);
 static void update_synced_slots_inactive_since(void);
 
@@ -211,13 +217,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		 * impact the users, so we used DEBUG1 level to log the message.
 		 */
 		ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
-				errmsg("could not synchronize replication slot \"%s\"",
-					   remote_slot->name),
-				errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
-						  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
-						  remote_slot->catalog_xmin,
-						  LSN_FORMAT_ARGS(slot->data.restart_lsn),
-						  slot->data.catalog_xmin));
+			errmsg("could not synchronize replication slot \"%s\"",
+				   remote_slot->name),
+			errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
+			LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+			remote_slot->catalog_xmin,
+			LSN_FORMAT_ARGS(slot->data.restart_lsn),
+			slot->data.catalog_xmin));
 
 		if (remote_slot_precedes)
 			*remote_slot_precedes = true;
@@ -565,8 +571,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	bool		remote_slot_precedes = false;
 
 	(void) update_local_synced_slot(remote_slot, remote_dbid,
-									&found_consistent_snapshot,
-									&remote_slot_precedes);
+							&found_consistent_snapshot,
+							&remote_slot_precedes);
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
@@ -575,13 +581,18 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	if (remote_slot_precedes)
 	{
 		/*
-		 * The remote slot didn't catch up to locally reserved position.
+		 * The remote slot didn't catch up to locally reserved
+		 * position.
 		 *
-		 * We do not drop the slot because the restart_lsn can be ahead of the
-		 * current location when recreating the slot in the next cycle. It may
-		 * take more time to create such a slot. Therefore, we keep this slot
-		 * and attempt the synchronization in the next cycle.
+		 * We do not drop the slot because the restart_lsn can be
+		 * ahead of the current location when recreating the slot in
+		 * the next cycle. It may take more time to create such a
+		 * slot. Therefore, we keep this slot and attempt the
+		 * synchronization in the next cycle. Update the
+		 * slot_persistence_pending flag, so the API can retry.
 		 */
+		SlotSyncCtx->slot_persistence_pending = true;
+
 		return false;
 	}
 
@@ -596,11 +607,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 				errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
 						  LSN_FORMAT_ARGS(slot->data.restart_lsn)));
 
+		/* update flag, so that we retry */
+		SlotSyncCtx->slot_persistence_pending = true;
+
 		return false;
 	}
 
 	ReplicationSlotPersist();
 
+	/* slot has been persisted, no need to retry */
+	SlotSyncCtx->slot_persistence_pending |= false;
+
 	ereport(LOG,
 			errmsg("newly created replication slot \"%s\" is sync-ready now",
 				   remote_slot->name));
@@ -622,7 +639,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn * wrconn, RemoteSlot * remote_slot,
+					Oid remote_dbid)
 {
 	ReplicationSlot *slot;
 	XLogRecPtr	latestFlushPtr;
@@ -715,8 +733,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/* Slot not ready yet, let's attempt to make it sync-ready now. */
 		if (slot->data.persistency == RS_TEMPORARY)
 		{
-			slot_updated = update_and_persist_local_synced_slot(remote_slot,
-																remote_dbid);
+			slot_updated = update_and_persist_local_synced_slot(remote_slot, remote_dbid);
 		}
 
 		/* Slot ready for sync, so sync it. */
@@ -796,30 +813,66 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 }
 
 /*
- * Synchronize slots.
+ * Fetch or refresh remote slots.
  *
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * If remote_slot_list is NIL, fetches all failover logical slots from the
+ * primary server. If remote_slot_list is provided, refreshes only those
+ * specific slots with current values from the primary server.
  *
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ * NOTE: Caller must ensure a transaction is active before calling this
+ * function.
+ *
+ * Parameters:
+ *   wrconn - Connection to the primary server
+ *   remote_slot_list - List of RemoteSlot structures to refresh, or NIL to
+ *                      fetch all failover slots
+ *
+ * Returns a list of RemoteSlot structures. If refreshing and the query fails,
+ * returns the original list. Slots that no longer exist on the primary will
+ * be removed from the list.
  */
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn, List *target_slot_list)
 {
 #define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
 	LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
-
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
 	List	   *remote_slot_list = NIL;
-	bool		some_slot_updated = false;
+	StringInfoData query;
+	ListCell   *lc;
+	bool		is_refresh = (target_slot_list!= NIL);
+	bool		first_slot = true;
 	bool		started_tx = false;
-	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
-		" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
-		" database, invalidation_reason"
-		" FROM pg_catalog.pg_replication_slots"
-		" WHERE failover and NOT temporary";
+
+	/* Build the query based on whether we're fetching all or refreshing specific slots */
+	initStringInfo(&query);
+	appendStringInfoString(&query,
+				"SELECT slot_name, plugin, confirmed_flush_lsn,"
+				" restart_lsn, catalog_xmin, two_phase,"
+				" two_phase_at, failover,"
+				" database, invalidation_reason"
+				" FROM pg_catalog.pg_replication_slots"
+				" WHERE failover and NOT temporary");
+
+	if (is_refresh)
+	{
+		/* Add IN clause for specific slot names */
+		appendStringInfoString(&query, " AND slot_name IN (");
+
+		foreach(lc, target_slot_list)
+		{
+			RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc);
+
+			if (!first_slot)
+				appendStringInfoString(&query, ", ");
+
+			appendStringInfo(&query, "'%s'", remote_slot->name);
+			first_slot = false;
+		}
+		appendStringInfoString(&query, ")");
+	}
 
 	/* The syscache access in walrcv_exec() needs a transaction env. */
 	if (!IsTransactionState())
@@ -829,13 +882,16 @@ synchronize_slots(WalReceiverConn *wrconn)
 	}
 
 	/* Execute the query */
-	res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
+	res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
 	if (res->status != WALRCV_OK_TUPLES)
+	{
 		ereport(ERROR,
-				errmsg("could not fetch failover logical slots info from the primary server: %s",
-					   res->err));
+			errmsg("could not fetch failover logical slots info"
+				" from the primary server: %s",
+				res->err));
+	}
 
-	/* Construct the remote_slot tuple and synchronize each slot locally */
+	/* Process the slot information */
 	tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
 	{
@@ -844,17 +900,19 @@ synchronize_slots(WalReceiverConn *wrconn)
 		Datum		d;
 		int			col = 0;
 
-		remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
-															 &isnull));
+		remote_slot->name = TextDatumGetCString(slot_getattr(tupslot,
+									++col,
+									&isnull));
 		Assert(!isnull);
 
-		remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
-															   &isnull));
+		remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot,
+									++col,
+									&isnull));
 		Assert(!isnull);
 
 		/*
-		 * It is possible to get null values for LSN and Xmin if slot is
-		 * invalidated on the primary server, so handle accordingly.
+		 * Handle possible null values for LSN and Xmin if slot is
+		 * invalidated on the primary server.
 		 */
 		d = slot_getattr(tupslot, ++col, &isnull);
 		remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
@@ -868,18 +926,20 @@ synchronize_slots(WalReceiverConn *wrconn)
 			DatumGetTransactionId(d);
 
 		remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
-														   &isnull));
+								   &isnull));
 		Assert(!isnull);
 
 		d = slot_getattr(tupslot, ++col, &isnull);
 		remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
 
-		remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
-														  &isnull));
+		remote_slot->failover = DatumGetBool(slot_getattr(tupslot,
+									++col,
+									&isnull));
 		Assert(!isnull);
 
 		remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
-																 ++col, &isnull));
+									 ++col,
+									 &isnull));
 		Assert(!isnull);
 
 		d = slot_getattr(tupslot, ++col, &isnull);
@@ -890,15 +950,8 @@ synchronize_slots(WalReceiverConn *wrconn)
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
 		/*
-		 * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
-		 * slot is valid, that means we have fetched the remote_slot in its
-		 * RS_EPHEMERAL state. In such a case, don't sync it; we can always
-		 * sync it in the next sync cycle when the remote_slot is persisted
-		 * and has valid lsn(s) and xmin values.
-		 *
-		 * XXX: In future, if we plan to expose 'slot->data.persistency' in
-		 * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
-		 * slots in the first place.
+		 * Apply ephemeral slot filtering. Skip slots that are in RS_EPHEMERAL
+		 * state (invalid LSNs/xmin but not explicitly invalidated).
 		 */
 		if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
 			 XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
@@ -906,12 +959,34 @@ synchronize_slots(WalReceiverConn *wrconn)
 			remote_slot->invalidated == RS_INVAL_NONE)
 			pfree(remote_slot);
 		else
-			/* Create list of remote slots */
+			/* Add to updated list */
 			remote_slot_list = lappend(remote_slot_list, remote_slot);
 
 		ExecClearTuple(tupslot);
 	}
 
+	walrcv_clear_result(res);
+	pfree(query.data);
+
+	if (started_tx)
+		CommitTransactionCommand();
+
+	return remote_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Takes a list of remote slots and synchronizes them locally. Creates the
+ * slots if not present on the standby and updates existing ones.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list)
+{
+	bool		some_slot_updated = false;
+
 	/* Drop local slots that no longer need to be synced. */
 	drop_local_obsolete_slots(remote_slot_list);
 
@@ -927,19 +1002,12 @@ synchronize_slots(WalReceiverConn *wrconn)
 		 */
 		LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+		some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+								remote_dbid);
 
 		UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 	}
 
-	/* We are done, free remote_slot_list elements */
-	list_free_deep(remote_slot_list);
-
-	walrcv_clear_result(res);
-
-	if (started_tx)
-		CommitTransactionCommand();
-
 	return some_slot_updated;
 }
 
@@ -1131,7 +1199,7 @@ slotsync_reread_config(void)
 	bool		conninfo_changed;
 	bool		primary_slotname_changed;
 
-	Assert(sync_replication_slots);
+	Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots);
 
 	ConfigReloadPending = false;
 	ProcessConfigFile(PGC_SIGHUP);
@@ -1254,29 +1322,29 @@ slotsync_worker_onexit(int code, Datum arg)
 static void
 wait_for_slot_activity(bool some_slot_updated)
 {
-	int			rc;
+	int		rc;
 
 	if (!some_slot_updated)
 	{
 		/*
-		 * No slots were updated, so double the sleep time, but not beyond the
-		 * maximum allowable value.
+		 * No slots were updated, so double the sleep time,
+		 * but not beyond the maximum allowable value.
 		 */
 		sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
 	}
 	else
 	{
 		/*
-		 * Some slots were updated since the last sleep, so reset the sleep
-		 * time.
+		 * Some slots were updated since the last sleep, so
+		 * reset the sleep time.
 		 */
 		sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
 	}
 
 	rc = WaitLatch(MyLatch,
-				   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-				   sleep_ms,
-				   WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+			WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+			sleep_ms,
+			WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
 
 	if (rc & WL_LATCH_SET)
 		ResetLatch(MyLatch);
@@ -1505,10 +1573,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
 	for (;;)
 	{
 		bool		some_slot_updated = false;
+		bool		started_tx = false;
+		List	       *remote_slots;
 
 		ProcessSlotSyncInterrupts(wrconn);
 
-		some_slot_updated = synchronize_slots(wrconn);
+		/*
+		 * The syscache access in fetch_or_refresh_remote_slots() needs a
+		 * transaction env.
+		 */
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+		remote_slots = fetch_remote_slots(wrconn, NIL);
+		some_slot_updated = synchronize_slots(wrconn, remote_slots);
+		list_free_deep(remote_slots);
+
+		if (started_tx)
+			CommitTransactionCommand();
 
 		wait_for_slot_activity(some_slot_updated);
 	}
@@ -1736,19 +1821,94 @@ slotsync_failure_callback(int code, Datum arg)
 }
 
 /*
- * Synchronize the failover enabled replication slots using the specified
- * primary server connection.
+ * Synchronize failover enabled replication slots using the specified primary
+ * server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready". Retry is done after 2
+ * sec wait. Exits early is promotion is triggered.
  */
 void
-SyncReplicationSlots(WalReceiverConn *wrconn)
+SyncReplicationSlots(WalReceiverConn * wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		List		*remote_slots;
+		List		*prev_slot_list = NIL;
+		bool		started_tx = false;
+
 		check_and_set_sync_info(InvalidPid);
 
 		validate_remote_info(wrconn);
 
-		synchronize_slots(wrconn);
+		/*
+		 * The syscache access in fetch_or_refresh_remote_slots() needs a
+		 * transaction env.
+		 */
+		if (!IsTransactionState()) {
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+
+		/* Retry until all slots are sync ready atleast */
+		for (;;)
+		{
+			int		rc;
+
+			/*
+			 * Refresh the remote slot data. We keep using the previous slot
+			 * list, even if some slots are already sync ready, so that all
+			 * slots are updated with the latest status from the primary.
+			 * Some of the slots in the previous list could have gone away,
+			 * which is why we create a new list here and free the old list
+			 * at the end of the loop.
+			 */
+			remote_slots = fetch_remote_slots(wrconn, prev_slot_list);
+
+			/* Attempt to synchronize slots */
+			synchronize_slots(wrconn, remote_slots);
+
+			/* Done if all slots are atleast sync ready */
+			if (!SlotSyncCtx->slot_persistence_pending)
+				break;
+
+			/* wait for 2 seconds before retrying */
+			rc = WaitLatch(MyLatch,
+					WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					SLOTSYNC_API_NAPTIME_MS,
+					WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
+
+			if (rc & WL_LATCH_SET)
+				ResetLatch(MyLatch);
+
+			/*
+			 * If we've been promoted, then no point
+			 * continuing.
+			 */
+			if (SlotSyncCtx->stopSignaled)
+			{
+				ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("exiting from slot synchronization as"
+							" promotion is triggered")));
+				break;
+			}
+
+			/* Handle any termination request if any */
+			ProcessSlotSyncInterrupts(wrconn);
+
+			/* Free the previous slot-list if it exists */
+			if (prev_slot_list)
+				list_free_deep(prev_slot_list);
+
+			prev_slot_list = remote_slots;
+		}
+
+		list_free_deep(remote_slots);
+
+		if (started_tx)
+			CommitTransactionCommand();
 
 		/* Cleanup the synced temporary slots */
 		ReplicationSlotCleanup(true);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d2ca0..3497f0fa45e 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN	"Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN	"Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication parallel apply process."
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP	"Waiting for the primary to catch-up."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN	"Waiting in main loop of WAL receiver process."
 WAL_SENDER_MAIN	"Waiting in main loop of WAL sender process."
-- 
2.47.3

