On Fri, Apr 04, 2025 at 09:33:46PM +0530, vignesh C wrote:
> The new test added currently passes even without the patch. It would
> be ideal to have a test that fails without the patch and passes once
> the patch is applied.

Right.  The subscription test and logical WAL senders passes without
the patch, because we are able to catch some WAL activity through
pgoutput.  The recovery test for physical WAL sender fails without the
patch on timeout.

We could need something more advanced here for the logical case, where
we could use pg_recvlogical started in the background with a hardcoded
endpos, or just kill the pg_recvlogical command once we have checked
the state of the stats.  I am not sure if this is worth the cycles
spent on, TBH, so I would be happy with just the physical case checked
in TAP as it's simpler because streaming replication makes that easy
to work with.

One thing that I'm a bit unhappy about in the patch is the choice to
do the stats updates in WalSndLoop() for the logical WAL sender case,
because this forces an extra GetCurrentTimestamp() call for each loop,
and that's never a cheap system call in what can be a hot code path.
How about doing the calculations in WalSndWaitForWal() for the logical
part, relying on the existing GetCurrentTimestamp() done there?
That's also where the waits are handled for the logical part, so there
may be a good point in keeping this code more symmetric for now,
rather than split it.

Saying that, here is a version 7 with all that included, which is
simpler to read.
--
Michael
From 9f6dadc679cdc0da7c005d005a06dad6e81020ad Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Tue, 25 Feb 2025 10:18:05 +0000
Subject: [PATCH v7] Flush the IO statistics of active walsenders

The walsender does not flush its IO statistics until it exits.
The issue is there since pg_stat_io has been introduced in a9c70b46dbe.
This commits:

1. ensures it does not wait to exit to flush its IO statistics
2. flush its IO statistics periodically to not overload the walsender
3. adds a test for a physical walsender and a test for a logical walsender
---
 src/backend/replication/walsender.c   | 36 +++++++++++++++++++++++++--
 src/test/recovery/t/001_stream_rep.pl | 15 +++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1028919aecb1..216baeda5cd2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -91,10 +91,14 @@
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
 
+/* Minimum interval used by walsender for stats flushes, in ms */
+#define WALSENDER_STATS_FLUSH_INTERVAL         1000
+
 /*
  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
  *
@@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 	int			wakeEvents;
 	uint32		wait_event = 0;
 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+	TimestampTz last_flush = 0;
 
 	/*
 	 * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 	{
 		bool		wait_for_standby_at_stop = false;
 		long		sleeptime;
+		TimestampTz now;
 
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
@@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * new WAL to be generated.  (But if we have nothing to send, we don't
 		 * want to wake on socket-writable.)
 		 */
-		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+		now = GetCurrentTimestamp();
+		sleeptime = WalSndComputeSleeptime(now);
 
 		wakeEvents = WL_SOCKET_READABLE;
 
@@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc)
 
 		Assert(wait_event != 0);
 
+		/* Report IO statistics, if needed */
+		if (TimestampDifferenceExceeds(last_flush, now,
+									   WALSENDER_STATS_FLUSH_INTERVAL))
+		{
+			pgstat_flush_io(false);
+			(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+			last_flush = now;
+		}
+
 		WalSndWait(wakeEvents, sleeptime, wait_event);
 	}
 
@@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+	TimestampTz last_flush = 0;
+
 	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
@@ -2836,6 +2854,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * WalSndWaitForWal() handle any other blocking; idle receivers need
 		 * its additional actions.  For physical replication, also block if
 		 * caught up; its send_data does not block.
+		 *
+		 * The IO statistics are reported in WalSndWaitForWal() for the
+		 * logical WAL senders.
 		 */
 		if ((WalSndCaughtUp && send_data != XLogSendLogical &&
 			 !streamingDoneSending) ||
@@ -2843,6 +2864,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		{
 			long		sleeptime;
 			int			wakeEvents;
+			TimestampTz now;
 
 			if (!streamingDoneReceiving)
 				wakeEvents = WL_SOCKET_READABLE;
@@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			 * Use fresh timestamp, not last_processing, to reduce the chance
 			 * of reaching wal_sender_timeout before sending a keepalive.
 			 */
-			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+			now = GetCurrentTimestamp();
+			sleeptime = WalSndComputeSleeptime(now);
 
 			if (pq_is_send_pending())
 				wakeEvents |= WL_SOCKET_WRITEABLE;
 
+			/* Report IO statistics, if needed */
+			if (TimestampDifferenceExceeds(last_flush, now,
+										   WALSENDER_STATS_FLUSH_INTERVAL))
+			{
+				pgstat_flush_io(false);
+				(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+				last_flush = now;
+			}
+
 			/* Sleep until something happens or we time out */
 			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
 		}
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ccd8417d449f..e55d8ec0ec17 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name,
 	has_streaming => 1);
 $node_standby_2->start;
 
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
 # Create some content on primary and check its presence in standby nodes
 $node_primary->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -333,6 +336,18 @@ $node_primary->psql(
 
 note "switching to physical replication slot";
 
+# Wait for the walsender to update its IO statistics.  This is done before
+# the next restart and far enough from the reset done above.
+$node_primary->poll_query_until(
+	'postgres',
+	qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # Switch to using a physical replication slot. We can do this without a new
 # backup since physical slots can go backwards if needed. Do so on both
 # standbys. Since we're going to be testing things that affect the slot state,
-- 
2.49.0

Attachment: signature.asc
Description: PGP signature

Reply via email to