From ca10a52bd7a835b2873268236a4553fc911e2de3 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Tue, 25 Nov 2025 17:58:28 +0800
Subject: [PATCH v1 1/5] Extend xlogwait infrastructure with write and flush 
 wait types

Add support for waiting on WAL write and flush LSNs in addition to the
existing replay LSN wait type. This provides the foundation for
extending the WAIT FOR command with MODE parameter.

Key changes:
- Add WAIT_LSN_TYPE_WRITE and WAIT_LSN_TYPE_FLUSH_STANDBY to WaitLSNType
- Add GetCurrentLSNForWaitType() to retrieve current LSN
- Add new wait events WAIT_EVENT_WAIT_FOR_WAL_WRITE and
  WAIT_EVENT_WAIT_FOR_WAL_FLUSH for pg_stat_activity visibility
- Update WaitForLSN() to use GetCurrentLSNForWaitType() internally
---
 src/backend/access/transam/xlogwait.c         | 79 ++++++++++++++-----
 .../utils/activity/wait_event_names.txt       |  3 +-
 src/include/access/xlogwait.h                 |  7 +-
 3 files changed, 67 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
index 98aa5f1e4a2..86709e0df63 100644
--- a/src/backend/access/transam/xlogwait.c
+++ b/src/backend/access/transam/xlogwait.c
@@ -12,25 +12,30 @@
  *		This file implements waiting for WAL operations to reach specific LSNs
  *		on both physical standby and primary servers. The core idea is simple:
  *		every process that wants to wait publishes the LSN it needs to the
- *		shared memory, and the appropriate process (startup on standby, or
- *		WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *		shared memory, and the appropriate process (startup on standby,
+ *		walreceiver on standby, or WAL writer/backend on primary) wakes it
+ *		once that LSN has been reached.
  *
  *		The shared memory used by this module comprises a procInfos
  *		per-backend array with the information of the awaited LSN for each
  *		of the backend processes.  The elements of that array are organized
- *		into a pairing heap waitersHeap, which allows for very fast finding
- *		of the least awaited LSN.
+ *		into pairing heaps (waitersHeap), one for each WaitLSNType, which
+ *		allows for very fast finding of the least awaited LSN for each type.
  *
- *		In addition, the least-awaited LSN is cached as minWaitedLSN.  The
- *		waiter process publishes information about itself to the shared
- *		memory and waits on the latch until it is woken up by the appropriate
- *		process, standby is promoted, or the postmaster	dies.  Then, it cleans
- *		information about itself in the shared memory.
+ *		In addition, the least-awaited LSN for each type is cached in the
+ *		minWaitedLSN array.  The waiter process publishes information about
+ *		itself to the shared memory and waits on the latch until it is woken
+ *		up by the appropriate process, standby is promoted, or the postmaster
+ *		dies.  Then, it cleans information about itself in the shared memory.
  *
- *		On standby servers: After replaying a WAL record, the startup process
- *		first performs a fast path check minWaitedLSN > replayLSN.  If this
- *		check is negative, it checks waitersHeap and wakes up the backend
- *		whose awaited LSNs are reached.
+ *		On standby servers:
+ *		- After replaying a WAL record, the startup process performs a fast
+ *		  path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
+ *		  negative, it checks waitersHeap[REPLAY] and wakes up the backends
+ *		  whose awaited LSNs are reached.
+ *		- After receiving WAL, the walreceiver process performs similar checks
+ *		  against the flush and write LSNs, waking up waiters in the FLUSH
+ *		  and WRITE heaps respectively.
  *
  *		On primary servers: After flushing WAL, the WAL writer or backend
  *		process performs a similar check against the flush LSN and wakes up
@@ -49,6 +54,7 @@
 #include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
@@ -62,6 +68,43 @@ static int	waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
 
 struct WaitLSNState *waitLSNState = NULL;
 
+/*
+ * Wait event for each WaitLSNType, used with WaitLatch() to report
+ * the wait in pg_stat_activity.
+ */
+static const uint32 WaitLSNWaitEvents[] = {
+	[WAIT_LSN_TYPE_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
+	[WAIT_LSN_TYPE_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
+	[WAIT_LSN_TYPE_FLUSH_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+	[WAIT_LSN_TYPE_FLUSH_PRIMARY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+};
+
+/*
+ * Get the current LSN for the specified wait type.
+ */
+XLogRecPtr
+GetCurrentLSNForWaitType(WaitLSNType lsnType)
+{
+	switch (lsnType)
+	{
+		case WAIT_LSN_TYPE_REPLAY:
+			return GetXLogReplayRecPtr(NULL);
+
+		case WAIT_LSN_TYPE_WRITE:
+			return GetWalRcvWriteRecPtr();
+
+		case WAIT_LSN_TYPE_FLUSH_STANDBY:
+			return GetWalRcvFlushRecPtr(NULL, NULL);
+
+		case WAIT_LSN_TYPE_FLUSH_PRIMARY:
+			return GetFlushRecPtr(NULL);
+
+		default:
+			elog(ERROR, "invalid LSN wait type: %d", lsnType);
+			return InvalidXLogRecPtr;	/* keep compiler quiet */
+	}
+}
+
 /* Report the amount of shared memory space needed for WaitLSNState. */
 Size
 WaitLSNShmemSize(void)
@@ -341,13 +384,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		int			rc;
 		long		delay_ms = -1;
 
-		if (lsnType == WAIT_LSN_TYPE_REPLAY)
-			currentLSN = GetXLogReplayRecPtr(NULL);
-		else
-			currentLSN = GetFlushRecPtr(NULL);
+		/* Get current LSN for the wait type */
+		currentLSN = GetCurrentLSNForWaitType(lsnType);
 
 		/* Check that recovery is still in-progress */
-		if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
+		if (lsnType != WAIT_LSN_TYPE_FLUSH_PRIMARY && !RecoveryInProgress())
 		{
 			/*
 			 * Recovery was ended, but check if target LSN was already
@@ -376,7 +417,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		CHECK_FOR_INTERRUPTS();
 
 		rc = WaitLatch(MyLatch, wake_events, delay_ms,
-					   (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+					   WaitLSNWaitEvents[lsnType]);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index c1ac71ff7f2..fbcdb92dcfb 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
-WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary or standby."
 WAIT_FOR_WAL_REPLAY	"Waiting for WAL replay to reach a target LSN on a standby."
+WAIT_FOR_WAL_WRITE	"Waiting for WAL write to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
index e607441d618..64a2fb02eac 100644
--- a/src/include/access/xlogwait.h
+++ b/src/include/access/xlogwait.h
@@ -36,8 +36,10 @@ typedef enum
 typedef enum WaitLSNType
 {
 	WAIT_LSN_TYPE_REPLAY = 0,	/* Waiting for replay on standby */
-	WAIT_LSN_TYPE_FLUSH = 1,	/* Waiting for flush on primary */
-	WAIT_LSN_TYPE_COUNT = 2
+	WAIT_LSN_TYPE_FLUSH_STANDBY = 1,	/* Waiting for flush on standby */
+	WAIT_LSN_TYPE_WRITE = 2,	/* Waiting for write on standby */
+	WAIT_LSN_TYPE_FLUSH_PRIMARY = 3,	/* Waiting for flush on primary */
+	WAIT_LSN_TYPE_COUNT = 4
 } WaitLSNType;
 
 /*
@@ -96,6 +98,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
 
 extern Size WaitLSNShmemSize(void);
 extern void WaitLSNShmemInit(void);
+extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
 extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,
-- 
2.51.0

