On Fri, Apr 04, 2025 at 09:33:46PM +0530, vignesh C wrote: > The new test added currently passes even without the patch. It would > be ideal to have a test that fails without the patch and passes once > the patch is applied.
Right. The subscription test and logical WAL senders passes without the patch, because we are able to catch some WAL activity through pgoutput. The recovery test for physical WAL sender fails without the patch on timeout. We could need something more advanced here for the logical case, where we could use pg_recvlogical started in the background with a hardcoded endpos, or just kill the pg_recvlogical command once we have checked the state of the stats. I am not sure if this is worth the cycles spent on, TBH, so I would be happy with just the physical case checked in TAP as it's simpler because streaming replication makes that easy to work with. One thing that I'm a bit unhappy about in the patch is the choice to do the stats updates in WalSndLoop() for the logical WAL sender case, because this forces an extra GetCurrentTimestamp() call for each loop, and that's never a cheap system call in what can be a hot code path. How about doing the calculations in WalSndWaitForWal() for the logical part, relying on the existing GetCurrentTimestamp() done there? That's also where the waits are handled for the logical part, so there may be a good point in keeping this code more symmetric for now, rather than split it. Saying that, here is a version 7 with all that included, which is simpler to read. -- Michael
From 9f6dadc679cdc0da7c005d005a06dad6e81020ad Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <bertranddrouvot...@gmail.com> Date: Tue, 25 Feb 2025 10:18:05 +0000 Subject: [PATCH v7] Flush the IO statistics of active walsenders The walsender does not flush its IO statistics until it exits. The issue is there since pg_stat_io has been introduced in a9c70b46dbe. This commits: 1. ensures it does not wait to exit to flush its IO statistics 2. flush its IO statistics periodically to not overload the walsender 3. adds a test for a physical walsender and a test for a logical walsender --- src/backend/replication/walsender.c | 36 +++++++++++++++++++++++++-- src/test/recovery/t/001_stream_rep.pl | 15 +++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1028919aecb1..216baeda5cd2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -91,10 +91,14 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" +#include "utils/pgstat_internal.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" +/* Minimum interval used by walsender for stats flushes, in ms */ +#define WALSENDER_STATS_FLUSH_INTERVAL 1000 + /* * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. * @@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc) int wakeEvents; uint32 wait_event = 0; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + TimestampTz last_flush = 0; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc) { bool wait_for_standby_at_stop = false; long sleeptime; + TimestampTz now; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc) * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + sleeptime = WalSndComputeSleeptime(now); wakeEvents = WL_SOCKET_READABLE; @@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc) Assert(wait_event != 0); + /* Report IO statistics, if needed */ + if (TimestampDifferenceExceeds(last_flush, now, + WALSENDER_STATS_FLUSH_INTERVAL)) + { + pgstat_flush_io(false); + (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO); + last_flush = now; + } + WalSndWait(wakeEvents, sleeptime, wait_event); } @@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void) static void WalSndLoop(WalSndSendDataCallback send_data) { + TimestampTz last_flush = 0; + /* * Initialize the last reply timestamp. That enables timeout processing * from hereon. @@ -2836,6 +2854,9 @@ WalSndLoop(WalSndSendDataCallback send_data) * WalSndWaitForWal() handle any other blocking; idle receivers need * its additional actions. For physical replication, also block if * caught up; its send_data does not block. + * + * The IO statistics are reported in WalSndWaitForWal() for the + * logical WAL senders. */ if ((WalSndCaughtUp && send_data != XLogSendLogical && !streamingDoneSending) || @@ -2843,6 +2864,7 @@ WalSndLoop(WalSndSendDataCallback send_data) { long sleeptime; int wakeEvents; + TimestampTz now; if (!streamingDoneReceiving) wakeEvents = WL_SOCKET_READABLE; @@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data) * Use fresh timestamp, not last_processing, to reduce the chance * of reaching wal_sender_timeout before sending a keepalive. */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + now = GetCurrentTimestamp(); + sleeptime = WalSndComputeSleeptime(now); if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; + /* Report IO statistics, if needed */ + if (TimestampDifferenceExceeds(last_flush, now, + WALSENDER_STATS_FLUSH_INTERVAL)) + { + pgstat_flush_io(false); + (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO); + last_flush = now; + } + /* Sleep until something happens or we time out */ WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); } diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index ccd8417d449f..e55d8ec0ec17 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name, has_streaming => 1); $node_standby_2->start; +# Reset IO statistics, for the WAL sender check with pg_stat_io. +$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')"); + # Create some content on primary and check its presence in standby nodes $node_primary->safe_psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a"); @@ -333,6 +336,18 @@ $node_primary->psql( note "switching to physical replication slot"; +# Wait for the walsender to update its IO statistics. This is done before +# the next restart and far enough from the reset done above. +$node_primary->poll_query_until( + 'postgres', + qq[SELECT sum(reads) > 0 + FROM pg_catalog.pg_stat_io + WHERE backend_type = 'walsender' + AND object = 'wal'] + ) + or die + "Timed out while waiting for the walsender to update its IO statistics"; + # Switch to using a physical replication slot. We can do this without a new # backup since physical slots can go backwards if needed. Do so on both # standbys. Since we're going to be testing things that affect the slot state, -- 2.49.0
signature.asc
Description: PGP signature