From fa0d8433ed649841416b442a8c9897acf12eaa92 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 8 Nov 2023 07:07:49 +0000
Subject: [PATCH v16] Add support for collecting WAL read stats for walsenders

This commit adds code for collecting WAL read stats for
walsenders; both read from WAL buffers and WAL file and expose it
via pg_stat_replication.
---
 doc/src/sgml/monitoring.sgml                | 61 +++++++++++++++
 src/backend/access/transam/xlogreader.c     | 48 +++++++++++-
 src/backend/access/transam/xlogutils.c      |  2 +-
 src/backend/catalog/system_views.sql        |  8 +-
 src/backend/replication/walsender.c         | 86 ++++++++++++++++++++-
 src/bin/pg_waldump/pg_waldump.c             |  2 +-
 src/include/access/xlogreader.h             | 30 ++++++-
 src/include/catalog/pg_proc.dat             |  6 +-
 src/include/replication/walsender_private.h |  3 +
 src/test/regress/expected/rules.out         | 10 ++-
 10 files changed, 238 insertions(+), 18 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index e068f7e247..a0257fea0c 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1442,6 +1442,67 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        Send time of last reply message received from standby server
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times WAL data is read from disk
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_bytes</structfield> <type>numeric</type>
+      </para>
+      <para>
+       Total amount of WAL read from disk in bytes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_time</structfield> <type>double precision</type>
+      </para>
+      <para>
+       Total amount of time spent reading WAL from disk via
+       <function>WALRead</function> request, in milliseconds
+       (if <xref linkend="guc-track-wal-io-timing"/> is enabled,
+       otherwise zero).
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_buffers</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times WAL data is read from WAL buffers
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_bytes_buffers</structfield> <type>numeric</type>
+      </para>
+      <para>
+       Total amount of WAL read from WAL buffers in bytes
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>wal_read_time_buffers</structfield> <type>double precision</type>
+      </para>
+      <para>
+       Total amount of time spent reading WAL from WAL buffers via
+       <function>WALRead</function> request, in milliseconds
+       (if <xref linkend="guc-track-wal-io-timing"/> is enabled,
+       otherwise zero).
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5820c5eedc..44aee42079 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -31,6 +31,7 @@
 #include "access/xlogrecord.h"
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
+#include "portability/instr_time.h"
 #include "replication/origin.h"
 
 #ifndef FRONTEND
@@ -1479,9 +1480,9 @@ err:
  * reducing read system calls or even disk IOs.
  */
 bool
-WALRead(XLogReaderState *state,
-		char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
-		WALReadError *errinfo)
+WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count,
+		TimeLineID tli, WALReadError *errinfo, WALReadStats * stats,
+		bool capture_wal_io_timing)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -1489,9 +1490,13 @@ WALRead(XLogReaderState *state,
 #ifndef FRONTEND
 	Size		nread;
 #endif
+	instr_time	start;
 
 #ifndef FRONTEND
 
+	if (stats != NULL && capture_wal_io_timing)
+		INSTR_TIME_SET_CURRENT(start);
+
 	/*
 	 * Try reading WAL from WAL buffers. Frontend code has no idea of WAL
 	 * buffers.
@@ -1500,6 +1505,23 @@ WALRead(XLogReaderState *state,
 
 	Assert(nread >= 0);
 
+	/* Collect I/O stats if requested by the caller. */
+	if (stats != NULL)
+	{
+		stats->wal_read_buffers++;
+		stats->wal_read_bytes_buffers += nread;
+
+		/* Increment the I/O timing. */
+		if (capture_wal_io_timing)
+		{
+			instr_time	duration;
+
+			INSTR_TIME_SET_CURRENT(duration);
+			INSTR_TIME_SUBTRACT(duration, start);
+			stats->wal_read_time_buffers += INSTR_TIME_GET_MICROSEC(duration);
+		}
+	}
+
 	/*
 	 * Check if we have read fully (hit), partially (partial hit) or nothing
 	 * (miss) from WAL buffers. If we have read either partially or nothing,
@@ -1525,6 +1547,7 @@ WALRead(XLogReaderState *state,
 	p = buf;
 	recptr = startptr;
 	nbytes = count;
+	INSTR_TIME_SET_ZERO(start);
 
 	while (nbytes > 0)
 	{
@@ -1565,6 +1588,10 @@ WALRead(XLogReaderState *state,
 		else
 			segbytes = nbytes;
 
+		/* Measure I/O timing to read WAL data if requested by the caller. */
+		if (stats != NULL && capture_wal_io_timing)
+			INSTR_TIME_SET_CURRENT(start);
+
 #ifndef FRONTEND
 		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
 #endif
@@ -1587,6 +1614,21 @@ WALRead(XLogReaderState *state,
 			return false;
 		}
 
+		if (stats != NULL)
+		{
+			stats->wal_read++;
+			stats->wal_read_bytes += readbytes;
+
+			if (capture_wal_io_timing)
+			{
+				instr_time	duration;
+
+				INSTR_TIME_SET_CURRENT(duration);
+				INSTR_TIME_SUBTRACT(duration, start);
+				stats->wal_read_time += INSTR_TIME_GET_MICROSEC(duration);
+			}
+		}
+
 		/* Update state for read */
 		recptr += readbytes;
 		nbytes -= readbytes;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 43f7b31205..c88aad35bb 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1012,7 +1012,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
 	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
-				 &errinfo))
+				 &errinfo, NULL, false))
 		WALReadRaiseError(&errinfo);
 
 	/* number of valid bytes in the buffer */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b65f6b5249..5e9fa587e7 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -893,7 +893,13 @@ CREATE VIEW pg_stat_replication AS
             W.replay_lag,
             W.sync_priority,
             W.sync_state,
-            W.reply_time
+            W.reply_time,
+            W.wal_read,
+            W.wal_read_bytes,
+            W.wal_read_time,
+            W.wal_read_buffers,
+            W.wal_read_bytes_buffers,
+            W.wal_read_time_buffers
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e250b0567e..2bfee2b002 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -259,7 +259,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 							  TimeLineID *tli_p);
-
+static void WalSndAccumulateWalReadStats(WALReadStats * stats);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -907,6 +907,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	WALReadError errinfo;
 	XLogSegNo	segno;
 	TimeLineID	currTLI;
+	WALReadStats stats;
 
 	/*
 	 * Make sure we have enough WAL available before retrieving the current
@@ -943,6 +944,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	else
 		count = flushptr - targetPagePtr;	/* part of the page available */
 
+	MemSet(&stats, 0, sizeof(WALReadStats));
+
 	/* now actually read the data, we know it's there */
 	if (!WALRead(state,
 				 cur_page,
@@ -951,9 +954,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 				 currTLI,		/* Pass the current TLI because only
 								 * WalSndSegmentOpen controls whether new TLI
 								 * is needed. */
-				 &errinfo))
+				 &errinfo,
+				 &stats,
+				 track_wal_io_timing))
 		WALReadRaiseError(&errinfo);
 
+	WalSndAccumulateWalReadStats(&stats);
+
 	/*
 	 * After reading into the buffer, check that what we read was valid. We do
 	 * this after reading, because even though the segment was present when we
@@ -2630,6 +2637,13 @@ InitWalSenderSlot(void)
 			else
 				walsnd->kind = REPLICATION_KIND_LOGICAL;
 
+			walsnd->wal_read_stats.wal_read = 0;
+			walsnd->wal_read_stats.wal_read_bytes = 0;
+			walsnd->wal_read_stats.wal_read_time = 0;
+			walsnd->wal_read_stats.wal_read_buffers = 0;
+			walsnd->wal_read_stats.wal_read_bytes_buffers = 0;
+			walsnd->wal_read_stats.wal_read_time_buffers = 0;
+
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -2750,6 +2764,7 @@ XLogSendPhysical(void)
 	Size		nbytes;
 	XLogSegNo	segno;
 	WALReadError errinfo;
+	WALReadStats stats;
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2965,6 +2980,8 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
+	MemSet(&stats, 0, sizeof(WALReadStats));
+
 	if (!WALRead(xlogreader,
 				 &output_message.data[output_message.len],
 				 startptr,
@@ -2972,9 +2989,13 @@ retry:
 				 xlogreader->seg.ws_tli,	/* Pass the current TLI because
 											 * only WalSndSegmentOpen controls
 											 * whether new TLI is needed. */
-				 &errinfo))
+				 &errinfo,
+				 &stats,
+				 track_wal_io_timing))
 		WALReadRaiseError(&errinfo);
 
+	WalSndAccumulateWalReadStats(&stats);
+
 	/* See logical_read_xlog_page(). */
 	XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
 	CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
@@ -3523,7 +3544,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	12
+#define PG_STAT_GET_WAL_SENDERS_COLS	18
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	SyncRepStandbyData *sync_standbys;
 	int			num_standbys;
@@ -3552,9 +3573,16 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		WalSndState state;
 		TimestampTz replyTime;
 		bool		is_sync_standby;
+		int64		wal_read;
+		uint64		wal_read_bytes;
+		int64		wal_read_time;
+		int64		wal_read_buffers;
+		uint64		wal_read_bytes_buffers;
+		int64		wal_read_time_buffers;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
 		int			j;
+		char		buf[256];
 
 		/* Collect data from shared memory */
 		SpinLockAcquire(&walsnd->mutex);
@@ -3574,6 +3602,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		applyLag = walsnd->applyLag;
 		priority = walsnd->sync_standby_priority;
 		replyTime = walsnd->replyTime;
+		wal_read = walsnd->wal_read_stats.wal_read;
+		wal_read_bytes = walsnd->wal_read_stats.wal_read_bytes;
+		wal_read_time = walsnd->wal_read_stats.wal_read_time;
+		wal_read_buffers = walsnd->wal_read_stats.wal_read_buffers;
+		wal_read_bytes_buffers = walsnd->wal_read_stats.wal_read_bytes_buffers;
+		wal_read_time_buffers = walsnd->wal_read_stats.wal_read_time_buffers;
 		SpinLockRelease(&walsnd->mutex);
 
 		/*
@@ -3670,6 +3704,31 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 				nulls[11] = true;
 			else
 				values[11] = TimestampTzGetDatum(replyTime);
+
+			values[12] = Int64GetDatum(wal_read);
+
+			/* Convert to numeric. */
+			snprintf(buf, sizeof buf, UINT64_FORMAT, wal_read_bytes);
+			values[13] = DirectFunctionCall3(numeric_in,
+											 CStringGetDatum(buf),
+											 ObjectIdGetDatum(0),
+											 Int32GetDatum(-1));
+
+			/* Convert counter from microsec to millisec for display. */
+			values[14] = Float8GetDatum(((double) wal_read_time) / 1000.0);
+
+			values[15] = Int64GetDatum(wal_read_buffers);
+
+			/* Convert to numeric. */
+			MemSet(buf, '\0', sizeof buf);
+			snprintf(buf, sizeof buf, UINT64_FORMAT, wal_read_bytes_buffers);
+			values[16] = DirectFunctionCall3(numeric_in,
+											 CStringGetDatum(buf),
+											 ObjectIdGetDatum(0),
+											 Int32GetDatum(-1));
+
+			/* Convert counter from microsec to millisec for display. */
+			values[17] = Float8GetDatum(((double) wal_read_time_buffers) / 1000.0);
 		}
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@@ -3914,3 +3973,22 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * Function to accumulate WAL Read stats for WAL sender.
+ */
+static void
+WalSndAccumulateWalReadStats(WALReadStats * stats)
+{
+	/* Collect I/O stats for walsender. */
+	SpinLockAcquire(&MyWalSnd->mutex);
+	MyWalSnd->wal_read_stats.wal_read += stats->wal_read;
+	MyWalSnd->wal_read_stats.wal_read_bytes += stats->wal_read_bytes;
+	MyWalSnd->wal_read_stats.wal_read_time += stats->wal_read_time;
+	MyWalSnd->wal_read_stats.wal_read_buffers += stats->wal_read_buffers;
+	MyWalSnd->wal_read_stats.wal_read_bytes_buffers +=
+		stats->wal_read_bytes_buffers;
+	MyWalSnd->wal_read_stats.wal_read_time_buffers +=
+		stats->wal_read_time_buffers;
+	SpinLockRelease(&MyWalSnd->mutex);
+}
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index a3535bdfa9..5e1c14dd2e 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -407,7 +407,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 	}
 
 	if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
-				 &errinfo))
+				 &errinfo, NULL, false))
 	{
 		WALOpenSegment *seg = &errinfo.wre_seg;
 		char		fname[MAXPGPATH];
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 0813722715..44a3cd4591 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -388,9 +388,33 @@ typedef struct WALReadError
 	WALOpenSegment wre_seg;		/* Segment we tried to read from. */
 } WALReadError;
 
-extern bool WALRead(XLogReaderState *state,
-					char *buf, XLogRecPtr startptr, Size count,
-					TimeLineID tli, WALReadError *errinfo);
+/*
+ * WAL read stats from WALRead that the callers can use.
+ */
+typedef struct WALReadStats
+{
+	/* Number of times WAL read from disk. */
+	int64		wal_read;
+
+	/* Total amount of WAL read from disk in bytes. */
+	uint64		wal_read_bytes;
+
+	/* Total amount of time spent reading WAL from disk. */
+	int64		wal_read_time;
+
+	/* Number of times WAL read from WAL buffers. */
+	int64		wal_read_buffers;
+
+	/* Total amount of WAL read from WAL buffers in bytes. */
+	uint64		wal_read_bytes_buffers;
+
+	/* Total amount of time spent reading WAL from WAL buffers. */
+	int64		wal_read_time_buffers;
+}			WALReadStats;
+
+extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr,
+					Size count, TimeLineID tli, WALReadError *errinfo,
+					WALReadStats * stats, bool capture_wal_io_timing);
 
 /* Functions for decoding an XLogRecord */
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f14aed422a..4af16a0f81 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5452,9 +5452,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
+  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,numeric,float8,int8,numeric,float8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,wal_read,wal_read_bytes,wal_read_time,wal_read_buffers,wal_read_bytes_buffers,wal_read_time_buffers}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 13fd5877a6..c21707098f 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_PRIVATE_H
 
 #include "access/xlog.h"
+#include "access/xlogreader.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
 #include "nodes/replnodes.h"
@@ -83,6 +84,8 @@ typedef struct WalSnd
 	TimestampTz replyTime;
 
 	ReplicationKind kind;
+
+	WALReadStats wal_read_stats;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1442c43d9c..40d7963707 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2078,9 +2078,15 @@ pg_stat_replication| SELECT s.pid,
     w.replay_lag,
     w.sync_priority,
     w.sync_state,
-    w.reply_time
+    w.reply_time,
+    w.wal_read,
+    w.wal_read_bytes,
+    w.wal_read_time,
+    w.wal_read_buffers,
+    w.wal_read_bytes_buffers,
+    w.wal_read_time_buffers
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, gss_delegation, leader_pid, query_id)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, wal_read, wal_read_bytes, wal_read_time, wal_read_buffers, wal_read_bytes_buffers, wal_read_time_buffers) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
     s.spill_txns,
-- 
2.34.1

