On Sun, May 3, 2020 at 3:12 AM Dmitry Dolgov <9erthali...@gmail.com> wrote: > I've finally performed couple of tests involving more IO. The > not-that-big dataset of 1.5 GB for the replica with the memory allowing > fitting ~ 1/6 of it, default prefetching parameters and an update > workload with uniform distribution. Rather a small setup, but causes > stable reading into the page cache on the replica and allows to see a > visible influence of the patch (more measurement samples tend to happen > at lower latencies):
Thanks for these tests Dmitry. You didn't mention the details of the workload, but one thing I'd recommend for a uniform/random workload that's generating a lot of misses on the primary server using N backends is to make sure that maintenance_io_concurrency is set to a number like N*2 or higher, and to look at the queue depth on both systems with iostat -x 1. Then you can experiment with ALTER SYSTEM SET maintenance_io_concurrency = X; SELECT pg_reload_conf(); to try to understand the way it works; there is a point where you've set it high enough and the replica is able to handle the same rate of concurrent I/Os as the primary. The default of 10 is actually pretty low unless you've only got ~4 backends generating random updates on the primary. That's with full_page_writes=off; if you leave it on, it takes a while to get into a scenario where it has much effect. Here's a rebase, after the recent XLogReader refactoring.
From a7fd3f728d64c3c94387e9e424dba507b166bcab Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 28 Mar 2020 11:42:59 +1300 Subject: [PATCH v9 1/3] Add pg_atomic_unlocked_add_fetch_XXX(). Add a variant of pg_atomic_add_fetch_XXX with no barrier semantics, for cases where you only want to avoid the possibility that a concurrent pg_atomic_read_XXX() sees a torn/partial value. Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/include/port/atomics.h | 24 ++++++++++++++++++++++ src/include/port/atomics/generic.h | 33 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index 4956ec55cb..2abb852893 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -389,6 +389,21 @@ pg_atomic_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_) return pg_atomic_add_fetch_u32_impl(ptr, add_); } +/* + * pg_atomic_unlocked_add_fetch_u32 - atomically add to variable + * + * Like pg_atomic_unlocked_write_u32, guarantees only that partial values + * cannot be observed. + * + * No barrier semantics. + */ +static inline uint32 +pg_atomic_unlocked_add_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 add_) +{ + AssertPointerAlignment(ptr, 4); + return pg_atomic_unlocked_add_fetch_u32_impl(ptr, add_); +} + /* * pg_atomic_sub_fetch_u32 - atomically subtract from variable * @@ -519,6 +534,15 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +static inline uint64 +pg_atomic_unlocked_add_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 add_) +{ +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + return pg_atomic_unlocked_add_fetch_u64_impl(ptr, add_); +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ diff --git a/src/include/port/atomics/generic.h b/src/include/port/atomics/generic.h index d3ba89a58f..1683653ca6 100644 --- a/src/include/port/atomics/generic.h +++ b/src/include/port/atomics/generic.h @@ -234,6 +234,16 @@ pg_atomic_add_fetch_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_) } #endif +#if !defined(PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U32) +#define PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U32 +static inline uint32 +pg_atomic_unlocked_add_fetch_u32_impl(volatile pg_atomic_uint32 *ptr, int32 add_) +{ + ptr->value += add_; + return ptr->value; +} +#endif + #if !defined(PG_HAVE_ATOMIC_SUB_FETCH_U32) && defined(PG_HAVE_ATOMIC_FETCH_SUB_U32) #define PG_HAVE_ATOMIC_SUB_FETCH_U32 static inline uint32 @@ -399,3 +409,26 @@ pg_atomic_sub_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_fetch_sub_u64_impl(ptr, sub_) - sub_; } #endif + +#if defined(PG_HAVE_8BYTE_SINGLE_COPY_ATOMICITY) && \ + !defined(PG_HAVE_ATOMIC_U64_SIMULATION) + +#ifndef PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U64 +#define PG_HAVE_ATOMIC_UNLOCKED_ADD_FETCH_U64 +static inline uint64 +pg_atomic_unlocked_add_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val) +{ + ptr->value += val; + return ptr->value; +} +#endif + +#else + +static inline uint64 +pg_atomic_unlocked_add_fetch_u64_impl(volatile pg_atomic_uint64 *ptr, uint64 val) +{ + return pg_atomic_add_fetch_u64_impl(ptr, val); +} + +#endif -- 2.20.1
From 6ed95fffba6751ddc9607659183c072cb11fa4a8 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 7 Apr 2020 22:56:27 +1200 Subject: [PATCH v9 2/3] Allow XLogReadRecord() to be non-blocking. Extend read_local_xlog_page() to support non-blocking modes: 1. Reading as far as the WAL receiver has written so far. 2. Reading all the way to the end, when the end LSN is unknown. Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/access/transam/xlogreader.c | 37 ++++-- src/backend/access/transam/xlogutils.c | 149 +++++++++++++++++------- src/backend/replication/walsender.c | 2 +- src/include/access/xlogreader.h | 14 ++- src/include/access/xlogutils.h | 26 +++++ 5 files changed, 173 insertions(+), 55 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 5995798b58..897efaf682 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -259,6 +259,9 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * If the reading fails for some other reason, NULL is also returned, and * *errormsg is set to a string with details of the failure. * + * If the read_page callback is one that returns XLOGPAGEREAD_WOULDBLOCK rather + * than waiting for WAL to arrive, NULL is also returned in that case. + * * The returned pointer (or *errormsg) points to an internal buffer that's * valid until the next call to XLogReadRecord. */ @@ -548,10 +551,11 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) err: /* - * Invalidate the read state. We might read from a different source after - * failure. + * Invalidate the read state, if this was an error. We might read from a + * different source after failure. */ - XLogReaderInvalReadState(state); + if (readOff != XLOGPAGEREAD_WOULDBLOCK) + XLogReaderInvalReadState(state); if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; @@ -563,8 +567,9 @@ err: * Read a single xlog page including at least [pageptr, reqLen] of valid data * via the page_read() callback. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Returns XLOGPAGEREAD_ERROR or XLOGPAGEREAD_WOULDBLOCK if the required page + * cannot be read for some reason; errormsg_buf is set in the former case + * (unless the error occurs in the page_read callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -661,8 +666,11 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) return readLen; err: + if (readLen == XLOGPAGEREAD_WOULDBLOCK) + return XLOGPAGEREAD_WOULDBLOCK; + XLogReaderInvalReadState(state); - return -1; + return XLOGPAGEREAD_ERROR; } /* @@ -941,6 +949,7 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; char *errormsg; + int readLen; Assert(!XLogRecPtrIsInvalid(RecPtr)); @@ -954,7 +963,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -1035,7 +1043,8 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) } err: - XLogReaderInvalReadState(state); + if (readLen != XLOGPAGEREAD_WOULDBLOCK) + XLogReaderInvalReadState(state); return InvalidXLogRecPtr; } @@ -1094,8 +1103,16 @@ WALRead(XLogReaderState *state, XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); state->routine.segment_open(state, nextSegNo, &tli); - /* This shouldn't happen -- indicates a bug in segment_open */ - Assert(state->seg.ws_file >= 0); + /* callback reported that there was no such file */ + if (state->seg.ws_file < 0) + { + errinfo->wre_errno = errno; + errinfo->wre_req = 0; + errinfo->wre_read = 0; + errinfo->wre_off = startoff; + errinfo->wre_seg = state->seg; + return false; + } /* Update the current segment info. */ state->seg.ws_tli = tli; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 322b0e8ff5..18aa499831 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -25,6 +25,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -808,6 +809,29 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, path))); } +/* + * XLogReaderRoutine->segment_open callback that reports missing files rather + * than raising an error. + */ +void +wal_segment_try_open(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; + + if (errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); +} + /* stock XLogReaderRoutine->segment_close callback */ void wal_segment_close(XLogReaderState *state) @@ -823,6 +847,10 @@ wal_segment_close(XLogReaderState *state) * Public because it would likely be very helpful for someone writing another * output method outside walsender, e.g. in a bgworker. * + * A pointer to an XLogReadLocalOptions struct may be passed in as + * XLogReaderRouter->page_read_private to control the behavior of this + * function. + * * TODO: The walsender has its own version of this, but it relies on the * walsender's latch being set whenever WAL is flushed. No such infrastructure * exists for normal backends, so we have to do a check/sleep/repeat style of @@ -837,58 +865,89 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + XLogReadLocalOptions *options = + (XLogReadLocalOptions *) state->routine.page_read_private; loc = targetPagePtr + reqLen; /* Loop waiting for xlog to be available if necessary */ while (1) { - /* - * Determine the limit of xlog we can currently read to, and what the - * most recent timeline is. - * - * RecoveryInProgress() will update ThisTimeLineID when it first - * notices recovery finishes, so we only have to maintain it for the - * local process until recovery ends. - */ - if (!RecoveryInProgress()) - read_upto = GetFlushRecPtr(); - else - read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - tli = ThisTimeLineID; + switch (options ? options->read_upto_policy : -1) + { + case XLRO_WALRCV_WRITTEN: + /* + * We'll try to read as far as has been written by the WAL + * receiver, on the requested timeline. When we run out of valid + * data, we'll return an error. This is used by xlogprefetch.c + * while streaming. + */ + read_upto = GetWalRcvWriteRecPtr(); + state->currTLI = tli = options->tli; + break; - /* - * Check which timeline to get the record from. - * - * We have to do it each time through the loop because if we're in - * recovery as a cascading standby, the current timeline might've - * become historical. We can't rely on RecoveryInProgress() because in - * a standby configuration like - * - * A => B => C - * - * if we're a logical decoding session on C, and B gets promoted, our - * timeline will change while we remain in recovery. - * - * We can't just keep reading from the old timeline as the last WAL - * archive in the timeline will get renamed to .partial by - * StartupXLOG(). - * - * If that happens after our caller updated ThisTimeLineID but before - * we actually read the xlog page, we might still try to read from the - * old (now renamed) segment and fail. There's not much we can do - * about this, but it can only happen when we're a leaf of a cascading - * standby whose master gets promoted while we're decoding, so a - * one-off ERROR isn't too bad. - */ - XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + case XLRO_END: + /* + * We'll try to read as far as we can on one timeline. This is + * used by xlogprefetch.c for crash recovery. + */ + read_upto = (XLogRecPtr) -1; + state->currTLI = tli = options->tli; + break; + + default: + /* + * Determine the limit of xlog we can currently read to, and what the + * most recent timeline is. + * + * RecoveryInProgress() will update ThisTimeLineID when it first + * notices recovery finishes, so we only have to maintain it for + * the local process until recovery ends. + */ + if (!RecoveryInProgress()) + read_upto = GetFlushRecPtr(); + else + read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); + tli = ThisTimeLineID; + + /* + * Check which timeline to get the record from. + * + * We have to do it each time through the loop because if we're in + * recovery as a cascading standby, the current timeline might've + * become historical. We can't rely on RecoveryInProgress() + * because in a standby configuration like + * + * A => B => C + * + * if we're a logical decoding session on C, and B gets promoted, + * our timeline will change while we remain in recovery. + * + * We can't just keep reading from the old timeline as the last + * WAL archive in the timeline will get renamed to .partial by + * StartupXLOG(). + * + * If that happens after our caller updated ThisTimeLineID but + * before we actually read the xlog page, we might still try to + * read from the old (now renamed) segment and fail. There's not + * much we can do about this, but it can only happen when we're a + * leaf of a cascading standby whose master gets promoted while + * we're decoding, so a one-off ERROR isn't too bad. + */ + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + break; + } - if (state->currTLI == ThisTimeLineID) + if (state->currTLI == tli) { if (loc <= read_upto) break; + /* not enough data there, but we were asked not to wait */ + if (options && options->nowait) + return XLOGPAGEREAD_WOULDBLOCK; + CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } @@ -930,7 +989,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + return XLOGPAGEREAD_ERROR; } else { @@ -945,7 +1004,17 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &errinfo)) + { + /* + * When not following timeline changes, we may read past the end of + * available segments. Report lack of file as an error rather than + * raising an error. + */ + if (errinfo.wre_errno == ENOENT) + return XLOGPAGEREAD_ERROR; + WALReadRaiseError(&errinfo); + } /* number of valid bytes in the buffer */ return count; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 86847cbb54..448c83b684 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -835,7 +835,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + return XLOGPAGEREAD_ERROR; if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d930fe957d..3a5ab4b3ce 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -57,6 +57,10 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; +/* Special negative return values for XLogPageReadCB functions */ +#define XLOGPAGEREAD_ERROR -1 +#define XLOGPAGEREAD_WOULDBLOCK -2 + /* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, @@ -76,10 +80,11 @@ typedef struct XLogReaderRoutine * This callback shall read at least reqLen valid bytes of the xlog page * starting at targetPagePtr, and store them in readBuf. The callback * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. + * XLOGPAGEREAD_ERROR on failure. The callback shall either sleep, if + * necessary, to wait for the requested bytes to become available, or + * return XLOGPAGEREAD_WOULDBLOCK. The callback will not be invoked again + * for the same page unless more than the returned number of bytes are + * needed. * * targetRecPtr is the position of the WAL record we're reading. Usually * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs @@ -91,6 +96,7 @@ typedef struct XLogReaderRoutine * read from. */ XLogPageReadCB page_read; + void *page_read_private; /* * Callback to open the specified WAL segment for reading. ->seg.ws_file diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index e59b6cf3a9..6325c23dc2 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,12 +47,38 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +/* + * A pointer to an XLogReadLocalOptions struct can supplied as the private data + * for an XLogReader, causing read_local_xlog_page() to modify its behavior. + */ +typedef struct XLogReadLocalOptions +{ + /* Don't block waiting for new WAL to arrive. */ + bool nowait; + + /* + * For XLRO_WALRCV_WRITTEN and XLRO_END modes, the timeline ID must be + * provided. + */ + TimeLineID tli; + + /* How far to read. */ + enum { + XLRO_STANDARD, + XLRO_WALRCV_WRITTEN, + XLRO_END + } read_upto_policy; +} XLogReadLocalOptions; + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); extern void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); +extern void wal_segment_try_open(XLogReaderState *state, + XLogSegNo nextSegNo, + TimeLineID *tli_p); extern void wal_segment_close(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, -- 2.20.1
From 68cbfa9e553359a57a4806cab8af60b0450f7e5b Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 8 Apr 2020 23:04:51 +1200 Subject: [PATCH v9 3/3] Prefetch referenced blocks during recovery. Introduce a new GUC max_recovery_prefetch_distance. If it is set to a positive number of bytes, then read ahead in the WAL at most that distance, and initiate asynchronous reading of referenced blocks. The goal is to avoid I/O stalls and benefit from concurrent I/O. The number of concurrency asynchronous reads is capped by the existing maintenance_io_concurrency GUC. The feature is enabled by default for now, but we might reconsider that before release. Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Reviewed-by: Andres Freund <and...@anarazel.de> Reviewed-by: Tomas Vondra <tomas.von...@2ndquadrant.com> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- doc/src/sgml/config.sgml | 45 + doc/src/sgml/monitoring.sgml | 85 +- doc/src/sgml/wal.sgml | 13 + src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 16 + src/backend/access/transam/xlogprefetch.c | 910 ++++++++++++++++++ src/backend/catalog/system_views.sql | 14 + src/backend/postmaster/pgstat.c | 96 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/misc/guc.c | 47 +- src/backend/utils/misc/postgresql.conf.sample | 5 + src/include/access/xlogprefetch.h | 85 ++ src/include/catalog/pg_proc.dat | 8 + src/include/pgstat.h | 27 + src/include/utils/guc.h | 4 + src/test/regress/expected/rules.out | 11 + 16 files changed, 1366 insertions(+), 4 deletions(-) create mode 100644 src/backend/access/transam/xlogprefetch.c create mode 100644 src/include/access/xlogprefetch.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a2694e548a..0c9842b0f9 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3121,6 +3121,51 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-max-recovery-prefetch-distance" xreflabel="max_recovery_prefetch_distance"> + <term><varname>max_recovery_prefetch_distance</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_recovery_prefetch_distance</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + The maximum distance to look ahead in the WAL during recovery, to find + blocks to prefetch. Prefetching blocks that will soon be needed can + reduce I/O wait times. The number of concurrent prefetches is limited + by this setting as well as + <xref linkend="guc-maintenance-io-concurrency"/>. Setting it too high + might be counterproductive, if it means that data falls out of the + kernel cache before it is needed. If this value is specified without + units, it is taken as bytes. A setting of -1 disables prefetching + during recovery. + The default is 256kB on systems that support + <function>posix_fadvise</function>, and otherwise -1. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-recovery-prefetch-fpw" xreflabel="recovery_prefetch_fpw"> + <term><varname>recovery_prefetch_fpw</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>recovery_prefetch_fpw</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Whether to prefetch blocks that were logged with full page images, + during recovery. Often this doesn't help, since such blocks will not + be read the first time they are needed and might remain in the buffer + pool after that. However, on file systems with a block size larger + than + <productname>PostgreSQL</productname>'s, prefetching can avoid a + costly read-before-write when a blocks are later written. This + setting has no effect unless + <xref linkend="guc-max-recovery-prefetch-distance"/> is set to a positive + number. The default is off. + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> <sect2 id="runtime-config-wal-archiving"> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 49d4bb13b9..0ab278e087 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -320,6 +320,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_prefetch_recovery</structname><indexterm><primary>pg_stat_prefetch_recovery</primary></indexterm></entry> + <entry>Only one row, showing statistics about blocks prefetched during recovery. + See <xref linkend="pg-stat-prefetch-recovery-view"/> for details. + </entry> + </row> + <row> <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry> <entry>At least one row per subscription, showing information about @@ -2674,6 +2681,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i connected server. </para> + <table id="pg-stat-prefetch-recovery-view" xreflabel="pg_stat_prefetch_recovery"> + <title><structname>pg_stat_prefetch_recovery</structname> View</title> + <tgroup cols="3"> + <thead> + <row> + <entry>Column</entry> + <entry>Type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry><structfield>prefetch</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks prefetched because they were not in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_hit</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were already in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_new</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were new (usually relation extension)</entry> + </row> + <row> + <entry><structfield>skip_fpw</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-recovery-prefetch-fpw"/> was set to <literal>off</literal></entry> + </row> + <row> + <entry><structfield>skip_seq</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because of repeated or sequential access</entry> + </row> + <row> + <entry><structfield>distance</structfield></entry> + <entry><type>integer</type></entry> + <entry>How far ahead of recovery the prefetcher is currently reading, in bytes</entry> + </row> + <row> + <entry><structfield>queue_depth</structfield></entry> + <entry><type>integer</type></entry> + <entry>How many prefetches have been initiated but are not yet known to have completed</entry> + </row> + <row> + <entry><structfield>avg_distance</structfield></entry> + <entry><type>float4</type></entry> + <entry>How far ahead of recovery the prefetcher is on average, while recovery is not idle</entry> + </row> + <row> + <entry><structfield>avg_queue_depth</structfield></entry> + <entry><type>float4</type></entry> + <entry>Average number of prefetches in flight while recovery is not idle</entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + The <structname>pg_stat_prefetch_recovery</structname> view will contain only + one row. It is filled with nulls if recovery is not running or WAL + prefetching is not enabled. See <xref linkend="guc-max-recovery-prefetch-distance"/> + for more information. The counters in this view are reset whenever the + <xref linkend="guc-max-recovery-prefetch-distance"/>, + <xref linkend="guc-recovery-prefetch-fpw"/> or + <xref linkend="guc-maintenance-io-concurrency"/> setting is changed and + the server configuration is reloaded. + </para> + <table id="pg-stat-subscription" xreflabel="pg_stat_subscription"> <title><structname>pg_stat_subscription</structname> View</title> <tgroup cols="1"> @@ -4494,8 +4573,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i argument. The argument can be <literal>bgwriter</literal> to reset all the counters shown in the <structname>pg_stat_bgwriter</structname> - view,or <literal>archiver</literal> to reset all the counters shown in - the <structname>pg_stat_archiver</structname> view. + view, <literal>archiver</literal> to reset all the counters shown in + the <structname>pg_stat_archiver</structname> view, and + <literal>prefetch_recovery</literal> to reset all the counters shown + in the <structname>pg_stat_prefetch_recovery</structname> view. </para> <para> This function is restricted to superusers by default, but other users diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml index bd9fae544c..38fc8149a8 100644 --- a/doc/src/sgml/wal.sgml +++ b/doc/src/sgml/wal.sgml @@ -719,6 +719,19 @@ <acronym>WAL</acronym> call being logged to the server log. This option might be replaced by a more general mechanism in the future. </para> + + <para> + The <xref linkend="guc-max-recovery-prefetch-distance"/> parameter can + be used to improve I/O performance during recovery by instructing + <productname>PostgreSQL</productname> to initiate reads + of disk blocks that will soon be needed, in combination with the + <xref linkend="guc-maintenance-io-concurrency"/> parameter. The + prefetching mechanism is most likely to be effective on systems + with <varname>full_page_writes</varname> set to + <varname>off</varname> (where that is safe), and where the working + set is larger than RAM. By default, prefetching in recovery is enabled, + but it can be disabled by setting the distance to -1. + </para> </sect1> <sect1 id="wal-internals"> diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..39f9d4e77d 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogprefetch.o \ xlogreader.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ca09d81b08..81147d5f59 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -35,6 +35,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" +#include "access/xlogprefetch.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -7169,6 +7170,7 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + XLogPrefetchState prefetch; InRedo = true; @@ -7176,6 +7178,9 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + /* Prepare to prefetch, if configured. */ + XLogPrefetchBegin(&prefetch); + /* * main redo apply loop */ @@ -7205,6 +7210,12 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* Peform WAL prefetching, if enabled. */ + XLogPrefetch(&prefetch, + ThisTimeLineID, + xlogreader->ReadRecPtr, + currentSource == XLOG_FROM_STREAM); + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7376,6 +7387,9 @@ StartupXLOG(void) */ if (switchedTLI && AllowCascadeReplication()) WalSndWakeup(); + + /* Reset the prefetcher. */ + XLogPrefetchReconfigure(); } /* Exit loop if we reached inclusive recovery target */ @@ -7392,6 +7406,7 @@ StartupXLOG(void) /* * end of main redo apply loop */ + XLogPrefetchEnd(&prefetch); if (reachedRecoveryTarget) { @@ -12138,6 +12153,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ currentSource = XLOG_FROM_STREAM; startWalReceiver = true; + XLogPrefetchReconfigure(); break; case XLOG_FROM_STREAM: diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c new file mode 100644 index 0000000000..6d8cff12c6 --- /dev/null +++ b/src/backend/access/transam/xlogprefetch.c @@ -0,0 +1,910 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetch.c + * Prefetching support for recovery. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetch.c + * + * The goal of this module is to read future WAL records and issue + * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O + * stalls in the main recovery loop. Currently, this is achieved by using a + * separate XLogReader to read ahead. In future, we should find a way to + * avoid reading and decoding each record twice. + * + * When examining a WAL record from the future, we need to consider that a + * referenced block or segment file might not exist on disk until this record + * or some earlier record has been replayed. After a crash, a file might also + * be missing because it was dropped by a later WAL record; in that case, it + * will be recreated when this record is replayed. These cases are handled by + * recognizing them and adding a "filter" that prevents all prefetching of a + * certain block range until the present WAL record has been replayed. Blocks + * skipped for these reasons are counted as "skip_new" (that is, cases where we + * didn't try to prefetch "new" blocks). + * + * Blocks found in the buffer pool already are counted as "skip_hit". + * Repeated access to the same buffer is detected and skipped, and this is + * counted with "skip_seq". Blocks that were logged with FPWs are skipped if + * recovery_prefetch_fpw is off, since on most systems there will be no I/O + * stall; this is counted with "skip_fpw". + * + * The only way we currently have to know that an I/O initiated with + * PrefetchSharedBuffer() has completed is to call ReadBuffer(). Therefore, + * we track the number of potentially in-flight I/Os by using a circular + * buffer of LSNs. When it's full, we have to wait for recovery to replay + * records so that the queue depth can be reduced, before we can do any more + * prefetching. Ideally, this keeps us the right distance ahead to respect + * maintenance_io_concurrency. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetch.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/storage_xlog.h" +#include "utils/fmgrprotos.h" +#include "utils/timestamp.h" +#include "funcapi.h" +#include "pgstat.h" +#include "miscadmin.h" +#include "port/atomics.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/smgr.h" +#include "utils/guc.h" +#include "utils/hsearch.h" + +/* + * Sample the queue depth and distance every time we replay this much WAL. + * This is used to compute avg_queue_depth and avg_distance for the log + * message that appears at the end of crash recovery. It's also used to send + * messages periodically to the stats collector, to save the counters on disk. + */ +#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000 + +/* GUCs */ +int max_recovery_prefetch_distance = -1; +bool recovery_prefetch_fpw = false; + +int XLogPrefetchReconfigureCount; + +/* + * A prefetcher object. There is at most one of these in existence at a time, + * recreated whenever there is a configuration change. + */ +struct XLogPrefetcher +{ + /* Reader and current reading state. */ + XLogReaderState *reader; + XLogReadLocalOptions options; + bool have_record; + bool shutdown; + int next_block_id; + + /* Details of last prefetch to skip repeats and seq scans. */ + SMgrRelation last_reln; + RelFileNode last_rnode; + BlockNumber last_blkno; + + /* Online averages. */ + uint64 samples; + double avg_queue_depth; + double avg_distance; + XLogRecPtr next_sample_lsn; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping required to limit concurrent prefetches. */ + int prefetch_head; + int prefetch_tail; + int prefetch_queue_size; + XLogRecPtr prefetch_queue[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that we must assume have already been dropped. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +/* + * Counters exposed in shared memory for pg_stat_prefetch_recovery. + */ +typedef struct XLogPrefetchStats +{ + pg_atomic_uint64 reset_time; /* Time of last reset. */ + pg_atomic_uint64 prefetch; /* Prefetches initiated. */ + pg_atomic_uint64 skip_hit; /* Blocks already buffered. */ + pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ + pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ + pg_atomic_uint64 skip_seq; /* Repeat blocks skipped. */ + float avg_distance; + float avg_queue_depth; + + /* Reset counters */ + pg_atomic_uint32 reset_request; + uint32 reset_handled; + + /* Dynamic values */ + int distance; /* Number of bytes ahead in the WAL. */ + int queue_depth; /* Number of I/Os possibly in progress. */ +} XLogPrefetchStats; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn); +static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher); +static void XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher); +static void XLogPrefetchSaveStats(void); +static void XLogPrefetchRestoreStats(void); + +static XLogPrefetchStats *Stats; + +size_t +XLogPrefetchShmemSize(void) +{ + return sizeof(XLogPrefetchStats); +} + +static void +XLogPrefetchResetStats(void) +{ + pg_atomic_write_u64(&Stats->reset_time, GetCurrentTimestamp()); + pg_atomic_write_u64(&Stats->prefetch, 0); + pg_atomic_write_u64(&Stats->skip_hit, 0); + pg_atomic_write_u64(&Stats->skip_new, 0); + pg_atomic_write_u64(&Stats->skip_fpw, 0); + pg_atomic_write_u64(&Stats->skip_seq, 0); + Stats->avg_distance = 0; + Stats->avg_queue_depth = 0; +} + +void +XLogPrefetchShmemInit(void) +{ + bool found; + + Stats = (XLogPrefetchStats *) + ShmemInitStruct("XLogPrefetchStats", + sizeof(XLogPrefetchStats), + &found); + if (!found) + { + pg_atomic_init_u32(&Stats->reset_request, 0); + Stats->reset_handled = 0; + pg_atomic_init_u64(&Stats->reset_time, GetCurrentTimestamp()); + pg_atomic_init_u64(&Stats->prefetch, 0); + pg_atomic_init_u64(&Stats->skip_hit, 0); + pg_atomic_init_u64(&Stats->skip_new, 0); + pg_atomic_init_u64(&Stats->skip_fpw, 0); + pg_atomic_init_u64(&Stats->skip_seq, 0); + Stats->avg_distance = 0; + Stats->avg_queue_depth = 0; + Stats->distance = 0; + Stats->queue_depth = 0; + } +} + +/* + * Called when any GUC is changed that affects prefetching. + */ +void +XLogPrefetchReconfigure(void) +{ + XLogPrefetchReconfigureCount++; +} + +/* + * Called by any backend to request that the stats be reset. + */ +void +XLogPrefetchRequestResetStats(void) +{ + pg_atomic_fetch_add_u32(&Stats->reset_request, 1); +} + +/* + * Tell the stats collector to serialize the shared memory counters into the + * stats file. + */ +static void +XLogPrefetchSaveStats(void) +{ + PgStat_RecoveryPrefetchStats serialized = { + .prefetch = pg_atomic_read_u64(&Stats->prefetch), + .skip_hit = pg_atomic_read_u64(&Stats->skip_hit), + .skip_new = pg_atomic_read_u64(&Stats->skip_new), + .skip_fpw = pg_atomic_read_u64(&Stats->skip_fpw), + .skip_seq = pg_atomic_read_u64(&Stats->skip_seq), + .stat_reset_timestamp = pg_atomic_read_u64(&Stats->reset_time) + }; + + pgstat_send_recoveryprefetch(&serialized); +} + +/* + * Try to restore the shared memory counters from the stats file. + */ +static void +XLogPrefetchRestoreStats(void) +{ + PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch(); + + if (serialized->stat_reset_timestamp != 0) + { + pg_atomic_write_u64(&Stats->prefetch, serialized->prefetch); + pg_atomic_write_u64(&Stats->skip_hit, serialized->skip_hit); + pg_atomic_write_u64(&Stats->skip_new, serialized->skip_new); + pg_atomic_write_u64(&Stats->skip_fpw, serialized->skip_fpw); + pg_atomic_write_u64(&Stats->skip_seq, serialized->skip_seq); + pg_atomic_write_u64(&Stats->reset_time, serialized->stat_reset_timestamp); + } +} + +/* + * Initialize an XLogPrefetchState object and restore the last saved + * statistics from disk. + */ +void +XLogPrefetchBegin(XLogPrefetchState *state) +{ + XLogPrefetchRestoreStats(); + + /* We'll reconfigure on the first call to XLogPrefetch(). */ + state->prefetcher = NULL; + state->reconfigure_count = XLogPrefetchReconfigureCount - 1; +} + +/* + * Shut down the prefetching infrastructure, if configured. + */ +void +XLogPrefetchEnd(XLogPrefetchState *state) +{ + XLogPrefetchSaveStats(); + + if (state->prefetcher) + XLogPrefetcherFree(state->prefetcher); + state->prefetcher = NULL; + + Stats->queue_depth = 0; + Stats->distance = 0; +} + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL that is ahead of the given lsn. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(TimeLineID tli, XLogRecPtr lsn, bool streaming) +{ + XLogPrefetcher *prefetcher; + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + XLogReaderRoutine reader_routines = { + .page_read = read_local_xlog_page, + .segment_open = wal_segment_try_open, + .segment_close = wal_segment_close + }; + + /* + * The size of the queue is based on the maintenance_io_concurrency + * setting. In theory we might have a separate queue for each tablespace, + * but it's not clear how that should work, so for now we'll just use the + * general GUC to rate-limit all prefetching. We add one to the size + * because our circular buffer has a gap between head and tail when full. + */ + prefetcher = palloc0(offsetof(XLogPrefetcher, prefetch_queue) + + sizeof(XLogRecPtr) * (maintenance_io_concurrency + 1)); + prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1; + prefetcher->options.tli = tli; + prefetcher->options.nowait = true; + if (streaming) + { + /* + * We're only allowed to read as far as the WAL receiver has written. + * We don't have to wait for it to be flushed, though, as recovery + * does, so that gives us a chance to get a bit further ahead. + */ + prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN; + } + else + { + /* Read as far as we can. */ + prefetcher->options.read_upto_policy = XLRO_END; + } + reader_routines.page_read_private = &prefetcher->options; + prefetcher->reader = XLogReaderAllocate(wal_segment_size, + NULL, + &reader_routines, + NULL); + prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + /* Prepare to read at the given LSN. */ + ereport(LOG, + (errmsg("recovery started prefetching on timeline %u at %X/%X", + tli, + (uint32) (lsn << 32), (uint32) lsn))); + XLogBeginRead(prefetcher->reader, lsn); + + Stats->queue_depth = 0; + Stats->distance = 0; + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + /* Log final statistics. */ + ereport(LOG, + (errmsg("recovery finished prefetching at %X/%X; " + "prefetch = " UINT64_FORMAT ", " + "skip_hit = " UINT64_FORMAT ", " + "skip_new = " UINT64_FORMAT ", " + "skip_fpw = " UINT64_FORMAT ", " + "skip_seq = " UINT64_FORMAT ", " + "avg_distance = %f, " + "avg_queue_depth = %f", + (uint32) (prefetcher->reader->EndRecPtr << 32), + (uint32) (prefetcher->reader->EndRecPtr), + pg_atomic_read_u64(&Stats->prefetch), + pg_atomic_read_u64(&Stats->skip_hit), + pg_atomic_read_u64(&Stats->skip_new), + pg_atomic_read_u64(&Stats->skip_fpw), + pg_atomic_read_u64(&Stats->skip_seq), + Stats->avg_distance, + Stats->avg_queue_depth))); + XLogReaderFree(prefetcher->reader); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher); +} + +/* + * Called when recovery is replaying a new LSN, to check if we can read ahead. + */ +void +XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + uint32 reset_request; + + /* If an error has occurred or we've hit the end of the WAL, do nothing. */ + if (prefetcher->shutdown) + return; + + /* + * Have any in-flight prefetches definitely completed, judging by the LSN + * that is currently being replayed? + */ + XLogPrefetcherCompletedIO(prefetcher, replaying_lsn); + + /* + * Do we already have the maximum permitted number of I/Os running + * (according to the information we have)? If so, we have to wait for at + * least one to complete, so give up early and let recovery catch up. + */ + if (XLogPrefetcherSaturated(prefetcher)) + return; + + /* + * Can we drop any filters yet? This happens when the LSN that is + * currently being replayed has moved past a record that prevents + * pretching of a block range, such as relation extension. + */ + XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn); + + /* + * Have we been asked to reset our stats counters? This is checked with + * an unsynchronized memory read, but we'll see it eventually and we'll be + * accessing that cache line anyway. + */ + reset_request = pg_atomic_read_u32(&Stats->reset_request); + if (reset_request != Stats->reset_handled) + { + XLogPrefetchResetStats(); + Stats->reset_handled = reset_request; + prefetcher->avg_distance = 0; + prefetcher->avg_queue_depth = 0; + prefetcher->samples = 0; + } + + /* OK, we can now try reading ahead. */ + XLogPrefetcherScanRecords(prefetcher, replaying_lsn); +} + +/* + * Read ahead as far as we are allowed to, considering the LSN that recovery + * is currently replaying. + */ +static void +XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + XLogReaderState *reader = prefetcher->reader; + + Assert(!XLogPrefetcherSaturated(prefetcher)); + + for (;;) + { + char *error; + int64 distance; + + /* If we don't already have a record, then try to read one. */ + if (!prefetcher->have_record) + { + if (!XLogReadRecord(reader, &error)) + { + /* If we got an error, log it and give up. */ + if (error) + { + ereport(LOG, (errmsg("recovery no longer prefetching: %s", error))); + prefetcher->shutdown = true; + Stats->queue_depth = 0; + Stats->distance = 0; + } + /* Otherwise, we'll try again later when more data is here. */ + return; + } + prefetcher->have_record = true; + prefetcher->next_block_id = 0; + } + + /* How far ahead of replay are we now? */ + distance = prefetcher->reader->ReadRecPtr - replaying_lsn; + + /* Update distance shown in shm. */ + Stats->distance = distance; + + /* Periodically recompute some statistics. */ + if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn)) + { + /* Compute online averages. */ + prefetcher->samples++; + if (prefetcher->samples == 1) + { + prefetcher->avg_distance = Stats->distance; + prefetcher->avg_queue_depth = Stats->queue_depth; + } + else + { + prefetcher->avg_distance += + (Stats->distance - prefetcher->avg_distance) / + prefetcher->samples; + prefetcher->avg_queue_depth += + (Stats->queue_depth - prefetcher->avg_queue_depth) / + prefetcher->samples; + } + + /* Expose it in shared memory. */ + Stats->avg_distance = prefetcher->avg_distance; + Stats->avg_queue_depth = prefetcher->avg_queue_depth; + + /* Also periodically save the simple counters. */ + XLogPrefetchSaveStats(); + + prefetcher->next_sample_lsn = + replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE; + } + + /* Are we too far ahead of replay? */ + if (distance >= max_recovery_prefetch_distance) + break; + + /* Are we not far enough ahead? */ + if (distance <= 0) + { + prefetcher->have_record = false; /* skip this record */ + continue; + } + + /* + * If this is a record that creates a new SMGR relation, we'll avoid + * prefetching anything from that rnode until it has been replayed. + */ + if (replaying_lsn < reader->ReadRecPtr && + XLogRecGetRmid(reader) == RM_SMGR_ID && + (XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader); + + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, + reader->ReadRecPtr); + } + + /* Scan the record's block references. */ + if (!XLogPrefetcherScanBlocks(prefetcher)) + return; + + /* Advance to the next record. */ + prefetcher->have_record = false; + } +} + +/* + * Scan the current record for block references, and consider prefetching. + * + * Return true if we processed the current record to completion and still have + * queue space to process a new record, and false if we saturated the I/O + * queue and need to wait for recovery to advance before we continue. + */ +static bool +XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher) +{ + XLogReaderState *reader = prefetcher->reader; + + Assert(!XLogPrefetcherSaturated(prefetcher)); + + /* + * We might already have been partway through processing this record when + * our queue became saturated, so we need to start where we left off. + */ + for (int block_id = prefetcher->next_block_id; + block_id <= reader->max_block_id; + ++block_id) + { + PrefetchBufferResult prefetch; + DecodedBkpBlock *block = &reader->blocks[block_id]; + SMgrRelation reln; + + /* Ignore everything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + continue; + + /* + * If there is a full page image attached, we won't be reading the + * page, so you might think we should skip it. However, if the + * underlying filesystem uses larger logical blocks than us, it + * might still need to perform a read-before-write some time later. + * Therefore, only prefetch if configured to do so. + */ + if (block->has_image && !recovery_prefetch_fpw) + { + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_fpw, 1); + continue; + } + + /* + * If this block will initialize a new page then it's probably an + * extension. Since it might create a new segment, we can't try + * to prefetch this block until the record has been replayed, or we + * might try to open a file that doesn't exist yet. + */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + reader->ReadRecPtr); + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1); + continue; + } + + /* Should we skip this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno)) + { + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1); + continue; + } + + /* Fast path for repeated references to the same relation. */ + if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode)) + { + /* + * If this is a repeat access to the same block, then skip it. + * + * XXX We could also check for last_blkno + 1 too, and also update + * last_blkno; it's not clear if the kernel would do a better job + * of sequential prefetching. + */ + if (block->blkno == prefetcher->last_blkno) + { + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_seq, 1); + continue; + } + + /* We can avoid calling smgropen(). */ + reln = prefetcher->last_reln; + } + else + { + /* Otherwise we have to open it. */ + reln = smgropen(block->rnode, InvalidBackendId); + prefetcher->last_rnode = block->rnode; + prefetcher->last_reln = reln; + } + prefetcher->last_blkno = block->blkno; + + /* Try to prefetch this block! */ + prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno); + if (BufferIsValid(prefetch.recent_buffer)) + { + /* + * It was already cached, so do nothing. Perhaps in future we + * could remember the buffer so that recovery doesn't have to look + * it up again. + */ + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_hit, 1); + } + else if (prefetch.initiated_io) + { + /* + * I/O has possibly been initiated (though we don't know if it + * was already cached by the kernel, so we just have to assume + * that it has due to lack of better information). Record + * this as an I/O in progress until eventually we replay this + * LSN. + */ + pg_atomic_unlocked_add_fetch_u64(&Stats->prefetch, 1); + XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr); + /* + * If the queue is now full, we'll have to wait before processing + * any more blocks from this record, or move to a new record if + * that was the last block. + */ + if (XLogPrefetcherSaturated(prefetcher)) + { + prefetcher->next_block_id = block_id + 1; + return false; + } + } + else + { + /* + * Neither cached nor initiated. The underlying segment file + * doesn't exist. Presumably it will be unlinked by a later WAL + * record. When recovery reads this block, it will use the + * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to do + * that sort of thing while merely prefetching, so let's just + * ignore references to this relation until this record is + * replayed, and let recovery create the dummy file or complain if + * something is wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + reader->ReadRecPtr); + pg_atomic_unlocked_add_fetch_u64(&Stats->skip_new, 1); + } + } + + return true; +} + +/* + * Expose statistics about recovery prefetching. + */ +Datum +pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mod required, but it is not allowed in this context"))); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + if (pg_atomic_read_u32(&Stats->reset_request) != Stats->reset_handled) + { + /* There's an unhandled reset request, so just show NULLs */ + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = true; + } + else + { + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = false; + } + + values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&Stats->reset_time)); + values[1] = Int64GetDatum(pg_atomic_read_u64(&Stats->prefetch)); + values[2] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_hit)); + values[3] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_new)); + values[4] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_fpw)); + values[5] = Int64GetDatum(pg_atomic_read_u64(&Stats->skip_seq)); + values[6] = Int32GetDatum(Stats->distance); + values[7] = Int32GetDatum(Stats->queue_depth); + values[8] = Float4GetDatum(Stats->avg_distance); + values[9] = Float4GetDatum(Stats->avg_queue_depth); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the (presumably lower) block + * number there because we don't want to have to track individual + * blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } +} + +/* + * Have we replayed the records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can drop relevant filters. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of the + * time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode, + HASH_FIND, NULL); + + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * Insert an LSN into the queue. The queue must not be full already. This + * tracks the fact that we have (to the best of our knowledge) initiated an + * I/O, so that we can impose a cap on concurrent prefetching. + */ +static inline void +XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn) +{ + Assert(!XLogPrefetcherSaturated(prefetcher)); + prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn; + prefetcher->prefetch_head %= prefetcher->prefetch_queue_size; + Stats->queue_depth++; + Assert(Stats->queue_depth <= prefetcher->prefetch_queue_size); +} + +/* + * Have we replayed the records that caused us to initiate the oldest + * prefetches yet? That means that they're definitely finished, so we can can + * forget about them and allow ourselves to initiate more prefetches. For now + * we don't have any awareness of when I/O really completes. + */ +static inline void +XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (prefetcher->prefetch_head != prefetcher->prefetch_tail && + prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn) + { + prefetcher->prefetch_tail++; + prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size; + Stats->queue_depth--; + Assert(Stats->queue_depth >= 0); + } +} + +/* + * Check if the maximum allowed number of I/Os is already in flight. + */ +static inline bool +XLogPrefetcherSaturated(XLogPrefetcher *prefetcher) +{ + return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size == + prefetcher->prefetch_tail; +} + +void +assign_max_recovery_prefetch_distance(int new_value, void *extra) +{ + /* Reconfigure prefetching, because a setting it depends on changed. */ + max_recovery_prefetch_distance = new_value; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +} + +void +assign_recovery_prefetch_fpw(bool new_value, void *extra) +{ + /* Reconfigure prefetching, because a setting it depends on changed. */ + recovery_prefetch_fpw = new_value; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 56420bbc9d..6c39b9ad48 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -826,6 +826,20 @@ CREATE VIEW pg_stat_wal_receiver AS FROM pg_stat_get_wal_receiver() s WHERE s.pid IS NOT NULL; +CREATE VIEW pg_stat_prefetch_recovery AS + SELECT + s.stats_reset, + s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth, + s.avg_distance, + s.avg_queue_depth + FROM pg_stat_get_prefetch_recovery() s; + CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index d7f99d9944..5ac3fed4c6 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -38,6 +38,7 @@ #include "access/transam.h" #include "access/twophase_rmgr.h" #include "access/xact.h" +#include "access/xlogprefetch.h" #include "catalog/pg_database.h" #include "catalog/pg_proc.h" #include "common/ip.h" @@ -282,6 +283,7 @@ static int localNumBackends = 0; static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; +static PgStat_RecoveryPrefetchStats recoveryPrefetchStats; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -354,6 +356,7 @@ static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len); static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len); static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len); +static void pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len); static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len); static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len); static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len); @@ -1370,11 +1373,20 @@ pgstat_reset_shared_counters(const char *target) msg.m_resettarget = RESET_ARCHIVER; else if (strcmp(target, "bgwriter") == 0) msg.m_resettarget = RESET_BGWRITER; + else if (strcmp(target, "prefetch_recovery") == 0) + { + /* + * We can't ask the stats collector to do this for us as it is not + * attached to shared memory. + */ + XLogPrefetchRequestResetStats(); + return; + } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized reset target: \"%s\"", target), - errhint("Target must be \"archiver\" or \"bgwriter\"."))); + errhint("Target must be \"archiver\", \"bgwriter\" or \"prefetch_recovery\"."))); pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER); pgstat_send(&msg, sizeof(msg)); @@ -2696,6 +2708,22 @@ pgstat_fetch_slru(void) } +/* + * --------- + * pgstat_fetch_recoveryprefetch() - + * + * Support function for restoring the counters managed by xlogprefetch.c. + * --------- + */ +PgStat_RecoveryPrefetchStats * +pgstat_fetch_recoveryprefetch(void) +{ + backend_read_statsfile(); + + return &recoveryPrefetchStats; +} + + /* ------------------------------------------------------------ * Functions for management of the shared-memory PgBackendStatus array * ------------------------------------------------------------ @@ -4444,6 +4472,23 @@ pgstat_send_slru(void) } +/* ---------- + * pgstat_send_recoveryprefetch() - + * + * Send recovery prefetch statistics to the collector + * ---------- + */ +void +pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats) +{ + PgStat_MsgRecoveryPrefetch msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYPREFETCH); + msg.m_stats = *stats; + pgstat_send(&msg, sizeof(msg)); +} + + /* ---------- * PgstatCollectorMain() - * @@ -4640,6 +4685,10 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_slru(&msg.msg_slru, len); break; + case PGSTAT_MTYPE_RECOVERYPREFETCH: + pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len); + break; + case PGSTAT_MTYPE_FUNCSTAT: pgstat_recv_funcstat(&msg.msg_funcstat, len); break; @@ -4915,6 +4964,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) rc = fwrite(slruStats, sizeof(slruStats), 1, fpout); (void) rc; /* we'll check for error with ferror */ + /* + * Write recovery prefetch stats struct + */ + rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1, + fpout); + (void) rc; /* we'll check for error with ferror */ + /* * Walk through the database table. */ @@ -5174,6 +5230,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memset(&globalStats, 0, sizeof(globalStats)); memset(&archiverStats, 0, sizeof(archiverStats)); memset(&slruStats, 0, sizeof(slruStats)); + memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats)); /* * Set the current timestamp (will be kept only in case we can't load an @@ -5261,6 +5318,18 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) goto done; } + /* + * Read recoveryPrefetchStats struct + */ + if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats), + fpin) != sizeof(recoveryPrefetchStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", statfile))); + memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats)); + goto done; + } + /* * We found an existing collector stats file. Read it and put all the * hashtable entries into place. @@ -5560,6 +5629,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_GlobalStats myGlobalStats; PgStat_ArchiverStats myArchiverStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; + PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -5625,6 +5695,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, return false; } + /* + * Read recovery prefetch stats struct + */ + if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats), + fpin) != sizeof(myRecoveryPrefetchStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", statfile))); + FreeFile(fpin); + return false; + } + /* By default, we're going to return the timestamp of the global file. */ *ts = myGlobalStats.stats_timestamp; @@ -6422,6 +6504,18 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len) slruStats[msg->m_index].truncate += msg->m_truncate; } +/* ---------- + * pgstat_recv_recoveryprefetch() - + * + * Process a recovery prefetch message. + * ---------- + */ +static void +pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len) +{ + recoveryPrefetchStats = msg->m_stats; +} + /* ---------- * pgstat_recv_recoveryconflict() - * diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 427b0d59cd..221081bddc 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -21,6 +21,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/xlogprefetch.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -124,6 +125,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, PredicateLockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); + size = add_size(size, XLogPrefetchShmemSize()); size = add_size(size, CLOGShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); @@ -212,6 +214,7 @@ CreateSharedMemoryAndSemaphores(void) * Set up xlog, clog, and buffers */ XLOGShmemInit(); + XLogPrefetchShmemInit(); CLOGShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 2f3e0a70e0..2fea5f3dcd 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -34,6 +34,7 @@ #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogprefetch.h" #include "catalog/namespace.h" #include "catalog/pg_authid.h" #include "catalog/storage.h" @@ -198,6 +199,7 @@ static bool check_max_wal_senders(int *newval, void **extra, GucSource source); static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source); static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source); static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source); +static void assign_maintenance_io_concurrency(int newval, void *extra); static void assign_pgstat_temp_directory(const char *newval, void *extra); static bool check_application_name(char **newval, void **extra, GucSource source); static void assign_application_name(const char *newval, void *extra); @@ -1272,6 +1274,18 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"recovery_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Prefetch blocks that have full page images in the WAL"), + gettext_noop("On some systems, there is no benefit to prefetching pages that will be " + "entirely overwritten, but if the logical page size of the filesystem is " + "larger than PostgreSQL's, this can be beneficial. This option has no " + "effect unless max_recovery_prefetch_distance is set to a positive number.") + }, + &recovery_prefetch_fpw, + false, + NULL, assign_recovery_prefetch_fpw, NULL + }, { {"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS, @@ -2649,6 +2663,22 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_recovery_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("Maximum number of bytes to read ahead in the WAL to prefetch referenced blocks."), + gettext_noop("Set to -1 to disable prefetching during recovery."), + GUC_UNIT_BYTE + }, + &max_recovery_prefetch_distance, +#ifdef USE_PREFETCH + 256 * 1024, +#else + -1, +#endif + -1, INT_MAX, + NULL, assign_max_recovery_prefetch_distance, NULL + }, + { {"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the number of WAL files held for standby servers."), @@ -2968,7 +2998,8 @@ static struct config_int ConfigureNamesInt[] = 0, #endif 0, MAX_IO_CONCURRENCY, - check_maintenance_io_concurrency, NULL, NULL + check_maintenance_io_concurrency, assign_maintenance_io_concurrency, + NULL }, { @@ -11586,6 +11617,20 @@ check_maintenance_io_concurrency(int *newval, void **extra, GucSource source) return true; } +static void +assign_maintenance_io_concurrency(int newval, void *extra) +{ +#ifdef USE_PREFETCH + /* + * Reconfigure recovery prefetching, because a setting it depends on + * changed. + */ + maintenance_io_concurrency = newval; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +#endif +} + static void assign_pgstat_temp_directory(const char *newval, void *extra) { diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 81055edde7..38763f88b0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -230,6 +230,11 @@ #checkpoint_flush_after = 0 # measured in pages, 0 disables #checkpoint_warning = 30s # 0 disables +# - Prefetching during recovery - + +#max_recovery_prefetch_distance = 256kB # -1 disables prefetching +#recovery_prefetch_fpw = off # whether to prefetch pages logged with FPW + # - Archiving - #archive_mode = off # enables archiving; off, on, or always diff --git a/src/include/access/xlogprefetch.h b/src/include/access/xlogprefetch.h new file mode 100644 index 0000000000..d8e2e1ca50 --- /dev/null +++ b/src/include/access/xlogprefetch.h @@ -0,0 +1,85 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetch.h + * Declarations for the recovery prefetching module. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/access/xlogprefetch.h + *------------------------------------------------------------------------- + */ +#ifndef XLOGPREFETCH_H +#define XLOGPREFETCH_H + +#include "access/xlogdefs.h" + +/* GUCs */ +extern int max_recovery_prefetch_distance; +extern bool recovery_prefetch_fpw; + +struct XLogPrefetcher; +typedef struct XLogPrefetcher XLogPrefetcher; + +extern int XLogPrefetchReconfigureCount; + +typedef struct XLogPrefetchState +{ + XLogPrefetcher *prefetcher; + int reconfigure_count; +} XLogPrefetchState; + +extern size_t XLogPrefetchShmemSize(void); +extern void XLogPrefetchShmemInit(void); + +extern void XLogPrefetchReconfigure(void); +extern void XLogPrefetchRequestResetStats(void); + +extern void XLogPrefetchBegin(XLogPrefetchState *state); +extern void XLogPrefetchEnd(XLogPrefetchState *state); + +/* Functions exposed only for the use of XLogPrefetch(). */ +extern XLogPrefetcher *XLogPrefetcherAllocate(TimeLineID tli, + XLogRecPtr lsn, + bool streaming); +extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher); +extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch, + XLogRecPtr replaying_lsn); + +/* + * Tell the prefetching module that we are now replaying a given LSN, so that + * it can decide how far ahead to read in the WAL, if configured. + */ +static inline void +XLogPrefetch(XLogPrefetchState *state, + TimeLineID replaying_tli, + XLogRecPtr replaying_lsn, + bool from_stream) +{ + /* + * Handle any configuration changes. Rather than trying to deal with + * various parameter changes, we just tear down and set up a new + * prefetcher if anything we depend on changes. + */ + if (unlikely(state->reconfigure_count != XLogPrefetchReconfigureCount)) + { + /* If we had a prefetcher, tear it down. */ + if (state->prefetcher) + { + XLogPrefetcherFree(state->prefetcher); + state->prefetcher = NULL; + } + /* If we want a prefetcher, set it up. */ + if (max_recovery_prefetch_distance > 0) + state->prefetcher = XLogPrefetcherAllocate(replaying_tli, + replaying_lsn, + from_stream); + state->reconfigure_count = XLogPrefetchReconfigureCount; + } + + if (state->prefetcher) + XLogPrefetcherReadAhead(state->prefetcher, replaying_lsn); +} + +#endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 61f2c2f5b4..56b48bf2ad 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6136,6 +6136,14 @@ prorettype => 'bool', proargtypes => '', prosrc => 'pg_is_wal_replay_paused' }, +{ oid => '9085', descr => 'statistics: information about WAL prefetching', + proname => 'pg_stat_get_prefetch_recovery', prorows => '1', provolatile => 'v', + proretset => 't', prorettype => 'record', proargtypes => '', + proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int4,int4,float4,float4}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o}', + proargnames => '{stats_reset,prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth,avg_distance,avg_queue_depth}', + prosrc => 'pg_stat_get_prefetch_recovery' }, + { oid => '2621', descr => 'reload configuration files', proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool', proargtypes => '', prosrc => 'pg_reload_conf' }, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c55dc1481c..0dcd3c377a 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -62,6 +62,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_ARCHIVER, PGSTAT_MTYPE_BGWRITER, PGSTAT_MTYPE_SLRU, + PGSTAT_MTYPE_RECOVERYPREFETCH, PGSTAT_MTYPE_FUNCSTAT, PGSTAT_MTYPE_FUNCPURGE, PGSTAT_MTYPE_RECOVERYCONFLICT, @@ -182,6 +183,19 @@ typedef struct PgStat_TableXactStatus struct PgStat_TableXactStatus *next; /* next of same subxact */ } PgStat_TableXactStatus; +/* + * Recovery prefetching statistics persisted on disk by pgstat.c, but kept in + * shared memory by xlogprefetch.c. + */ +typedef struct PgStat_RecoveryPrefetchStats +{ + PgStat_Counter prefetch; + PgStat_Counter skip_hit; + PgStat_Counter skip_new; + PgStat_Counter skip_fpw; + PgStat_Counter skip_seq; + TimestampTz stat_reset_timestamp; +} PgStat_RecoveryPrefetchStats; /* ------------------------------------------------------------ * Message formats follow @@ -453,6 +467,16 @@ typedef struct PgStat_MsgSLRU PgStat_Counter m_truncate; } PgStat_MsgSLRU; +/* ---------- + * PgStat_MsgRecoveryPrefetch Sent by XLogPrefetch to save statistics. + * ---------- + */ +typedef struct PgStat_MsgRecoveryPrefetch +{ + PgStat_MsgHdr m_hdr; + PgStat_RecoveryPrefetchStats m_stats; +} PgStat_MsgRecoveryPrefetch; + /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * ---------- @@ -597,6 +621,7 @@ typedef union PgStat_Msg PgStat_MsgArchiver msg_archiver; PgStat_MsgBgWriter msg_bgwriter; PgStat_MsgSLRU msg_slru; + PgStat_MsgRecoveryPrefetch msg_recoveryprefetch; PgStat_MsgFuncstat msg_funcstat; PgStat_MsgFuncpurge msg_funcpurge; PgStat_MsgRecoveryConflict msg_recoveryconflict; @@ -1458,6 +1483,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info, extern void pgstat_send_archiver(const char *xlog, bool failed); extern void pgstat_send_bgwriter(void); +extern void pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats); /* ---------- * Support functions for the SQL-callable functions to @@ -1473,6 +1499,7 @@ extern int pgstat_fetch_stat_numbackends(void); extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); +extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void); extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx); diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 2819282181..976cf8b116 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -440,4 +440,8 @@ extern void assign_search_path(const char *newval, void *extra); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +/* in access/transam/xlogprefetch.c */ +extern void assign_max_recovery_prefetch_distance(int new_value, void *extra); +extern void assign_recovery_prefetch_fpw(bool new_value, void *extra); + #endif /* GUC_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index b813e32215..74dd8c604c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1857,6 +1857,17 @@ pg_stat_gssapi| SELECT s.pid, s.gss_enc AS encrypted FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) WHERE (s.client_port IS NOT NULL); +pg_stat_prefetch_recovery| SELECT s.stats_reset, + s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth, + s.avg_distance, + s.avg_queue_depth + FROM pg_stat_get_prefetch_recovery() s(stats_reset, prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth, avg_distance, avg_queue_depth); pg_stat_progress_analyze| SELECT s.pid, s.datid, d.datname, -- 2.20.1