After playing with the patch a little more, we’ve come to the conclusion, that the idea to signal CV broadcast from the timer handler is a risky one, as it creates a large number of execution paths, which are not present in current code base. It's hard to prove correctness of application behavior in each such case, so we've decided to use a different approach.

In the new version of the patch we use timer handler only to set the flag, which is then checked in the ProcessStartupProcInterrupts function. This allow us to send signal on timeout if the startup process is waiting for the arrival of new WAL records (in ReadRecord), but the notification may be delayed while record is being applied (during redo handler invocation from ApplyWalRecord). This could increase delay for some corner cases with non-trivial WAL records like ‘drop database’, but this should be a rare case and walsender process have its own limit on the wait time, so the delay won’t be indefinite even in this case.

A new variant of the patch is attached in case anybody else wants to play with this patch and approach. A slightly improved test case and formatted CV patch (which is not strictly required anymore for this case) are attached as well.

Thanks,
Alexey

From b6749b210cceabcc0c382935ec412182b047fbb4 Mon Sep 17 00:00:00 2001
From: Alexey Makhmutov <a.makhmu...@postgrespro.ru>
Date: Fri, 28 Mar 2025 16:16:36 +0300
Subject: [PATCH 1/2] Ensure that content of the wait list on CV is aligned
 with cv_sleep_target value

Wait on the ConditionVariable could be interleaved with interruption handlers,
such as timers. If such handler uses CV calls (i.e. ConditionVariableBroadcast),
then value of the cv_sleep_target could be null or point to a different CV after
wakeup. In this case we should not try to add ourselves to the wakeup wait list,
as subsequent call to ConditionVariableCancelSleep won't be able to find our CV
to clear its list. We should instead return control to the client, so the exit
condition for the CV external wait loop could be checked. If wait is still
required, then next invocation of ConditionVariableTimedSleep will perform the
required setup procedures for CV. Behavior before this change results in the
mismatch between cv_sleep_target value and content of the wakeup list, so the
next invocation of ConditionVariableBroadcast method may fail due to presence of
current process in the list.
---
 src/backend/storage/lmgr/condition_variable.c | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 228303e77f7..ab4ad7e5641 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -165,6 +165,17 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
 		/* Reset latch before examining the state of the wait list. */
 		ResetLatch(MyLatch);
 
+		/*
+		 * First, check that we are still expected to wait on this variable.
+		 * If sleep target has changed, then we need to return spuriously to
+		 * allow caller to check the exit condition and invoke the wait
+		 * function again to properly prepare for the next sleep. Note, that
+		 * we don't need to change wait list in this case, as we should be
+		 * deleted from the list in case of cv_sleep_target change.
+		 */
+		if (cv != cv_sleep_target)
+			return false;
+
 		/*
 		 * If this process has been taken out of the wait list, then we know
 		 * that it has been signaled by ConditionVariableSignal (or
-- 
2.49.0

From 6eaeb7a7fbb40fc3a8fb04a2ea22df9a54747a0a Mon Sep 17 00:00:00 2001
From: Rustam Khamidullin <r.khamidul...@postgrespro.ru>
Date: Fri, 14 Mar 2025 18:18:34 +0700
Subject: [PATCH 2/2] Implement batching for WAL records notification during
 cascade replication

Currently standby server notifies walsenders after applying of each WAL
record during cascade replication. This creates a bottleneck in case of large
number of sender processes during WalSndWakeup invocation. This change
introduces batching for such notifications, which are now sent either after
certain number of applied records or specified time interval (whichever comes
first).

Co-authored-by: Alexey Makhmutov <a.makhmu...@postgrespro.ru>
---
 src/backend/access/transam/xlogrecovery.c | 99 ++++++++++++++++++++++-
 src/backend/postmaster/startup.c          |  5 ++
 src/backend/utils/misc/guc_tables.c       | 22 +++++
 src/include/access/xlogrecovery.h         |  8 ++
 src/include/utils/timeout.h               |  1 +
 5 files changed, 133 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 6ce979f2d8b..43e95602223 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -64,6 +64,7 @@
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/pg_rusage.h"
+#include "utils/timeout.h"
 
 /* Unsupported old recovery command file names (relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE	"recovery.conf"
@@ -147,6 +148,13 @@ bool		InArchiveRecovery = false;
 static bool StandbyModeRequested = false;
 bool		StandbyMode = false;
 
+/*
+ * Whether we are currently in process of processing recovery records while
+ * allowing downstream replication instances
+ */
+#define StandbyWithCascadeReplication() \
+		(AmStartupProcess() && StandbyMode && AllowCascadeReplication())
+
 /* was a signal file present at startup? */
 static bool standby_signal_file_found = false;
 static bool recovery_signal_file_found = false;
@@ -303,6 +311,25 @@ bool		reachedConsistency = false;
 static char *replay_image_masked = NULL;
 static char *primary_image_masked = NULL;
 
+/*
+ * Maximum number of applied records in batch before notifying walsender during
+ * cascade replication
+ */
+int			cascadeReplicationMaxBatchSize;
+
+/*
+ * Maximum batching delay before notifying walsender during cascade replication
+ */
+int			cascadeReplicationMaxBatchDelay;
+
+/* Counter for applied records which are not yet signaled to walsenders */
+static int	appliedRecords = 0;
+
+/*
+ * True if downstream walsenders need to be notified about pending WAL records,
+ * set by timeout handler.
+ */
+volatile sig_atomic_t replicationNotificationPending = false;
 
 /*
  * Shared-memory state for WAL recovery.
@@ -1844,6 +1871,17 @@ PerformWalRecovery(void)
 		 * end of main redo apply loop
 		 */
 
+		/* Send notification for batched messages once loop is ended */
+		if (StandbyWithCascadeReplication() &&
+			cascadeReplicationMaxBatchSize > 1 &&
+			appliedRecords > 0)
+		{
+			if (cascadeReplicationMaxBatchDelay > 0)
+				disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+			appliedRecords = 0;
+			WalSndWakeup(false, true);
+		}
+
 		if (reachedRecoveryTarget)
 		{
 			if (!reachedConsistency)
@@ -2042,8 +2080,40 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	 *    be created otherwise)
 	 * ------
 	 */
-	if (AllowCascadeReplication())
-		WalSndWakeup(switchedTLI, true);
+
+	if (StandbyWithCascadeReplication())
+	{
+		if (cascadeReplicationMaxBatchSize <= 1)
+			WalSndWakeup(switchedTLI, true);
+		else
+		{
+			/*
+			 * If time line has switched, then we do not want to delay the
+			 * notification, otherwise we will wait until we apply specified
+			 * number of records before notifying downstream logical
+			 * walsenders.
+			 */
+			bool		batchFlushRequired =
+				++appliedRecords >= cascadeReplicationMaxBatchSize ||
+				replicationNotificationPending ||
+				switchedTLI;
+
+			if (batchFlushRequired)
+			{
+				if (appliedRecords > 0 && cascadeReplicationMaxBatchDelay > 0)
+					disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+				appliedRecords = 0;
+				replicationNotificationPending = false;
+			}
+
+			WalSndWakeup(switchedTLI, batchFlushRequired);
+
+			/* Setup timeout to limit maximum delay for notifications */
+			if (appliedRecords == 1 && cascadeReplicationMaxBatchDelay > 0)
+				enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT,
+									 cascadeReplicationMaxBatchDelay);
+		}
+	}
 
 	/*
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
@@ -5070,3 +5140,28 @@ assign_recovery_target_xid(const char *newval, void *extra)
 	else
 		recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Send notifications to downstream walsenders
+ */
+void
+StandbyWalCheckSendNotify(void)
+{
+	if (appliedRecords > 0)
+	{
+		WalSndWakeup(false, true);
+		appliedRecords = 0;
+	}
+	replicationNotificationPending = false;
+}
+
+/*
+ * Timer handler for batch notifications in cascade replication
+ */
+void
+StandbyWalSendTimeoutHandler(void)
+{
+	replicationNotificationPending = true;
+	/* Most likely process is waiting for arrival of WAL records */
+	SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index 7149a67fcbc..94a892776a7 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void)
 	if (ProcSignalBarrierPending)
 		ProcessProcSignalBarrier();
 
+	/* Send a notification to downstream walsenders if required */
+	if (replicationNotificationPending)
+		StandbyWalCheckSendNotify();
+
 	/* Perform logging of memory contexts of this process */
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
@@ -250,6 +254,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len)
 	RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler);
 	RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler);
 	RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler);
+	RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler);
 
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 60b12446a1c..dcb5f97fffd 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2318,6 +2318,28 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"cascade_replication_batch_size", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Set the maximum number of applied WAL records before cascade walsenders are notified on standby."),
+			gettext_noop("0 disabled records batching in cascade replication"),
+			GUC_NOT_IN_SAMPLE
+		},
+		&cascadeReplicationMaxBatchSize,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"cascade_replication_batch_delay", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Sets the maximum time before cascade walsenders are notified on standby about applied records."),
+			gettext_noop("0 disables timed notifications"),
+			GUC_NOT_IN_SAMPLE | GUC_UNIT_MS
+		},
+		&cascadeReplicationMaxBatchDelay,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 91446303024..a4e714365fe 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,8 @@
 #ifndef XLOGRECOVERY_H
 #define XLOGRECOVERY_H
 
+#include <signal.h>
+
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
@@ -57,6 +59,8 @@ extern PGDLLIMPORT char *PrimarySlotName;
 extern PGDLLIMPORT char *recoveryRestoreCommand;
 extern PGDLLIMPORT char *recoveryEndCommand;
 extern PGDLLIMPORT char *archiveCleanupCommand;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchSize;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay;
 
 /* indirectly set via GUC system */
 extern PGDLLIMPORT TransactionId recoveryTargetXid;
@@ -155,4 +159,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending;
+extern void StandbyWalCheckSendNotify(void);
+extern void StandbyWalSendTimeoutHandler(void);
+
 #endif							/* XLOGRECOVERY_H */
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 7b19beafdc9..0062cb562b9 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -36,6 +36,7 @@ typedef enum TimeoutId
 	IDLE_STATS_UPDATE_TIMEOUT,
 	CLIENT_CONNECTION_CHECK_TIMEOUT,
 	STARTUP_PROGRESS_TIMEOUT,
+	STANDBY_CASCADE_WAL_SEND_TIMEOUT,
 	/* First user-definable timeout reason */
 	USER_TIMEOUT,
 	/* Maximum number of timeout reasons */
-- 
2.49.0

<<attachment: test_scenario.zip>>

Reply via email to