From b11c33159de217d21c188cfa18af0399e1277e0d Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Mon, 11 Aug 2025 03:44:55 -0400
Subject: [PATCH v5] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
 doc/src/sgml/func/func-admin.sgml               |   4 +-
 doc/src/sgml/logicaldecoding.sgml               |  40 +--
 src/backend/replication/logical/slotsync.c      | 437 ++++++++++++++++++++----
 src/backend/utils/activity/wait_event_names.txt |   2 +-
 4 files changed, 383 insertions(+), 100 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..3608610 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1478,9 +1478,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionally,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 77c720c..6e4251a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
    <sect2 id="logicaldecoding-replication-slots-synchronization">
     <title>Replication Slot Synchronization</title>
     <para>
-     The logical replication slots on the primary can be synchronized to
-     the hot standby by using the <literal>failover</literal> parameter of
+     The logical replication slots on the primary can be enabled for
+     synchronization to the hot standby by using the
+     <literal>failover</literal> parameter of
      <link linkend="pg-create-logical-replication-slot">
      <function>pg_create_logical_replication_slot</function></link>, or by
      using the <link linkend="sql-createsubscription-params-with-failover">
      <literal>failover</literal></link> option of
-     <command>CREATE SUBSCRIPTION</command> during slot creation.
-     Additionally, enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link> on the standby
-     is required. By enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link>
-     on the standby, the failover slots can be synchronized periodically in
+     <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+     synchronization can be be performed either manually by calling
+     <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby, or automatically by enabling
+     <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> on the standby.
+     When <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> is enabled
+     on the standby, the failover slots are periodically synchronized by
      the slotsync worker. For the synchronization to work, it is mandatory to
      have a physical replication slot between the primary and the standby (i.e.,
      <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      receiving the WAL up to the latest flushed position on the primary server.
     </para>
 
-    <note>
-     <para>
-      While enabling <link linkend="guc-sync-replication-slots">
-      <varname>sync_replication_slots</varname></link> allows for automatic
-      periodic synchronization of failover slots, they can also be manually
-      synchronized using the <link linkend="pg-sync-replication-slots">
-      <function>pg_sync_replication_slots</function></link> function on the standby.
-      However, this function is primarily intended for testing and debugging and
-      should be used with caution. Unlike automatic synchronization, it does not
-      include cyclic retries, making it more prone to synchronization failures,
-      particularly during initial sync scenarios where the required WAL files
-      or catalog rows for the slot might have already been removed or are at risk
-      of being removed on the standby. In contrast, automatic synchronization
-      via <varname>sync_replication_slots</varname> provides continuous slot
-      updates, enabling seamless failover and supporting high availability.
-      Therefore, it is the recommended method for synchronizing slots.
-     </para>
-    </note>
-
     <para>
      When slot synchronization is configured as recommended,
      and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 3773844..f9eec0b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -113,6 +113,7 @@ bool		sync_replication_slots = false;
  */
 #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000	/* 30s */
+#define SLOTSYNC_API_NAPTIME_MS         2000	/* 2s */
 
 static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
 
@@ -146,6 +147,7 @@ typedef struct RemoteSlot
 	ReplicationSlotInvalidationCause invalidated;
 } RemoteSlot;
 
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
 static void slotsync_failure_callback(int code, Datum arg);
 static void update_synced_slots_inactive_since(void);
 
@@ -166,7 +168,8 @@ static void update_synced_slots_inactive_since(void);
 static bool
 update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 						 bool *found_consistent_snapshot,
-						 bool *remote_slot_precedes)
+						 bool *remote_slot_precedes,
+						 int   sync_iterations)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		updated_xmin_or_lsn = false;
@@ -209,15 +212,21 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		 * to understand why the slot is not sync-ready. In the case of a
 		 * persistent slot, it would be a more common case and won't directly
 		 * impact the users, so we used DEBUG1 level to log the message.
+		 *
+		 * If called from pg_sync_replication_slots(), log message only for
+		 * the first iteration.
 		 */
-		ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
-				errmsg("could not synchronize replication slot \"%s\"",
+		if (AmLogicalSlotSyncWorkerProcess() || sync_iterations == 1)
+		{
+			ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
+				errmsg("Replication slot \"%s\" is not sync ready; will keep retrying",
 					   remote_slot->name),
-				errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
-						  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
-						  remote_slot->catalog_xmin,
-						  LSN_FORMAT_ARGS(slot->data.restart_lsn),
-						  slot->data.catalog_xmin));
+				errdetail("Attempting Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
+				LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+				remote_slot->catalog_xmin,
+				LSN_FORMAT_ARGS(slot->data.restart_lsn),
+				slot->data.catalog_xmin));
+		}
 
 		if (remote_slot_precedes)
 			*remote_slot_precedes = true;
@@ -558,7 +567,9 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
  * false.
  */
 static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(WalReceiverConn * wrconn,
+	RemoteSlot * remote_slot, Oid remote_dbid, bool *sync_start_pending,
+	int sync_iterations)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		found_consistent_snapshot = false;
@@ -566,7 +577,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 
 	(void) update_local_synced_slot(remote_slot, remote_dbid,
 									&found_consistent_snapshot,
-									&remote_slot_precedes);
+									&remote_slot_precedes,
+									sync_iterations);
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
@@ -575,13 +587,40 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	if (remote_slot_precedes)
 	{
 		/*
-		 * The remote slot didn't catch up to locally reserved position.
+		 * The remote slot didn't catch up to locally reserved
+		 * position.
 		 *
-		 * We do not drop the slot because the restart_lsn can be ahead of the
-		 * current location when recreating the slot in the next cycle. It may
-		 * take more time to create such a slot. Therefore, we keep this slot
-		 * and attempt the synchronization in the next cycle.
+		 * We do not drop the slot because the restart_lsn can be
+		 * ahead of the current location when recreating the slot in
+		 * the next cycle. It may take more time to create such a
+		 * slot. Therefore, we keep this slot and attempt the
+		 * synchronization in the next cycle.
+		 *
+		 * If called from pg_sync_replication_slots(), set flag
+		 * indicating that the slot is not yet sync ready, so that it
+		 * can be retried. Log a message once every 5 iterations,
+		 * which should be around 10 seconds.
 		 */
+		if (!AmLogicalSlotSyncWorkerProcess())
+		{
+			if (sync_start_pending)
+				*sync_start_pending = true;
+
+			if (sync_iterations % 5 == 0)
+			{
+				/* Log a message every ten seconds */
+				ereport(LOG,
+						errmsg("waiting for remote slot \"%s\" LSN (%X/%X)"
+						" and catalog xmin %u) to pass local slot LSN"
+						" (%X/%X) and catalog xmin (%u)",
+						remote_slot->name,
+						LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+						remote_slot->catalog_xmin,
+						LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+						MyReplicationSlot->data.catalog_xmin));
+			}
+		}
+
 		return false;
 	}
 
@@ -622,7 +661,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn * wrconn, RemoteSlot * remote_slot,
+			Oid remote_dbid, bool *sync_start_pending, int sync_iterations)
 {
 	ReplicationSlot *slot;
 	XLogRecPtr	latestFlushPtr;
@@ -715,8 +755,11 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/* Slot not ready yet, let's attempt to make it sync-ready now. */
 		if (slot->data.persistency == RS_TEMPORARY)
 		{
-			slot_updated = update_and_persist_local_synced_slot(remote_slot,
-																remote_dbid);
+			slot_updated = update_and_persist_local_synced_slot(wrconn,
+																remote_slot,
+																remote_dbid,
+																sync_start_pending,
+																sync_iterations);
 		}
 
 		/* Slot ready for sync, so sync it. */
@@ -738,7 +781,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 										   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
 
 			slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
-													NULL, NULL);
+													NULL, NULL, sync_iterations);
 		}
 	}
 	/* Otherwise create the slot first. */
@@ -785,7 +828,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
 
-		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+		update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid,
+											 sync_start_pending,
+											 sync_iterations);
 
 		slot_updated = true;
 	}
@@ -796,15 +841,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 }
 
 /*
- * Synchronize slots.
+ * Fetch remote slots.
  *
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * Gets the failover logical slots info from the primary server and creates
+ * a list of remote slots that need to be synchronized locally.
  *
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ * NOTE: Caller must ensure a transaction is active before calling this function.
+ *
+ * Returns a list of RemoteSlot structures, or NIL if no slots need syncing.
  */
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn)
 {
 #define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
@@ -813,21 +860,12 @@ synchronize_slots(WalReceiverConn *wrconn)
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
 	List	   *remote_slot_list = NIL;
-	bool		some_slot_updated = false;
-	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
 		" database, invalidation_reason"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
-	/* The syscache access in walrcv_exec() needs a transaction env. */
-	if (!IsTransactionState())
-	{
-		StartTransactionCommand();
-		started_tx = true;
-	}
-
 	/* Execute the query */
 	res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
 	if (res->status != WALRCV_OK_TUPLES)
@@ -835,7 +873,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 				errmsg("could not fetch failover logical slots info from the primary server: %s",
 					   res->err));
 
-	/* Construct the remote_slot tuple and synchronize each slot locally */
+	/* Construct the remote_slot tuple and build list of slots to sync */
 	tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
 	{
@@ -912,6 +950,180 @@ synchronize_slots(WalReceiverConn *wrconn)
 		ExecClearTuple(tupslot);
 	}
 
+	walrcv_clear_result(res);
+
+	return remote_slot_list;
+}
+
+/*
+ * Update remote slots list with current values.
+ *
+ * Takes a list of RemoteSlot structures and queries the primary server to
+ * get updated values for those specific slots. This is useful for refreshing
+ * slot information without fetching all failover slots again.
+ *
+ * NOTE: Caller must ensure a transaction is active before calling this
+ * function.
+ *
+ * Parameters: wrconn - Connection to the primary server remote_slot_list -
+ * List of RemoteSlot structures to update
+ *
+ * Returns the updated list, or the original list if query fails. Slots that
+ * no longer exist on the primary will be removed from the list.
+ */
+static List *
+refresh_remote_slots(WalReceiverConn * wrconn, List * remote_slot_list)
+{
+#define UPDATE_SLOTSYNC_COLUMN_COUNT 10
+	Oid		slotRow[UPDATE_SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
+	LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
+	WalRcvExecResult   *res;
+	TupleTableSlot	   *tupslot;
+	List			   *updated_slot_list = NIL;
+	StringInfoData		query;
+	ListCell		   *lc;
+	bool				first_slot = true;
+
+	/* If the input list is empty, return it as-is */
+	if (remote_slot_list == NIL)
+		return remote_slot_list;
+
+	/* Build query with slot names from the input list */
+	initStringInfo(&query);
+	appendStringInfoString(&query,
+				"SELECT slot_name, plugin, confirmed_flush_lsn,"
+				" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
+				" database, invalidation_reason"
+				" FROM pg_catalog.pg_replication_slots"
+				" WHERE failover and NOT temporary AND slot_name IN (");
+
+	/* Add slot names to the IN clause */
+	foreach(lc, remote_slot_list)
+	{
+		RemoteSlot     *remote_slot = (RemoteSlot *) lfirst(lc);
+
+		if (!first_slot)
+			appendStringInfoString(&query, ", ");
+
+		appendStringInfo(&query, "'%s'", remote_slot->name);
+		first_slot = false;
+	}
+	appendStringInfoString(&query, ")");
+
+	/* Execute the query */
+	res = walrcv_exec(wrconn, query.data, UPDATE_SLOTSYNC_COLUMN_COUNT, slotRow);
+	if (res->status != WALRCV_OK_TUPLES)
+	{
+		ereport(WARNING,
+		errmsg("could not fetch updated failover logical slots info"
+			   " from the primary server: %s",
+			   res->err));
+		pfree(query.data);
+		return remote_slot_list;
+	}
+
+	/* Process the updated slot information */
+	tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+	{
+		bool		isnull;
+		RemoteSlot     *remote_slot = palloc0(sizeof(RemoteSlot));
+		Datum		d;
+		int		col = 0;
+
+		remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
+								  &isnull));
+		Assert(!isnull);
+
+		remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
+								  &isnull));
+		Assert(!isnull);
+
+		/*
+		 * Handle possible null values for LSN and Xmin if slot is
+		 * invalidated on the primary server.
+		 */
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
+			DatumGetLSN(d);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
+			DatumGetTransactionId(d);
+
+		remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
+								   &isnull));
+		Assert(!isnull);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+		remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
+								  &isnull));
+		Assert(!isnull);
+
+		remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
+							   ++col, &isnull));
+		Assert(!isnull);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
+			GetSlotInvalidationCause(TextDatumGetCString(d));
+
+		/* Sanity check */
+		Assert(col == UPDATE_SLOTSYNC_COLUMN_COUNT);
+
+		/*
+		 * Apply the same ephemeral slot filtering as in
+		 * fetch_remote_slots. Skip slots that are in RS_EPHEMERAL
+		 * state (invalid LSNs/xmin but not explicitly invalidated).
+		 */
+		if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
+			 XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
+			 !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
+			 remote_slot->invalidated == RS_INVAL_NONE)
+			pfree(remote_slot);
+		else
+			/* Add to updated list */
+			updated_slot_list = lappend(updated_slot_list, remote_slot);
+
+		ExecClearTuple(tupslot);
+	}
+
+	walrcv_clear_result(res);
+	pfree(query.data);
+
+	/*
+	 * Free the original list structures (but not the slot names, as
+	 * they're reused)
+	 */
+	foreach(lc, remote_slot_list)
+	{
+		RemoteSlot     *old_slot = (RemoteSlot *) lfirst(lc);
+		pfree(old_slot);
+	}
+	list_free(remote_slot_list);
+
+	return updated_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Takes a list of remote slots and synchronizes them locally. Creates the
+ * slots if not present on the standby and updates existing ones.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
+				  List **pending_sync_start_slots, int sync_iterations)
+{
+	bool		some_slot_updated = false;
+
 	/* Drop local slots that no longer need to be synced. */
 	drop_local_obsolete_slots(remote_slot_list);
 
@@ -919,6 +1131,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
 	{
 		Oid			remote_dbid = get_database_oid(remote_slot->database, false);
+		bool		sync_start_pending = false;
 
 		/*
 		 * Use shared lock to prevent a conflict with
@@ -927,19 +1140,16 @@ synchronize_slots(WalReceiverConn *wrconn)
 		 */
 		LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+		some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+					  remote_dbid, &sync_start_pending, sync_iterations);
+
+		/* Only append to list if caller wants it and sync is pending */
+		if (pending_sync_start_slots != NULL && sync_start_pending)
+			*pending_sync_start_slots = lappend(*pending_sync_start_slots, remote_slot);
 
 		UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 	}
 
-	/* We are done, free remote_slot_list elements */
-	list_free_deep(remote_slot_list);
-
-	walrcv_clear_result(res);
-
-	if (started_tx)
-		CommitTransactionCommand();
-
 	return some_slot_updated;
 }
 
@@ -1131,7 +1341,7 @@ slotsync_reread_config(void)
 	bool		conninfo_changed;
 	bool		primary_slotname_changed;
 
-	Assert(sync_replication_slots);
+	Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots);
 
 	ConfigReloadPending = false;
 	ProcessConfigFile(PGC_SIGHUP);
@@ -1252,31 +1462,38 @@ slotsync_worker_onexit(int code, Datum arg)
  * sync-cycles is reset to the minimum (200ms).
  */
 static void
-wait_for_slot_activity(bool some_slot_updated)
+wait_for_slot_activity(bool some_slot_updated, bool called_from_api)
 {
-	int			rc;
+	int		rc;
+	int		wait_time;
 
-	if (!some_slot_updated)
-	{
-		/*
-		 * No slots were updated, so double the sleep time, but not beyond the
-		 * maximum allowable value.
-		 */
-		sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
-	}
-	else
-	{
+	if (called_from_api) {
 		/*
-		 * Some slots were updated since the last sleep, so reset the sleep
-		 * time.
+		 * When called from pg_sync_replication_slots, use a fixed 2
+		 * second wait time.
 		 */
-		sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
+		wait_time = SLOTSYNC_API_NAPTIME_MS;
+	} else {
+		if (!some_slot_updated) {
+			/*
+			 * No slots were updated, so double the sleep time,
+			 * but not beyond the maximum allowable value.
+			 */
+			sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
+		} else {
+			/*
+			 * Some slots were updated since the last sleep, so
+			 * reset the sleep time.
+			 */
+			sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
+		}
+		wait_time = sleep_ms;
 	}
 
 	rc = WaitLatch(MyLatch,
 				   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-				   sleep_ms,
-				   WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+				   wait_time,
+				   WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
 
 	if (rc & WL_LATCH_SET)
 		ResetLatch(MyLatch);
@@ -1505,12 +1722,28 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
 	for (;;)
 	{
 		bool		some_slot_updated = false;
+		List	       *remote_slots;
+		bool		started_tx = false;
 
 		ProcessSlotSyncInterrupts(wrconn);
 
-		some_slot_updated = synchronize_slots(wrconn);
+		/*
+		 * The syscache access in fetch_remote_slots() needs a
+		 * transaction env.
+		 */
+		if (!IsTransactionState()) {
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+		remote_slots = fetch_remote_slots(wrconn);
+		some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL, 0);
+		list_free_deep(remote_slots);
+
+		if (started_tx)
+			CommitTransactionCommand();
 
-		wait_for_slot_activity(some_slot_updated);
+		wait_for_slot_activity(some_slot_updated, false);
 	}
 
 	/*
@@ -1736,19 +1969,85 @@ slotsync_failure_callback(int code, Datum arg)
 }
 
 /*
- * Synchronize the failover enabled replication slots using the specified
- * primary server connection.
+ * Synchronize failover enabled replication slots using the specified primary
+ * server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready". Retry is done after 2
+ * sec wait. Exits early is promotion is triggered.
  */
 void
-SyncReplicationSlots(WalReceiverConn *wrconn)
+SyncReplicationSlots(WalReceiverConn * wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
+		List	       *remote_slots;
+		bool		started_tx = false;
+		int			sync_iterations = 0;
+
 		check_and_set_sync_info(InvalidPid);
 
 		validate_remote_info(wrconn);
 
-		synchronize_slots(wrconn);
+		/*
+		 * The syscache access in fetch_remote_slots() needs a
+		 * transaction env.
+		 */
+		if (!IsTransactionState()) {
+			StartTransactionCommand();
+			started_tx = true;
+		}
+
+		remote_slots = fetch_remote_slots(wrconn);
+
+		/* Retry until all slots are sync ready atleast */
+		for (;;)
+		{
+			bool		some_slot_updated = false;
+			List	       *pending_sync_start_slots = NIL;
+
+			sync_iterations++;
+
+			/* Refresh remote slot data */
+			remote_slots = refresh_remote_slots(wrconn, remote_slots);
+
+			/* Attempt to synchronize slots */
+			some_slot_updated = synchronize_slots(wrconn, remote_slots,
+						 &pending_sync_start_slots, sync_iterations);
+
+			/* Done if all slots are atleast sync ready */
+			if (pending_sync_start_slots == NIL)
+				break;
+			else
+			{
+				list_free(pending_sync_start_slots);
+				pending_sync_start_slots = NIL;
+
+				/* wait for 2 seconds before retrying */
+				wait_for_slot_activity(some_slot_updated, true);
+
+				/*
+				 * If we've been promoted, then no point
+				 * continuing.
+				 */
+				if (SlotSyncCtx->stopSignaled)
+				{
+					ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("exiting from slot synchronization as"
+								" promotion is triggered")));
+					break;
+				}
+
+				/* Handle any termination request if any */
+				ProcessSlotSyncInterrupts(wrconn);
+			}
+		}
+
+		list_free_deep(remote_slots);
+
+		if (started_tx)
+			CommitTransactionCommand();
 
 		/* Cleanup the synced temporary slots */
 		ReplicationSlotCleanup(true);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d..3497f0f 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,8 +62,8 @@ LOGICAL_APPLY_MAIN	"Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN	"Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication parallel apply process."
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP	"Waiting for the primary to catch-up."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN	"Waiting in main loop of WAL receiver process."
 WAL_SENDER_MAIN	"Waiting in main loop of WAL sender process."
-- 
1.8.3.1

