On Wed, Feb 12, 2020 at 7:52 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > 1. It now uses effective_io_concurrency to control how many > concurrent prefetches to allow. It's possible that we should have a > different GUC to control "maintenance" users of concurrency I/O as > discussed elsewhere[1], but I'm staying out of that for now; if we > agree to do that for VACUUM etc, we can change it easily here. Note > that the value is percolated through the ComputeIoConcurrency() > function which I think we should discuss, but again that's off topic, > I just want to use the standard infrastructure here.
I started a separate thread[1] to discuss that GUC, because it's basically an independent question. Meanwhile, here's a new version of the WAL prefetch patch, with the following changes: 1. A monitoring view: postgres=# select * from pg_stat_wal_prefetcher ; prefetch | skip_hit | skip_new | skip_fpw | skip_seq | distance | queue_depth ----------+----------+----------+----------+----------+----------+------------- 95854 | 291458 | 435 | 0 | 26245 | 261800 | 10 (1 row) That shows a bunch of counters for blocks prefetched and skipped for various reasons. It also shows the current read-ahead distance (in bytes of WAL) and queue depth (an approximation of how many I/Os might be in flight, used for rate limiting; I'm struggling to come up with a better short name for this). This can be used to see the effects of experiments with different settings, eg: alter system set effective_io_concurrency = 20; alter system set wal_prefetch_distance = '256kB'; select pg_reload_conf(); 2. A log message when WAL prefetching begins and ends, so you can see what it did during crash recovery: LOG: WAL prefetch finished at 0/C5E98758; prefetch = 1112628, skip_hit = 3607540, skip_new = 45592, skip_fpw = 0, skip_seq = 177049, avg_distance = 247907.942532, avg_queue_depth = 22.261352 3. A bit of general user documentation. [1] https://www.postgresql.org/message-id/flat/CA%2BhUKGJUw08dPs_3EUcdO6M90GnjofPYrWp4YSLaBkgYwS-AqA%40mail.gmail.com
From a61b4e00c42ace5db1608e02165f89094bf86391 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 3 Dec 2019 17:13:40 +1300 Subject: [PATCH 1/5] Allow PrefetchBuffer() to be called with a SMgrRelation. Previously a Relation was required, but it's annoying to have to create a "fake" one in recovery. --- src/backend/storage/buffer/bufmgr.c | 77 ++++++++++++++++------------- src/include/storage/bufmgr.h | 3 ++ 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 5880054245..6e0875022c 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -519,6 +519,48 @@ ComputeIoConcurrency(int io_concurrency, double *target) return (new_prefetch_pages >= 0.0 && new_prefetch_pages < (double) INT_MAX); } +void +SharedPrefetchBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum) +{ +#ifdef USE_PREFETCH + BufferTag newTag; /* identity of requested block */ + uint32 newHash; /* hash value for newTag */ + LWLock *newPartitionLock; /* buffer partition lock for it */ + int buf_id; + + Assert(BlockNumberIsValid(blockNum)); + + /* create a tag so we can lookup the buffer */ + INIT_BUFFERTAG(newTag, smgr_reln->smgr_rnode.node, + forkNum, blockNum); + + /* determine its hash code and partition lock ID */ + newHash = BufTableHashCode(&newTag); + newPartitionLock = BufMappingPartitionLock(newHash); + + /* see if the block is in the buffer pool already */ + LWLockAcquire(newPartitionLock, LW_SHARED); + buf_id = BufTableLookup(&newTag, newHash); + LWLockRelease(newPartitionLock); + + /* If not in buffers, initiate prefetch */ + if (buf_id < 0) + smgrprefetch(smgr_reln, forkNum, blockNum); + + /* + * If the block *is* in buffers, we do nothing. This is not really ideal: + * the block might be just about to be evicted, which would be stupid + * since we know we are going to need it soon. But the only easy answer + * is to bump the usage_count, which does not seem like a great solution: + * when the caller does ultimately touch the block, usage_count would get + * bumped again, resulting in too much favoritism for blocks that are + * involved in a prefetch sequence. A real fix would involve some + * additional per-buffer state, and it's not clear that there's enough of + * a problem to justify that. + */ +#endif +} + /* * PrefetchBuffer -- initiate asynchronous read of a block of a relation * @@ -550,39 +592,8 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) } else { - BufferTag newTag; /* identity of requested block */ - uint32 newHash; /* hash value for newTag */ - LWLock *newPartitionLock; /* buffer partition lock for it */ - int buf_id; - - /* create a tag so we can lookup the buffer */ - INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node, - forkNum, blockNum); - - /* determine its hash code and partition lock ID */ - newHash = BufTableHashCode(&newTag); - newPartitionLock = BufMappingPartitionLock(newHash); - - /* see if the block is in the buffer pool already */ - LWLockAcquire(newPartitionLock, LW_SHARED); - buf_id = BufTableLookup(&newTag, newHash); - LWLockRelease(newPartitionLock); - - /* If not in buffers, initiate prefetch */ - if (buf_id < 0) - smgrprefetch(reln->rd_smgr, forkNum, blockNum); - - /* - * If the block *is* in buffers, we do nothing. This is not really - * ideal: the block might be just about to be evicted, which would be - * stupid since we know we are going to need it soon. But the only - * easy answer is to bump the usage_count, which does not seem like a - * great solution: when the caller does ultimately touch the block, - * usage_count would get bumped again, resulting in too much - * favoritism for blocks that are involved in a prefetch sequence. A - * real fix would involve some additional per-buffer state, and it's - * not clear that there's enough of a problem to justify that. - */ + /* pass it to the shared buffer version */ + SharedPrefetchBuffer(reln->rd_smgr, forkNum, blockNum); } #endif /* USE_PREFETCH */ } diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 73c7e9ba38..89a47afec1 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -18,6 +18,7 @@ #include "storage/buf.h" #include "storage/bufpage.h" #include "storage/relfilenode.h" +#include "storage/smgr.h" #include "utils/relcache.h" #include "utils/snapmgr.h" @@ -162,6 +163,8 @@ extern PGDLLIMPORT int32 *LocalRefCount; * prototypes for functions in bufmgr.c */ extern bool ComputeIoConcurrency(int io_concurrency, double *target); +extern void SharedPrefetchBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, + BlockNumber blockNum); extern void PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum); extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum); -- 2.20.1
From acbff1444d0acce71b0218ce083df03992af1581 Mon Sep 17 00:00:00 2001 From: Thomas Munro <tmu...@postgresql.org> Date: Mon, 9 Dec 2019 17:10:17 +1300 Subject: [PATCH 2/5] Rename GetWalRcvWriteRecPtr() to GetWalRcvFlushRecPtr(). The new name better reflects the fact that the value it returns is updated only when received data has been flushed to disk. An upcoming patch will make use of the latest data that was written without waiting for it to be flushed, so use more precise function names. --- src/backend/access/transam/xlog.c | 4 ++-- src/backend/access/transam/xlogfuncs.c | 2 +- src/backend/replication/walreceiverfuncs.c | 4 ++-- src/backend/replication/walsender.c | 2 +- src/include/replication/walreceiver.h | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index d19408b3be..cc7072ba13 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9283,7 +9283,7 @@ CreateRestartPoint(int flags) * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); + receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); @@ -12104,7 +12104,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, { XLogRecPtr latestChunkStart; - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); + receivedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI); if (RecPtr < receivedUpto && receiveTLI == curFileTLI) { havedata = true; diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 20316539b6..e075c1c71b 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS) { XLogRecPtr recptr; - recptr = GetWalRcvWriteRecPtr(NULL, NULL); + recptr = GetWalRcvFlushRecPtr(NULL, NULL); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 89c903e45a..9bce63b534 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -286,7 +286,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, } /* - * Returns the last+1 byte position that walreceiver has written. + * Returns the last+1 byte position that walreceiver has flushed. * * Optionally, returns the previous chunk start, that is the first byte * written in the most recent walreceiver flush cycle. Callers not @@ -294,7 +294,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, * receiveTLI. */ XLogRecPtr -GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) +GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) { WalRcvData *walrcv = WalRcv; XLogRecPtr recptr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index abb533b9d0..1079b3f8cb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2903,7 +2903,7 @@ GetStandbyFlushRecPtr(void) * has streamed, but hasn't been replayed yet. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); + receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); ThisTimeLineID = replayTLI; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e08afc6548..147b374a26 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -322,7 +322,7 @@ extern bool WalRcvStreaming(void); extern bool WalRcvRunning(void); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname); -extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); -- 2.20.1
From d7fa7d82c5f68d0cccf441ce9e8dfa40f64d3e0d Mon Sep 17 00:00:00 2001 From: Thomas Munro <tmu...@postgresql.org> Date: Mon, 9 Dec 2019 17:22:07 +1300 Subject: [PATCH 3/5] Add WalRcvGetWriteRecPtr() (new definition). A later patch will read received WAL to prefetch referenced blocks, without waiting for the data to be flushed to disk. To do that, it needs to be able to see the write pointer advancing in shared memory. The function formerly bearing name was recently renamed to WalRcvGetFlushRecPtr(), which better described what it does. --- src/backend/replication/walreceiver.c | 5 +++++ src/backend/replication/walreceiverfuncs.c | 10 ++++++++++ src/include/replication/walreceiver.h | 9 +++++++++ 3 files changed, 24 insertions(+) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2ab15c3cbb..88a51ba35f 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -244,6 +244,8 @@ WalReceiverMain(void) SpinLockRelease(&walrcv->mutex); + pg_atomic_init_u64(&WalRcv->writtenUpto, 0); + /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); @@ -985,6 +987,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) LogstreamResult.Write = recptr; } + + /* Update shared-memory status */ + pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); } /* diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 9bce63b534..14e9a6245a 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -310,6 +310,16 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) return recptr; } +/* + * Returns the last+1 byte position that walreceiver has written. + * This returns a recently written value without taking a lock. + */ +XLogRecPtr +GetWalRcvWriteRecPtr(void) +{ + return pg_atomic_read_u64(&WalRcv->writtenUpto); +} + /* * Returns the replication apply delay in ms or -1 * if the apply delay info is not available diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 147b374a26..1e8f304dc4 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -16,6 +16,7 @@ #include "access/xlogdefs.h" #include "getaddrinfo.h" /* for NI_MAXHOST */ #include "pgtime.h" +#include "port/atomics.h" #include "replication/logicalproto.h" #include "replication/walsender.h" #include "storage/latch.h" @@ -83,6 +84,13 @@ typedef struct XLogRecPtr receivedUpto; TimeLineID receivedTLI; + /* + * Same as above, but advanced after writing and before flushing, without + * the need to acquire the spin lock. Data can be read by another process + * up to this point, but shouldn't be used for data integrity purposes. + */ + pg_atomic_uint64 writtenUpto; + /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of @@ -323,6 +331,7 @@ extern bool WalRcvRunning(void); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname); extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvWriteRecPtr(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); -- 2.20.1
From f9a53985e0e30659caa41c95c85001c91b3deb5f Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 30 Dec 2019 16:43:50 +1300 Subject: [PATCH 4/5] Allow PrefetchBuffer() to report the outcome. Report when a relation's backing file is missing, to prepare for use during recovery. This will be used to handle cases of relations that are referenced in the WAL but have been unlinked already due to actions covered by WAL records that haven't been replayed yet, after a crash. Also report whether a prefetch was actually initiated, so that callers can limit the number of concurrent I/Os they try to issue, without counting the prefetch calls that did nothing because the page was already in our buffers. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 9 +++++++-- src/backend/storage/smgr/md.c | 9 +++++++-- src/backend/storage/smgr/smgr.c | 10 +++++++--- src/include/storage/bufmgr.h | 12 ++++++++++-- src/include/storage/md.h | 2 +- src/include/storage/smgr.h | 2 +- 6 files changed, 33 insertions(+), 11 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 6e0875022c..5dbbcf8111 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -519,7 +519,7 @@ ComputeIoConcurrency(int io_concurrency, double *target) return (new_prefetch_pages >= 0.0 && new_prefetch_pages < (double) INT_MAX); } -void +PrefetchBufferResult SharedPrefetchBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum) { #ifdef USE_PREFETCH @@ -545,7 +545,11 @@ SharedPrefetchBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blo /* If not in buffers, initiate prefetch */ if (buf_id < 0) - smgrprefetch(smgr_reln, forkNum, blockNum); + { + if (!smgrprefetch(smgr_reln, forkNum, blockNum)) + return PREFETCH_BUFFER_NOREL; + return PREFETCH_BUFFER_MISS; + } /* * If the block *is* in buffers, we do nothing. This is not really ideal: @@ -559,6 +563,7 @@ SharedPrefetchBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blo * a problem to justify that. */ #endif + return PREFETCH_BUFFER_HIT; } /* diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index c5b771c531..ba12fc2077 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -525,14 +525,17 @@ mdclose(SMgrRelation reln, ForkNumber forknum) /* * mdprefetch() -- Initiate asynchronous read of the specified block of a relation */ -void +bool mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { #ifdef USE_PREFETCH off_t seekpos; MdfdVec *v; - v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL); + v = _mdfd_getseg(reln, forknum, blocknum, false, + InRecovery ? EXTENSION_RETURN_NULL : EXTENSION_FAIL); + if (v == NULL) + return false; seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); @@ -540,6 +543,8 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) (void) FilePrefetch(v->mdfd_vfd, seekpos, BLCKSZ, WAIT_EVENT_DATA_FILE_PREFETCH); #endif /* USE_PREFETCH */ + + return true; } /* diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 360b5bf5bf..c39dd533e6 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -49,7 +49,7 @@ typedef struct f_smgr bool isRedo); void (*smgr_extend) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); - void (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum, + bool (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); void (*smgr_read) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); @@ -489,11 +489,15 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, /* * smgrprefetch() -- Initiate asynchronous read of the specified block of a relation. + * + * In recovery only, this can return false to indicate that a file + * doesn't exist (presumably it has been dropped by a later WAL + * record). */ -void +bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { - smgrsw[reln->smgr_which].smgr_prefetch(reln, forknum, blocknum); + return smgrsw[reln->smgr_which].smgr_prefetch(reln, forknum, blocknum); } /* diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 89a47afec1..5d7a796ba0 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -159,12 +159,20 @@ extern PGDLLIMPORT int32 *LocalRefCount; */ #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer)) +typedef enum PrefetchBufferResult +{ + PREFETCH_BUFFER_HIT, + PREFETCH_BUFFER_MISS, + PREFETCH_BUFFER_NOREL +} PrefetchBufferResult; + /* * prototypes for functions in bufmgr.c */ extern bool ComputeIoConcurrency(int io_concurrency, double *target); -extern void SharedPrefetchBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, - BlockNumber blockNum); +extern PrefetchBufferResult SharedPrefetchBuffer(SMgrRelation smgr_reln, + ForkNumber forkNum, + BlockNumber blockNum); extern void PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum); extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum); diff --git a/src/include/storage/md.h b/src/include/storage/md.h index ec7630ce3b..07fd1bb7d0 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -28,7 +28,7 @@ extern bool mdexists(SMgrRelation reln, ForkNumber forknum); extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo); extern void mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); -extern void mdprefetch(SMgrRelation reln, ForkNumber forknum, +extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 243822137c..dc740443e2 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -92,7 +92,7 @@ extern void smgrdounlink(SMgrRelation reln, bool isRedo); extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo); extern void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); -extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, +extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); -- 2.20.1
From 6dc2cfa4b64ac25513c36538272e08b937bd46a4 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 2 Mar 2020 15:33:51 +1300 Subject: [PATCH 5/5] Prefetch referenced blocks during recovery. Introduce a new GUC wal_prefetch_distance. If it is set to a positive number of bytes, then read ahead in the WAL at most that distance, and initiate asynchronous reading of referenced blocks. The goal is to avoid I/O stalls and benefit from concurrent I/O. The number of concurrent asynchronous reads is limited by both effective_io_concurrency and wal_prefetch_distance. The feature is disabled by default. Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- doc/src/sgml/config.sgml | 38 ++ doc/src/sgml/monitoring.sgml | 69 +++ doc/src/sgml/wal.sgml | 12 + src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 64 ++ src/backend/access/transam/xlogprefetcher.c | 653 ++++++++++++++++++++ src/backend/access/transam/xlogutils.c | 23 +- src/backend/catalog/system_views.sql | 11 + src/backend/replication/logical/logical.c | 2 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/misc/guc.c | 25 + src/include/access/xlog.h | 4 + src/include/access/xlogprefetcher.h | 28 + src/include/access/xlogutils.h | 20 + src/include/catalog/pg_proc.dat | 8 + src/include/storage/bufmgr.h | 5 + src/include/utils/guc.h | 2 + src/test/regress/expected/rules.out | 8 + 18 files changed, 974 insertions(+), 2 deletions(-) create mode 100644 src/backend/access/transam/xlogprefetcher.c create mode 100644 src/include/access/xlogprefetcher.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index c1128f89ec..415b0793e1 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3082,6 +3082,44 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-wal-prefetch-distance" xreflabel="wal_prefetch_distance"> + <term><varname>wal_prefetch_distance</varname> (<type>integer</type>) + <indexterm> + <primary><varname>wal_prefetch_distance</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + The maximum distance to look ahead in the WAL during recovery, to find + blocks to prefetch. Prefetching blocks that will soon be needed can + reduce I/O wait times. The number of concurrent prefetches is limited + by this setting as well as <xref linkend="guc-effective-io-concurrency"/>. + If this value is specified without units, it is taken as bytes. + The default is -1, meaning that WAL prefetching is disabled. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-wal-prefetch-fpw" xreflabel="wal_prefetch_fpw"> + <term><varname>wal_prefetch_fpw</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>wal_prefetch_fpw</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Whether to prefetch blocks with full page images during recovery. + Usually this doesn't help, since such blocks will not be read. However, + on file systems with a block size larger than + <productname>PostgreSQL</productname>'s, prefetching can avoid a costly + read-before-write when a blocks are later written. + This setting has no effect unless + <xref linkend="guc-wal-prefetch-distance"/> is set to a positive number. + The default is off. + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> <sect2 id="runtime-config-wal-archiving"> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 87586a7b06..013537d2be 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -320,6 +320,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_wal_prefetcher</structname><indexterm><primary>pg_stat_wal_prefetcher</primary></indexterm></entry> + <entry>Only one row, showing statistics about blocks prefetched during recovery. + See <xref linkend="pg-stat-wal-prefetcher-view"/> for details. + </entry> + </row> + <row> <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry> <entry>At least one row per subscription, showing information about @@ -2184,6 +2191,68 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i connected server. </para> + <table id="pg-stat-wal-prefetcher-view" xreflabel="pg_stat_wal_prefetcher"> + <title><structname>pg_stat_wal_prefetcher</structname> View</title> + <tgroup cols="3"> + <thead> + <row> + <entry>Column</entry> + <entry>Type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry><structfield>prefetch</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks prefetched because they were not in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_hit</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were already in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_new</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were new (usually relation extension)</entry> + </row> + <row> + <entry><structfield>skip_fpw</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-wal-prefetch-fpw"/> was set to <literal>off</literal></entry> + </row> + <row> + <entry><structfield>skip_seq</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because of repeated or sequential access</entry> + </row> + <row> + <entry><structfield>distance</structfield></entry> + <entry><type>integer</type></entry> + <entry>How far ahead of recovery the WAL prefetcher is currently reading, in bytes</entry> + </row> + <row> + <entry><structfield>queue_depth</structfield></entry> + <entry><type>integer</type></entry> + <entry>How many prefetches have been initiated but are not yet known to have completed</entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + The <structname>pg_stat_wal_prefetcher</structname> view will contain only + one row. It is filled with nulls if recovery is not running or WAL + prefetching is not enabled. See <xref linkend="guc-wal-prefetch-distance"/> + for more information. The counters in this view are reset whenever the + <xref linkend="guc-wal-prefetch-distance"/>, + <xref linkend="guc-wal-prefetch-fpw"/> or + <xref linkend="guc-effective-io-concurrency"/> setting is changed and + the server configuration is reloaded. + </para> + <table id="pg-stat-subscription" xreflabel="pg_stat_subscription"> <title><structname>pg_stat_subscription</structname> View</title> <tgroup cols="3"> diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml index 4eb8feb903..943462ca05 100644 --- a/doc/src/sgml/wal.sgml +++ b/doc/src/sgml/wal.sgml @@ -719,6 +719,18 @@ <acronym>WAL</acronym> call being logged to the server log. This option might be replaced by a more general mechanism in the future. </para> + + <para> + The <xref linkend="guc-wal-prefetch-distance"/> parameter can be + used to improve I/O performance during recovery by instructing + <productname>PostgreSQL</productname> to initiate reads + of disk blocks that will soon be needed, in combination with the + <xref linkend="guc-effective-io-concurrency"/> parameter. The + prefetching mechanism is most likely to be effective on systems + with <varname>full_page_writes</varname> set to + <varname>off</varname> (where that is safe), and where the working + set is larger than RAM. By default, WAL prefetching is disabled. + </para> </sect1> <sect1 id="wal-internals"> diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..20e044c7c8 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogprefetcher.o \ xlogreader.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index cc7072ba13..d042ebeaf5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -34,6 +34,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xloginsert.h" +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -104,6 +105,8 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int wal_prefetch_distance = -1; +bool wal_prefetch_fpw = false; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -805,6 +808,7 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */ */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */ static bool lastSourceFailed = false; +static bool reset_wal_prefetcher = false; typedef struct XLogPageReadPrivate { @@ -6212,6 +6216,7 @@ CheckRequiredParameterValues(void) } } + /* * This must be called ONCE during postmaster or standalone-backend startup */ @@ -7068,6 +7073,7 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + XLogPrefetcher *prefetcher = NULL; InRedo = true; @@ -7075,6 +7081,9 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + /* the first time through, see if we need to enable prefetching */ + ResetWalPrefetcher(); + /* * main redo apply loop */ @@ -7104,6 +7113,31 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* + * The first time through, or if any relevant settings or the + * WAL source changes, we'll restart the prefetching machinery + * as appropriate. This is simpler than trying to handle + * various complicated state changes. + */ + if (unlikely(reset_wal_prefetcher)) + { + /* If we had one already, destroy it. */ + if (prefetcher) + { + XLogPrefetcherFree(prefetcher); + prefetcher = NULL; + } + /* If we want one, create it. */ + if (wal_prefetch_distance > 0) + prefetcher = XLogPrefetcherAllocate(xlogreader->ReadRecPtr, + currentSource == XLOG_FROM_STREAM); + reset_wal_prefetcher = false; + } + + /* Peform WAL prefetching, if enabled. */ + if (prefetcher) + XLogPrefetcherReadAhead(prefetcher, xlogreader->ReadRecPtr); + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7291,6 +7325,8 @@ StartupXLOG(void) /* * end of main redo apply loop */ + if (prefetcher) + XLogPrefetcherFree(prefetcher); if (reachedRecoveryTarget) { @@ -10150,6 +10186,24 @@ assign_xlog_sync_method(int new_sync_method, void *extra) } } +void +assign_wal_prefetch_distance(int new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + wal_prefetch_distance = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + +void +assign_wal_prefetch_fpw(bool new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + wal_prefetch_fpw = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + /* * Issue appropriate kind of fsync (if any) for an XLOG output file. @@ -11933,6 +11987,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * and move on to the next state. */ currentSource = XLOG_FROM_STREAM; + ResetWalPrefetcher(); break; case XLOG_FROM_STREAM: @@ -12356,3 +12411,12 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Schedule a WAL prefetcher reset, on change of relevant settings. + */ +void +ResetWalPrefetcher(void) +{ + reset_wal_prefetcher = true; +} diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c new file mode 100644 index 0000000000..5b32522bb5 --- /dev/null +++ b/src/backend/access/transam/xlogprefetcher.c @@ -0,0 +1,653 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.c + * Prefetching support for PostgreSQL write-ahead log manager + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetcher.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/storage_xlog.h" +#include "utils/fmgrprotos.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "port/atomics.h" +#include "storage/shmem.h" +#include "utils/hsearch.h" + +/* + * Sample the queue depth and distance every time we replay this much WAL. + * This is used to compute avg_queue_depth and avg_distance for the log message + * that appears at the end of crash recovery. + */ +#define XLOGPREFETCHER_MONITORING_SAMPLE_STEP 32768 + +/* + * Internal state used for book-keeping. + */ +struct XLogPrefetcher +{ + /* Reader and current reading state. */ + XLogReaderState *reader; + XLogReadLocalOptions options; + bool have_record; + bool shutdown; + int next_block_id; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping required to limit concurrent prefetches. */ + XLogRecPtr *prefetch_queue; + int prefetch_queue_size; + int prefetch_head; + int prefetch_tail; + + /* Details of last prefetch to skip repeats and seq scans. */ + SMgrRelation last_reln; + RelFileNode last_rnode; + BlockNumber last_blkno; + + /* Counters used to compute avg_queue_depth and avg_distance. */ + double samples; + double queue_depth_sum; + double distance_sum; + XLogRecPtr next_sample_lsn; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that we must assume have already been dropped. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +/* + * Counters exposed in shared memory just for the benefit of monitoring + * functions. + */ +typedef struct XLogPrefetcherMonitoringStats +{ + pg_atomic_uint64 prefetch; /* Prefetches initiated. */ + pg_atomic_uint64 skip_hit; /* Blocks already buffered. */ + pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ + pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ + pg_atomic_uint64 skip_seq; /* Sequential/repeat blocks skipped. */ + int distance; /* Number of bytes ahead in the WAL. */ + int queue_depth; /* Number of I/Os possibly in progress. */ +} XLogPrefetcherMonitoringStats; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn); +static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher); + +/* + * On modern systems this is really just *counter++. On some older systems + * there might be more to it, due to inability to read and write 64 bit values + * atomically. + */ +static inline void inc_counter(pg_atomic_uint64 *counter) +{ + pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); +} + +static XLogPrefetcherMonitoringStats *MonitoringStats; + +size_t +XLogPrefetcherShmemSize(void) +{ + return sizeof(XLogPrefetcherMonitoringStats); +} + +static void +XLogPrefetcherResetMonitoringStats(void) +{ + pg_atomic_init_u64(&MonitoringStats->prefetch, 0); + pg_atomic_init_u64(&MonitoringStats->skip_hit, 0); + pg_atomic_init_u64(&MonitoringStats->skip_new, 0); + pg_atomic_init_u64(&MonitoringStats->skip_fpw, 0); + pg_atomic_init_u64(&MonitoringStats->skip_seq, 0); + MonitoringStats->distance = -1; + MonitoringStats->queue_depth = 0; +} + +void +XLogPrefetcherShmemInit(void) +{ + bool found; + + MonitoringStats = (XLogPrefetcherMonitoringStats *) + ShmemInitStruct("XLogPrefetcherMonitoringStats", + sizeof(XLogPrefetcherMonitoringStats), + &found); + if (!found) + XLogPrefetcherResetMonitoringStats(); +} + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL that is ahead of the given lsn. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming) +{ + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + XLogPrefetcher *prefetcher = palloc0(sizeof(*prefetcher)); + + prefetcher->options.nowait = true; + if (streaming) + { + /* + * We're only allowed to read as far as the WAL receiver has written. + * We don't have to wait for it to be flushed, though, as recovery + * does, so that gives us a chance to get a bit further ahead. + */ + prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN; + } + else + { + /* We're allowed to read as far as we can. */ + prefetcher->options.read_upto_policy = XLRO_LSN; + prefetcher->options.lsn = (XLogRecPtr) -1; + } + prefetcher->reader = XLogReaderAllocate(wal_segment_size, + NULL, + read_local_xlog_page, + &prefetcher->options); + prefetcher->filter_table = hash_create("PrefetchFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + /* + * The size of the queue is determined by target_prefetch_pages, which is + * derived from effective_io_concurrency. In theory we might have a + * separate queue for each tablespace, but it's not clear how that should + * work, so for now we'll just use the system-wide GUC to rate-limit all + * prefetching. + */ + prefetcher->prefetch_queue_size = target_prefetch_pages; + prefetcher->prefetch_queue = palloc0(sizeof(XLogRecPtr) * prefetcher->prefetch_queue_size); + prefetcher->prefetch_head = prefetcher->prefetch_tail = 0; + + /* Prepare to read at the given LSN. */ + elog(LOG, "WAL prefetch started at %X/%X", + (uint32) (lsn << 32), (uint32) lsn); + XLogBeginRead(prefetcher->reader, lsn); + + XLogPrefetcherResetMonitoringStats(); + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + double avg_distance = 0; + double avg_queue_depth = 0; + + /* Log final statistics. */ + if (prefetcher->samples > 0) + { + avg_distance = prefetcher->distance_sum / prefetcher->samples; + avg_queue_depth = prefetcher->queue_depth_sum / prefetcher->samples; + } + elog(LOG, + "WAL prefetch finished at %X/%X; " + "prefetch = " UINT64_FORMAT ", " + "skip_hit = " UINT64_FORMAT ", " + "skip_new = " UINT64_FORMAT ", " + "skip_fpw = " UINT64_FORMAT ", " + "skip_seq = " UINT64_FORMAT ", " + "avg_distance = %f, " + "avg_queue_depth = %f", + (uint32) (prefetcher->reader->EndRecPtr << 32), + (uint32) (prefetcher->reader->EndRecPtr), + pg_atomic_read_u64(&MonitoringStats->prefetch), + pg_atomic_read_u64(&MonitoringStats->skip_hit), + pg_atomic_read_u64(&MonitoringStats->skip_new), + pg_atomic_read_u64(&MonitoringStats->skip_fpw), + pg_atomic_read_u64(&MonitoringStats->skip_seq), + avg_distance, + avg_queue_depth); + XLogReaderFree(prefetcher->reader); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher->prefetch_queue); + pfree(prefetcher); + + XLogPrefetcherResetMonitoringStats(); +} + +/* + * Read ahead in the WAL, as far as we can within the limits set by the user. + * Begin fetching any referenced blocks that are not already in the buffer + * pool. + */ +void +XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + /* If an error has occurred or we've hit the end of the WAL, do nothing. */ + if (prefetcher->shutdown) + return; + + /* + * Have any in-flight prefetches definitely completed, judging by the LSN + * that is currently being replayed? + */ + XLogPrefetcherCompletedIO(prefetcher, replaying_lsn); + + /* + * Do we already have the maximum permitted number of I/Os running + * (according to the information we have)? If so, we have to wait for at + * least one to complete, so give up early. + */ + if (XLogPrefetcherSaturated(prefetcher)) + return; + + /* Can we drop any filters yet, due to problem records begin replayed? */ + XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn); + + /* Main prefetch loop. */ + for (;;) + { + XLogReaderState *reader = prefetcher->reader; + char *error; + int64 distance; + + /* If we don't already have a record, then try to read one. */ + if (!prefetcher->have_record) + { + if (!XLogReadRecord(reader, &error)) + { + /* If we got an error, log it and give up. */ + if (error) + { + elog(LOG, "WAL prefetch error: %s", error); + prefetcher->shutdown = true; + } + /* Otherwise, we'll try again later when more data is here. */ + return; + } + prefetcher->have_record = true; + prefetcher->next_block_id = 0; + } + + /* How far ahead of replay are we now? */ + distance = prefetcher->reader->ReadRecPtr - replaying_lsn; + + /* Update distance shown in shm. */ + MonitoringStats->distance = distance; + + /* Sample the averages so we can log them at end of recovery. */ + if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn)) + { + prefetcher->distance_sum += MonitoringStats->distance; + prefetcher->queue_depth_sum += MonitoringStats->queue_depth; + prefetcher->samples += 1.0; + prefetcher->next_sample_lsn = + replaying_lsn + XLOGPREFETCHER_MONITORING_SAMPLE_STEP; + } + + /* Are we too far ahead of replay? */ + if (distance >= wal_prefetch_distance) + break; + + /* + * If this is a record that creates a new SMGR relation, we'll avoid + * prefetching anything from that rnode until it has been replayed. + */ + if (replaying_lsn < reader->ReadRecPtr && + XLogRecGetRmid(reader) == RM_SMGR_ID && + (XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader); + + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, + reader->ReadRecPtr); + } + + /* + * Scan the record for block references. We might already have been + * partway through processing this record when we hit maximum I/O + * concurrency, so start where we left off. + */ + for (int i = prefetcher->next_block_id; i <= reader->max_block_id; ++i) + { + DecodedBkpBlock *block = &reader->blocks[i]; + SMgrRelation reln; + + /* Ignore everything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + continue; + + /* + * If there is a full page image attached, we won't be reading the + * page, so you might thing we should skip it. However, if the + * underlying filesystem uses larger logical blocks than us, it + * might still need to perform a read-before-write some time later. + * Therefore, only prefetch if configured to do so. + */ + if (block->has_image && !wal_prefetch_fpw) + { + inc_counter(&MonitoringStats->skip_fpw); + continue; + } + + /* + * If this block will initialize a new page then it's probably an + * extension. Since it might create a new segment, we can't try + * to prefetch this block until the record has been replayed, or we + * might try to open a file that doesn't exist yet. + */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + reader->ReadRecPtr); + inc_counter(&MonitoringStats->skip_new); + continue; + } + + /* Should we skip this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, + block->blkno)) + { + inc_counter(&MonitoringStats->skip_new); + continue; + } + + /* Fast path for repeated references to the same relation. */ + if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode)) + { + /* + * If this is a repeat or sequential access, then skip it. We + * expect the kernel to detect sequential access on its own + * and do a better job than we could. + */ + if (block->blkno == prefetcher->last_blkno || + block->blkno == prefetcher->last_blkno + 1) + { + prefetcher->last_blkno = block->blkno; + inc_counter(&MonitoringStats->skip_seq); + continue; + } + + /* We can avoid calling smgropen(). */ + reln = prefetcher->last_reln; + } + else + { + /* Otherwise we have to open it. */ + reln = smgropen(block->rnode, InvalidBackendId); + prefetcher->last_rnode = block->rnode; + prefetcher->last_reln = reln; + } + prefetcher->last_blkno = block->blkno; + + /* Try to prefetch this block! */ + switch (SharedPrefetchBuffer(reln, block->forknum, block->blkno)) + { + case PREFETCH_BUFFER_HIT: + /* It's already cached, so do nothing. */ + inc_counter(&MonitoringStats->skip_hit); + break; + case PREFETCH_BUFFER_MISS: + /* + * I/O has possibly been initiated (though we don't know if it + * was already cached by the kernel, so we just have to assume + * that it has due to lack of better information). Record + * this as an I/O in progress until eventually we replay this + * LSN. + */ + inc_counter(&MonitoringStats->prefetch); + XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr); + /* + * If the queue is now full, we'll have to wait before + * processing any more blocks from this record. + */ + if (XLogPrefetcherSaturated(prefetcher)) + { + prefetcher->next_block_id = i + 1; + return; + } + break; + case PREFETCH_BUFFER_NOREL: + /* + * The underlying segment file doesn't exist. Presumably it + * will be unlinked by a later WAL record. When recovery + * reads this block, it will use the EXTENSION_CREATE_RECOVERY + * flag. We certainly don't want to do that sort of thing + * while merely prefetching, so let's just ignore references + * to this relation until this record is replayed, and let + * recovery create the dummy file or complain if something is + * wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + reader->ReadRecPtr); + inc_counter(&MonitoringStats->skip_new); + break; + } + } + + /* Advance to the next record. */ + prefetcher->have_record = false; + } +} + +/* + * Expose statistics about WAL prefetching. + */ +Datum +pg_stat_get_wal_prefetcher(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_WAL_PREFETCHER_COLS 7 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Datum values[PG_STAT_GET_WAL_PREFETCHER_COLS]; + bool nulls[PG_STAT_GET_WAL_PREFETCHER_COLS]; + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mod required, but it is not allowed in this context"))); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + if (MonitoringStats->distance < 0) + { + for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i) + nulls[i] = true; + } + else + { + for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i) + nulls[i] = false; + values[0] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->prefetch)); + values[1] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_hit)); + values[2] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_new)); + values[3] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_fpw)); + values[4] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_seq)); + values[5] = Int32GetDatum(MonitoringStats->distance); + values[6] = Int32GetDatum(MonitoringStats->queue_depth); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the (presumably lower) block + * number there because we don't want to have to track individual + * blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } +} + +/* + * Have we replayed the records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can drop relevant filters. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of the + * time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode, + HASH_FIND, NULL); + + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * Insert an LSN into the queue. The queue must not be full already. This + * tracks the fact that we have (to the best of our knowledge) initiated an + * I/O, so that we can impose a cap on concurrent prefetching. + */ +static inline void +XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn) +{ + Assert(!XLogPrefetcherSaturated(prefetcher)); + prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn; + prefetcher->prefetch_head %= prefetcher->prefetch_queue_size; + MonitoringStats->queue_depth++; + Assert(MonitoringStats->queue_depth <= prefetcher->prefetch_queue_size); +} + +/* + * Have we replayed the records that caused us to initiate the oldest + * prefetches yet? That means that they're definitely finished, so we can can + * forget about them and allow ourselves to initiate more prefetches. For now + * we don't have any awareness of when I/O really completes. + */ +static inline void +XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (prefetcher->prefetch_head != prefetcher->prefetch_tail && + prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn) + { + prefetcher->prefetch_tail++; + prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size; + MonitoringStats->queue_depth--; + Assert(MonitoringStats->queue_depth >= 0); + } +} + +/* + * Check if the maximum allowed number of I/Os is already in flight. + */ +static inline bool +XLogPrefetcherSaturated(XLogPrefetcher *prefetcher) +{ + return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size == + prefetcher->prefetch_tail; +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b217ffa52f..fad2acb514 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -25,6 +25,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -827,6 +828,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + XLogReadLocalOptions *options = (XLogReadLocalOptions *) state->private_data; loc = targetPagePtr + reqLen; @@ -841,7 +843,23 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * notices recovery finishes, so we only have to maintain it for the * local process until recovery ends. */ - if (!RecoveryInProgress()) + if (options) + { + switch (options->read_upto_policy) + { + case XLRO_WALRCV_WRITTEN: + read_upto = GetWalRcvWriteRecPtr(); + break; + case XLRO_LSN: + read_upto = options->lsn; + break; + default: + read_upto = 0; + elog(ERROR, "unknown read_upto_policy value"); + break; + } + } + else if (!RecoveryInProgress()) read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); @@ -879,6 +897,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, if (loc <= read_upto) break; + if (options && options->nowait) + break; + CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f681aafcf9..d0882e5f82 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -811,6 +811,17 @@ CREATE VIEW pg_stat_wal_receiver AS FROM pg_stat_get_wal_receiver() s WHERE s.pid IS NOT NULL; +CREATE VIEW pg_stat_wal_prefetcher AS + SELECT + s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth + FROM pg_stat_get_wal_prefetcher() s; + CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index e3da7d3625..34f3017871 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, NULL); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 427b0d59cd..5ca98b8886 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -21,6 +21,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/xlogprefetcher.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -124,6 +125,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, PredicateLockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); + size = add_size(size, XLogPrefetcherShmemSize()); size = add_size(size, CLOGShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); @@ -212,6 +214,7 @@ CreateSharedMemoryAndSemaphores(void) * Set up xlog, clog, and buffers */ XLOGShmemInit(); + XLogPrefetcherShmemInit(); CLOGShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 464f264d9a..893c9478d9 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1240,6 +1240,18 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"wal_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Prefetch blocks that have full page images in the WAL"), + gettext_noop("On some systems, there is no benefit to prefetching pages that will be " + "entirely overwritten, but if the logical page size of the filesystem is " + "larger than PostgreSQL's, this can be beneficial. This option has no " + "effect unless wal_prefetch_distance is set to a positive number.") + }, + &wal_prefetch_fpw, + false, + NULL, assign_wal_prefetch_fpw, NULL + }, { {"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS, @@ -2626,6 +2638,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"wal_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("How many bytes to read ahead in the WAL to prefetch referenced blocks."), + gettext_noop("Set to -1 to disable WAL prefetching."), + GUC_UNIT_BYTE + }, + &wal_prefetch_distance, + -1, -1, INT_MAX, + NULL, assign_wal_prefetch_distance, NULL + }, + { {"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the number of WAL files held for standby servers."), @@ -11484,6 +11507,8 @@ assign_effective_io_concurrency(int newval, void *extra) { #ifdef USE_PREFETCH target_prefetch_pages = *((int *) extra); + if (AmStartupProcess()) + ResetWalPrefetcher(); #endif /* USE_PREFETCH */ } diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 98b033fc20..0a31edfba4 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -111,6 +111,8 @@ extern int wal_keep_segments; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; +extern int wal_prefetch_distance; +extern bool wal_prefetch_fpw; extern char *XLogArchiveCommand; extern bool EnableHotStandby; extern bool fullPageWrites; @@ -319,6 +321,8 @@ extern void SetWalWriterSleeping(bool sleeping); extern void XLogRequestWalReceiverReply(void); +extern void ResetWalPrefetcher(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h new file mode 100644 index 0000000000..585f5564a3 --- /dev/null +++ b/src/include/access/xlogprefetcher.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.h + * Declarations for the XLog prefetching facility + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/access/xlogprefetcher.h + *------------------------------------------------------------------------- + */ +#ifndef XLOGPREFETCHER_H +#define XLOGPREFETCHER_H + +#include "access/xlogdefs.h" + +struct XLogPrefetcher; +typedef struct XLogPrefetcher XLogPrefetcher; + +extern XLogPrefetcher *XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming); +extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher); +extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch, XLogRecPtr replaying_lsn); + +extern size_t XLogPrefetcherShmemSize(void); +extern void XLogPrefetcherShmemInit(void); + +#endif diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..1c8e67d74a 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,6 +47,26 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +/* + * A pointer to an XLogReadLocalOptions struct can supplied as the private + * data for an xlog reader, causing read_local_xlog_page to modify its + * behavior. + */ +typedef struct XLogReadLocalOptions +{ + /* Don't block waiting for new WAL to arrive. */ + bool nowait; + + /* How far to read. */ + enum { + XLRO_WALRCV_WRITTEN, + XLRO_LSN + } read_upto_policy; + + /* If read_upto_policy is XLRO_LSN, the LSN. */ + XLogRecPtr lsn; +} XLogReadLocalOptions; + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 07a86c7b7b..0bd16c1b77 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6082,6 +6082,14 @@ prorettype => 'bool', proargtypes => '', prosrc => 'pg_is_wal_replay_paused' }, +{ oid => '9085', descr => 'statistics: information about WAL prefetching', + proname => 'pg_stat_get_wal_prefetcher', prorows => '1', provolatile => 'v', + proretset => 't', prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,int8,int8,int8,int8,int4,int4}', + proargmodes => '{o,o,o,o,o,o,o}', + proargnames => '{prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth}', + prosrc => 'pg_stat_get_wal_prefetcher' }, + { oid => '2621', descr => 'reload configuration files', proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool', proargtypes => '', prosrc => 'pg_reload_conf' }, diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 5d7a796ba0..6e91c33f3d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -159,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount; */ #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer)) +/* + * When you try to prefetch a buffer, there are three possibilities: it's + * already cached in our buffer pool, it's not cached but we can ask the kernel + * we'll be loading it soon, or the relation file doesn't exist. + */ typedef enum PrefetchBufferResult { PREFETCH_BUFFER_HIT, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index ce93ace76c..903b0ec02b 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -438,5 +438,7 @@ extern void assign_search_path(const char *newval, void *extra); /* in access/transam/xlog.c */ extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +extern void assign_wal_prefetch_distance(int new_value, void *extra); +extern void assign_wal_prefetch_fpw(bool new_value, void *extra); #endif /* GUC_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 634f8256f7..62b1e0e113 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2087,6 +2087,14 @@ pg_stat_user_tables| SELECT pg_stat_all_tables.relid, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname <> ALL (ARRAY['pg_catalog'::name, 'information_schema'::name])) AND (pg_stat_all_tables.schemaname !~ '^pg_toast'::text)); +pg_stat_wal_prefetcher| SELECT s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth + FROM pg_stat_get_wal_prefetcher() s(prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth); pg_stat_wal_receiver| SELECT s.pid, s.status, s.receive_start_lsn, -- 2.20.1