Hi, On Tue, Mar 11, 2025 at 06:10:32AM +0000, Bertrand Drouvot wrote: > WalSndWaitForWal() is being used only for logical walsender. So we'd need to > find another location for the physical walsender case. One option is to keep > the > WalSndLoop() location and control the reports frequency.
I ended up with an idea in the same vein as your third one, but let's do it in WalSndLoop() instead for both logical and physical walsenders. The idea is to report the stats when the walsender is caught up or has pending data to send. Also, we do flush only periodically (based on PGSTAT_MIN_INTERVAL). Remarks/Questions: 1. maybe relying on PGSTAT_IDLE_INTERVAL would make more sense? In both case PGSTAT_MIN_INTERVAL or PGSTAT_MIN_INTERVAL, I'm not sure there is a need to update the related doc. 2. bonus point to make it here is that we don't need an extra call to GetCurrentTimestamp(). We simply assign the one that is already done to "now". 3. add to use poll_query_until() has I've seen failures on the CI whith this new approach. Also moving the test far away from the stats reset to to minimize the risk of polling for too long. With the attached in place, the number of times the stats are being flushed reduce drastically but still seems enough to get an accurate idea of the walsender IO activity. Thoughts? Regards, -- Bertrand Drouvot PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
>From 7978b1bce3d3f5cd47c983911431225b03d42efe Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot <bertranddrouvot...@gmail.com> Date: Tue, 25 Feb 2025 10:18:05 +0000 Subject: [PATCH v4] Flush the IO statistics of active walsenders The walsender does not flush its IO statistics until it exits. The issue is there since pg_stat_io has been introduced in a9c70b46dbe. This commits: 1. ensures it does not wait to exit to flush its IO statistics 2. flush its IO statistics periodically to bot overload the WAL sender 3. adds a test for a physical walsender (a logical walsender had the same issue but the fix is in the same code path) --- src/backend/replication/walsender.c | 60 +++++++++++++++++++-------- src/backend/utils/activity/pgstat.c | 2 - src/include/pgstat.h | 2 + src/test/recovery/t/001_stream_rep.pl | 16 +++++++ 4 files changed, 60 insertions(+), 20 deletions(-) 70.5% src/backend/replication/ 3.1% src/backend/utils/activity/ 3.6% src/include/ 22.6% src/test/recovery/t/ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 446d10c1a7d..9b44d4ae600 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -90,6 +90,7 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" +#include "utils/pgstat_internal.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" @@ -2740,6 +2741,8 @@ WalSndCheckTimeOut(void) static void WalSndLoop(WalSndSendDataCallback send_data) { + TimestampTz last_flush = 0; + /* * Initialize the last reply timestamp. That enables timeout processing * from hereon. @@ -2834,30 +2837,51 @@ WalSndLoop(WalSndSendDataCallback send_data) * WalSndWaitForWal() handle any other blocking; idle receivers need * its additional actions. For physical replication, also block if * caught up; its send_data does not block. + * + * When the WAL sender is caught up or has pending data to send, we + * also periodically report I/O statistics. It's done periodically to + * not overload the WAL sender. */ - if ((WalSndCaughtUp && send_data != XLogSendLogical && - !streamingDoneSending) || - pq_is_send_pending()) + if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending()) { - long sleeptime; - int wakeEvents; + TimestampTz now; - if (!streamingDoneReceiving) - wakeEvents = WL_SOCKET_READABLE; - else - wakeEvents = 0; + now = GetCurrentTimestamp(); - /* - * Use fresh timestamp, not last_processing, to reduce the chance - * of reaching wal_sender_timeout before sending a keepalive. - */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + if (TimestampDifferenceExceeds(last_flush, now, PGSTAT_MIN_INTERVAL)) + { + /* + * Report IO statistics + */ + pgstat_flush_io(false); + (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO); + last_flush = now; + } - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (send_data != XLogSendLogical || pq_is_send_pending()) + { + long sleeptime; + int wakeEvents; + + if (!streamingDoneReceiving) + wakeEvents = WL_SOCKET_READABLE; + else + wakeEvents = 0; + + /* + * Use fresh timestamp, not last_processing, to reduce the + * chance of reaching wal_sender_timeout before sending a + * keepalive. + */ + sleeptime = WalSndComputeSleeptime(now); + + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; + + /* Sleep until something happens or we time out */ + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); + } - /* Sleep until something happens or we time out */ - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); } } } diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index 3168b825e25..1bf84cbf64e 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -123,8 +123,6 @@ * ---------- */ -/* minimum interval non-forced stats flushes.*/ -#define PGSTAT_MIN_INTERVAL 1000 /* how long until to block flushing pending stats updates */ #define PGSTAT_MAX_INTERVAL 60000 /* when to call pgstat_report_stat() again, even when idle */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index def6b370ac1..d1b15bf7757 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -22,6 +22,8 @@ #include "utils/relcache.h" #include "utils/wait_event.h" /* for backward compatibility */ /* IWYU pragma: export */ +/* minimum interval non-forced stats flushes.*/ +#define PGSTAT_MIN_INTERVAL 1000 /* ---------- * Paths for the statistics files (relative to installation's $PGDATA). diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 3945f00ab88..3371895ab1d 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name, has_streaming => 1); $node_standby_2->start; +# To check that an active walsender updates its IO statistics below. +$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')"); + # Create some content on primary and check its presence in standby nodes $node_primary->safe_psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a"); @@ -329,6 +332,19 @@ $node_primary->psql( note "switching to physical replication slot"; +# Wait for the walsender to update its IO statistics. +# Has to be done before the next restart and far enough from the +# pg_stat_reset_shared('io') to minimize the risk of polling for too long. +$node_primary->poll_query_until( + 'postgres', + qq[SELECT sum(reads) > 0 + FROM pg_catalog.pg_stat_io + WHERE backend_type = 'walsender' + AND object = 'wal'] + ) + or die + "Timed out while waiting for the walsender to update its IO statistics"; + # Switch to using a physical replication slot. We can do this without a new # backup since physical slots can go backwards if needed. Do so on both # standbys. Since we're going to be testing things that affect the slot state, -- 2.34.1