From 29b4a4f2301184665710fbd09fcdb077fd419125 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Fri, 5 Apr 2024 10:16:03 +0530
Subject: [PATCH v2] Handle stopSignaled during sync function call.

Currently, promotion related handling is missing in the slot sync SQL
function pg_sync_replication_slots(). Here is the background on how
it is done in slot sync worker:
During promotion, the startup process in order to shut down the
slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down
signal, and waits for slot sync worker to exit. Meanwhile if the
postmaster has not noticed the promotion yet, it may end up restarting
slot sync worker. In such a case, the worker exits if 'stopSignaled'
is set.

Since there is a chance that the user (or any of his scripts/tools)
may execute SQL function pg_sync_replication_slots() in parallel to
promotion, such handling is needed in this SQL function as well, The
attached patch attempts to implement the same. Changes are:

1) If pg_sync_replication_slots() is already running when the
promotion is triggered, ShutDownSlotSync() checks the
'SlotSyncCtx->syncing' flag as well and waits for it to become false
i.e. waits till parallel running SQL function is finished.

2) If pg_sync_replication_slots() is invoked when promotion is
already in progress, pg_sync_replication_slots() respects the
'stopSignaled' flag set by the startup process and becomes a no-op.
---
 src/backend/replication/logical/slotsync.c | 38 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index d18e2c7342..0064578e9c 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1395,6 +1395,7 @@ update_synced_slots_inactive_since(void)
 		if (s->in_use && s->data.synced)
 		{
 			Assert(SlotIsLogical(s));
+			Assert(s->active_pid == 0);
 
 			/* Use the same inactive_since time for all the slots. */
 			if (now == 0)
@@ -1411,6 +1412,10 @@ update_synced_slots_inactive_since(void)
 
 /*
  * Shut down the slot sync worker.
+ *
+ * It sends signal to shutdown slot sync worker. It also waits till
+ * the slot sync worker has exited and pg_sync_replication_slots()
+ * has finished.
  */
 void
 ShutDownSlotSync(void)
@@ -1419,7 +1424,11 @@ ShutDownSlotSync(void)
 
 	SlotSyncCtx->stopSignaled = true;
 
-	if (SlotSyncCtx->pid == InvalidPid)
+	/*
+	 * Return if neither the slot sync worker is running nor the function
+	 * pg_sync_replication_slots() is executing.
+	 */
+	if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
 		update_synced_slots_inactive_since();
@@ -1427,9 +1436,10 @@ ShutDownSlotSync(void)
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
 
-	kill(SlotSyncCtx->pid, SIGINT);
+	if (SlotSyncCtx->pid != InvalidPid)
+		kill(SlotSyncCtx->pid, SIGINT);
 
-	/* Wait for it to die */
+	/* Wait for worker to exit and SQL function to finish */
 	for (;;)
 	{
 		int			rc;
@@ -1447,8 +1457,11 @@ ShutDownSlotSync(void)
 
 		SpinLockAcquire(&SlotSyncCtx->mutex);
 
-		/* Is it gone? */
-		if (SlotSyncCtx->pid == InvalidPid)
+		/*
+		 * Confirm that both the worker and the function
+		 * pg_sync_replication_slots() are done.
+		 */
+		if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing)
 			break;
 
 		SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1557,6 +1570,21 @@ slotsync_failure_callback(int code, Datum arg)
 void
 SyncReplicationSlots(WalReceiverConn *wrconn)
 {
+	/*
+	 * Startup process signaled the slot sync to stop, so if meanwhile user
+	 * has invoked slot sync SQL function, simply return.
+	 */
+	SpinLockAcquire(&SlotSyncCtx->mutex);
+	if (SlotSyncCtx->stopSignaled)
+	{
+		ereport(LOG,
+				errmsg("skipping slot synchronization as slot sync shutdown is signaled during promotion"));
+
+		SpinLockRelease(&SlotSyncCtx->mutex);
+		return;
+	}
+	SpinLockRelease(&SlotSyncCtx->mutex);
+
 	PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
 	{
 		validate_remote_info(wrconn);
-- 
2.34.1

