From 2764cc594b7b8dc947b6c66193691853de5e83f3 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Tue, 9 Dec 2025 21:11:31 +1100
Subject: [PATCH v31 1/2] Signal backends running pg_sync_replication_slots()
 during promotion.

Previously, during promotion, only the slot synchronization worker was
interrupted to shutdown for promotion. That meant backends
that perform slot synchronization via the pg_sync_replication_slots()
SQL function were not signalled at all because their PIDs were not
recorded in the slot-sync context.

This patch changes behaviour to:
1. Store the backend PID in SlotSyncCtxStruct so the backend performing
   slot synchronization can be signalled.
2. On promotion, send SIGUSR1 to the recorded PID - either
   the slot-sync worker or any backend currently syncing slots.
3. Backends invoking pg_sync_replication_slots() also calls
   ProcessSlotSyncInterrupts() to handle promotion signal as well any
   configuration changes that might result in stopping of
   synchronization.

This patch also acts as a base for a larger patch that improves
pg_sync_replication_slots() to wait for slots to be persisted before
exiting.

Author: Ajin Cherian <itsajin@gmail.com>
Discussion: https://www.postgresql.org/message-id/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com
---
 src/backend/replication/logical/slotsync.c | 145 +++++++++++++--------
 1 file changed, 93 insertions(+), 52 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 7e3b4c4413e..327bb361d61 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -71,9 +71,12 @@
 /*
  * Struct for sharing information to control slot synchronization.
  *
- * The slot sync worker's pid is needed by the startup process to shut it
- * down during promotion. The startup process shuts down the slot sync worker
- * and also sets stopSignaled=true to handle the race condition when the
+ * The pid is either the slot sync worker's pid or the backend's pid running
+ * the SQL function pg_sync_replication_slots(). When the startup process sets
+ * stopSignaled during promotion, it uses this pid to wake up the currently
+ * synchronizing process so that the process can immediately stop its
+ * synchronizing work on seeing stopSignaled set.
+ * Setting stopSignaled is also used to handle the race condition when the
  * postmaster has not noticed the promotion yet and thus may end up restarting
  * the slot sync worker. If stopSignaled is set, the worker will exit in such a
  * case. The SQL function pg_sync_replication_slots() will also error out if
@@ -1195,10 +1198,11 @@ ValidateSlotSyncParams(int elevel)
 }
 
 /*
- * Re-read the config file.
+ * Re-read the config file for slot synchronization.
+ *
+ * Exit or throw errors if relevant GUCs have changed depending on whether
+ * called from slotsync worker or from SQL function pg_sync_replication_slots()
  *
- * Exit if any of the slot sync GUCs have changed. The postmaster will
- * restart it.
  */
 static void
 slotsync_reread_config(void)
@@ -1209,45 +1213,77 @@ slotsync_reread_config(void)
 	bool		old_hot_standby_feedback = hot_standby_feedback;
 	bool		conninfo_changed;
 	bool		primary_slotname_changed;
+	bool		worker = AmLogicalSlotSyncWorkerProcess();
+	bool		parameter_changed = false;
 
-	Assert(sync_replication_slots);
+	if (worker)
+		Assert(sync_replication_slots);
 
 	ConfigReloadPending = false;
 	ProcessConfigFile(PGC_SIGHUP);
 
 	conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
 	primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
+
 	pfree(old_primary_conninfo);
 	pfree(old_primary_slotname);
 
+	/* Check for sync_replication_slots change */
 	if (old_sync_replication_slots != sync_replication_slots)
 	{
-		ereport(LOG,
-		/* translator: %s is a GUC variable name */
-				errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", "sync_replication_slots"));
-		proc_exit(0);
+		if (worker)
+		{
+			ereport(LOG,
+			/* translator: %s is a GUC variable name */
+					errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled",
+						   "sync_replication_slots"));
+
+			proc_exit(0);
+		}
+
+		parameter_changed = true;
 	}
 
+	/* Check for parameter changes common to both API and worker */
 	if (conninfo_changed ||
 		primary_slotname_changed ||
 		(old_hot_standby_feedback != hot_standby_feedback))
 	{
-		ereport(LOG,
-				errmsg("replication slot synchronization worker will restart because of a parameter change"));
 
-		/*
-		 * Reset the last-start time for this worker so that the postmaster
-		 * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
-		 */
-		SlotSyncCtx->last_start_time = 0;
+		if (worker)
+		{
+			ereport(LOG,
+					errmsg("replication slot synchronization worker will restart because of a parameter change"));
 
-		proc_exit(0);
+			/*
+			 * Reset the last-start time for this worker so that the
+			 * postmaster can restart it without waiting for
+			 * SLOTSYNC_RESTART_INTERVAL_SEC.
+			 */
+			SlotSyncCtx->last_start_time = 0;
+
+			proc_exit(0);
+		}
+
+		parameter_changed = true;
+	}
+
+	/*
+	 * If we have reached here with a parameter change, we must be running in
+	 * SQL function, emit error in such a case.
+	 */
+	if (parameter_changed)
+	{
+		Assert(!worker);
+		ereport(ERROR,
+				errmsg("replication slot synchronization will stop because of a parameter change"));
 	}
 
 }
 
 /*
- * Interrupt handler for main loop of slot sync worker.
+ * Interrupt handler for main loop of slot sync worker and
+ * SQL function pg_sync_replication_slots().
  */
 static void
 ProcessSlotSyncInterrupts(void)
@@ -1256,10 +1292,20 @@ ProcessSlotSyncInterrupts(void)
 
 	if (SlotSyncCtx->stopSignaled)
 	{
-		ereport(LOG,
-				errmsg("replication slot synchronization worker is shutting down because promotion is triggered"));
+		if (AmLogicalSlotSyncWorkerProcess())
+		{
+			ereport(LOG,
+					errmsg("replication slot synchronization worker is shutting down because promotion is triggered"));
 
-		proc_exit(0);
+			proc_exit(0);
+		}
+		else
+		{
+			/* For SQL function */
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("replication slot synchronization will stop because promotion is triggered"));
+		}
 	}
 
 	if (ConfigReloadPending)
@@ -1366,25 +1412,10 @@ wait_for_slot_activity(bool some_slot_updated)
  * Otherwise, advertise that a sync is in progress.
  */
 static void
-check_and_set_sync_info(pid_t worker_pid)
+check_and_set_sync_info(pid_t sync_process_pid)
 {
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 
-	/* The worker pid must not be already assigned in SlotSyncCtx */
-	Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
-
-	/*
-	 * Emit an error if startup process signaled the slot sync machinery to
-	 * stop. See comments atop SlotSyncCtxStruct.
-	 */
-	if (SlotSyncCtx->stopSignaled)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
-	}
-
 	if (SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1393,13 +1424,16 @@ check_and_set_sync_info(pid_t worker_pid)
 				errmsg("cannot synchronize replication slots concurrently"));
 	}
 
+	/* The pid must not be already assigned in SlotSyncCtx */
+	Assert(SlotSyncCtx->pid == InvalidPid);
+
 	SlotSyncCtx->syncing = true;
 
 	/*
 	 * Advertise the required PID so that the startup process can kill the
-	 * slot sync worker on promotion.
+	 * slot sync process on promotion.
 	 */
-	SlotSyncCtx->pid = worker_pid;
+	SlotSyncCtx->pid = sync_process_pid;
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
@@ -1414,6 +1448,7 @@ reset_syncing_flag(void)
 {
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 	SlotSyncCtx->syncing = false;
+	SlotSyncCtx->pid = InvalidPid;
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
 	syncing_slots = false;
@@ -1595,7 +1630,7 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
 
 	/*
 	 * The slot sync worker can't get here because it will only stop when it
-	 * receives a stop request from the startup process, or when there is an
+	 * because receives a stop request from the startup process, or when there is an
 	 * error.
 	 */
 	Assert(false);
@@ -1651,16 +1686,18 @@ update_synced_slots_inactive_since(void)
 }
 
 /*
- * Shut down the slot sync worker.
+ * Shut down slot synchronization.
  *
- * This function sends signal to shutdown slot sync worker, if required. It
- * also waits till the slot sync worker has exited or
+ * This function sets stopSignaled=true and wakes up the slot sync process
+ * (either worker or backend running SQL function pg_sync_replication_slots())
+ * so that worker can exit or SQL function pg_sync_replication_slots() can
+ * finish. It also waits till the slot sync worker has exited or
  * pg_sync_replication_slots() has finished.
  */
 void
 ShutDownSlotSync(void)
 {
-	pid_t		worker_pid;
+	pid_t		sync_process_pid;
 
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 
@@ -1677,16 +1714,17 @@ ShutDownSlotSync(void)
 		return;
 	}
 
-	worker_pid = SlotSyncCtx->pid;
+	sync_process_pid = SlotSyncCtx->pid;
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
 	/*
-	 * Signal slotsync worker if it was still running. The worker will stop
-	 * upon detecting that the stopSignaled flag is set to true.
+	 * Signal slotsync worker or backend process running pg_sync_replication_slots()
+	 * if running. The process will stop upon detecting that the stopSignaled
+	 * flag is set to true.
 	 */
-	if (worker_pid != InvalidPid)
-		kill(worker_pid, SIGUSR1);
+	if (sync_process_pid!= InvalidPid)
+		kill(sync_process_pid, SIGUSR1);
 
 	/* Wait for slot sync to end */
 	for (;;)
@@ -1835,7 +1873,10 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
-		check_and_set_sync_info(InvalidPid);
+		check_and_set_sync_info(MyProcPid);
+
+		/* Check for interrupts and config changes */
+		ProcessSlotSyncInterrupts();
 
 		validate_remote_info(wrconn);
 
-- 
2.47.3

