> We’ve prepared two test patches on top of current master to address both issues:
> ...
> * 0002-Implement-batching-for-cascade-replication.patch – test patch to implement possible batching approach in xlogreceiver.c with timer. Currently it uses GUC variables to allow testing of different batch sizes and timeout values.

I've played with the second patch a little more and made some adjustments to it: 1. Setup timer only if we actually have applied messages, which are (potentially) not yet signaled to walsenders.
2. Notify logical walsenders without delay if time line has changed.

Modified patch is attached.

Thanks,
Alexey
From c691724332dd5a78b98d606188383d6b37c98021 Mon Sep 17 00:00:00 2001
From: Rustam Khamidullin <r.khamidul...@postgrespro.ru>
Date: Fri, 14 Mar 2025 18:18:34 +0700
Subject: [PATCH 2/2] Implement batching for WAL records notification during
 cascade replication

Currently standby server notifies walsenders after applying of each WAL
record during cascade replication. This creates a bottleneck in case of large
number of sender processes during WalSndWakeup invocation. This change
introduces batching for such notifications, which are now sent either after
certain number of applied records or specified time interval (whichever comes
first).

Co-authored-by: Alexey Makhmutov <a.makhmu...@postgrespro.ru>
---
 src/backend/access/transam/xlogrecovery.c | 61 ++++++++++++++++++++++-
 src/backend/postmaster/startup.c          |  1 +
 src/backend/utils/misc/guc_tables.c       | 22 ++++++++
 src/include/access/xlogrecovery.h         |  4 ++
 src/include/utils/timeout.h               |  1 +
 5 files changed, 87 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 0aa3ab59085..3e3e43d5888 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -64,6 +64,7 @@
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/pg_rusage.h"
+#include "utils/timeout.h"
 
 /* Unsupported old recovery command file names (relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE	"recovery.conf"
@@ -147,6 +148,8 @@ bool		InArchiveRecovery = false;
 static bool StandbyModeRequested = false;
 bool		StandbyMode = false;
 
+#define StandbyWithCascadeReplication() (AmStartupProcess() && StandbyMode && AllowCascadeReplication())
+
 /* was a signal file present at startup? */
 static bool standby_signal_file_found = false;
 static bool recovery_signal_file_found = false;
@@ -298,6 +301,14 @@ bool		reachedConsistency = false;
 static char *replay_image_masked = NULL;
 static char *primary_image_masked = NULL;
 
+/* Maximum number of applied records in batch before notifying walsender during cascade replication */
+int cascadeReplicationMaxBatchSize;
+
+/* Maximum batching delay before notifying walsender during cascade replication */
+int cascadeReplicationMaxBatchDelay;
+
+/* Counter for collected records during cascade replication */
+static volatile sig_atomic_t appliedRecords = 0;
 
 /*
  * Shared-memory state for WAL recovery.
@@ -1839,6 +1850,17 @@ PerformWalRecovery(void)
 		 * end of main redo apply loop
 		 */
 
+		/* Ensure that notification for batched messages is sent */
+		if (StandbyWithCascadeReplication() &&
+				cascadeReplicationMaxBatchSize > 1 &&
+				appliedRecords > 0)
+		{
+			if (cascadeReplicationMaxBatchDelay > 0)
+				disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+			appliedRecords = 0;
+			WalSndWakeup(false, true);
+		}
+
 		if (reachedRecoveryTarget)
 		{
 			if (!reachedConsistency)
@@ -2037,8 +2059,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	 *    be created otherwise)
 	 * ------
 	 */
-	if (AllowCascadeReplication())
-		WalSndWakeup(switchedTLI, true);
+
+	if (StandbyWithCascadeReplication())
+	{
+		if (cascadeReplicationMaxBatchSize <= 1)
+			WalSndWakeup(switchedTLI, true);
+		else
+		{
+			bool batchFlushRequired = ++appliedRecords >=
+					cascadeReplicationMaxBatchSize;
+			if (batchFlushRequired)
+			{
+				appliedRecords = 0;
+				if (cascadeReplicationMaxBatchDelay > 0)
+					disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+			}
+
+			WalSndWakeup(switchedTLI, batchFlushRequired);
+
+			/* Setup timeout to limit maximum delay for notifications */
+			if (appliedRecords == 1 && cascadeReplicationMaxBatchDelay > 0)
+				enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT,
+						cascadeReplicationMaxBatchDelay);
+			}
+	}
 
 	/*
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
@@ -5064,3 +5108,16 @@ assign_recovery_target_xid(const char *newval, void *extra)
 	else
 		recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Timer handler for batch notifications in cascade replication
+ */
+void
+StandbyWalSendTimeoutHandler(void)
+{
+	if (appliedRecords > 0)
+	{
+		appliedRecords = 0;
+		WalSndWakeup(false, true);
+	}
+}
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index 27e86cf393f..185a58f94fd 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -246,6 +246,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len)
 	RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler);
 	RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler);
 	RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler);
+	RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler);
 
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 76c7c6bb4b1..9b69015f375 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2306,6 +2306,28 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"cascade_replication_batch_size", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Set the maximum number of applied WAL records before cascade walsenders are notified on standby."),
+			gettext_noop("0 disabled records batching in cascade replication"),
+			GUC_NOT_IN_SAMPLE
+		},
+		&cascadeReplicationMaxBatchSize,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"cascade_replication_batch_delay", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Sets the maximum time before cascade walsenders are notified on standby about applied records."),
+			gettext_noop("0 disables timed notifications"),
+			GUC_NOT_IN_SAMPLE | GUC_UNIT_MS
+		},
+		&cascadeReplicationMaxBatchDelay,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 91446303024..efb97213a07 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -57,6 +57,8 @@ extern PGDLLIMPORT char *PrimarySlotName;
 extern PGDLLIMPORT char *recoveryRestoreCommand;
 extern PGDLLIMPORT char *recoveryEndCommand;
 extern PGDLLIMPORT char *archiveCleanupCommand;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchSize;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay;
 
 /* indirectly set via GUC system */
 extern PGDLLIMPORT TransactionId recoveryTargetXid;
@@ -155,4 +157,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern void StandbyWalSendTimeoutHandler(void);
+
 #endif							/* XLOGRECOVERY_H */
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 7b19beafdc9..0062cb562b9 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -36,6 +36,7 @@ typedef enum TimeoutId
 	IDLE_STATS_UPDATE_TIMEOUT,
 	CLIENT_CONNECTION_CHECK_TIMEOUT,
 	STARTUP_PROGRESS_TIMEOUT,
+	STANDBY_CASCADE_WAL_SEND_TIMEOUT,
 	/* First user-definable timeout reason */
 	USER_TIMEOUT,
 	/* Maximum number of timeout reasons */
-- 
2.49.0

Reply via email to