From ccdf02bfdcca1807d9fe6bd1e39b0b185f81e5e6 Mon Sep 17 00:00:00 2001
From: alterego665 <824662526@qq.com>
Date: Thu, 28 Aug 2025 15:38:47 +0800
Subject: [PATCH v3 2/2] Improve read_local_xlog_page_guts by replacing polling
 with latch-based waiting

Replace inefficient polling loops in read_local_xlog_page_guts with latch-based waiting
when WAL data is not yet available.  This eliminates CPU-intensive busy waiting and improves
responsiveness by waking processes immediately when their target LSN becomes available.
---
 src/backend/access/transam/xlog.c             | 18 ++++
 src/backend/access/transam/xlogutils.c        | 48 ++++++++--
 src/backend/access/transam/xlogwait.c         | 93 ++++++++++++++++---
 src/backend/replication/walsender.c           |  4 -
 .../utils/activity/wait_event_names.txt       |  1 +
 src/include/access/xlogwait.h                 |  1 +
 6 files changed, 142 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f5257dfa689..4af8f22f166 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2913,6 +2913,15 @@ XLogFlush(XLogRecPtr record)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for then walk
+	 * over the shared memory array and set latches to notify the
+	 * waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
+		WaitLSNWakeup(LogwrtResult.Flush);
+
 	/*
 	 * If we still haven't flushed to the request point then we have a
 	 * problem; most likely, the requested flush point is past end of XLOG.
@@ -3088,6 +3097,15 @@ XLogBackgroundFlush(void)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for then walk
+	 * over the shared memory array and set latches to notify the
+	 * waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedLSN)))
+		WaitLSNWakeup(LogwrtResult.Flush);
+
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
 	 * as many of the no-longer-needed WAL buffers for future use as we can.
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 27ea52fdfee..b7ba3f6a737 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -23,6 +23,7 @@
 #include "access/xlogrecovery.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
@@ -880,12 +881,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	loc = targetPagePtr + reqLen;
 
 	/*
-	 * Loop waiting for xlog to be available if necessary
-	 *
-	 * TODO: The walsender has its own version of this function, which uses a
-	 * condition variable to wake up whenever WAL is flushed. We could use the
-	 * same infrastructure here, instead of the check/sleep/repeat style of
-	 * loop.
+	 * Waiting for xlog to be available if necessary.
 	 */
 	while (1)
 	{
@@ -927,7 +923,6 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 
 		if (state->currTLI == currTLI)
 		{
-
 			if (loc <= read_upto)
 				break;
 
@@ -947,7 +942,44 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 			}
 
 			CHECK_FOR_INTERRUPTS();
-			pg_usleep(1000L);
+
+			/*
+			 * Wait for LSN using appropriate method based on server state.
+			 */
+			if (!RecoveryInProgress())
+			{
+				/* Primary: wait for flush */
+				WaitForLSNFlush(loc);
+			}
+			else
+			{
+				/* Standby: wait for replay */
+				WaitLSNResult result = WaitForLSNReplay(loc, 0);
+
+				switch (result)
+				{
+					case WAIT_LSN_RESULT_SUCCESS:
+						/* LSN was replayed, loop back to recheck timeline */
+						break;
+
+					case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+						/*
+						 * Promoted while waiting. This is the tricky case.
+						 * We're now a primary, so loop back and use flush
+						 * logic instead of replay logic.
+						 */
+						break;
+
+					default:
+						/* Shouldn't happen without timeout */
+						elog(ERROR, "unexpected wait result");
+				}
+			}
+
+			/*
+			 * Loop back to recheck everything.
+			 * Timeline might have changed during our wait.
+			 */
 		}
 		else
 		{
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
index 2cc9312e836..70241bd384f 100644
--- a/src/backend/access/transam/xlogwait.c
+++ b/src/backend/access/transam/xlogwait.c
@@ -1,8 +1,8 @@
 /*-------------------------------------------------------------------------
  *
  * xlogwait.c
- *	  Implements waiting for the given replay LSN, which is used in
- *	  WAIT FOR lsn '...'
+ *	  Implements waiting for WAL operations to reach specific LSNs.
+ *	  Used by WAIT FOR lsn '...' and internal WAL reading operations.
  *
  * Copyright (c) 2025, PostgreSQL Global Development Group
  *
@@ -10,10 +10,11 @@
  *	  src/backend/access/transam/xlogwait.c
  *
  * NOTES
- *		This file implements waiting for the replay of the given LSN on a
- *		physical standby.  The core idea is very small: every backend that
- *		wants to wait publishes the LSN it needs to the shared memory, and
- *		the startup process wakes it once that LSN has been replayed.
+ *		This file implements waiting for WAL operations to reach specific LSNs
+ *		on both physical standby and primary servers. The core idea is simple:
+ *		every process that wants to wait publishes the LSN it needs to the
+ *		shared memory, and the appropriate process (startup on standby, or
+ *		WAL writer/backend on primary) wakes it once that LSN has been reached.
  *
  *		The shared memory used by this module comprises a procInfos
  *		per-backend array with the information of the awaited LSN for each
@@ -23,14 +24,18 @@
  *
  *		In addition, the least-awaited LSN is cached as minWaitedLSN.  The
  *		waiter process publishes information about itself to the shared
- *		memory and waits on the latch before it wakens up by a startup
+ *		memory and waits on the latch before it wakens up by the appropriate
  *		process, timeout is reached, standby is promoted, or the postmaster
  *		dies.  Then, it cleans information about itself in the shared memory.
  *
- *		After replaying a WAL record, the startup process first performs a
- *		fast path check minWaitedLSN > replayLSN.  If this check is negative,
- *		it checks waitersHeap and wakes up the backend whose awaited LSNs
- *		are reached.
+ *		On standby servers: After replaying a WAL record, the startup process
+ *		first performs a fast path check minWaitedLSN > replayLSN.  If this
+ *		check is negative, it checks waitersHeap and wakes up the backend
+ *		whose awaited LSNs are reached.
+ *
+ *		On primary servers: After flushing WAL, the WAL writer or backend
+ *		process performs a similar check against the flush LSN and wakes up
+ *		waiters whose target flush LSNs have been reached.
  *
  *-------------------------------------------------------------------------
  */
@@ -386,3 +391,69 @@ WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout)
 
 	return WAIT_LSN_RESULT_SUCCESS;
 }
+
+/*
+ * Wait for LSN to be flushed on primary server.
+ * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was replayed
+ */
+void
+WaitForLSNFlush(XLogRecPtr targetLSN)
+{
+	XLogRecPtr	currentLSN;
+	int			wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSNState);
+
+	/* Should have a valid proc number */
+	Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
+
+	/* We can only wait for flush when we are not in recovery */
+	Assert(!RecoveryInProgress());
+
+	/* Quick exit if already flushed */
+	currentLSN = GetFlushRecPtr(NULL);
+	if (targetLSN <= currentLSN)
+		return;
+
+	/* Add to waiters */
+	addLSNWaiter(targetLSN);
+
+	/* Wait loop */
+	for (;;)
+	{
+		int			rc;
+
+		/* Check if the waited LSN has been flushed */
+		currentLSN = GetFlushRecPtr(NULL);
+		if (targetLSN <= currentLSN)
+			break;
+
+		CHECK_FOR_INTERRUPTS();
+
+		rc = WaitLatch(MyLatch, wake_events, -1,
+					   WAIT_EVENT_WAIT_FOR_WAL_FLUSH);	
+
+		/*
+		 * Emergency bailout if postmaster has died. This is to avoid the
+		 * necessity for manual cleanup of all postmaster children.
+		 */
+		if (rc & WL_POSTMASTER_DEATH)
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating connection due to unexpected postmaster exit"),
+					 errcontext("while waiting for LSN flush")));
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	/*
+	 * Delete our process from the shared memory pairing heap. We might
+	 * already be deleted by the waker process. The 'inHeap' flag prevents
+	 * us from the double deletion.
+	 */
+	deleteLSNWaiter();
+
+	return;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0855bae3535..5fb74088fcd 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1021,10 +1021,6 @@ StartReplication(StartReplicationCmd *cmd)
 /*
  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
  * walsender process.
- *
- * Inside the walsender we can do better than read_local_xlog_page,
- * which has to do a plain sleep/busy loop, because the walsender's latch gets
- * set every time WAL is flushed.
  */
 static int
 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index ee20a48b2c5..b2266666c02 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,6 +89,7 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary."
 WAIT_FOR_WAL_REPLAY	"Waiting for WAL replay to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
index 72be2f76293..742c47609ae 100644
--- a/src/include/access/xlogwait.h
+++ b/src/include/access/xlogwait.h
@@ -86,5 +86,6 @@ extern void WaitLSNShmemInit(void);
 extern void WaitLSNWakeup(XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout);
+extern void WaitForLSNFlush(XLogRecPtr targetLSN);
 
 #endif							/* XLOG_WAIT_H */
-- 
2.49.0

