On Sun, May 3, 2020 at 3:12 AM Dmitry Dolgov <9erthali...@gmail.com> wrote:
> I've finally performed couple of tests involving more IO. The
> not-that-big dataset of 1.5 GB for the replica with the memory allowing
> fitting ~ 1/6 of it, default prefetching parameters and an update
> workload with uniform distribution. Rather a small setup, but causes
> stable reading into the page cache on the replica and allows to see a
> visible influence of the patch (more measurement samples tend to happen
> at lower latencies):

Thanks for these tests Dmitry.  You didn't mention the details of the
workload, but one thing I'd recommend for a uniform/random workload
that's generating a lot of misses on the primary server using N
backends is to make sure that maintenance_io_concurrency is set to a
number like N*2 or higher, and to look at the queue depth on both
systems with iostat -x 1.  Then you can experiment with ALTER SYSTEM
SET maintenance_io_concurrency = X; SELECT pg_reload_conf(); to try to
understand the way it works; there is a point where you've set it high
enough and the replica is able to handle the same rate of concurrent
I/Os as the primary.  The default of 10 is actually pretty low unless
you've only got ~4 backends generating random updates on the primary.
That's with full_page_writes=off; if you leave it on, it takes a while
to get into a scenario where it has much effect.

Here's a rebase, after the recent XLogReader refactoring.
From a7fd3f728d64c3c94387e9e424dba507b166bcab Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 28 Mar 2020 11:42:59 +1300
Subject: [PATCH v9 1/3] Add pg_atomic_unlocked_add_fetch_XXX().

Add a variant of pg_atomic_add_fetch_XXX with no barrier semantics, for
cases where you only want to avoid the possibility that a concurrent
pg_atomic_read_XXX() sees a torn/partial value.

Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/include/port/atomics.h         | 24 ++++++++++++++++++++++
 src/include/port/atomics/generic.h | 33 ++++++++++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index 4956ec55cb..2abb852893 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -389,6 +389,21 @@ pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
 	return pg_atomic_add_fetch_u32_impl(ptr, add_);
 }
 
+/*
+ * pg_atomic_unlocked_add_fetch_u32 - atomically add to variable
+ *
+ * Like pg_atomic_unlocked_write_u32, guarantees only that partial values
+ * cannot be observed.
+ *
+ * No barrier semantics.
+ */
+static inline uint32
+pg_atomic_unlocked_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
+{
+	AssertPointerAlignment(ptr, 4);
+	return pg_atomic_unlocked_add_fetch_u32_impl(ptr, add_);
+}
+
 /*
  * pg_atomic_sub_fetch_u32 - atomically subtract from variable
  *
@@ -519,6 +534,15 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+static inline uint64
+pg_atomic_unlocked_add_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 add_)
+{
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+	return pg_atomic_unlocked_add_fetch_u64_impl(ptr, add_);
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
diff --git a/src/include/port/atomics/generic.h b/src/include/port/atomics/generic.h
index d3ba89a58f..1683653ca6 100644
--- a/src/include/port/atomics/generic.h
+++ b/src/include/port/atomics/generic.h
@@ -234,6 +234,16 @@ pg_atomic_add_fetch_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_)
 }
 #endif
 
+#if !defined(PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U32)
+#define PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U32
+static inline uint32
+pg_atomic_unlocked_add_fetch_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_)
+{
+	ptr->value += add_;
+	return ptr->value;
+}
+#endif
+
 #if !defined(PG_HAVE_ATOMIC_SUB_FETCH_U32) && defined(PG_HAVE_ATOMIC_FETCH_SUB_U32)
 #define PG_HAVE_ATOMIC_SUB_FETCH_U32
 static inline uint32
@@ -399,3 +409,26 @@ pg_atomic_sub_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_fetch_sub_u64_impl(ptr, sub_) - sub_;
 }
 #endif
+
+#if defined(PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY) && \
+	!defined(PG_HAVE_ATOMIC_U64_SIMULATION)
+
+#ifndef PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U64
+#define PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U64
+static inline uint64
+pg_atomic_unlocked_add_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val)
+{
+	ptr->value += val;
+	return ptr->value;
+}
+#endif
+
+#else
+
+static inline uint64
+pg_atomic_unlocked_add_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val)
+{
+	return pg_atomic_add_fetch_u64_impl(ptr, val);
+}
+
+#endif
-- 
2.20.1

From 6ed95fffba6751ddc9607659183c072cb11fa4a8 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Tue, 7 Apr 2020 22:56:27 +1200
Subject: [PATCH v9 2/3] Allow XLogReadRecord() to be non-blocking.

Extend read_local_xlog_page() to support non-blocking modes:

1. Reading as far as the WAL receiver has written so far.
2. Reading all the way to the end, when the end LSN is unknown.

Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 src/backend/access/transam/xlogreader.c |  37 ++++--
 src/backend/access/transam/xlogutils.c  | 149 +++++++++++++++++-------
 src/backend/replication/walsender.c     |   2 +-
 src/include/access/xlogreader.h         |  14 ++-
 src/include/access/xlogutils.h          |  26 +++++
 5 files changed, 173 insertions(+), 55 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5995798b58..897efaf682 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -259,6 +259,9 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
  * If the reading fails for some other reason, NULL is also returned, and
  * *errormsg is set to a string with details of the failure.
  *
+ * If the read_page callback is one that returns XLOGPAGEREAD_WOULDBLOCK rather
+ * than waiting for WAL to arrive, NULL is also returned in that case.
+ *
  * The returned pointer (or *errormsg) points to an internal buffer that's
  * valid until the next call to XLogReadRecord.
  */
@@ -548,10 +551,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 err:
 
 	/*
-	 * Invalidate the read state. We might read from a different source after
-	 * failure.
+	 * Invalidate the read state, if this was an error. We might read from a
+	 * different source after failure.
 	 */
-	XLogReaderInvalReadState(state);
+	if (readOff != XLOGPAGEREAD_WOULDBLOCK)
+		XLogReaderInvalReadState(state);
 
 	if (state->errormsg_buf[0] != '\0')
 		*errormsg = state->errormsg_buf;
@@ -563,8 +567,9 @@ err:
  * Read a single xlog page including at least [pageptr, reqLen] of valid data
  * via the page_read() callback.
  *
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the page_read callback).
+ * Returns XLOGPAGEREAD_ERROR or XLOGPAGEREAD_WOULDBLOCK if the required page
+ * cannot be read for some reason; errormsg_buf is set in the former case
+ * (unless the error occurs in the page_read callback).
  *
  * We fetch the page from a reader-local cache if we know we have the required
  * data and if there hasn't been any error since caching the data.
@@ -661,8 +666,11 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
 	return readLen;
 
 err:
+	if (readLen == XLOGPAGEREAD_WOULDBLOCK)
+		return XLOGPAGEREAD_WOULDBLOCK;
+
 	XLogReaderInvalReadState(state);
-	return -1;
+	return XLOGPAGEREAD_ERROR;
 }
 
 /*
@@ -941,6 +949,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	XLogRecPtr	found = InvalidXLogRecPtr;
 	XLogPageHeader header;
 	char	   *errormsg;
+	int			readLen;
 
 	Assert(!XLogRecPtrIsInvalid(RecPtr));
 
@@ -954,7 +963,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 		XLogRecPtr	targetPagePtr;
 		int			targetRecOff;
 		uint32		pageHeaderSize;
-		int			readLen;
 
 		/*
 		 * Compute targetRecOff. It should typically be equal or greater than
@@ -1035,7 +1043,8 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
 	}
 
 err:
-	XLogReaderInvalReadState(state);
+	if (readLen != XLOGPAGEREAD_WOULDBLOCK)
+		XLogReaderInvalReadState(state);
 
 	return InvalidXLogRecPtr;
 }
@@ -1094,8 +1103,16 @@ WALRead(XLogReaderState *state,
 			XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
 			state->routine.segment_open(state, nextSegNo, &tli);
 
-			/* This shouldn't happen -- indicates a bug in segment_open */
-			Assert(state->seg.ws_file >= 0);
+			/* callback reported that there was no such file */
+			if (state->seg.ws_file < 0)
+			{
+				errinfo->wre_errno = errno;
+				errinfo->wre_req = 0;
+				errinfo->wre_read = 0;
+				errinfo->wre_off = startoff;
+				errinfo->wre_seg = state->seg;
+				return false;
+			}
 
 			/* Update the current segment info. */
 			state->seg.ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 322b0e8ff5..18aa499831 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -25,6 +25,7 @@
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -808,6 +809,29 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
 						path)));
 }
 
+/*
+ * XLogReaderRoutine->segment_open callback that reports missing files rather
+ * than raising an error.
+ */
+void
+wal_segment_try_open(XLogReaderState *state, XLogSegNo nextSegNo,
+					 TimeLineID *tli_p)
+{
+	TimeLineID	tli = *tli_p;
+	char		path[MAXPGPATH];
+
+	XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
+	state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (state->seg.ws_file >= 0)
+		return;
+
+	if (errno != ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m",
+						path)));
+}
+
 /* stock XLogReaderRoutine->segment_close callback */
 void
 wal_segment_close(XLogReaderState *state)
@@ -823,6 +847,10 @@ wal_segment_close(XLogReaderState *state)
  * Public because it would likely be very helpful for someone writing another
  * output method outside walsender, e.g. in a bgworker.
  *
+ * A pointer to an XLogReadLocalOptions struct may be passed in as
+ * XLogReaderRouter->page_read_private to control the behavior of this
+ * function.
+ *
  * TODO: The walsender has its own version of this, but it relies on the
  * walsender's latch being set whenever WAL is flushed. No such infrastructure
  * exists for normal backends, so we have to do a check/sleep/repeat style of
@@ -837,58 +865,89 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	TimeLineID	tli;
 	int			count;
 	WALReadError errinfo;
+	XLogReadLocalOptions *options =
+		(XLogReadLocalOptions *) state->routine.page_read_private;
 
 	loc = targetPagePtr + reqLen;
 
 	/* Loop waiting for xlog to be available if necessary */
 	while (1)
 	{
-		/*
-		 * Determine the limit of xlog we can currently read to, and what the
-		 * most recent timeline is.
-		 *
-		 * RecoveryInProgress() will update ThisTimeLineID when it first
-		 * notices recovery finishes, so we only have to maintain it for the
-		 * local process until recovery ends.
-		 */
-		if (!RecoveryInProgress())
-			read_upto = GetFlushRecPtr();
-		else
-			read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
-		tli = ThisTimeLineID;
+		switch (options ? options->read_upto_policy : -1)
+		{
+		case XLRO_WALRCV_WRITTEN:
+			/*
+			 * We'll try to read as far as has been written by the WAL
+			 * receiver, on the requested timeline.  When we run out of valid
+			 * data, we'll return an error.  This is used by xlogprefetch.c
+			 * while streaming.
+			 */
+			read_upto = GetWalRcvWriteRecPtr();
+			state->currTLI = tli = options->tli;
+			break;
 
-		/*
-		 * Check which timeline to get the record from.
-		 *
-		 * We have to do it each time through the loop because if we're in
-		 * recovery as a cascading standby, the current timeline might've
-		 * become historical. We can't rely on RecoveryInProgress() because in
-		 * a standby configuration like
-		 *
-		 * A => B => C
-		 *
-		 * if we're a logical decoding session on C, and B gets promoted, our
-		 * timeline will change while we remain in recovery.
-		 *
-		 * We can't just keep reading from the old timeline as the last WAL
-		 * archive in the timeline will get renamed to .partial by
-		 * StartupXLOG().
-		 *
-		 * If that happens after our caller updated ThisTimeLineID but before
-		 * we actually read the xlog page, we might still try to read from the
-		 * old (now renamed) segment and fail. There's not much we can do
-		 * about this, but it can only happen when we're a leaf of a cascading
-		 * standby whose master gets promoted while we're decoding, so a
-		 * one-off ERROR isn't too bad.
-		 */
-		XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+		case XLRO_END:
+			/*
+			 * We'll try to read as far as we can on one timeline.  This is
+			 * used by xlogprefetch.c for crash recovery.
+			 */
+			read_upto = (XLogRecPtr) -1;
+			state->currTLI = tli = options->tli;
+			break;
+
+		default:
+			/*
+			 * Determine the limit of xlog we can currently read to, and what the
+			 * most recent timeline is.
+			 *
+			 * RecoveryInProgress() will update ThisTimeLineID when it first
+			 * notices recovery finishes, so we only have to maintain it for
+			 * the local process until recovery ends.
+			 */
+			if (!RecoveryInProgress())
+				read_upto = GetFlushRecPtr();
+			else
+				read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
+			tli = ThisTimeLineID;
+
+			/*
+			 * Check which timeline to get the record from.
+			 *
+			 * We have to do it each time through the loop because if we're in
+			 * recovery as a cascading standby, the current timeline might've
+			 * become historical. We can't rely on RecoveryInProgress()
+			 * because in a standby configuration like
+			 *
+			 * A => B => C
+			 *
+			 * if we're a logical decoding session on C, and B gets promoted,
+			 * our timeline will change while we remain in recovery.
+			 *
+			 * We can't just keep reading from the old timeline as the last
+			 * WAL archive in the timeline will get renamed to .partial by
+			 * StartupXLOG().
+			 *
+			 * If that happens after our caller updated ThisTimeLineID but
+			 * before we actually read the xlog page, we might still try to
+			 * read from the old (now renamed) segment and fail. There's not
+			 * much we can do about this, but it can only happen when we're a
+			 * leaf of a cascading standby whose master gets promoted while
+			 * we're decoding, so a one-off ERROR isn't too bad.
+			 */
+			XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
+			break;
+		}
 
-		if (state->currTLI == ThisTimeLineID)
+		if (state->currTLI == tli)
 		{
 
 			if (loc <= read_upto)
 				break;
 
+			/* not enough data there, but we were asked not to wait */
+			if (options && options->nowait)
+				return XLOGPAGEREAD_WOULDBLOCK;
+
 			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
@@ -930,7 +989,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	else if (targetPagePtr + reqLen > read_upto)
 	{
 		/* not enough data there */
-		return -1;
+		return XLOGPAGEREAD_ERROR;
 	}
 	else
 	{
@@ -945,7 +1004,17 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 */
 	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
 				 &errinfo))
+	{
+		/*
+		 * When not following timeline changes, we may read past the end of
+		 * available segments.  Report lack of file as an error rather than
+		 * raising an error.
+		 */
+		if (errinfo.wre_errno == ENOENT)
+			return XLOGPAGEREAD_ERROR;
+
 		WALReadRaiseError(&errinfo);
+	}
 
 	/* number of valid bytes in the buffer */
 	return count;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 86847cbb54..448c83b684 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -835,7 +835,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 
 	/* fail if not (implies we are going to shut down) */
 	if (flushptr < targetPagePtr + reqLen)
-		return -1;
+		return XLOGPAGEREAD_ERROR;
 
 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
 		count = XLOG_BLCKSZ;	/* more than one block available */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d930fe957d..3a5ab4b3ce 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -57,6 +57,10 @@ typedef struct WALSegmentContext
 
 typedef struct XLogReaderState XLogReaderState;
 
+/* Special negative return values for XLogPageReadCB functions */
+#define XLOGPAGEREAD_ERROR		-1
+#define XLOGPAGEREAD_WOULDBLOCK	-2
+
 /* Function type definitions for various xlogreader interactions */
 typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   XLogRecPtr targetPagePtr,
@@ -76,10 +80,11 @@ typedef struct XLogReaderRoutine
 	 * This callback shall read at least reqLen valid bytes of the xlog page
 	 * starting at targetPagePtr, and store them in readBuf.  The callback
 	 * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
-	 * -1 on failure.  The callback shall sleep, if necessary, to wait for the
-	 * requested bytes to become available.  The callback will not be invoked
-	 * again for the same page unless more than the returned number of bytes
-	 * are needed.
+	 * XLOGPAGEREAD_ERROR on failure.  The callback shall either sleep, if
+	 * necessary, to wait for the requested bytes to become available, or
+	 * return XLOGPAGEREAD_WOULDBLOCK.  The callback will not be invoked again
+	 * for the same page unless more than the returned number of bytes are
+	 * needed.
 	 *
 	 * targetRecPtr is the position of the WAL record we're reading.  Usually
 	 * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
@@ -91,6 +96,7 @@ typedef struct XLogReaderRoutine
 	 * read from.
 	 */
 	XLogPageReadCB page_read;
+	void	   *page_read_private;
 
 	/*
 	 * Callback to open the specified WAL segment for reading.  ->seg.ws_file
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index e59b6cf3a9..6325c23dc2 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,12 +47,38 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
 extern void FreeFakeRelcacheEntry(Relation fakerel);
 
+/*
+ * A pointer to an XLogReadLocalOptions struct can supplied as the private data
+ * for an XLogReader, causing read_local_xlog_page() to modify its behavior.
+ */
+typedef struct XLogReadLocalOptions
+{
+	/* Don't block waiting for new WAL to arrive. */
+	bool		nowait;
+
+	/*
+	 * For XLRO_WALRCV_WRITTEN and XLRO_END modes, the timeline ID must be
+	 * provided.
+	 */
+	TimeLineID	tli;
+
+	/* How far to read. */
+	enum {
+		XLRO_STANDARD,
+		XLRO_WALRCV_WRITTEN,
+		XLRO_END
+	} read_upto_policy;
+} XLogReadLocalOptions;
+
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page);
 extern void wal_segment_open(XLogReaderState *state,
 							 XLogSegNo nextSegNo,
 							 TimeLineID *tli_p);
+extern void wal_segment_try_open(XLogReaderState *state,
+								 XLogSegNo nextSegNo,
+								 TimeLineID *tli_p);
 extern void wal_segment_close(XLogReaderState *state);
 
 extern void XLogReadDetermineTimeline(XLogReaderState *state,
-- 
2.20.1

From 68cbfa9e553359a57a4806cab8af60b0450f7e5b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 8 Apr 2020 23:04:51 +1200
Subject: [PATCH v9 3/3] Prefetch referenced blocks during recovery.

Introduce a new GUC max_recovery_prefetch_distance.  If it is set to a
positive number of bytes, then read ahead in the WAL at most that
distance, and initiate asynchronous reading of referenced blocks.  The
goal is to avoid I/O stalls and benefit from concurrent I/O.  The number
of concurrency asynchronous reads is capped by the existing
maintenance_io_concurrency GUC.  The feature is enabled by default for
now, but we might reconsider that before release.

Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.von...@2ndquadrant.com>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  45 +
 doc/src/sgml/monitoring.sgml                  |  85 +-
 doc/src/sgml/wal.sgml                         |  13 +
 src/backend/access/transam/Makefile           |   1 +
 src/backend/access/transam/xlog.c             |  16 +
 src/backend/access/transam/xlogprefetch.c     | 910 ++++++++++++++++++
 src/backend/catalog/system_views.sql          |  14 +
 src/backend/postmaster/pgstat.c               |  96 +-
 src/backend/storage/ipc/ipci.c                |   3 +
 src/backend/utils/misc/guc.c                  |  47 +-
 src/backend/utils/misc/postgresql.conf.sample |   5 +
 src/include/access/xlogprefetch.h             |  85 ++
 src/include/catalog/pg_proc.dat               |   8 +
 src/include/pgstat.h                          |  27 +
 src/include/utils/guc.h                       |   4 +
 src/test/regress/expected/rules.out           |  11 +
 16 files changed, 1366 insertions(+), 4 deletions(-)
 create mode 100644 src/backend/access/transam/xlogprefetch.c
 create mode 100644 src/include/access/xlogprefetch.h

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index a2694e548a..0c9842b0f9 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3121,6 +3121,51 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-recovery-prefetch-distance" xreflabel="max_recovery_prefetch_distance">
+      <term><varname>max_recovery_prefetch_distance</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_recovery_prefetch_distance</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        The maximum distance to look ahead in the WAL during recovery, to find
+        blocks to prefetch.  Prefetching blocks that will soon be needed can
+        reduce I/O wait times.  The number of concurrent prefetches is limited
+        by this setting as well as
+        <xref linkend="guc-maintenance-io-concurrency"/>.  Setting it too high
+        might be counterproductive, if it means that data falls out of the
+        kernel cache before it is needed.  If this value is specified without
+        units, it is taken as bytes.  A setting of -1 disables prefetching
+        during recovery.
+        The default is 256kB on systems that support
+        <function>posix_fadvise</function>, and otherwise -1.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="guc-recovery-prefetch-fpw" xreflabel="recovery_prefetch_fpw">
+      <term><varname>recovery_prefetch_fpw</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>recovery_prefetch_fpw</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Whether to prefetch blocks that were logged with full page images,
+        during recovery.  Often this doesn't help, since such blocks will not
+        be read the first time they are needed and might remain in the buffer
+        pool after that.  However, on file systems with a block size larger
+        than
+        <productname>PostgreSQL</productname>'s, prefetching can avoid a
+        costly read-before-write when a blocks are later written.  This
+        setting has no effect unless
+        <xref linkend="guc-max-recovery-prefetch-distance"/> is set to a positive
+        number.  The default is off.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
      <sect2 id="runtime-config-wal-archiving">
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 49d4bb13b9..0ab278e087 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -320,6 +320,13 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_prefetch_recovery</structname><indexterm><primary>pg_stat_prefetch_recovery</primary></indexterm></entry>
+      <entry>Only one row, showing statistics about blocks prefetched during recovery.
+       See <xref linkend="pg-stat-prefetch-recovery-view"/> for details.
+      </entry>
+     </row>
+
      <row>
       <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry>
       <entry>At least one row per subscription, showing information about
@@ -2674,6 +2681,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    connected server.
   </para>
 
+  <table id="pg-stat-prefetch-recovery-view" xreflabel="pg_stat_prefetch_recovery">
+   <title><structname>pg_stat_prefetch_recovery</structname> View</title>
+   <tgroup cols="3">
+    <thead>
+    <row>
+      <entry>Column</entry>
+      <entry>Type</entry>
+      <entry>Description</entry>
+     </row>
+    </thead>
+
+   <tbody>
+    <row>
+     <entry><structfield>prefetch</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks prefetched because they were not in the buffer pool</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_hit</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because they were already in the buffer pool</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_new</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because they were new (usually relation extension)</entry>
+    </row>
+    <row>
+     <entry><structfield>skip_fpw</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-recovery-prefetch-fpw"/> was set to <literal>off</literal></entry>
+    </row>
+    <row>
+     <entry><structfield>skip_seq</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of blocks not prefetched because of repeated or sequential access</entry>
+    </row>
+    <row>
+     <entry><structfield>distance</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>How far ahead of recovery the prefetcher is currently reading, in bytes</entry>
+    </row>
+    <row>
+     <entry><structfield>queue_depth</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>How many prefetches have been initiated but are not yet known to have completed</entry>
+    </row>
+    <row>
+     <entry><structfield>avg_distance</structfield></entry>
+     <entry><type>float4</type></entry>
+     <entry>How far ahead of recovery the prefetcher is on average, while recovery is not idle</entry>
+    </row>
+    <row>
+     <entry><structfield>avg_queue_depth</structfield></entry>
+     <entry><type>float4</type></entry>
+     <entry>Average number of prefetches in flight while recovery is not idle</entry>
+    </row>
+    </tbody>
+   </tgroup>
+  </table>
+
+  <para>
+   The <structname>pg_stat_prefetch_recovery</structname> view will contain only
+   one row.  It is filled with nulls if recovery is not running or WAL
+   prefetching is not enabled.  See <xref linkend="guc-max-recovery-prefetch-distance"/>
+   for more information.  The counters in this view are reset whenever the
+   <xref linkend="guc-max-recovery-prefetch-distance"/>,
+   <xref linkend="guc-recovery-prefetch-fpw"/> or
+   <xref linkend="guc-maintenance-io-concurrency"/> setting is changed and
+   the server configuration is reloaded.
+  </para>
+
   <table id="pg-stat-subscription" xreflabel="pg_stat_subscription">
    <title><structname>pg_stat_subscription</structname> View</title>
    <tgroup cols="1">
@@ -4494,8 +4573,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
         argument.  The argument can be <literal>bgwriter</literal> to reset
         all the counters shown in
         the <structname>pg_stat_bgwriter</structname>
-        view,or <literal>archiver</literal> to reset all the counters shown in
-        the <structname>pg_stat_archiver</structname> view.
+        view, <literal>archiver</literal> to reset all the counters shown in
+        the <structname>pg_stat_archiver</structname> view, and
+        <literal>prefetch_recovery</literal> to reset all the counters shown
+        in the <structname>pg_stat_prefetch_recovery</structname> view.
        </para>
        <para>
         This function is restricted to superusers by default, but other users
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index bd9fae544c..38fc8149a8 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -719,6 +719,19 @@
    <acronym>WAL</acronym> call being logged to the server log. This
    option might be replaced by a more general mechanism in the future.
   </para>
+
+  <para>
+   The <xref linkend="guc-max-recovery-prefetch-distance"/> parameter can
+   be used to improve I/O performance during recovery by instructing
+   <productname>PostgreSQL</productname> to initiate reads
+   of disk blocks that will soon be needed, in combination with the
+   <xref linkend="guc-maintenance-io-concurrency"/> parameter.  The
+   prefetching mechanism is most likely to be effective on systems
+   with <varname>full_page_writes</varname> set to
+   <varname>off</varname> (where that is safe), and where the working
+   set is larger than RAM.  By default, prefetching in recovery is enabled,
+   but it can be disabled by setting the distance to -1.
+  </para>
  </sect1>
 
  <sect1 id="wal-internals">
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..39f9d4e77d 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -31,6 +31,7 @@ OBJS = \
 	xlogarchive.o \
 	xlogfuncs.o \
 	xloginsert.o \
+	xlogprefetch.o \
 	xlogreader.o \
 	xlogutils.o
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ca09d81b08..81147d5f59 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -35,6 +35,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xloginsert.h"
+#include "access/xlogprefetch.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
@@ -7169,6 +7170,7 @@ StartupXLOG(void)
 		{
 			ErrorContextCallback errcallback;
 			TimestampTz xtime;
+			XLogPrefetchState prefetch;
 
 			InRedo = true;
 
@@ -7176,6 +7178,9 @@ StartupXLOG(void)
 					(errmsg("redo starts at %X/%X",
 							(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr)));
 
+			/* Prepare to prefetch, if configured. */
+			XLogPrefetchBegin(&prefetch);
+
 			/*
 			 * main redo apply loop
 			 */
@@ -7205,6 +7210,12 @@ StartupXLOG(void)
 				/* Handle interrupt signals of startup process */
 				HandleStartupProcInterrupts();
 
+				/* Peform WAL prefetching, if enabled. */
+				XLogPrefetch(&prefetch,
+							 ThisTimeLineID,
+							 xlogreader->ReadRecPtr,
+							 currentSource == XLOG_FROM_STREAM);
+
 				/*
 				 * Pause WAL replay, if requested by a hot-standby session via
 				 * SetRecoveryPause().
@@ -7376,6 +7387,9 @@ StartupXLOG(void)
 					 */
 					if (switchedTLI && AllowCascadeReplication())
 						WalSndWakeup();
+
+					/* Reset the prefetcher. */
+					XLogPrefetchReconfigure();
 				}
 
 				/* Exit loop if we reached inclusive recovery target */
@@ -7392,6 +7406,7 @@ StartupXLOG(void)
 			/*
 			 * end of main redo apply loop
 			 */
+			XLogPrefetchEnd(&prefetch);
 
 			if (reachedRecoveryTarget)
 			{
@@ -12138,6 +12153,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 */
 					currentSource = XLOG_FROM_STREAM;
 					startWalReceiver = true;
+					XLogPrefetchReconfigure();
 					break;
 
 				case XLOG_FROM_STREAM:
diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c
new file mode 100644
index 0000000000..6d8cff12c6
--- /dev/null
+++ b/src/backend/access/transam/xlogprefetch.c
@@ -0,0 +1,910 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.c
+ *		Prefetching support for recovery.
+ *
+ * Portions Copyright (c) 2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *		src/backend/access/transam/xlogprefetch.c
+ *
+ * The goal of this module is to read future WAL records and issue
+ * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O
+ * stalls in the main recovery loop.  Currently, this is achieved by using a
+ * separate XLogReader to read ahead.  In future, we should find a way to
+ * avoid reading and decoding each record twice.
+ *
+ * When examining a WAL record from the future, we need to consider that a
+ * referenced block or segment file might not exist on disk until this record
+ * or some earlier record has been replayed.  After a crash, a file might also
+ * be missing because it was dropped by a later WAL record; in that case, it
+ * will be recreated when this record is replayed.  These cases are handled by
+ * recognizing them and adding a "filter" that prevents all prefetching of a
+ * certain block range until the present WAL record has been replayed.  Blocks
+ * skipped for these reasons are counted as "skip_new" (that is, cases where we
+ * didn't try to prefetch "new" blocks).
+ *
+ * Blocks found in the buffer pool already are counted as "skip_hit".
+ * Repeated access to the same buffer is detected and skipped, and this is
+ * counted with "skip_seq".  Blocks that were logged with FPWs are skipped if
+ * recovery_prefetch_fpw is off, since on most systems there will be no I/O
+ * stall; this is counted with "skip_fpw".
+ *
+ * The only way we currently have to know that an I/O initiated with
+ * PrefetchSharedBuffer() has completed is to call ReadBuffer().  Therefore,
+ * we track the number of potentially in-flight I/Os by using a circular
+ * buffer of LSNs.  When it's full, we have to wait for recovery to replay
+ * records so that the queue depth can be reduced, before we can do any more
+ * prefetching.  Ideally, this keeps us the right distance ahead to respect
+ * maintenance_io_concurrency.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetch.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/storage_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "utils/timestamp.h"
+#include "funcapi.h"
+#include "pgstat.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+
+/*
+ * Sample the queue depth and distance every time we replay this much WAL.
+ * This is used to compute avg_queue_depth and avg_distance for the log
+ * message that appears at the end of crash recovery.  It's also used to send
+ * messages periodically to the stats collector, to save the counters on disk.
+ */
+#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000
+
+/* GUCs */
+int			max_recovery_prefetch_distance = -1;
+bool		recovery_prefetch_fpw = false;
+
+int			XLogPrefetchReconfigureCount;
+
+/*
+ * A prefetcher object.  There is at most one of these in existence at a time,
+ * recreated whenever there is a configuration change.
+ */
+struct XLogPrefetcher
+{
+	/* Reader and current reading state. */
+	XLogReaderState *reader;
+	XLogReadLocalOptions options;
+	bool			have_record;
+	bool			shutdown;
+	int				next_block_id;
+
+	/* Details of last prefetch to skip repeats and seq scans. */
+	SMgrRelation	last_reln;
+	RelFileNode		last_rnode;
+	BlockNumber		last_blkno;
+
+	/* Online averages. */
+	uint64			samples;
+	double			avg_queue_depth;
+	double			avg_distance;
+	XLogRecPtr		next_sample_lsn;
+
+	/* Book-keeping required to avoid accessing non-existing blocks. */
+	HTAB		   *filter_table;
+	dlist_head		filter_queue;
+
+	/* Book-keeping required to limit concurrent prefetches. */
+	int				prefetch_head;
+	int				prefetch_tail;
+	int				prefetch_queue_size;
+	XLogRecPtr		prefetch_queue[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that we must assume have already been dropped.
+ */
+typedef struct XLogPrefetcherFilter
+{
+	RelFileNode		rnode;
+	XLogRecPtr		filter_until_replayed;
+	BlockNumber		filter_from_block;
+	dlist_node		link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory for pg_stat_prefetch_recovery.
+ */
+typedef struct XLogPrefetchStats
+{
+	pg_atomic_uint64 reset_time; /* Time of last reset. */
+	pg_atomic_uint64 prefetch;	/* Prefetches initiated. */
+	pg_atomic_uint64 skip_hit;	/* Blocks already buffered. */
+	pg_atomic_uint64 skip_new;	/* New/missing blocks filtered. */
+	pg_atomic_uint64 skip_fpw;	/* FPWs skipped. */
+	pg_atomic_uint64 skip_seq;	/* Repeat blocks skipped. */
+	float			avg_distance;
+	float			avg_queue_depth;
+
+	/* Reset counters */
+	pg_atomic_uint32 reset_request;
+	uint32			reset_handled;
+
+	/* Dynamic values */
+	int				distance;	/* Number of bytes ahead in the WAL. */
+	int				queue_depth; /* Number of I/Os possibly in progress. */
+} XLogPrefetchStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+										   RelFileNode rnode,
+										   BlockNumber blockno,
+										   XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+											RelFileNode rnode,
+											BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+												 XLogRecPtr replaying_lsn);
+static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+											 XLogRecPtr prefetching_lsn);
+static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher,
+											 XLogRecPtr replaying_lsn);
+static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher);
+static void XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher,
+									  XLogRecPtr replaying_lsn);
+static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher);
+static void XLogPrefetchSaveStats(void);
+static void XLogPrefetchRestoreStats(void);
+
+static XLogPrefetchStats *Stats;
+
+size_t
+XLogPrefetchShmemSize(void)
+{
+	return sizeof(XLogPrefetchStats);
+}
+
+static void
+XLogPrefetchResetStats(void)
+{
+	pg_atomic_write_u64(&Stats->reset_time, GetCurrentTimestamp());
+	pg_atomic_write_u64(&Stats->prefetch, 0);
+	pg_atomic_write_u64(&Stats->skip_hit, 0);
+	pg_atomic_write_u64(&Stats->skip_new, 0);
+	pg_atomic_write_u64(&Stats->skip_fpw, 0);
+	pg_atomic_write_u64(&Stats->skip_seq, 0);
+	Stats->avg_distance = 0;
+	Stats->avg_queue_depth = 0;
+}
+
+void
+XLogPrefetchShmemInit(void)
+{
+	bool		found;
+
+	Stats = (XLogPrefetchStats *)
+		ShmemInitStruct("XLogPrefetchStats",
+						sizeof(XLogPrefetchStats),
+						&found);
+	if (!found)
+	{
+		pg_atomic_init_u32(&Stats->reset_request, 0);
+		Stats->reset_handled = 0;
+		pg_atomic_init_u64(&Stats->reset_time, GetCurrentTimestamp());
+		pg_atomic_init_u64(&Stats->prefetch, 0);
+		pg_atomic_init_u64(&Stats->skip_hit, 0);
+		pg_atomic_init_u64(&Stats->skip_new, 0);
+		pg_atomic_init_u64(&Stats->skip_fpw, 0);
+		pg_atomic_init_u64(&Stats->skip_seq, 0);
+		Stats->avg_distance = 0;
+		Stats->avg_queue_depth = 0;
+		Stats->distance = 0;
+		Stats->queue_depth = 0;
+	}
+}
+
+/*
+ * Called when any GUC is changed that affects prefetching.
+ */
+void
+XLogPrefetchReconfigure(void)
+{
+	XLogPrefetchReconfigureCount++;
+}
+
+/*
+ * Called by any backend to request that the stats be reset.
+ */
+void
+XLogPrefetchRequestResetStats(void)
+{
+	pg_atomic_fetch_add_u32(&Stats->reset_request, 1);
+}
+
+/*
+ * Tell the stats collector to serialize the shared memory counters into the
+ * stats file.
+ */
+static void
+XLogPrefetchSaveStats(void)
+{
+	PgStat_RecoveryPrefetchStats serialized = {
+		.prefetch = pg_atomic_read_u64(&Stats->prefetch),
+		.skip_hit = pg_atomic_read_u64(&Stats->skip_hit),
+		.skip_new = pg_atomic_read_u64(&Stats->skip_new),
+		.skip_fpw = pg_atomic_read_u64(&Stats->skip_fpw),
+		.skip_seq = pg_atomic_read_u64(&Stats->skip_seq),
+		.stat_reset_timestamp = pg_atomic_read_u64(&Stats->reset_time)
+	};
+
+	pgstat_send_recoveryprefetch(&serialized);
+}
+
+/*
+ * Try to restore the shared memory counters from the stats file.
+ */
+static void
+XLogPrefetchRestoreStats(void)
+{
+	PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch();
+
+	if (serialized->stat_reset_timestamp != 0)
+	{
+		pg_atomic_write_u64(&Stats->prefetch, serialized->prefetch);
+		pg_atomic_write_u64(&Stats->skip_hit, serialized->skip_hit);
+		pg_atomic_write_u64(&Stats->skip_new, serialized->skip_new);
+		pg_atomic_write_u64(&Stats->skip_fpw, serialized->skip_fpw);
+		pg_atomic_write_u64(&Stats->skip_seq, serialized->skip_seq);
+		pg_atomic_write_u64(&Stats->reset_time, serialized->stat_reset_timestamp);
+	}
+}
+
+/*
+ * Initialize an XLogPrefetchState object and restore the last saved
+ * statistics from disk.
+ */
+void
+XLogPrefetchBegin(XLogPrefetchState *state)
+{
+	XLogPrefetchRestoreStats();
+
+	/* We'll reconfigure on the first call to XLogPrefetch(). */
+	state->prefetcher = NULL;
+	state->reconfigure_count = XLogPrefetchReconfigureCount - 1;
+}
+
+/*
+ * Shut down the prefetching infrastructure, if configured.
+ */
+void
+XLogPrefetchEnd(XLogPrefetchState *state)
+{
+	XLogPrefetchSaveStats();
+
+	if (state->prefetcher)
+		XLogPrefetcherFree(state->prefetcher);
+	state->prefetcher = NULL;
+
+	Stats->queue_depth = 0;
+	Stats->distance = 0;
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL that is ahead of the given lsn.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(TimeLineID tli, XLogRecPtr lsn, bool streaming)
+{
+	XLogPrefetcher *prefetcher;
+	static HASHCTL hash_table_ctl = {
+		.keysize = sizeof(RelFileNode),
+		.entrysize = sizeof(XLogPrefetcherFilter)
+	};
+	XLogReaderRoutine reader_routines = {
+		.page_read = read_local_xlog_page,
+		.segment_open = wal_segment_try_open,
+		.segment_close = wal_segment_close
+	};
+
+	/*
+	 * The size of the queue is based on the maintenance_io_concurrency
+	 * setting.  In theory we might have a separate queue for each tablespace,
+	 * but it's not clear how that should work, so for now we'll just use the
+	 * general GUC to rate-limit all prefetching.  We add one to the size
+	 * because our circular buffer has a gap between head and tail when full.
+	 */
+	prefetcher = palloc0(offsetof(XLogPrefetcher, prefetch_queue) +
+						 sizeof(XLogRecPtr) * (maintenance_io_concurrency + 1));
+	prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1;
+	prefetcher->options.tli = tli;
+	prefetcher->options.nowait = true;
+	if (streaming)
+	{
+		/*
+		 * We're only allowed to read as far as the WAL receiver has written.
+		 * We don't have to wait for it to be flushed, though, as recovery
+		 * does, so that gives us a chance to get a bit further ahead.
+		 */
+		prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN;
+	}
+	else
+	{
+		/* Read as far as we can. */
+		prefetcher->options.read_upto_policy = XLRO_END;
+	}
+	reader_routines.page_read_private = &prefetcher->options;
+	prefetcher->reader = XLogReaderAllocate(wal_segment_size,
+											NULL,
+											&reader_routines,
+											NULL);
+	prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
+										   &hash_table_ctl,
+										   HASH_ELEM | HASH_BLOBS);
+	dlist_init(&prefetcher->filter_queue);
+
+	/* Prepare to read at the given LSN. */
+	ereport(LOG,
+			(errmsg("recovery started prefetching on timeline %u at %X/%X",
+					tli,
+					(uint32) (lsn << 32), (uint32) lsn)));
+	XLogBeginRead(prefetcher->reader, lsn);
+
+	Stats->queue_depth = 0;
+	Stats->distance = 0;
+
+	return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+	/* Log final statistics. */
+	ereport(LOG,
+			(errmsg("recovery finished prefetching at %X/%X; "
+					"prefetch = " UINT64_FORMAT ", "
+					"skip_hit = " UINT64_FORMAT ", "
+					"skip_new = " UINT64_FORMAT ", "
+					"skip_fpw = " UINT64_FORMAT ", "
+					"skip_seq = " UINT64_FORMAT ", "
+					"avg_distance = %f, "
+					"avg_queue_depth = %f",
+			 (uint32) (prefetcher->reader->EndRecPtr << 32),
+			 (uint32) (prefetcher->reader->EndRecPtr),
+			 pg_atomic_read_u64(&Stats->prefetch),
+			 pg_atomic_read_u64(&Stats->skip_hit),
+			 pg_atomic_read_u64(&Stats->skip_new),
+			 pg_atomic_read_u64(&Stats->skip_fpw),
+			 pg_atomic_read_u64(&Stats->skip_seq),
+			 Stats->avg_distance,
+			 Stats->avg_queue_depth)));
+	XLogReaderFree(prefetcher->reader);
+	hash_destroy(prefetcher->filter_table);
+	pfree(prefetcher);
+}
+
+/*
+ * Called when recovery is replaying a new LSN, to check if we can read ahead.
+ */
+void
+XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	uint32		reset_request;
+
+	/* If an error has occurred or we've hit the end of the WAL, do nothing. */
+	if (prefetcher->shutdown)
+		return;
+
+	/*
+	 * Have any in-flight prefetches definitely completed, judging by the LSN
+	 * that is currently being replayed?
+	 */
+	XLogPrefetcherCompletedIO(prefetcher, replaying_lsn);
+
+	/*
+	 * Do we already have the maximum permitted number of I/Os running
+	 * (according to the information we have)?  If so, we have to wait for at
+	 * least one to complete, so give up early and let recovery catch up.
+	 */
+	if (XLogPrefetcherSaturated(prefetcher))
+		return;
+
+	/*
+	 * Can we drop any filters yet?  This happens when the LSN that is
+	 * currently being replayed has moved past a record that prevents
+	 * pretching of a block range, such as relation extension.
+	 */
+	XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn);
+
+	/*
+	 * Have we been asked to reset our stats counters?  This is checked with
+	 * an unsynchronized memory read, but we'll see it eventually and we'll be
+	 * accessing that cache line anyway.
+	 */
+	reset_request = pg_atomic_read_u32(&Stats->reset_request);
+	if (reset_request != Stats->reset_handled)
+	{
+		XLogPrefetchResetStats();
+		Stats->reset_handled = reset_request;
+		prefetcher->avg_distance = 0;
+		prefetcher->avg_queue_depth = 0;
+		prefetcher->samples = 0;
+	}
+
+	/* OK, we can now try reading ahead. */
+	XLogPrefetcherScanRecords(prefetcher, replaying_lsn);
+}
+
+/*
+ * Read ahead as far as we are allowed to, considering the LSN that recovery
+ * is currently replaying.
+ */
+static void
+XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	XLogReaderState *reader = prefetcher->reader;
+
+	Assert(!XLogPrefetcherSaturated(prefetcher));
+
+	for (;;)
+	{
+		char *error;
+		int64		distance;
+
+		/* If we don't already have a record, then try to read one. */
+		if (!prefetcher->have_record)
+		{
+			if (!XLogReadRecord(reader, &error))
+			{
+				/* If we got an error, log it and give up. */
+				if (error)
+				{
+					ereport(LOG, (errmsg("recovery no longer prefetching: %s", error)));
+					prefetcher->shutdown = true;
+					Stats->queue_depth = 0;
+					Stats->distance = 0;
+				}
+				/* Otherwise, we'll try again later when more data is here. */
+				return;
+			}
+			prefetcher->have_record = true;
+			prefetcher->next_block_id = 0;
+		}
+
+		/* How far ahead of replay are we now? */
+		distance = prefetcher->reader->ReadRecPtr - replaying_lsn;
+
+		/* Update distance shown in shm. */
+		Stats->distance = distance;
+
+		/* Periodically recompute some statistics. */
+		if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn))
+		{
+			/* Compute online averages. */
+			prefetcher->samples++;
+			if (prefetcher->samples == 1)
+			{
+				prefetcher->avg_distance = Stats->distance;
+				prefetcher->avg_queue_depth = Stats->queue_depth;
+			}
+			else
+			{
+				prefetcher->avg_distance +=
+					(Stats->distance - prefetcher->avg_distance) /
+					prefetcher->samples;
+				prefetcher->avg_queue_depth +=
+					(Stats->queue_depth - prefetcher->avg_queue_depth) /
+					prefetcher->samples;
+			}
+
+			/* Expose it in shared memory. */
+			Stats->avg_distance = prefetcher->avg_distance;
+			Stats->avg_queue_depth = prefetcher->avg_queue_depth;
+
+			/* Also periodically save the simple counters. */
+			XLogPrefetchSaveStats();
+
+			prefetcher->next_sample_lsn =
+				replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE;
+		}
+
+		/* Are we too far ahead of replay? */
+		if (distance >= max_recovery_prefetch_distance)
+			break;
+
+		/* Are we not far enough ahead? */
+		if (distance <= 0)
+		{
+			prefetcher->have_record = false;	/* skip this record */
+			continue;
+		}
+
+		/*
+		 * If this is a record that creates a new SMGR relation, we'll avoid
+		 * prefetching anything from that rnode until it has been replayed.
+		 */
+		if (replaying_lsn < reader->ReadRecPtr &&
+			XLogRecGetRmid(reader) == RM_SMGR_ID &&
+			(XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE)
+		{
+			xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader);
+
+			XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0,
+									reader->ReadRecPtr);
+		}
+
+		/* Scan the record's block references. */
+		if (!XLogPrefetcherScanBlocks(prefetcher))
+			return;
+
+		/* Advance to the next record. */
+		prefetcher->have_record = false;
+	}
+}
+
+/*
+ * Scan the current record for block references, and consider prefetching.
+ *
+ * Return true if we processed the current record to completion and still have
+ * queue space to process a new record, and false if we saturated the I/O
+ * queue and need to wait for recovery to advance before we continue.
+ */
+static bool
+XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher)
+{
+	XLogReaderState *reader = prefetcher->reader;
+
+	Assert(!XLogPrefetcherSaturated(prefetcher));
+
+	/*
+	 * We might already have been partway through processing this record when
+	 * our queue became saturated, so we need to start where we left off.
+	 */
+	for (int block_id = prefetcher->next_block_id;
+		 block_id <= reader->max_block_id;
+		 ++block_id)
+	{
+		PrefetchBufferResult prefetch;
+		DecodedBkpBlock *block = &reader->blocks[block_id];
+		SMgrRelation reln;
+
+		/* Ignore everything but the main fork for now. */
+		if (block->forknum != MAIN_FORKNUM)
+			continue;
+
+		/*
+		 * If there is a full page image attached, we won't be reading the
+		 * page, so you might think we should skip it.  However, if the
+		 * underlying filesystem uses larger logical blocks than us, it
+		 * might still need to perform a read-before-write some time later.
+		 * Therefore, only prefetch if configured to do so.
+		 */
+		if (block->has_image && !recovery_prefetch_fpw)
+		{
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_fpw, 1);
+			continue;
+		}
+
+		/*
+		 * If this block will initialize a new page then it's probably an
+		 * extension.  Since it might create a new segment, we can't try
+		 * to prefetch this block until the record has been replayed, or we
+		 * might try to open a file that doesn't exist yet.
+		 */
+		if (block->flags & BKPBLOCK_WILL_INIT)
+		{
+			XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+									reader->ReadRecPtr);
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1);
+			continue;
+		}
+
+		/* Should we skip this block due to a filter? */
+		if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
+		{
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1);
+			continue;
+		}
+
+		/* Fast path for repeated references to the same relation. */
+		if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode))
+		{
+			/*
+			 * If this is a repeat access to the same block, then skip it.
+			 *
+			 * XXX We could also check for last_blkno + 1 too, and also update
+			 * last_blkno; it's not clear if the kernel would do a better job
+			 * of sequential prefetching.
+			 */
+			if (block->blkno == prefetcher->last_blkno)
+			{
+				pg_atomic_unlocked_add_fetch_u64(&Stats->skip_seq, 1);
+				continue;
+			}
+
+			/* We can avoid calling smgropen(). */
+			reln = prefetcher->last_reln;
+		}
+		else
+		{
+			/* Otherwise we have to open it. */
+			reln = smgropen(block->rnode, InvalidBackendId);
+			prefetcher->last_rnode = block->rnode;
+			prefetcher->last_reln = reln;
+		}
+		prefetcher->last_blkno = block->blkno;
+
+		/* Try to prefetch this block! */
+		prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
+		if (BufferIsValid(prefetch.recent_buffer))
+		{
+			/*
+			 * It was already cached, so do nothing.  Perhaps in future we
+			 * could remember the buffer so that recovery doesn't have to look
+			 * it up again.
+			 */
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_hit, 1);
+		}
+		else if (prefetch.initiated_io)
+		{
+			/*
+			 * I/O has possibly been initiated (though we don't know if it
+			 * was already cached by the kernel, so we just have to assume
+			 * that it has due to lack of better information).  Record
+			 * this as an I/O in progress until eventually we replay this
+			 * LSN.
+			 */
+			pg_atomic_unlocked_add_fetch_u64(&Stats->prefetch, 1);
+			XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr);
+			/*
+			 * If the queue is now full, we'll have to wait before processing
+			 * any more blocks from this record, or move to a new record if
+			 * that was the last block.
+			 */
+			if (XLogPrefetcherSaturated(prefetcher))
+			{
+				prefetcher->next_block_id = block_id + 1;
+				return false;
+			}
+		}
+		else
+		{
+			/*
+			 * Neither cached nor initiated.  The underlying segment file
+			 * doesn't exist.  Presumably it will be unlinked by a later WAL
+			 * record.  When recovery reads this block, it will use the
+			 * EXTENSION_CREATE_RECOVERY flag.  We certainly don't want to do
+			 * that sort of thing while merely prefetching, so let's just
+			 * ignore references to this relation until this record is
+			 * replayed, and let recovery create the dummy file or complain if
+			 * something is wrong.
+			 */
+			XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+									reader->ReadRecPtr);
+			pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1);
+		}
+	}
+
+	return true;
+}
+
+/*
+ * Expose statistics about recovery prefetching.
+ */
+Datum
+pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	Tuplestorestate *tupstore;
+	MemoryContext per_query_ctx;
+	MemoryContext oldcontext;
+	Datum		values[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+	bool		nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mod required, but it is not allowed in this context")));
+
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (pg_atomic_read_u32(&Stats->reset_request) != Stats->reset_handled)
+	{
+		/* There's an unhandled reset request, so just show NULLs */
+		for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+			nulls[i] = true;
+	}
+	else
+	{
+		for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+			nulls[i] = false;
+	}
+
+	values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&Stats->reset_time));
+	values[1] = Int64GetDatum(pg_atomic_read_u64(&Stats->prefetch));
+	values[2] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_hit));
+	values[3] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_new));
+	values[4] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_fpw));
+	values[5] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_seq));
+	values[6] = Int32GetDatum(Stats->distance);
+	values[7] = Int32GetDatum(Stats->queue_depth);
+	values[8] = Float4GetDatum(Stats->avg_distance);
+	values[9] = Float4GetDatum(Stats->avg_queue_depth);
+	tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+	tuplestore_donestoring(tupstore);
+
+	return (Datum) 0;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+						BlockNumber blockno, XLogRecPtr lsn)
+{
+	XLogPrefetcherFilter *filter;
+	bool		found;
+
+	filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+	if (!found)
+	{
+		/*
+		 * Don't allow any prefetching of this block or higher until replayed.
+		 */
+		filter->filter_until_replayed = lsn;
+		filter->filter_from_block = blockno;
+		dlist_push_head(&prefetcher->filter_queue, &filter->link);
+	}
+	else
+	{
+		/*
+		 * We were already filtering this rnode.  Extend the filter's lifetime
+		 * to cover this WAL record, but leave the (presumably lower) block
+		 * number there because we don't want to have to track individual
+		 * blocks.
+		 */
+		filter->filter_until_replayed = lsn;
+		dlist_delete(&filter->link);
+		dlist_push_head(&prefetcher->filter_queue, &filter->link);
+	}
+}
+
+/*
+ * Have we replayed the records that caused us to begin filtering a block
+ * range?  That means that relations should have been created, extended or
+ * dropped as required, so we can drop relevant filters.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+	{
+		XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+														  link,
+														  &prefetcher->filter_queue);
+
+		if (filter->filter_until_replayed >= replaying_lsn)
+			break;
+		dlist_delete(&filter->link);
+		hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+	}
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+						 BlockNumber blockno)
+{
+	/*
+	 * Test for empty queue first, because we expect it to be empty most of the
+	 * time and we can avoid the hash table lookup in that case.
+	 */
+	if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+	{
+		XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode,
+												   HASH_FIND, NULL);
+
+		if (filter && filter->filter_from_block <= blockno)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * Insert an LSN into the queue.  The queue must not be full already.  This
+ * tracks the fact that we have (to the best of our knowledge) initiated an
+ * I/O, so that we can impose a cap on concurrent prefetching.
+ */
+static inline void
+XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+						  XLogRecPtr prefetching_lsn)
+{
+	Assert(!XLogPrefetcherSaturated(prefetcher));
+	prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn;
+	prefetcher->prefetch_head %= prefetcher->prefetch_queue_size;
+	Stats->queue_depth++;
+	Assert(Stats->queue_depth <= prefetcher->prefetch_queue_size);
+}
+
+/*
+ * Have we replayed the records that caused us to initiate the oldest
+ * prefetches yet?  That means that they're definitely finished, so we can can
+ * forget about them and allow ourselves to initiate more prefetches.  For now
+ * we don't have any awareness of when I/O really completes.
+ */
+static inline void
+XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+	while (prefetcher->prefetch_head != prefetcher->prefetch_tail &&
+		   prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn)
+	{
+		prefetcher->prefetch_tail++;
+		prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size;
+		Stats->queue_depth--;
+		Assert(Stats->queue_depth >= 0);
+	}
+}
+
+/*
+ * Check if the maximum allowed number of I/Os is already in flight.
+ */
+static inline bool
+XLogPrefetcherSaturated(XLogPrefetcher *prefetcher)
+{
+	return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size ==
+		prefetcher->prefetch_tail;
+}
+
+void
+assign_max_recovery_prefetch_distance(int new_value, void *extra)
+{
+	/* Reconfigure prefetching, because a setting it depends on changed. */
+	max_recovery_prefetch_distance = new_value;
+	if (AmStartupProcess())
+		XLogPrefetchReconfigure();
+}
+
+void
+assign_recovery_prefetch_fpw(bool new_value, void *extra)
+{
+	/* Reconfigure prefetching, because a setting it depends on changed. */
+	recovery_prefetch_fpw = new_value;
+	if (AmStartupProcess())
+		XLogPrefetchReconfigure();
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 56420bbc9d..6c39b9ad48 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -826,6 +826,20 @@ CREATE VIEW pg_stat_wal_receiver AS
     FROM pg_stat_get_wal_receiver() s
     WHERE s.pid IS NOT NULL;
 
+CREATE VIEW pg_stat_prefetch_recovery AS
+    SELECT
+            s.stats_reset,
+            s.prefetch,
+            s.skip_hit,
+            s.skip_new,
+            s.skip_fpw,
+            s.skip_seq,
+            s.distance,
+            s.queue_depth,
+	    s.avg_distance,
+	    s.avg_queue_depth
+     FROM pg_stat_get_prefetch_recovery() s;
+
 CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index d7f99d9944..5ac3fed4c6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -38,6 +38,7 @@
 #include "access/transam.h"
 #include "access/twophase_rmgr.h"
 #include "access/xact.h"
+#include "access/xlogprefetch.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_proc.h"
 #include "common/ip.h"
@@ -282,6 +283,7 @@ static int	localNumBackends = 0;
 static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -354,6 +356,7 @@ static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
 static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
 static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
 static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
+static void pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len);
 static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
 static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
 static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
@@ -1370,11 +1373,20 @@ pgstat_reset_shared_counters(const char *target)
 		msg.m_resettarget = RESET_ARCHIVER;
 	else if (strcmp(target, "bgwriter") == 0)
 		msg.m_resettarget = RESET_BGWRITER;
+	else if (strcmp(target, "prefetch_recovery") == 0)
+	{
+		/*
+		 * We can't ask the stats collector to do this for us as it is not
+		 * attached to shared memory.
+		 */
+		XLogPrefetchRequestResetStats();
+		return;
+	}
 	else
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("unrecognized reset target: \"%s\"", target),
-				 errhint("Target must be \"archiver\" or \"bgwriter\".")));
+				 errhint("Target must be \"archiver\", \"bgwriter\" or \"prefetch_recovery\".")));
 
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
 	pgstat_send(&msg, sizeof(msg));
@@ -2696,6 +2708,22 @@ pgstat_fetch_slru(void)
 }
 
 
+/*
+ * ---------
+ * pgstat_fetch_recoveryprefetch() -
+ *
+ *	Support function for restoring the counters managed by xlogprefetch.c.
+ * ---------
+ */
+PgStat_RecoveryPrefetchStats *
+pgstat_fetch_recoveryprefetch(void)
+{
+	backend_read_statsfile();
+
+	return &recoveryPrefetchStats;
+}
+
+
 /* ------------------------------------------------------------
  * Functions for management of the shared-memory PgBackendStatus array
  * ------------------------------------------------------------
@@ -4444,6 +4472,23 @@ pgstat_send_slru(void)
 }
 
 
+/* ----------
+ * pgstat_send_recoveryprefetch() -
+ *
+ *		Send recovery prefetch statistics to the collector
+ * ----------
+ */
+void
+pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats)
+{
+	PgStat_MsgRecoveryPrefetch msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYPREFETCH);
+	msg.m_stats = *stats;
+	pgstat_send(&msg, sizeof(msg));
+}
+
+
 /* ----------
  * PgstatCollectorMain() -
  *
@@ -4640,6 +4685,10 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_slru(&msg.msg_slru, len);
 					break;
 
+				case PGSTAT_MTYPE_RECOVERYPREFETCH:
+					pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len);
+					break;
+
 				case PGSTAT_MTYPE_FUNCSTAT:
 					pgstat_recv_funcstat(&msg.msg_funcstat, len);
 					break;
@@ -4915,6 +4964,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
 	(void) rc;					/* we'll check for error with ferror */
 
+	/*
+	 * Write recovery prefetch stats struct
+	 */
+	rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1,
+				fpout);
+	(void) rc;					/* we'll check for error with ferror */
+
 	/*
 	 * Walk through the database table.
 	 */
@@ -5174,6 +5230,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	memset(&globalStats, 0, sizeof(globalStats));
 	memset(&archiverStats, 0, sizeof(archiverStats));
 	memset(&slruStats, 0, sizeof(slruStats));
+	memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
 
 	/*
 	 * Set the current timestamp (will be kept only in case we can't load an
@@ -5261,6 +5318,18 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 		goto done;
 	}
 
+	/*
+	 * Read recoveryPrefetchStats struct
+	 */
+	if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats),
+			  fpin) != sizeof(recoveryPrefetchStats))
+	{
+		ereport(pgStatRunningInCollector ? LOG : WARNING,
+				(errmsg("corrupted statistics file \"%s\"", statfile)));
+		memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
+		goto done;
+	}
+
 	/*
 	 * We found an existing collector stats file. Read it and put all the
 	 * hashtable entries into place.
@@ -5560,6 +5629,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_GlobalStats myGlobalStats;
 	PgStat_ArchiverStats myArchiverStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+	PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
 	FILE	   *fpin;
 	int32		format_id;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -5625,6 +5695,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 		return false;
 	}
 
+	/*
+	 * Read recovery prefetch stats struct
+	 */
+	if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats),
+			  fpin) != sizeof(myRecoveryPrefetchStats))
+	{
+		ereport(pgStatRunningInCollector ? LOG : WARNING,
+				(errmsg("corrupted statistics file \"%s\"", statfile)));
+		FreeFile(fpin);
+		return false;
+	}
+
 	/* By default, we're going to return the timestamp of the global file. */
 	*ts = myGlobalStats.stats_timestamp;
 
@@ -6422,6 +6504,18 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len)
 	slruStats[msg->m_index].truncate += msg->m_truncate;
 }
 
+/* ----------
+ * pgstat_recv_recoveryprefetch() -
+ *
+ *	Process a recovery prefetch message.
+ * ----------
+ */
+static void
+pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len)
+{
+	recoveryPrefetchStats = msg->m_stats;
+}
+
 /* ----------
  * pgstat_recv_recoveryconflict() -
  *
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..221081bddc 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -21,6 +21,7 @@
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/xlogprefetch.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -124,6 +125,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, PredicateLockShmemSize());
 		size = add_size(size, ProcGlobalShmemSize());
 		size = add_size(size, XLOGShmemSize());
+		size = add_size(size, XLogPrefetchShmemSize());
 		size = add_size(size, CLOGShmemSize());
 		size = add_size(size, CommitTsShmemSize());
 		size = add_size(size, SUBTRANSShmemSize());
@@ -212,6 +214,7 @@ CreateSharedMemoryAndSemaphores(void)
 	 * Set up xlog, clog, and buffers
 	 */
 	XLOGShmemInit();
+	XLogPrefetchShmemInit();
 	CLOGShmemInit();
 	CommitTsShmemInit();
 	SUBTRANSShmemInit();
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2f3e0a70e0..2fea5f3dcd 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -34,6 +34,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogprefetch.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
 #include "catalog/storage.h"
@@ -198,6 +199,7 @@ static bool check_max_wal_senders(int *newval, void **extra, GucSource source);
 static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source);
 static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source);
 static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source);
+static void assign_maintenance_io_concurrency(int newval, void *extra);
 static void assign_pgstat_temp_directory(const char *newval, void *extra);
 static bool check_application_name(char **newval, void **extra, GucSource source);
 static void assign_application_name(const char *newval, void *extra);
@@ -1272,6 +1274,18 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"recovery_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Prefetch blocks that have full page images in the WAL"),
+			gettext_noop("On some systems, there is no benefit to prefetching pages that will be "
+						 "entirely overwritten, but if the logical page size of the filesystem is "
+						 "larger than PostgreSQL's, this can be beneficial.  This option has no "
+						 "effect unless max_recovery_prefetch_distance is set to a positive number.")
+		},
+		&recovery_prefetch_fpw,
+		false,
+		NULL, assign_recovery_prefetch_fpw, NULL
+	},
 
 	{
 		{"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS,
@@ -2649,6 +2663,22 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_recovery_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Maximum number of bytes to read ahead in the WAL to prefetch referenced blocks."),
+			gettext_noop("Set to -1 to disable prefetching during recovery."),
+			GUC_UNIT_BYTE
+		},
+		&max_recovery_prefetch_distance,
+#ifdef USE_PREFETCH
+		256 * 1024,
+#else
+		-1,
+#endif
+		-1, INT_MAX,
+		NULL, assign_max_recovery_prefetch_distance, NULL
+	},
+
 	{
 		{"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING,
 			gettext_noop("Sets the number of WAL files held for standby servers."),
@@ -2968,7 +2998,8 @@ static struct config_int ConfigureNamesInt[] =
 		0,
 #endif
 		0, MAX_IO_CONCURRENCY,
-		check_maintenance_io_concurrency, NULL, NULL
+		check_maintenance_io_concurrency, assign_maintenance_io_concurrency,
+		NULL
 	},
 
 	{
@@ -11586,6 +11617,20 @@ check_maintenance_io_concurrency(int *newval, void **extra, GucSource source)
 	return true;
 }
 
+static void
+assign_maintenance_io_concurrency(int newval, void *extra)
+{
+#ifdef USE_PREFETCH
+	/*
+	 * Reconfigure recovery prefetching, because a setting it depends on
+	 * changed.
+	 */
+	maintenance_io_concurrency = newval;
+	if (AmStartupProcess())
+		XLogPrefetchReconfigure();
+#endif
+}
+
 static void
 assign_pgstat_temp_directory(const char *newval, void *extra)
 {
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 81055edde7..38763f88b0 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -230,6 +230,11 @@
 #checkpoint_flush_after = 0		# measured in pages, 0 disables
 #checkpoint_warning = 30s		# 0 disables
 
+# - Prefetching during recovery -
+
+#max_recovery_prefetch_distance = 256kB	# -1 disables prefetching
+#recovery_prefetch_fpw = off	# whether to prefetch pages logged with FPW
+
 # - Archiving -
 
 #archive_mode = off		# enables archiving; off, on, or always
diff --git a/src/include/access/xlogprefetch.h b/src/include/access/xlogprefetch.h
new file mode 100644
index 0000000000..d8e2e1ca50
--- /dev/null
+++ b/src/include/access/xlogprefetch.h
@@ -0,0 +1,85 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.h
+ *		Declarations for the recovery prefetching module.
+ *
+ * Portions Copyright (c) 2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/access/xlogprefetch.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGPREFETCH_H
+#define XLOGPREFETCH_H
+
+#include "access/xlogdefs.h"
+
+/* GUCs */
+extern int	max_recovery_prefetch_distance;
+extern bool recovery_prefetch_fpw;
+
+struct XLogPrefetcher;
+typedef struct XLogPrefetcher XLogPrefetcher;
+
+extern int XLogPrefetchReconfigureCount;
+
+typedef struct XLogPrefetchState
+{
+	XLogPrefetcher *prefetcher;
+	int			reconfigure_count;
+} XLogPrefetchState;
+
+extern size_t XLogPrefetchShmemSize(void);
+extern void XLogPrefetchShmemInit(void);
+
+extern void XLogPrefetchReconfigure(void);
+extern void XLogPrefetchRequestResetStats(void);
+
+extern void XLogPrefetchBegin(XLogPrefetchState *state);
+extern void XLogPrefetchEnd(XLogPrefetchState *state);
+
+/* Functions exposed only for the use of XLogPrefetch(). */
+extern XLogPrefetcher *XLogPrefetcherAllocate(TimeLineID tli,
+											  XLogRecPtr lsn,
+											  bool streaming);
+extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher);
+extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch,
+									XLogRecPtr replaying_lsn);
+
+/*
+ * Tell the prefetching module that we are now replaying a given LSN, so that
+ * it can decide how far ahead to read in the WAL, if configured.
+ */
+static inline void
+XLogPrefetch(XLogPrefetchState *state,
+			 TimeLineID replaying_tli,
+			 XLogRecPtr replaying_lsn,
+			 bool from_stream)
+{
+	/*
+	 * Handle any configuration changes.  Rather than trying to deal with
+	 * various parameter changes, we just tear down and set up a new
+	 * prefetcher if anything we depend on changes.
+	 */
+	if (unlikely(state->reconfigure_count != XLogPrefetchReconfigureCount))
+	{
+		/* If we had a prefetcher, tear it down. */
+		if (state->prefetcher)
+		{
+			XLogPrefetcherFree(state->prefetcher);
+			state->prefetcher = NULL;
+		}
+		/* If we want a prefetcher, set it up. */
+		if (max_recovery_prefetch_distance > 0)
+			state->prefetcher = XLogPrefetcherAllocate(replaying_tli,
+													   replaying_lsn,
+													   from_stream);
+		state->reconfigure_count = XLogPrefetchReconfigureCount;
+	}
+
+	if (state->prefetcher)
+		XLogPrefetcherReadAhead(state->prefetcher, replaying_lsn);
+}
+
+#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 61f2c2f5b4..56b48bf2ad 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6136,6 +6136,14 @@
   prorettype => 'bool', proargtypes => '',
   prosrc => 'pg_is_wal_replay_paused' },
 
+{ oid => '9085', descr => 'statistics: information about WAL prefetching',
+  proname => 'pg_stat_get_prefetch_recovery', prorows => '1', provolatile => 'v',
+  proretset => 't', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int4,int4,float4,float4}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{stats_reset,prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth,avg_distance,avg_queue_depth}',
+  prosrc => 'pg_stat_get_prefetch_recovery' },
+
 { oid => '2621', descr => 'reload configuration files',
   proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
   proargtypes => '', prosrc => 'pg_reload_conf' },
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index c55dc1481c..0dcd3c377a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -62,6 +62,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_ARCHIVER,
 	PGSTAT_MTYPE_BGWRITER,
 	PGSTAT_MTYPE_SLRU,
+	PGSTAT_MTYPE_RECOVERYPREFETCH,
 	PGSTAT_MTYPE_FUNCSTAT,
 	PGSTAT_MTYPE_FUNCPURGE,
 	PGSTAT_MTYPE_RECOVERYCONFLICT,
@@ -182,6 +183,19 @@ typedef struct PgStat_TableXactStatus
 	struct PgStat_TableXactStatus *next;	/* next of same subxact */
 } PgStat_TableXactStatus;
 
+/*
+ * Recovery prefetching statistics persisted on disk by pgstat.c, but kept in
+ * shared memory by xlogprefetch.c.
+ */
+typedef struct PgStat_RecoveryPrefetchStats
+{
+	PgStat_Counter prefetch;
+	PgStat_Counter skip_hit;
+	PgStat_Counter skip_new;
+	PgStat_Counter skip_fpw;
+	PgStat_Counter skip_seq;
+	TimestampTz stat_reset_timestamp;
+} PgStat_RecoveryPrefetchStats;
 
 /* ------------------------------------------------------------
  * Message formats follow
@@ -453,6 +467,16 @@ typedef struct PgStat_MsgSLRU
 	PgStat_Counter m_truncate;
 } PgStat_MsgSLRU;
 
+/* ----------
+ * PgStat_MsgRecoveryPrefetch			Sent by XLogPrefetch to save statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgRecoveryPrefetch
+{
+	PgStat_MsgHdr m_hdr;
+	PgStat_RecoveryPrefetchStats m_stats;
+} PgStat_MsgRecoveryPrefetch;
+
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
  * ----------
@@ -597,6 +621,7 @@ typedef union PgStat_Msg
 	PgStat_MsgArchiver msg_archiver;
 	PgStat_MsgBgWriter msg_bgwriter;
 	PgStat_MsgSLRU msg_slru;
+	PgStat_MsgRecoveryPrefetch msg_recoveryprefetch;
 	PgStat_MsgFuncstat msg_funcstat;
 	PgStat_MsgFuncpurge msg_funcpurge;
 	PgStat_MsgRecoveryConflict msg_recoveryconflict;
@@ -1458,6 +1483,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info,
 
 extern void pgstat_send_archiver(const char *xlog, bool failed);
 extern void pgstat_send_bgwriter(void);
+extern void pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats);
 
 /* ----------
  * Support functions for the SQL-callable functions to
@@ -1473,6 +1499,7 @@ extern int	pgstat_fetch_stat_numbackends(void);
 extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
+extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 2819282181..976cf8b116 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -440,4 +440,8 @@ extern void assign_search_path(const char *newval, void *extra);
 extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
 
+/* in access/transam/xlogprefetch.c */
+extern void assign_max_recovery_prefetch_distance(int new_value, void *extra);
+extern void assign_recovery_prefetch_fpw(bool new_value, void *extra);
+
 #endif							/* GUC_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b813e32215..74dd8c604c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1857,6 +1857,17 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_enc AS encrypted
    FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_prefetch_recovery| SELECT s.stats_reset,
+    s.prefetch,
+    s.skip_hit,
+    s.skip_new,
+    s.skip_fpw,
+    s.skip_seq,
+    s.distance,
+    s.queue_depth,
+    s.avg_distance,
+    s.avg_queue_depth
+   FROM pg_stat_get_prefetch_recovery() s(stats_reset, prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth, avg_distance, avg_queue_depth);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
-- 
2.20.1

Reply via email to