From 38880b9e62bc5d87b146acc9cbacbe07e9ad8dd0 Mon Sep 17 00:00:00 2001
From: Sunil S <sunil.s@broadcom.com>
Date: Mon, 7 Jul 2025 11:27:58 +0530
Subject: [PATCH v6 1/3] Introduce feature to start WAL receiver eagerly

This commit introduces a new GUC wal_receiver_start_condition which can
enable the standby to start it's WAL receiver at an earlier stage. The
GUC will default to starting the WAL receiver after WAL from archives
and pg_wal have been exhausted, designated by the value 'exhaust'.
The value of 'startup' indicates that the WAL receiver will be started
immediately on standby startup. Finally, the value of 'consistency'
indicates that the server will start after the standby has replayed up
to the consistency point.

If 'startup' or 'consistency' is specified, the starting point for the
WAL receiver will always be the end of all locally available WAL in
pg_wal. The end is determined by finding the latest WAL segment in
pg_wal and then iterating to the earliest segment. The iteration is
terminated as soon as a valid WAL segment is found. Streaming can then
commence from the start of that segment.

Archiving from the restore command does not holds the control lock
and enabling XLogCtl->InstallXLogFileSegmentActive for wal reciever early start
will create a race condition with the checkpointer process as fixed in cc2c7d65fc27e877c9f407587b0b92d46cd6dd16.
Hence skipping early start of the wal receiver in case of archive recovery.

Co-authors: Sunil S<sunilfeb26@gmail.com>, Soumyadeep Chakraborty <soumyadeep2007@gmail.com>, Ashwin Agrawal, Asim Praveen, Wu Hao, Konstantin Knizhnik
Discussion:https://www.postgresql.org/message-id/flat/CANXE4Tc3FNvZ_xAimempJWv_RH9pCvsZH7Yq93o1VuNLjUT-mQ%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  33 ++++
 src/backend/access/transam/xlogrecovery.c     | 176 +++++++++++++++++-
 src/backend/replication/walreceiver.c         |   1 +
 src/backend/utils/misc/guc_tables.c           |  18 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/walreceiver.h         |  10 +
 6 files changed, 230 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 59a0874528a..403a7e70395 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5067,6 +5067,39 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
         the new setting.
        </para>
       </listitem>
+
+    </varlistentry>
+     <varlistentry id="guc-wal-receiver-start-condition" xreflabel="wal_receiver_start_at">
+      <term><varname>wal_receiver_start_at</varname> (<type>enum</type>)
+      <indexterm>
+       <primary><varname>wal_receiver_start_at</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies when the WAL receiver process will be started for a standby
+        server.
+        The allowed values of <varname>wal_receiver_start_at</varname>
+        are <literal>startup</literal> (start immediately when the standby starts),
+        <literal>consistency</literal> (start only after reaching consistency), and
+        <literal>exhaust</literal> (start only after all WAL from the archive and
+        pg_wal has been replayed)
+         The default setting is<literal>exhaust</literal>.
+       </para>
+
+       <para>
+        Traditionally, the WAL receiver process is started only after the
+        standby server has exhausted all WAL from the WAL archive and the local
+        pg_wal directory. In some environments there can be a significant volume
+        of local WAL left to replay, along with a large volume of yet to be
+        streamed WAL. Such environments can benefit from setting
+        <varname>wal_receiver_start_at</varname> to
+        <literal>startup</literal> or <literal>consistency</literal>. These
+        values will lead to the WAL receiver starting much earlier, and from
+        the end of locally available WAL. The network will be utilized to stream
+        WAL concurrently with replay, improving performance significantly.
+       </para>
+      </listitem>
      </varlistentry>
 
      <varlistentry id="guc-wal-receiver-status-interval" xreflabel="wal_receiver_status_interval">
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 23878b2dd91..b197247da70 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -494,6 +494,161 @@ EnableStandbyMode(void)
 	disable_startup_progress_timeout();
 }
 
+/*
+ * Start WAL receiver eagerly without waiting to play all WAL from the archive
+ * and pg_wal. First, find the last valid WAL segment in pg_wal and then request
+ * streaming to commence from it's beginning. startPoint signifies whether we
+ * are trying the eager start right at startup or once we have reached
+ * consistency.
+ */
+static void
+StartWALReceiverEagerlyIfPossible(WalRcvStartCondition startPoint,
+								  TimeLineID currentTLI)
+{
+	DIR           *dir;
+	struct dirent *de;
+	XLogSegNo     startsegno = -1;
+	XLogSegNo     endsegno   = -1;
+
+	/*
+	 * We should not be starting the walreceiver during bootstrap/init
+	 * processing.
+	 */
+	if (!IsNormalProcessingMode())
+		return;
+
+	/* Only the startup process can request an eager walreceiver start. */
+	Assert(AmStartupProcess());
+
+	/* Return if we are not set up to start the WAL receiver eagerly. */
+	if (wal_receiver_start_at == WAL_RCV_START_AT_EXHAUST)
+		return;
+
+	/*
+	 * Sanity checks: We must be in standby mode with primary_conninfo set up
+	 * for streaming replication, the WAL receiver should not already have
+	 * started and the intended startPoint must match the start condition GUC.
+	 *
+	 * Archiving from the restore command does not holds the control lock
+	 * and enabling XLogCtl->InstallXLogFileSegmentActive for wal reciever early start
+	 * will create a race condition with the checkpointer process as fixed in cc2c7d65fc27e877c9f407587b0b92d46cd6dd16.
+	 * Hence skipping early start of the wal receiver in case of archive recovery.
+	 */
+	if (!StandbyModeRequested || WalRcvStreaming() ||
+		!PrimaryConnInfo || strcmp(PrimaryConnInfo, "") == 0 ||
+		startPoint != wal_receiver_start_at ||
+		(ArchiveRecoveryRequested &&
+			recoveryRestoreCommand != NULL && strcmp(recoveryRestoreCommand, "") != 0))
+		return;
+
+	/*
+	 * We must have reached consistency if we wanted to start the walreceiver
+	 * at the consistency point.
+	 */
+	if (wal_receiver_start_at == WAL_RCV_START_AT_CONSISTENCY && !reachedConsistency)
+		return;
+
+	/* Find the latest and earliest WAL segments in pg_wal */
+	dir        = AllocateDir("pg_wal");
+	while ((de = ReadDir(dir, "pg_wal")) != NULL)
+	{
+		/* Does it look like a WAL segment? */
+		if (IsXLogFileName(de->d_name))
+		{
+			XLogSegNo logSegNo;
+			TimeLineID tli;
+
+			XLogFromFileName(de->d_name, &tli, &logSegNo, wal_segment_size);
+			if (tli != currentTLI)
+			{
+				/*
+				 * It seems wrong to stream WAL on a timeline different from
+				 * the one we are replaying on. So, bail in case a timeline
+				 * change is noticed.
+				 */
+				ereport(LOG,
+						(errmsg("could not start streaming WAL eagerly"),
+							errdetail("There are timeline changes in the locally available WAL files."),
+							errhint("WAL streaming will begin once all local WAL and archives are exhausted.")));
+				FreeDir(dir);
+				return;
+			}
+			startsegno = (startsegno == -1) ? logSegNo : Min(startsegno, logSegNo);
+			endsegno = (endsegno == -1) ? logSegNo : Max(endsegno, logSegNo);
+		}
+	}
+	FreeDir(dir);
+
+	/*
+	 * We should have at least one valid WAL segment in pg_wal. By this point,
+	 * we must have read at the segment that included the checkpoint record we
+	 * started replaying from.
+	 */
+	Assert(startsegno != -1 && endsegno != -1);
+
+	/* Find the latest valid WAL segment and request streaming from its start */
+	while (endsegno >= startsegno)
+	{
+		XLogReaderState * state;
+		XLogRecPtr   startptr;
+		WALReadError errinfo;
+		char         xlogfname[MAXFNAMELEN];
+
+		XLogSegNoOffsetToRecPtr(endsegno, 0, wal_segment_size, startptr);
+		XLogFileName(xlogfname, currentTLI, endsegno,
+					 wal_segment_size);
+
+		state = XLogReaderAllocate(wal_segment_size, NULL,
+								   XL_ROUTINE(.segment_open = wal_segment_open,
+											  .segment_close = wal_segment_close),
+								   NULL);
+		if (!state)
+			ereport(ERROR,
+					(errcode(ERRCODE_OUT_OF_MEMORY),
+						errmsg("out of memory"),
+						errdetail("Failed while allocating a WAL reading processor.")));
+
+		/*
+		 * Read the first page of the current WAL segment and validate it by
+		 * inspecting the page header. Once we find a valid WAL segment, we
+		 * can request WAL streaming from its beginning.
+		 */
+		XLogBeginRead(state, startptr);
+
+		if (!WALRead(state, state->readBuf, startptr, XLOG_BLCKSZ,
+					 currentTLI,
+					 &errinfo))
+			WALReadRaiseError(&errinfo);
+
+		if (XLogReaderValidatePageHeader(state, startptr, state->readBuf))
+		{
+			ereport(LOG,
+					errmsg("requesting stream from beginning of: \"%s\"", xlogfname));
+			XLogReaderFree(state);
+			SetInstallXLogFileSegmentActive();
+			RequestXLogStreaming(currentTLI,
+								 startptr,
+								 PrimaryConnInfo,
+								 PrimarySlotName,
+								 wal_receiver_create_temp_slot);
+			return;
+		}
+
+		ereport(LOG,
+				errmsg("invalid WAL segment found while calculating stream start: \"%s\". skipping..", xlogfname));
+
+		XLogReaderFree(state);
+		endsegno--;
+	}
+
+	/*
+	 * We should never reach here as we should have at least one valid WAL
+	 * segment in pg_wal. By this point, we must have read at the segment that
+	 * included the checkpoint record we started replaying from.
+	 */
+	Assert(false);
+}
+
 /*
  * Prepare the system for WAL recovery, if needed.
  *
@@ -805,6 +960,9 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
 		wasShutdown = ((record->xl_info & ~XLR_INFO_MASK) == XLOG_CHECKPOINT_SHUTDOWN);
 	}
 
+	/* Start WAL receiver eagerly if requested. */
+	StartWALReceiverEagerlyIfPossible(WAL_RCV_START_AT_STARTUP, recoveryTargetTLI);
+
 	if (ArchiveRecoveryRequested)
 	{
 		if (StandbyModeRequested)
@@ -2180,6 +2338,7 @@ CheckTablespaceDirectory(void)
  * Checks if recovery has reached a consistent state. When consistency is
  * reached and we have a valid starting standby snapshot, tell postmaster
  * that it can start accepting read-only connections.
+ * Also, attempt to start the WAL receiver eagerly if so configured.
  */
 static void
 CheckRecoveryConsistency(void)
@@ -2277,6 +2436,10 @@ CheckRecoveryConsistency(void)
 
 		SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY);
 	}
+
+	/* Start WAL receiver eagerly if requested. */
+	StartWALReceiverEagerlyIfPossible(WAL_RCV_START_AT_CONSISTENCY,
+									  lastReplayedTLI);
 }
 
 /*
@@ -3652,10 +3815,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 
 					/*
 					 * Move to XLOG_FROM_STREAM state, and set to start a
-					 * walreceiver if necessary.
+					 * walreceiver if necessary. The WAL receiver may have
+					 * already started (if it was configured to start
+					 * eagerly).
 					 */
 					currentSource = XLOG_FROM_STREAM;
-					startWalReceiver = true;
+					startWalReceiver = !WalRcvStreaming();;
 					break;
 
 				case XLOG_FROM_STREAM:
@@ -3769,13 +3934,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		{
 			case XLOG_FROM_ARCHIVE:
 			case XLOG_FROM_PG_WAL:
-
-				/*
-				 * WAL receiver must not be running when reading WAL from
-				 * archive or pg_wal.
-				 */
-				Assert(!WalRcvStreaming());
-
 				/* Close any old file we might have open. */
 				if (readFile >= 0)
 				{
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b6281101711..b6bc6a15370 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -88,6 +88,7 @@
 int			wal_receiver_status_interval;
 int			wal_receiver_timeout;
 bool		hot_standby_feedback;
+int			wal_receiver_start_at = WAL_RCV_START_AT_EXHAUST;
 
 /* libpqwalreceiver connection */
 static WalReceiverConn *wrconn = NULL;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..86332e5f771 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -483,6 +483,13 @@ static const struct config_enum_entry wal_compression_options[] = {
 	{NULL, 0, false}
 };
 
+static const struct config_enum_entry wal_rcv_start_options[] = {
+	{"exhaust", WAL_RCV_START_AT_EXHAUST, false},
+	{"consistency", WAL_RCV_START_AT_CONSISTENCY, false},
+	{"startup", WAL_RCV_START_AT_STARTUP, false},
+	{NULL, 0, false}
+};
+
 static const struct config_enum_entry file_copy_method_options[] = {
 	{"copy", FILE_COPY_METHOD_COPY, false},
 #if defined(HAVE_COPYFILE) && defined(COPYFILE_CLONE_FORCE) || defined(HAVE_COPY_FILE_RANGE)
@@ -5418,6 +5425,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, assign_io_method, NULL
 	},
 
+	{
+		{"wal_receiver_start_at", PGC_POSTMASTER, REPLICATION_STANDBY,
+		 	gettext_noop("When to start WAL receiver."),
+		 	NULL,
+		 },
+		 &wal_receiver_start_at,
+		 WAL_RCV_START_AT_EXHAUST,
+		 wal_rcv_start_options,
+		 NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 341f88adc87..b49b098d5b1 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -385,6 +385,7 @@
 					# retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
 #sync_replication_slots = off		# enables slot synchronization on the physical standby from the primary
+#wal_receiver_start_at = 'exhaust'#	'exhaust', 'consistency', or 'startup'			# (change requires restart)
 
 # - Subscribers -
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 89f63f908f8..0bf1f96c5ca 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -28,6 +28,7 @@
 extern PGDLLIMPORT int wal_receiver_status_interval;
 extern PGDLLIMPORT int wal_receiver_timeout;
 extern PGDLLIMPORT bool hot_standby_feedback;
+extern PGDLLIMPORT int	wal_receiver_start_at;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.
@@ -53,6 +54,15 @@ typedef enum
 	WALRCV_STOPPING,			/* requested to stop, but still running */
 } WalRcvState;
 
+typedef enum
+{
+	WAL_RCV_START_AT_STARTUP,	/* start a WAL receiver immediately at startup */
+	WAL_RCV_START_AT_CONSISTENCY,	/* start a WAL receiver once consistency
+									 * has been reached */
+	WAL_RCV_START_AT_EXHAUST,	/* start a WAL receiver after WAL from archive
+								 * and pg_wal has been replayed (default) */
+} WalRcvStartCondition;
+
 /* Shared memory area for management of walreceiver process */
 typedef struct
 {
-- 
2.49.0

