Hi,

On Mon, Dec 15, 2025 at 12:14 PM Noah Misch <[email protected]> wrote:
>
> On Sun, Dec 14, 2025 at 06:17:34PM +0800, Xuneng Zhou wrote:
> > On Sun, Dec 14, 2025 at 4:55 PM Xuneng Zhou <[email protected]> wrote:
> > > On Sun, Dec 14, 2025 at 1:14 PM Noah Misch <[email protected]> wrote:
> > > > > V2 makes the transition from WALRCV_CONNECTING to STREAMING only when
> > > > > the first valid WAL record is processed by the startup process. A new
> > > > > function WalRcvSetStreaming is introduced to enable the transition.
> > > >
> > > > The original patch set STREAMING in XLogWalRcvFlush().  
> > > > XLogWalRcvFlush()
> > > > callee XLogWalRcvSendReply() already fetches applyPtr to send a status
> > > > message.  So I would try the following before involving the startup 
> > > > process
> > > > like v2 does:
> > > >
> > > > 1. store the applyPtr when we enter CONNECTING
> > > > 2. force a status message as long as we remain in CONNECTING
> > > > 3. become STREAMING when applyPtr differs from the one stored at (1)
> > >
> > > Thanks for the suggestion. Using XLogWalRcvSendReply() for the
> > > transition could make sense. My concern before is about latency in a
> > > rare case: if the first flush completes but applyPtr hasn't advanced
> > > yet at the time of check and then the flush stalls after that, we
> > > might wait up to wal_receiver_status_interval (default 10s) before the
> > > next check or indefinitely if (wal_receiver_status_interval <= 0).
> > > This could be mitigated by shortening the wakeup interval while in
> > > CONNECTING (step 2), which reduces worst-case latency to ~1 second.
> > > Given that monitoring typically doesn't require sub-second precision,
> > > this approach could be feasible.
> > >
> > > case WALRCV_WAKEUP_REPLY:
> > > if (WalRcv->walRcvState == WALRCV_CONNECTING)
> > > {
> > > /* Poll frequently while CONNECTING to avoid long latency */
> > > wakeup[reason] = TimestampTzPlusMilliseconds(now, 1000);
> > > }
> > >
> > > > A possible issue with all patch versions: when the primary is writing 
> > > > no WAL
> > > > and the standby was caught up before this walreceiver started, 
> > > > CONNECTING
> > > > could persist for an unbounded amount of time.  Only actual primary WAL
> > > > generation would move the walreceiver to STREAMING.  This relates to 
> > > > your
> > > > above point about high latency.  If that's a concern, perhaps this 
> > > > change
> > > > deserves a total of two new states, CONNECTING and a state that 
> > > > represents
> > > > "connection exists, no WAL yet applied"?
> > >
> > > Yes, this could be an issue. Using two states would help address it.
> > > That said, when the primary is idle in this case, we might end up
> > > repeatedly polling the apply status in the state before streaming if
> > > we implement the 1s short-interval checking like above, which could be
> > > costful. However, If we do not implement it &&
> > > wal_receiver_status_interval is set to < 0 && flush stalls, the
> > > walreceiver could stay in the pre-streaming state indefinitely even if
> > > streaming did occur, which violates the semantics. Do you think this
> > > is a valid concern or just an artificial edge case?
> >
> > After looking more closely, I found that true indefinite waiting
> > requires ALL of:
> >
> > wal_receiver_status_interval <= 0 (disables status updates)
> > wal_receiver_timeout <= 0
> > Primary sends no keepalives
> > No more WAL arrives after the first failed-check flush
> > Startup never sets force_reply
> >
> > which is quite impossible and artificial, sorry for the noise here.
>
> Even if indefinite wait is a negligible concern, you identified a lot of
> intricacy that I hadn't pictured.  That makes your startup-process-driven
> version potentially more attractive.  Forcing status messages like I was
> thinking may also yield an unwanted flurry of them if the startup process is
> slow.  Let's see what the patch reviewer thinks.

OK, both approaches are presented for review.  Adding two states to
avoid the confusion of the status caused by the stall you depicted
earlier seems reasonable to me. So, I adapted it in v3.


--
Best,
Xuneng
From b4114789d2fb1fbbe585feedfc2b419d768057d7 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Mon, 15 Dec 2025 15:51:46 +0800
Subject: [PATCH v3] Add WALRCV_CONNECTING and WALRCV_CONNECTED states to
 walreceiver

Previously, walreceiver set status='streaming' early in startup before
receiving any WAL, making it unreliable for health monitoring.

Introduce two intermediate states:

- WALRCV_CONNECTING: Walreceiver enters CONNECTING on startup
- WALRCV_CONNECTED: Walreceiver transitions from CONNECTING  to
CONNECTED after START_REPLICATION succeeded, awaiting first WAL record

The final transition from CONNECTED to STREAMING occurs when startup
validates the first record, confirming end-to-end data flow. This allows monitoring
tools to distinguish connection establishment from active WAL streaming.
---
 src/backend/access/transam/xlogrecovery.c  | 17 +++++++++++++
 src/backend/replication/walreceiver.c      | 17 +++++++++++--
 src/backend/replication/walreceiverfuncs.c | 29 +++++++++++++++++++++-
 src/include/replication/walreceiver.h      |  4 +++
 4 files changed, 64 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index ae2398d6975..89af2909063 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -250,6 +250,9 @@ static XLogSource currentSource = XLOG_FROM_ANY;
 static bool lastSourceFailed = false;
 static bool pendingWalRcvRestart = false;
 
+/* Guard to update walreceiver state only once per streaming session. */
+static bool walrcv_streaming_set = false;
+
 /*
  * These variables track when we last obtained some WAL data to process,
  * and where we got it from.  (XLogReceiptSource is initially the same as
@@ -1827,6 +1830,17 @@ PerformWalRecovery(void)
 					recoveryPausesHere(false);
 			}
 
+			/*
+			 * If we are reading from the stream, this is the first valid
+			 * record we have successfully parsed. Now we can verify the
+			 * connection is truly streaming valid WAL.
+			 */
+			if (!walrcv_streaming_set && readSource == XLOG_FROM_STREAM)
+			{
+				if (WalRcvSetStreaming())
+					walrcv_streaming_set = true;
+			}
+
 			/*
 			 * Apply the record
 			 */
@@ -3692,6 +3706,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * one can hope...
 					 */
 
+					/* Reset our "streaming active" guard flag */
+					walrcv_streaming_set = false;
+
 					/*
 					 * We should be able to move to XLOG_FROM_STREAM only in
 					 * standby mode.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ac802ae85b4..fc8d0cd7804 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -205,6 +205,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 			break;
 
 		case WALRCV_WAITING:
+		case WALRCV_CONNECTING:
+		case WALRCV_CONNECTED:
 		case WALRCV_STREAMING:
 		case WALRCV_RESTARTING:
 		default:
@@ -214,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	}
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
-	walrcv->walRcvState = WALRCV_STREAMING;
+	walrcv->walRcvState = WALRCV_CONNECTING;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -394,6 +396,11 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 							   LSN_FORMAT_ARGS(startpoint), startpointTLI));
 			first_stream = false;
 
+			SpinLockAcquire(&walrcv->mutex);
+			if (walrcv->walRcvState == WALRCV_CONNECTING)
+				walrcv->walRcvState = WALRCV_CONNECTED;
+			SpinLockRelease(&walrcv->mutex);
+
 			/* Initialize LogstreamResult and buffers for processing messages */
 			LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
 			initStringInfo(&reply_message);
@@ -688,7 +695,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 			 */
 			*startpoint = walrcv->receiveStart;
 			*startpointTLI = walrcv->receiveStartTLI;
-			walrcv->walRcvState = WALRCV_STREAMING;
+			walrcv->walRcvState = WALRCV_CONNECTING;
 			SpinLockRelease(&walrcv->mutex);
 			break;
 		}
@@ -791,6 +798,8 @@ WalRcvDie(int code, Datum arg)
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+		   walrcv->walRcvState == WALRCV_CONNECTING ||
+		   walrcv->walRcvState == WALRCV_CONNECTED ||
 		   walrcv->walRcvState == WALRCV_RESTARTING ||
 		   walrcv->walRcvState == WALRCV_STARTING ||
 		   walrcv->walRcvState == WALRCV_WAITING ||
@@ -1373,6 +1382,10 @@ WalRcvGetStateString(WalRcvState state)
 			return "stopped";
 		case WALRCV_STARTING:
 			return "starting";
+		case WALRCV_CONNECTING:
+			return "connecting";
+		case WALRCV_CONNECTED:
+			return "connected";
 		case WALRCV_STREAMING:
 			return "streaming";
 		case WALRCV_WAITING:
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 822645748a7..62482830ba2 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -179,12 +179,37 @@ WalRcvStreaming(void)
 	}
 
 	if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
-		state == WALRCV_RESTARTING)
+		state == WALRCV_RESTARTING || state == WALRCV_CONNECTING ||
+		state == WALRCV_CONNECTED)
 		return true;
 	else
 		return false;
 }
 
+/*
+ * Transition from CONNECTED to STREAMING state.
+ *
+ * This is called by the startup process when the first WAL record from
+ * the walreceiver is processed, indicating that the connection is fully
+ * established and data is flowing.
+ */
+bool
+WalRcvSetStreaming(void)
+{
+	WalRcvData *walrcv = WalRcv;
+	bool		set = false;
+
+	SpinLockAcquire(&walrcv->mutex);
+	if (walrcv->walRcvState == WALRCV_CONNECTED)
+	{
+		walrcv->walRcvState = WALRCV_STREAMING;
+		set = true;
+	}
+	SpinLockRelease(&walrcv->mutex);
+
+	return set;
+}
+
 /*
  * Stop walreceiver (if running) and wait for it to die.
  * Executed by the Startup process.
@@ -211,6 +236,8 @@ ShutdownWalRcv(void)
 			stopped = true;
 			break;
 
+		case WALRCV_CONNECTING:
+		case WALRCV_CONNECTED:
 		case WALRCV_STREAMING:
 		case WALRCV_WAITING:
 		case WALRCV_RESTARTING:
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e5557d21fa8..656206e8d81 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -47,6 +47,9 @@ typedef enum
 	WALRCV_STOPPED,				/* stopped and mustn't start up again */
 	WALRCV_STARTING,			/* launched, but the process hasn't
 								 * initialized yet */
+	WALRCV_CONNECTING,			/* connection starting, but not established yet */
+	WALRCV_CONNECTED,			/* replication connection established, but no WAL
+								 * streamed yet */
 	WALRCV_STREAMING,			/* walreceiver is streaming */
 	WALRCV_WAITING,				/* stopped streaming, waiting for orders */
 	WALRCV_RESTARTING,			/* asked to restart streaming */
@@ -492,6 +495,7 @@ extern void WalRcvForceReply(void);
 /* prototypes for functions in walreceiverfuncs.c */
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
+extern bool WalRcvSetStreaming(void);
 extern void ShutdownWalRcv(void);
 extern bool WalRcvStreaming(void);
 extern bool WalRcvRunning(void);
-- 
2.51.0

From 51120c0c46533a062fb6be61ca58d100dc0059a0 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Mon, 15 Dec 2025 16:37:07 +0800
Subject: [PATCH v3] Add CONNECTING/CONNECTED states to walreceiver

Previously, walreceiver reported WALRCV_STREAMING immediately after startup,
before it had demonstrated end-to-end WAL flow. This could confuse monitoring
in some cases.

Introduce two intermediate states:
- WALRCV_CONNECTING: walreceiver is starting up
- WALRCV_CONNECTED: START_REPLICATION succeeded, but replay progress has not
  yet been observed

Capture a baseline apply pointer when START_REPLICATION succeeds, and promote
to WALRCV_STREAMING once applyPtr advances beyond that baseline.
---
 src/backend/replication/walreceiver.c      | 43 ++++++++++++++++++++--
 src/backend/replication/walreceiverfuncs.c |  5 ++-
 src/include/replication/walreceiver.h      |  3 ++
 3 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ac802ae85b4..345deed9093 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -130,6 +130,7 @@ typedef enum WalRcvWakeupReason
 static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
 
 static StringInfoData reply_message;
+static XLogRecPtr initialApplyPtr = InvalidXLogRecPtr;
 
 /* Prototypes for private functions */
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
@@ -204,6 +205,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 			/* The usual case */
 			break;
 
+		case WALRCV_CONNECTING:
+		case WALRCV_CONNECTED:
 		case WALRCV_WAITING:
 		case WALRCV_STREAMING:
 		case WALRCV_RESTARTING:
@@ -214,7 +217,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	}
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
-	walrcv->walRcvState = WALRCV_STREAMING;
+	walrcv->walRcvState = WALRCV_CONNECTING;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -398,6 +401,13 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 			LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
 			initStringInfo(&reply_message);
 
+			/* We are connected, but have not yet applied WAL from this stream */
+			initialApplyPtr = LogstreamResult.Write;
+			SpinLockAcquire(&walrcv->mutex);
+			if (walrcv->walRcvState == WALRCV_CONNECTING)
+				walrcv->walRcvState = WALRCV_CONNECTED;
+			SpinLockRelease(&walrcv->mutex);
+
 			/* Initialize nap wakeup times. */
 			now = GetCurrentTimestamp();
 			for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
@@ -688,8 +698,9 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 			 */
 			*startpoint = walrcv->receiveStart;
 			*startpointTLI = walrcv->receiveStartTLI;
-			walrcv->walRcvState = WALRCV_STREAMING;
+			walrcv->walRcvState = WALRCV_CONNECTING;
 			SpinLockRelease(&walrcv->mutex);
+			initialApplyPtr = InvalidXLogRecPtr;
 			break;
 		}
 		if (walrcv->walRcvState == WALRCV_STOPPING)
@@ -790,7 +801,9 @@ WalRcvDie(int code, Datum arg)
 
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
-	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+	Assert(walrcv->walRcvState == WALRCV_CONNECTING ||
+		   walrcv->walRcvState == WALRCV_CONNECTED ||
+		   walrcv->walRcvState == WALRCV_STREAMING ||
 		   walrcv->walRcvState == WALRCV_RESTARTING ||
 		   walrcv->walRcvState == WALRCV_STARTING ||
 		   walrcv->walRcvState == WALRCV_WAITING ||
@@ -989,6 +1002,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 	if (LogstreamResult.Flush < LogstreamResult.Write)
 	{
 		WalRcvData *walrcv = WalRcv;
+		bool		force_reply = false;
 
 		issue_xlog_fsync(recvFile, recvSegNo, tli);
 
@@ -1001,6 +1015,8 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 			walrcv->latestChunkStart = walrcv->flushedUpto;
 			walrcv->flushedUpto = LogstreamResult.Flush;
 			walrcv->receivedTLI = tli;
+			if (walrcv->walRcvState == WALRCV_CONNECTED)
+				force_reply = true;
 		}
 		SpinLockRelease(&walrcv->mutex);
 
@@ -1022,7 +1038,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		/* Also let the primary know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
+			XLogWalRcvSendReply(force_reply, false);
 			XLogWalRcvSendHSFeedback(false);
 		}
 	}
@@ -1129,6 +1145,21 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	flushPtr = LogstreamResult.Flush;
 	applyPtr = GetXLogReplayRecPtr(NULL);
 
+	/*
+	 * If we've established the replication connection but have not yet proven
+	 * WAL is flowing end-to-end, flip to STREAMING once applyPtr advances
+	 * beyond the baseline captured when START_REPLICATION succeeded.
+	 */
+	if (WalRcv->walRcvState == WALRCV_CONNECTED &&
+		XLogRecPtrIsValid(initialApplyPtr) &&
+		applyPtr != initialApplyPtr)
+	{
+		SpinLockAcquire(&WalRcv->mutex);
+		if (WalRcv->walRcvState == WALRCV_CONNECTED)
+			WalRcv->walRcvState = WALRCV_STREAMING;
+		SpinLockRelease(&WalRcv->mutex);
+	}
+
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
 	pq_sendint64(&reply_message, writePtr);
@@ -1373,6 +1404,10 @@ WalRcvGetStateString(WalRcvState state)
 			return "stopped";
 		case WALRCV_STARTING:
 			return "starting";
+		case WALRCV_CONNECTING:
+			return "connecting";
+		case WALRCV_CONNECTED:
+			return "connected";
 		case WALRCV_STREAMING:
 			return "streaming";
 		case WALRCV_WAITING:
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 822645748a7..b0b6e8314b5 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -179,7 +179,8 @@ WalRcvStreaming(void)
 	}
 
 	if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
-		state == WALRCV_RESTARTING)
+		state == WALRCV_RESTARTING || state == WALRCV_CONNECTING ||
+		state == WALRCV_CONNECTED)
 		return true;
 	else
 		return false;
@@ -211,6 +212,8 @@ ShutdownWalRcv(void)
 			stopped = true;
 			break;
 
+		case WALRCV_CONNECTING:
+		case WALRCV_CONNECTED:
 		case WALRCV_STREAMING:
 		case WALRCV_WAITING:
 		case WALRCV_RESTARTING:
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e5557d21fa8..70b2947f2b4 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -47,6 +47,9 @@ typedef enum
 	WALRCV_STOPPED,				/* stopped and mustn't start up again */
 	WALRCV_STARTING,			/* launched, but the process hasn't
 								 * initialized yet */
+	WALRCV_CONNECTING,			/* connection starting, but not established yet */
+	WALRCV_CONNECTED,			/* replication connection established, but no WAL
+								 * streamed yet */
 	WALRCV_STREAMING,			/* walreceiver is streaming */
 	WALRCV_WAITING,				/* stopped streaming, waiting for orders */
 	WALRCV_RESTARTING,			/* asked to restart streaming */
-- 
2.51.0

Reply via email to