From fbfd2df94ee7738cccca73429e43d2a217c292bc Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Thu, 4 Dec 2025 19:12:21 +1100
Subject: [PATCH v30] Use SIGUSR1 to interrupt slot synchronization workers and
 backends.

Previously, during promotion, only the slot synchronization worker was
interrupted with SIGINT to shutdown for promotion. That meant backends
that perform slot synchronization via the pg_sync_replication_slots()
SQL function were not signalled at all because their PIDs were not
recorded in the slot-sync context.

This patch changes behaviour to:
1. Store the backend PID in SlotSyncCtxStruct so the backend performing
   slot synchronization can be signalled.
2. On promotion, send SIGUSR1 (not SIGINT) to the recorded PID - either
   the slot-sync worker or any backend currently syncing slots.
3. Backends invoking pg_sync_replication_slots() also calls
   ProcessSlotSyncInterrupts() to handle promotion signal as well any
   configuration changes that might result in stopping of
   synchronization.

This patch also acts as a base for a larger patch that improves
pg_sync_replication_slots() to wait for slots to be persisted before
exiting.

Author: Ajin Cherian <itsajin@gmail.com>
Discussion: https://www.postgresql.org/message-id/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com
---
 src/backend/replication/logical/slotsync.c | 148 +++++++++++++--------
 1 file changed, 95 insertions(+), 53 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 31d7cb3ca77..64457f71de0 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -71,9 +71,12 @@
 /*
  * Struct for sharing information to control slot synchronization.
  *
- * The slot sync worker's pid is needed by the startup process to shut it
- * down during promotion. The startup process shuts down the slot sync worker
- * and also sets stopSignaled=true to handle the race condition when the
+ * The pid is either the slot sync worker's pid or the backend's pid running
+ * the SQL function pg_sync_replication_slots(). When the startup process sets
+ * stopSignaled during promotion, it uses this pid to wake up the currently
+ * synchronizing process so that the process can immediately stop its
+ * synchronizing work on seeing stopSignaled set.
+ * Setting stopSignaled is also used to handle the race condition when the
  * postmaster has not noticed the promotion yet and thus may end up restarting
  * the slot sync worker. If stopSignaled is set, the worker will exit in such a
  * case. The SQL function pg_sync_replication_slots() will also error out if
@@ -1195,10 +1198,11 @@ ValidateSlotSyncParams(int elevel)
 }
 
 /*
- * Re-read the config file.
+ * Re-read the config file for slot synchronization.
+ *
+ * Exit or throw errors if relevant GUCs have changed depending on whether
+ * called from slotsync worker or from SQL function pg_sync_replication_slots()
  *
- * Exit if any of the slot sync GUCs have changed. The postmaster will
- * restart it.
  */
 static void
 slotsync_reread_config(void)
@@ -1209,57 +1213,100 @@ slotsync_reread_config(void)
 	bool		old_hot_standby_feedback = hot_standby_feedback;
 	bool		conninfo_changed;
 	bool		primary_slotname_changed;
+	bool		worker = AmLogicalSlotSyncWorkerProcess();
+	bool		parameter_changed = false;
 
-	Assert(sync_replication_slots);
+	if (worker)
+		Assert(sync_replication_slots);
 
 	ConfigReloadPending = false;
 	ProcessConfigFile(PGC_SIGHUP);
 
 	conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
 	primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
+
 	pfree(old_primary_conninfo);
 	pfree(old_primary_slotname);
 
+	/* Check for sync_replication_slots change */
 	if (old_sync_replication_slots != sync_replication_slots)
 	{
-		ereport(LOG,
-		/* translator: %s is a GUC variable name */
-				errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", "sync_replication_slots"));
-		proc_exit(0);
+		if (worker)
+		{
+			ereport(LOG,
+			/* translator: %s is a GUC variable name */
+					errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled",
+						   "sync_replication_slots"));
+
+			proc_exit(0);
+		}
+
+		parameter_changed = true;
 	}
 
+	/* Check for parameter changes common to both API and worker */
 	if (conninfo_changed ||
 		primary_slotname_changed ||
 		(old_hot_standby_feedback != hot_standby_feedback))
 	{
-		ereport(LOG,
-				errmsg("replication slot synchronization worker will restart because of a parameter change"));
 
-		/*
-		 * Reset the last-start time for this worker so that the postmaster
-		 * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
-		 */
-		SlotSyncCtx->last_start_time = 0;
+		if (worker)
+		{
+			ereport(LOG,
+					errmsg("replication slot synchronization worker will restart because of a parameter change"));
 
-		proc_exit(0);
+			/*
+			 * Reset the last-start time for this worker so that the
+			 * postmaster can restart it without waiting for
+			 * SLOTSYNC_RESTART_INTERVAL_SEC.
+			 */
+			SlotSyncCtx->last_start_time = 0;
+
+			proc_exit(0);
+		}
+
+		parameter_changed = true;
+	}
+
+	/*
+	 * If we have reached here with a parameter change, we must be running in
+	 * SQL function, emit error in such a case.
+	 */
+	if (parameter_changed)
+	{
+		Assert(!worker);
+		ereport(ERROR,
+				errmsg("replication slot synchronization will stop because of a parameter change"));
 	}
 
 }
 
 /*
- * Interrupt handler for main loop of slot sync worker.
+ * Interrupt handler for main loop of slot sync worker and
+ * SQL function pg_sync_replication_slots().
  */
 static void
 ProcessSlotSyncInterrupts(void)
 {
 	CHECK_FOR_INTERRUPTS();
 
-	if (ShutdownRequestPending)
+	if (SlotSyncCtx->stopSignaled)
 	{
-		ereport(LOG,
-				errmsg("replication slot synchronization worker is shutting down on receiving SIGINT"));
+		if (AmLogicalSlotSyncWorkerProcess())
+		{
+			ereport(LOG,
+					errmsg("replication slot synchronization worker is shutting down as promotion is triggered"));
 
-		proc_exit(0);
+			proc_exit(0);
+		}
+		else
+		{
+			/* For SQL function */
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("cannot continue replication slot synchronization"
+						   " as standby promotion is triggered"));
+		}
 	}
 
 	if (ConfigReloadPending)
@@ -1366,25 +1413,10 @@ wait_for_slot_activity(bool some_slot_updated)
  * Otherwise, advertise that a sync is in progress.
  */
 static void
-check_and_set_sync_info(pid_t worker_pid)
+check_and_set_sync_info(pid_t sync_process_pid)
 {
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 
-	/* The worker pid must not be already assigned in SlotSyncCtx */
-	Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
-
-	/*
-	 * Emit an error if startup process signaled the slot sync machinery to
-	 * stop. See comments atop SlotSyncCtxStruct.
-	 */
-	if (SlotSyncCtx->stopSignaled)
-	{
-		SpinLockRelease(&SlotSyncCtx->mutex);
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
-	}
-
 	if (SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1393,13 +1425,16 @@ check_and_set_sync_info(pid_t worker_pid)
 				errmsg("cannot synchronize replication slots concurrently"));
 	}
 
+	/* The pid must not be already assigned in SlotSyncCtx */
+	Assert(SlotSyncCtx->pid == InvalidPid);
+
 	SlotSyncCtx->syncing = true;
 
 	/*
 	 * Advertise the required PID so that the startup process can kill the
-	 * slot sync worker on promotion.
+	 * slot sync process on promotion.
 	 */
-	SlotSyncCtx->pid = worker_pid;
+	SlotSyncCtx->pid = sync_process_pid;
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
@@ -1414,6 +1449,7 @@ reset_syncing_flag(void)
 {
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 	SlotSyncCtx->syncing = false;
+	SlotSyncCtx->pid = InvalidPid;
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
 	syncing_slots = false;
@@ -1488,7 +1524,6 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
 
 	/* Setup signal handling */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
-	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
 	pqsignal(SIGTERM, die);
 	pqsignal(SIGFPE, FloatExceptionHandler);
 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
@@ -1595,7 +1630,8 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
 
 	/*
 	 * The slot sync worker can't get here because it will only stop when it
-	 * receives a SIGINT from the startup process, or when there is an error.
+	 * receives a stopSignaled from the startup process, or when there is an
+	 * error.
 	 */
 	Assert(false);
 }
@@ -1650,16 +1686,18 @@ update_synced_slots_inactive_since(void)
 }
 
 /*
- * Shut down the slot sync worker.
+ * Shut down slot synchronization.
  *
- * This function sends signal to shutdown slot sync worker, if required. It
- * also waits till the slot sync worker has exited or
- * pg_sync_replication_slots() has finished.
+ * This function wakes up the slot sync process (either worker or backend
+ * running SQL function pg_sync_replication_slots()) and sets
+ * stopSignaled=true so that worker can exit or SQL function
+ * pg_sync_replication_slots() can finish. It also waits tll the slot sync
+ * worker has exited or pg_sync_replication_slots() has finished.
  */
 void
 ShutDownSlotSync(void)
 {
-	pid_t		worker_pid;
+	pid_t		sync_process_pid;
 
 	SpinLockAcquire(&SlotSyncCtx->mutex);
 
@@ -1676,12 +1714,13 @@ ShutDownSlotSync(void)
 		return;
 	}
 
-	worker_pid = SlotSyncCtx->pid;
+	sync_process_pid = SlotSyncCtx->pid;
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
-	if (worker_pid != InvalidPid)
-		kill(worker_pid, SIGINT);
+	/* Wake up slot sync process */
+	if (sync_process_pid != InvalidPid)
+		kill(sync_process_pid, SIGUSR1);
 
 	/* Wait for slot sync to end */
 	for (;;)
@@ -1830,7 +1869,10 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 {
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
-		check_and_set_sync_info(InvalidPid);
+		check_and_set_sync_info(MyProcPid);
+
+		/* Check for interrupts and config changes */
+		ProcessSlotSyncInterrupts();
 
 		validate_remote_info(wrconn);
 
-- 
2.47.3

