Hi,

On Tue, Mar 11, 2025 at 06:10:32AM +0000, Bertrand Drouvot wrote:
> WalSndWaitForWal() is being used only for logical walsender. So we'd need to 
> find another location for the physical walsender case. One option is to keep 
> the
> WalSndLoop() location and control the reports frequency.

I ended up with an idea in the same vein as your third one, but let's do it
in WalSndLoop() instead for both logical and physical walsenders. The idea
is to report the stats when the walsender is caught up or has pending data to
send. Also, we do flush only periodically (based on PGSTAT_MIN_INTERVAL).

Remarks/Questions:

1. maybe relying on PGSTAT_IDLE_INTERVAL would make more sense? In both case
PGSTAT_MIN_INTERVAL or PGSTAT_MIN_INTERVAL, I'm not sure there is a need to 
update the related doc.

2. bonus point to make it here is that we don't need an extra call to
GetCurrentTimestamp(). We simply assign the one that is already done to "now".

3. add to use poll_query_until() has I've seen failures on the CI whith this
new approach. Also moving the test far away from the stats reset to to minimize
the risk of polling for too long.

With the attached in place, the number of times the stats are being flushed
reduce drastically but still seems enough to get an accurate idea of the 
walsender
IO activity. Thoughts?

Regards,

-- 
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
>From 7978b1bce3d3f5cd47c983911431225b03d42efe Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Tue, 25 Feb 2025 10:18:05 +0000
Subject: [PATCH v4] Flush the IO statistics of active walsenders

The walsender does not flush its IO statistics until it exits.
The issue is there since pg_stat_io has been introduced in a9c70b46dbe.
This commits:

1. ensures it does not wait to exit to flush its IO statistics
2. flush its IO statistics periodically to bot overload the WAL sender
3. adds a test for a physical walsender (a logical walsender had the same issue
but the fix is in the same code path)
---
 src/backend/replication/walsender.c   | 60 +++++++++++++++++++--------
 src/backend/utils/activity/pgstat.c   |  2 -
 src/include/pgstat.h                  |  2 +
 src/test/recovery/t/001_stream_rep.pl | 16 +++++++
 4 files changed, 60 insertions(+), 20 deletions(-)
  70.5% src/backend/replication/
   3.1% src/backend/utils/activity/
   3.6% src/include/
  22.6% src/test/recovery/t/

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 446d10c1a7d..9b44d4ae600 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -90,6 +90,7 @@
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -2740,6 +2741,8 @@ WalSndCheckTimeOut(void)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+	TimestampTz last_flush = 0;
+
 	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
@@ -2834,30 +2837,51 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * WalSndWaitForWal() handle any other blocking; idle receivers need
 		 * its additional actions.  For physical replication, also block if
 		 * caught up; its send_data does not block.
+		 *
+		 * When the WAL sender is caught up or has pending data to send, we
+		 * also periodically report I/O statistics. It's done periodically to
+		 * not overload the WAL sender.
 		 */
-		if ((WalSndCaughtUp && send_data != XLogSendLogical &&
-			 !streamingDoneSending) ||
-			pq_is_send_pending())
+		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
 		{
-			long		sleeptime;
-			int			wakeEvents;
+			TimestampTz now;
 
-			if (!streamingDoneReceiving)
-				wakeEvents = WL_SOCKET_READABLE;
-			else
-				wakeEvents = 0;
+			now = GetCurrentTimestamp();
 
-			/*
-			 * Use fresh timestamp, not last_processing, to reduce the chance
-			 * of reaching wal_sender_timeout before sending a keepalive.
-			 */
-			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+			if (TimestampDifferenceExceeds(last_flush, now, PGSTAT_MIN_INTERVAL))
+			{
+				/*
+				 * Report IO statistics
+				 */
+				pgstat_flush_io(false);
+				(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+				last_flush = now;
+			}
 
-			if (pq_is_send_pending())
-				wakeEvents |= WL_SOCKET_WRITEABLE;
+			if (send_data != XLogSendLogical || pq_is_send_pending())
+			{
+				long		sleeptime;
+				int			wakeEvents;
+
+				if (!streamingDoneReceiving)
+					wakeEvents = WL_SOCKET_READABLE;
+				else
+					wakeEvents = 0;
+
+				/*
+				 * Use fresh timestamp, not last_processing, to reduce the
+				 * chance of reaching wal_sender_timeout before sending a
+				 * keepalive.
+				 */
+				sleeptime = WalSndComputeSleeptime(now);
+
+				if (pq_is_send_pending())
+					wakeEvents |= WL_SOCKET_WRITEABLE;
+
+				/* Sleep until something happens or we time out */
+				WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
+			}
 
-			/* Sleep until something happens or we time out */
-			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
 		}
 	}
 }
diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c
index 3168b825e25..1bf84cbf64e 100644
--- a/src/backend/utils/activity/pgstat.c
+++ b/src/backend/utils/activity/pgstat.c
@@ -123,8 +123,6 @@
  * ----------
  */
 
-/* minimum interval non-forced stats flushes.*/
-#define PGSTAT_MIN_INTERVAL			1000
 /* how long until to block flushing pending stats updates */
 #define PGSTAT_MAX_INTERVAL			60000
 /* when to call pgstat_report_stat() again, even when idle */
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index def6b370ac1..d1b15bf7757 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -22,6 +22,8 @@
 #include "utils/relcache.h"
 #include "utils/wait_event.h"	/* for backward compatibility */	/* IWYU pragma: export */
 
+/* minimum interval non-forced stats flushes.*/
+#define PGSTAT_MIN_INTERVAL                 1000
 
 /* ----------
  * Paths for the statistics files (relative to installation's $PGDATA).
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 3945f00ab88..3371895ab1d 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name,
 	has_streaming => 1);
 $node_standby_2->start;
 
+# To check that an active walsender updates its IO statistics below.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
 # Create some content on primary and check its presence in standby nodes
 $node_primary->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -329,6 +332,19 @@ $node_primary->psql(
 
 note "switching to physical replication slot";
 
+# Wait for the walsender to update its IO statistics.
+# Has to be done before the next restart and far enough from the
+# pg_stat_reset_shared('io') to minimize the risk of polling for too long.
+$node_primary->poll_query_until(
+	'postgres',
+	qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # Switch to using a physical replication slot. We can do this without a new
 # backup since physical slots can go backwards if needed. Do so on both
 # standbys. Since we're going to be testing things that affect the slot state,
-- 
2.34.1

Reply via email to