On Wed, Mar 18, 2020 at 2:47 PM Alvaro Herrera <alvhe...@2ndquadrant.com> wrote: > On 2020-Mar-17, Thomas Munro wrote: > > I didn't want counters that wrap at ~4 billion, but I did want to be > > able to read and write concurrently without tearing. Instructions > > like "lock xadd" would provide more guarantees that I don't need, > > since only one thread is doing all the writing and there's no ordering > > requirement. It's basically just counter++, but some platforms need a > > spinlock to perform atomic read and write of 64 bit wide numbers, so > > more hoop jumping is required. > > Ah, I see, you don't want lock xadd ... That's non-obvious. I suppose > the function could use more commentary on *why* you're doing it that way > then.
I updated the comment: +/* + * On modern systems this is really just *counter++. On some older systems + * there might be more to it, due to inability to read and write 64 bit values + * atomically. The counters will only be written to by one process, and there + * is no ordering requirement, so there's no point in using higher overhead + * pg_atomic_fetch_add_u64(). + */ +static inline void inc_counter(pg_atomic_uint64 *counter) > > > Umm, I would keep the return values of both these functions in sync. > > > It's really strange that PrefetchBuffer does not return > > > PrefetchBufferResult, don't you think? > > > > Agreed, and changed. I suspect that other users of the main > > PrefetchBuffer() call will eventually want that, to do a better job of > > keeping the request queue full, for example bitmap heap scan and > > (hypothetical) btree scan with prefetch. > > LGTM. Here's a new version that changes that part just a bit more, after a brief chat with Andres about his async I/O plans. It seems clear that returning an enum isn't very extensible, so I decided to try making PrefetchBufferResult a struct whose contents can be extended in the future. In this patch set it's still just used to distinguish 3 cases (hit, miss, no file), but it's now expressed as a buffer and a flag to indicate whether I/O was initiated. You could imagine that the second thing might be replaced by a pointer to an async I/O handle you can wait on or some other magical thing from the future. The concept here is that eventually we'll have just one XLogReader for both read ahead and recovery, and we could attach the prefetch results to the decoded records, and then recovery would try to use already looked up buffers to avoid a bit of work (and then recheck). In other words, the WAL would be decoded only once, and the buffers would hopefully be looked up only once, so you'd claw back all of the overheads of this patch. For now that's not done, and the buffer in the result is only compared with InvalidBuffer to check if there was a hit or not. Similar things could be done for bitmap heap scan and btree prefetch with this interface: their prefetch machinery could hold onto these results in their block arrays and try to avoid a more expensive ReadBuffer() call if they already have a buffer (though as before, there's a small chance it turns out to be the wrong one and they need to fall back to ReadBuffer()). > As before, I didn't get to reading 0005 in depth. Updated to account for the above-mentioned change, and with a couple of elog() calls changed to ereport().
From 94df05846b155dfc68997f17899ddb34637d868a Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 17 Mar 2020 15:25:55 +1300 Subject: [PATCH 1/5] Allow PrefetchBuffer() to be called with a SMgrRelation. Previously a Relation was required, but it's annoying to have to create a "fake" one in recovery. A new function PrefetchSharedBuffer() is provided that works with SMgrRelation, and LocalPrefetchBuffer() is renamed to PrefetchLocalBuffer() to fit with that more natural naming scheme. Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 84 ++++++++++++++++----------- src/backend/storage/buffer/localbuf.c | 4 +- src/include/storage/buf_internals.h | 2 +- src/include/storage/bufmgr.h | 6 ++ 4 files changed, 59 insertions(+), 37 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e05e2b3456..d30aed6fd9 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -466,6 +466,53 @@ static int ckpt_buforder_comparator(const void *pa, const void *pb); static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg); +/* + * Implementation of PrefetchBuffer() for shared buffers. + */ +void +PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, + ForkNumber forkNum, + BlockNumber blockNum) +{ +#ifdef USE_PREFETCH + BufferTag newTag; /* identity of requested block */ + uint32 newHash; /* hash value for newTag */ + LWLock *newPartitionLock; /* buffer partition lock for it */ + int buf_id; + + Assert(BlockNumberIsValid(blockNum)); + + /* create a tag so we can lookup the buffer */ + INIT_BUFFERTAG(newTag, smgr_reln->smgr_rnode.node, + forkNum, blockNum); + + /* determine its hash code and partition lock ID */ + newHash = BufTableHashCode(&newTag); + newPartitionLock = BufMappingPartitionLock(newHash); + + /* see if the block is in the buffer pool already */ + LWLockAcquire(newPartitionLock, LW_SHARED); + buf_id = BufTableLookup(&newTag, newHash); + LWLockRelease(newPartitionLock); + + /* If not in buffers, initiate prefetch */ + if (buf_id < 0) + smgrprefetch(smgr_reln, forkNum, blockNum); + + /* + * If the block *is* in buffers, we do nothing. This is not really ideal: + * the block might be just about to be evicted, which would be stupid + * since we know we are going to need it soon. But the only easy answer + * is to bump the usage_count, which does not seem like a great solution: + * when the caller does ultimately touch the block, usage_count would get + * bumped again, resulting in too much favoritism for blocks that are + * involved in a prefetch sequence. A real fix would involve some + * additional per-buffer state, and it's not clear that there's enough of + * a problem to justify that. + */ +#endif /* USE_PREFETCH */ +} + /* * PrefetchBuffer -- initiate asynchronous read of a block of a relation * @@ -493,43 +540,12 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) errmsg("cannot access temporary tables of other sessions"))); /* pass it off to localbuf.c */ - LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum); + PrefetchLocalBuffer(reln->rd_smgr, forkNum, blockNum); } else { - BufferTag newTag; /* identity of requested block */ - uint32 newHash; /* hash value for newTag */ - LWLock *newPartitionLock; /* buffer partition lock for it */ - int buf_id; - - /* create a tag so we can lookup the buffer */ - INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node, - forkNum, blockNum); - - /* determine its hash code and partition lock ID */ - newHash = BufTableHashCode(&newTag); - newPartitionLock = BufMappingPartitionLock(newHash); - - /* see if the block is in the buffer pool already */ - LWLockAcquire(newPartitionLock, LW_SHARED); - buf_id = BufTableLookup(&newTag, newHash); - LWLockRelease(newPartitionLock); - - /* If not in buffers, initiate prefetch */ - if (buf_id < 0) - smgrprefetch(reln->rd_smgr, forkNum, blockNum); - - /* - * If the block *is* in buffers, we do nothing. This is not really - * ideal: the block might be just about to be evicted, which would be - * stupid since we know we are going to need it soon. But the only - * easy answer is to bump the usage_count, which does not seem like a - * great solution: when the caller does ultimately touch the block, - * usage_count would get bumped again, resulting in too much - * favoritism for blocks that are involved in a prefetch sequence. A - * real fix would involve some additional per-buffer state, and it's - * not clear that there's enough of a problem to justify that. - */ + /* pass it to the shared buffer version */ + PrefetchSharedBuffer(reln->rd_smgr, forkNum, blockNum); } #endif /* USE_PREFETCH */ } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index cac08e1b1a..b528bc9553 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -54,14 +54,14 @@ static Block GetLocalBufferStorage(void); /* - * LocalPrefetchBuffer - + * PrefetchLocalBuffer - * initiate asynchronous read of a block of a relation * * Do PrefetchBuffer's work for temporary relations. * No-op if prefetching isn't compiled in. */ void -LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum, +PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum) { #ifdef USE_PREFETCH diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index bf3b8ad340..166fe334c7 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -327,7 +327,7 @@ extern int BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id); extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode); /* localbuf.c */ -extern void LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum, +extern void PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum); extern BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, bool *foundPtr); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index d2a5b52f6e..e00dd3ffb7 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -49,6 +49,9 @@ typedef enum /* forward declared, to avoid having to expose buf_internals.h here */ struct WritebackContext; +/* forward declared, to avoid including smgr.h */ +struct SMgrRelationData; + /* in globals.c ... this duplicates miscadmin.h */ extern PGDLLIMPORT int NBuffers; @@ -159,6 +162,9 @@ extern PGDLLIMPORT int32 *LocalRefCount; /* * prototypes for functions in bufmgr.c */ +extern void PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, + ForkNumber forkNum, + BlockNumber blockNum); extern void PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum); extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum); -- 2.20.1
From 02a03ee9767fbb2ef6fc62bdf1e64c0fe24eccfa Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 17 Mar 2020 15:28:08 +1300 Subject: [PATCH 2/5] Rename GetWalRcvWriteRecPtr() to GetWalRcvFlushRecPtr(). The new name better reflects the fact that the value it returns is updated only when received data has been flushed to disk. Also rename a couple of variables relating to this value. An upcoming patch will make use of the latest data that was written without waiting for it to be flushed, so let's use more precise function names. Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/access/transam/xlog.c | 20 ++++++++++---------- src/backend/access/transam/xlogfuncs.c | 2 +- src/backend/replication/README | 2 +- src/backend/replication/walreceiver.c | 10 +++++----- src/backend/replication/walreceiverfuncs.c | 12 ++++++------ src/backend/replication/walsender.c | 2 +- src/include/replication/walreceiver.h | 8 ++++---- 7 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index de2d4ee582..abb227ce66 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -205,8 +205,8 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; -/* Local copy of WalRcv->receivedUpto */ -static XLogRecPtr receivedUpto = 0; +/* Local copy of WalRcv->flushedUpto */ +static XLogRecPtr flushedUpto = 0; static TimeLineID receiveTLI = 0; /* @@ -9288,7 +9288,7 @@ CreateRestartPoint(int flags) * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); + receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); @@ -11682,7 +11682,7 @@ retry: /* See if we need to retrieve more data */ if (readFile < 0 || (readSource == XLOG_FROM_STREAM && - receivedUpto < targetPagePtr + reqLen)) + flushedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, private->randAccess, @@ -11713,10 +11713,10 @@ retry: */ if (readSource == XLOG_FROM_STREAM) { - if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) + if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ)) readLen = XLOG_BLCKSZ; else - readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) - + readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) - targetPageOff; } else @@ -11952,7 +11952,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, curFileTLI = tli; RequestXLogStreaming(tli, ptr, PrimaryConnInfo, PrimarySlotName); - receivedUpto = 0; + flushedUpto = 0; } /* @@ -12132,14 +12132,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * XLogReceiptTime will not advance, so the grace time * allotted to conflicting queries will decrease. */ - if (RecPtr < receivedUpto) + if (RecPtr < flushedUpto) havedata = true; else { XLogRecPtr latestChunkStart; - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); - if (RecPtr < receivedUpto && receiveTLI == curFileTLI) + flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI); + if (RecPtr < flushedUpto && receiveTLI == curFileTLI) { havedata = true; if (latestChunkStart <= RecPtr) diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 20316539b6..e075c1c71b 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS) { XLogRecPtr recptr; - recptr = GetWalRcvWriteRecPtr(NULL, NULL); + recptr = GetWalRcvFlushRecPtr(NULL, NULL); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/replication/README b/src/backend/replication/README index 0cbb990613..8ccdd86e74 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in WalRcvData->receiveStart. As walreceiver receives WAL from the master server, and writes and flushes -it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals +it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals the startup process to know how far WAL replay can advance. Walreceiver sends information about replication progress to the master server diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 25e0333c9e..0bdd0c3074 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -12,7 +12,7 @@ * in the primary server), and then keeps receiving XLOG records and * writing them to the disk as long as the connection is alive. As XLOG * records are received and flushed to disk, it updates the - * WalRcv->receivedUpto variable in shared memory, to inform the startup + * WalRcv->flushedUpto variable in shared memory, to inform the startup * process of how far it can proceed with XLOG replay. * * If the primary server ends streaming, but doesn't disconnect, walreceiver @@ -1006,10 +1006,10 @@ XLogWalRcvFlush(bool dying) /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); - if (walrcv->receivedUpto < LogstreamResult.Flush) + if (walrcv->flushedUpto < LogstreamResult.Flush) { - walrcv->latestChunkStart = walrcv->receivedUpto; - walrcv->receivedUpto = LogstreamResult.Flush; + walrcv->latestChunkStart = walrcv->flushedUpto; + walrcv->flushedUpto = LogstreamResult.Flush; walrcv->receivedTLI = ThisTimeLineID; } SpinLockRelease(&walrcv->mutex); @@ -1362,7 +1362,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) state = WalRcv->walRcvState; receive_start_lsn = WalRcv->receiveStart; receive_start_tli = WalRcv->receiveStartTLI; - received_lsn = WalRcv->receivedUpto; + received_lsn = WalRcv->flushedUpto; received_tli = WalRcv->receivedTLI; last_send_time = WalRcv->lastMsgSendTime; last_receipt_time = WalRcv->lastMsgReceiptTime; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 89c903e45a..31025f97e3 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -264,11 +264,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, /* * If this is the first startup of walreceiver (on this timeline), - * initialize receivedUpto and latestChunkStart to the starting point. + * initialize flushedUpto and latestChunkStart to the starting point. */ if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) { - walrcv->receivedUpto = recptr; + walrcv->flushedUpto = recptr; walrcv->receivedTLI = tli; walrcv->latestChunkStart = recptr; } @@ -286,7 +286,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, } /* - * Returns the last+1 byte position that walreceiver has written. + * Returns the last+1 byte position that walreceiver has flushed. * * Optionally, returns the previous chunk start, that is the first byte * written in the most recent walreceiver flush cycle. Callers not @@ -294,13 +294,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, * receiveTLI. */ XLogRecPtr -GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) +GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) { WalRcvData *walrcv = WalRcv; XLogRecPtr recptr; SpinLockAcquire(&walrcv->mutex); - recptr = walrcv->receivedUpto; + recptr = walrcv->flushedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; if (receiveTLI) @@ -327,7 +327,7 @@ GetReplicationApplyDelay(void) TimestampTz chunkReplayStartTime; SpinLockAcquire(&walrcv->mutex); - receivePtr = walrcv->receivedUpto; + receivePtr = walrcv->flushedUpto; SpinLockRelease(&walrcv->mutex); replayPtr = GetXLogReplayRecPtr(NULL); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 76ec3c7dd0..928a27dbaf 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2913,7 +2913,7 @@ GetStandbyFlushRecPtr(void) * has streamed, but hasn't been replayed yet. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); + receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); ThisTimeLineID = replayTLI; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e08afc6548..9ed71139ce 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -74,19 +74,19 @@ typedef struct TimeLineID receiveStartTLI; /* - * receivedUpto-1 is the last byte position that has already been + * flushedUpto-1 is the last byte position that has already been * received, and receivedTLI is the timeline it came from. At the first * startup of walreceiver, these are set to receiveStart and * receiveStartTLI. After that, walreceiver updates these whenever it * flushes the received WAL to disk. */ - XLogRecPtr receivedUpto; + XLogRecPtr flushedUpto; TimeLineID receivedTLI; /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of - * receivedUpto before the last flush to disk. Startup process can use + * flushedUpto before the last flush to disk. Startup process can use * this to detect whether it's keeping up or not. */ XLogRecPtr latestChunkStart; @@ -322,7 +322,7 @@ extern bool WalRcvStreaming(void); extern bool WalRcvRunning(void); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname); -extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); -- 2.20.1
From 1b03eb5ada24c3b23ab8ca6db50e0c5d90d38259 Mon Sep 17 00:00:00 2001 From: Thomas Munro <tmu...@postgresql.org> Date: Mon, 9 Dec 2019 17:22:07 +1300 Subject: [PATCH 3/5] Add WalRcvGetWriteRecPtr() (new definition). A later patch will read received WAL to prefetch referenced blocks, without waiting for the data to be flushed to disk. To do that, it needs to be able to see the write pointer advancing in shared memory. The function formerly bearing name was recently renamed to WalRcvGetFlushRecPtr(), which better described what it does. Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/replication/walreceiver.c | 5 +++++ src/backend/replication/walreceiverfuncs.c | 12 ++++++++++++ src/include/replication/walreceiver.h | 10 ++++++++++ 3 files changed, 27 insertions(+) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 0bdd0c3074..e250f5583c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -245,6 +245,8 @@ WalReceiverMain(void) SpinLockRelease(&walrcv->mutex); + pg_atomic_init_u64(&WalRcv->writtenUpto, 0); + /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); @@ -985,6 +987,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) LogstreamResult.Write = recptr; } + + /* Update shared-memory status */ + pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); } /* diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 31025f97e3..96b44e2c88 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -310,6 +310,18 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) return recptr; } +/* + * Returns the last+1 byte position that walreceiver has written. + * This returns a recently written value without taking a lock. + */ +XLogRecPtr +GetWalRcvWriteRecPtr(void) +{ + WalRcvData *walrcv = WalRcv; + + return pg_atomic_read_u64(&walrcv->writtenUpto); +} + /* * Returns the replication apply delay in ms or -1 * if the apply delay info is not available diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9ed71139ce..914e6e3d44 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -16,6 +16,7 @@ #include "access/xlogdefs.h" #include "getaddrinfo.h" /* for NI_MAXHOST */ #include "pgtime.h" +#include "port/atomics.h" #include "replication/logicalproto.h" #include "replication/walsender.h" #include "storage/latch.h" @@ -142,6 +143,14 @@ typedef struct slock_t mutex; /* locks shared variables shown above */ + /* + * Like flushedUpto, but advanced after writing and before flushing, + * without the need to acquire the spin lock. Data can be read by another + * process up to this point, but shouldn't be used for data integrity + * purposes. + */ + pg_atomic_uint64 writtenUpto; + /* * force walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. But we do need atomic fetch and @@ -323,6 +332,7 @@ extern bool WalRcvRunning(void); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname); extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvWriteRecPtr(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); -- 2.20.1
From c62fde23f70ff06833d743a1c85716e15f3c813c Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 17 Mar 2020 17:26:41 +1300 Subject: [PATCH 4/5] Allow PrefetchBuffer() to report what happened. Report whether a prefetch was actually initiated due to a cache miss, so that callers can limit the number of concurrent I/Os they try to issue, without counting the prefetch calls that did nothing because the page was already in our buffers. If the requested block was already cached, return a valid buffer. This might enable future code to avoid a buffer mapping lookup, though it will need to recheck the buffer before using it because it's not pinned so could be reclaimed at any time. Report neither hit nor miss when a relation's backing file is missing, to prepare for use during recovery. This will be used to handle cases of relations that are referenced in the WAL but have been unlinked already due to actions covered by WAL records that haven't been replayed yet, after a crash. Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 38 +++++++++++++++++++++++---- src/backend/storage/buffer/localbuf.c | 17 ++++++++---- src/backend/storage/smgr/md.c | 9 +++++-- src/backend/storage/smgr/smgr.c | 10 ++++--- src/include/storage/buf_internals.h | 5 ++-- src/include/storage/bufmgr.h | 19 ++++++++++---- src/include/storage/md.h | 2 +- src/include/storage/smgr.h | 2 +- 8 files changed, 78 insertions(+), 24 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index d30aed6fd9..4ceb40a856 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -469,11 +469,13 @@ static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg); /* * Implementation of PrefetchBuffer() for shared buffers. */ -void +PrefetchBufferResult PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, ForkNumber forkNum, BlockNumber blockNum) { + PrefetchBufferResult result = { InvalidBuffer, false }; + #ifdef USE_PREFETCH BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ @@ -497,7 +499,23 @@ PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, /* If not in buffers, initiate prefetch */ if (buf_id < 0) - smgrprefetch(smgr_reln, forkNum, blockNum); + { + /* + * Try to initiate an asynchronous read. This returns false in + * recovery if the relation file doesn't exist. + */ + if (smgrprefetch(smgr_reln, forkNum, blockNum)) + result.initiated_io = true; + } + else + { + /* + * Report the buffer it was in at that time. The caller may be able + * to avoid a buffer table lookup, but it's not pinned and it must be + * rechecked! + */ + result.buffer = buf_id + 1; + } /* * If the block *is* in buffers, we do nothing. This is not really ideal: @@ -511,6 +529,8 @@ PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, * a problem to justify that. */ #endif /* USE_PREFETCH */ + + return result; } /* @@ -520,8 +540,12 @@ PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, * buffer. Instead it tries to ensure that a future ReadBuffer for the given * block will not be delayed by the I/O. Prefetching is optional. * No-op if prefetching isn't compiled in. + * + * If the block is already cached, the result includes a valid buffer that can + * be used by the caller to avoid the need for a later buffer lookup, but it's + * not pinned, so the caller must recheck it. */ -void +PrefetchBufferResult PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) { #ifdef USE_PREFETCH @@ -540,13 +564,17 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) errmsg("cannot access temporary tables of other sessions"))); /* pass it off to localbuf.c */ - PrefetchLocalBuffer(reln->rd_smgr, forkNum, blockNum); + return PrefetchLocalBuffer(reln->rd_smgr, forkNum, blockNum); } else { /* pass it to the shared buffer version */ - PrefetchSharedBuffer(reln->rd_smgr, forkNum, blockNum); + return PrefetchSharedBuffer(reln->rd_smgr, forkNum, blockNum); } +#else + PrefetchBuffer result = { InvalidBuffer, false }; + + return result; #endif /* USE_PREFETCH */ } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index b528bc9553..18a8614e9b 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -60,10 +60,12 @@ static Block GetLocalBufferStorage(void); * Do PrefetchBuffer's work for temporary relations. * No-op if prefetching isn't compiled in. */ -void +PrefetchBufferResult PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum) { + PrefetchBufferResult result = { InvalidBuffer, false }; + #ifdef USE_PREFETCH BufferTag newTag; /* identity of requested block */ LocalBufferLookupEnt *hresult; @@ -81,12 +83,17 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, if (hresult) { /* Yes, so nothing to do */ - return; + result.buffer = -hresult->id - 1; + } + else + { + /* Not in buffers, so initiate prefetch */ + smgrprefetch(smgr, forkNum, blockNum); + result.initiated_io = true; } - - /* Not in buffers, so initiate prefetch */ - smgrprefetch(smgr, forkNum, blockNum); #endif /* USE_PREFETCH */ + + return result; } diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index c5b771c531..ba12fc2077 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -525,14 +525,17 @@ mdclose(SMgrRelation reln, ForkNumber forknum) /* * mdprefetch() -- Initiate asynchronous read of the specified block of a relation */ -void +bool mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { #ifdef USE_PREFETCH off_t seekpos; MdfdVec *v; - v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL); + v = _mdfd_getseg(reln, forknum, blocknum, false, + InRecovery ? EXTENSION_RETURN_NULL : EXTENSION_FAIL); + if (v == NULL) + return false; seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); @@ -540,6 +543,8 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) (void) FilePrefetch(v->mdfd_vfd, seekpos, BLCKSZ, WAIT_EVENT_DATA_FILE_PREFETCH); #endif /* USE_PREFETCH */ + + return true; } /* diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 360b5bf5bf..c39dd533e6 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -49,7 +49,7 @@ typedef struct f_smgr bool isRedo); void (*smgr_extend) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); - void (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum, + bool (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); void (*smgr_read) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); @@ -489,11 +489,15 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, /* * smgrprefetch() -- Initiate asynchronous read of the specified block of a relation. + * + * In recovery only, this can return false to indicate that a file + * doesn't exist (presumably it has been dropped by a later WAL + * record). */ -void +bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) { - smgrsw[reln->smgr_which].smgr_prefetch(reln, forknum, blocknum); + return smgrsw[reln->smgr_which].smgr_prefetch(reln, forknum, blocknum); } /* diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 166fe334c7..e57f84ee9c 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -327,8 +327,9 @@ extern int BufTableInsert(BufferTag *tagPtr, uint32 hashcode, int buf_id); extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode); /* localbuf.c */ -extern void PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, - BlockNumber blockNum); +extern PrefetchBufferResult PrefetchLocalBuffer(SMgrRelation smgr, + ForkNumber forkNum, + BlockNumber blockNum); extern BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, bool *foundPtr); extern void MarkLocalBufferDirty(Buffer buffer); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index e00dd3ffb7..64b643569f 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -46,6 +46,15 @@ typedef enum * replay; otherwise same as RBM_NORMAL */ } ReadBufferMode; +/* + * Type returned by PrefetchBuffer(). + */ +typedef struct PrefetchBufferResult +{ + Buffer buffer; /* If valid, a hit (recheck needed!) */ + bool initiated_io; /* If true, a miss resulting in async I/O */ +} PrefetchBufferResult; + /* forward declared, to avoid having to expose buf_internals.h here */ struct WritebackContext; @@ -162,11 +171,11 @@ extern PGDLLIMPORT int32 *LocalRefCount; /* * prototypes for functions in bufmgr.c */ -extern void PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, - ForkNumber forkNum, - BlockNumber blockNum); -extern void PrefetchBuffer(Relation reln, ForkNumber forkNum, - BlockNumber blockNum); +extern PrefetchBufferResult PrefetchSharedBuffer(struct SMgrRelationData *smgr_reln, + ForkNumber forkNum, + BlockNumber blockNum); +extern PrefetchBufferResult PrefetchBuffer(Relation reln, ForkNumber forkNum, + BlockNumber blockNum); extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum); extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, diff --git a/src/include/storage/md.h b/src/include/storage/md.h index ec7630ce3b..07fd1bb7d0 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -28,7 +28,7 @@ extern bool mdexists(SMgrRelation reln, ForkNumber forknum); extern void mdunlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo); extern void mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); -extern void mdprefetch(SMgrRelation reln, ForkNumber forknum, +extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 243822137c..dc740443e2 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -92,7 +92,7 @@ extern void smgrdounlink(SMgrRelation reln, bool isRedo); extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo); extern void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); -extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, +extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); extern void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); -- 2.20.1
From 42ba0a89260d46230ac0df791fae18bfdca0092f Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 18 Mar 2020 16:35:27 +1300 Subject: [PATCH 5/5] Prefetch referenced blocks during recovery. Introduce a new GUC max_wal_prefetch_distance. If it is set to a positive number of bytes, then read ahead in the WAL at most that distance, and initiate asynchronous reading of referenced blocks. The goal is to avoid I/O stalls and benefit from concurrent I/O. The number of concurrency asynchronous reads is capped by the existing maintenance_io_concurrency GUC. The feature is disabled by default. Reviewed-by: Tomas Vondra <tomas.von...@2ndquadrant.com> Reviewed-by: Alvaro Herrera <alvhe...@2ndquadrant.com> Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com --- doc/src/sgml/config.sgml | 38 ++ doc/src/sgml/monitoring.sgml | 69 ++ doc/src/sgml/wal.sgml | 12 + src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 64 ++ src/backend/access/transam/xlogprefetcher.c | 663 ++++++++++++++++++++ src/backend/access/transam/xlogutils.c | 23 +- src/backend/catalog/system_views.sql | 11 + src/backend/replication/logical/logical.c | 2 +- src/backend/storage/buffer/bufmgr.c | 2 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/misc/guc.c | 38 +- src/include/access/xlog.h | 4 + src/include/access/xlogprefetcher.h | 28 + src/include/access/xlogutils.h | 20 + src/include/catalog/pg_proc.dat | 8 + src/include/utils/guc.h | 2 + src/test/regress/expected/rules.out | 8 + 18 files changed, 992 insertions(+), 4 deletions(-) create mode 100644 src/backend/access/transam/xlogprefetcher.c create mode 100644 src/include/access/xlogprefetcher.h diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 672bf6f1ee..8249ec0139 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3102,6 +3102,44 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-max-wal-prefetch-distance" xreflabel="max_wal_prefetch_distance"> + <term><varname>max_wal_prefetch_distance</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_wal_prefetch_distance</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + The maximum distance to look ahead in the WAL during recovery, to find + blocks to prefetch. Prefetching blocks that will soon be needed can + reduce I/O wait times. The number of concurrent prefetches is limited + by this setting as well as <xref linkend="guc-maintenance-io-concurrency"/>. + If this value is specified without units, it is taken as bytes. + The default is -1, meaning that WAL prefetching is disabled. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-wal-prefetch-fpw" xreflabel="wal_prefetch_fpw"> + <term><varname>wal_prefetch_fpw</varname> (<type>boolean</type>) + <indexterm> + <primary><varname>wal_prefetch_fpw</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Whether to prefetch blocks with full page images during recovery. + Usually this doesn't help, since such blocks will not be read. However, + on file systems with a block size larger than + <productname>PostgreSQL</productname>'s, prefetching can avoid a costly + read-before-write when a blocks are later written. + This setting has no effect unless + <xref linkend="guc-max-wal-prefetch-distance"/> is set to a positive number. + The default is off. + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> <sect2 id="runtime-config-wal-archiving"> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 987580d6df..df4291092b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -320,6 +320,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_wal_prefetcher</structname><indexterm><primary>pg_stat_wal_prefetcher</primary></indexterm></entry> + <entry>Only one row, showing statistics about blocks prefetched during recovery. + See <xref linkend="pg-stat-wal-prefetcher-view"/> for details. + </entry> + </row> + <row> <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry> <entry>At least one row per subscription, showing information about @@ -2192,6 +2199,68 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i connected server. </para> + <table id="pg-stat-wal-prefetcher-view" xreflabel="pg_stat_wal_prefetcher"> + <title><structname>pg_stat_wal_prefetcher</structname> View</title> + <tgroup cols="3"> + <thead> + <row> + <entry>Column</entry> + <entry>Type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry><structfield>prefetch</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks prefetched because they were not in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_hit</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were already in the buffer pool</entry> + </row> + <row> + <entry><structfield>skip_new</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because they were new (usually relation extension)</entry> + </row> + <row> + <entry><structfield>skip_fpw</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-wal-prefetch-fpw"/> was set to <literal>off</literal></entry> + </row> + <row> + <entry><structfield>skip_seq</structfield></entry> + <entry><type>bigint</type></entry> + <entry>Number of blocks not prefetched because of repeated or sequential access</entry> + </row> + <row> + <entry><structfield>distance</structfield></entry> + <entry><type>integer</type></entry> + <entry>How far ahead of recovery the WAL prefetcher is currently reading, in bytes</entry> + </row> + <row> + <entry><structfield>queue_depth</structfield></entry> + <entry><type>integer</type></entry> + <entry>How many prefetches have been initiated but are not yet known to have completed</entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + The <structname>pg_stat_wal_prefetcher</structname> view will contain only + one row. It is filled with nulls if recovery is not running or WAL + prefetching is not enabled. See <xref linkend="guc-max-wal-prefetch-distance"/> + for more information. The counters in this view are reset whenever the + <xref linkend="guc-max-wal-prefetch-distance"/>, + <xref linkend="guc-wal-prefetch-fpw"/> or + <xref linkend="guc-maintenance-io-concurrency"/> setting is changed and + the server configuration is reloaded. + </para> + <table id="pg-stat-subscription" xreflabel="pg_stat_subscription"> <title><structname>pg_stat_subscription</structname> View</title> <tgroup cols="3"> diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml index bd9fae544c..9e956ad2a1 100644 --- a/doc/src/sgml/wal.sgml +++ b/doc/src/sgml/wal.sgml @@ -719,6 +719,18 @@ <acronym>WAL</acronym> call being logged to the server log. This option might be replaced by a more general mechanism in the future. </para> + + <para> + The <xref linkend="guc-max-wal-prefetch-distance"/> parameter can be + used to improve I/O performance during recovery by instructing + <productname>PostgreSQL</productname> to initiate reads + of disk blocks that will soon be needed, in combination with the + <xref linkend="guc-maintenance-io-concurrency"/> parameter. The + prefetching mechanism is most likely to be effective on systems + with <varname>full_page_writes</varname> set to + <varname>off</varname> (where that is safe), and where the working + set is larger than RAM. By default, WAL prefetching is disabled. + </para> </sect1> <sect1 id="wal-internals"> diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..20e044c7c8 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogprefetcher.o \ xlogreader.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index abb227ce66..85f36ef6f4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -34,6 +34,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xloginsert.h" +#include "access/xlogprefetcher.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -105,6 +106,8 @@ int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; +int max_wal_prefetch_distance = -1; +bool wal_prefetch_fpw = false; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -806,6 +809,7 @@ static XLogSource readSource = XLOG_FROM_ANY; */ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; +static bool reset_wal_prefetcher = false; typedef struct XLogPageReadPrivate { @@ -6213,6 +6217,7 @@ CheckRequiredParameterValues(void) } } + /* * This must be called ONCE during postmaster or standalone-backend startup */ @@ -7069,6 +7074,7 @@ StartupXLOG(void) { ErrorContextCallback errcallback; TimestampTz xtime; + XLogPrefetcher *prefetcher = NULL; InRedo = true; @@ -7076,6 +7082,9 @@ StartupXLOG(void) (errmsg("redo starts at %X/%X", (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + /* the first time through, see if we need to enable prefetching */ + ResetWalPrefetcher(); + /* * main redo apply loop */ @@ -7105,6 +7114,31 @@ StartupXLOG(void) /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); + /* + * The first time through, or if any relevant settings or the + * WAL source changes, we'll restart the prefetching machinery + * as appropriate. This is simpler than trying to handle + * various complicated state changes. + */ + if (unlikely(reset_wal_prefetcher)) + { + /* If we had one already, destroy it. */ + if (prefetcher) + { + XLogPrefetcherFree(prefetcher); + prefetcher = NULL; + } + /* If we want one, create it. */ + if (max_wal_prefetch_distance > 0) + prefetcher = XLogPrefetcherAllocate(xlogreader->ReadRecPtr, + currentSource == XLOG_FROM_STREAM); + reset_wal_prefetcher = false; + } + + /* Peform WAL prefetching, if enabled. */ + if (prefetcher) + XLogPrefetcherReadAhead(prefetcher, xlogreader->ReadRecPtr); + /* * Pause WAL replay, if requested by a hot-standby session via * SetRecoveryPause(). @@ -7292,6 +7326,8 @@ StartupXLOG(void) /* * end of main redo apply loop */ + if (prefetcher) + XLogPrefetcherFree(prefetcher); if (reachedRecoveryTarget) { @@ -10155,6 +10191,24 @@ assign_xlog_sync_method(int new_sync_method, void *extra) } } +void +assign_max_wal_prefetch_distance(int new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + max_wal_prefetch_distance = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + +void +assign_wal_prefetch_fpw(bool new_value, void *extra) +{ + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + wal_prefetch_fpw = new_value; + if (AmStartupProcess()) + ResetWalPrefetcher(); +} + /* * Issue appropriate kind of fsync (if any) for an XLOG output file. @@ -11961,6 +12015,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * and move on to the next state. */ currentSource = XLOG_FROM_STREAM; + ResetWalPrefetcher(); break; case XLOG_FROM_STREAM: @@ -12390,3 +12445,12 @@ XLogRequestWalReceiverReply(void) { doRequestWalReceiverReply = true; } + +/* + * Schedule a WAL prefetcher reset, on change of relevant settings. + */ +void +ResetWalPrefetcher(void) +{ + reset_wal_prefetcher = true; +} diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c new file mode 100644 index 0000000000..715552b428 --- /dev/null +++ b/src/backend/access/transam/xlogprefetcher.c @@ -0,0 +1,663 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.c + * Prefetching support for PostgreSQL write-ahead log manager + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetcher.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetcher.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/storage_xlog.h" +#include "utils/fmgrprotos.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "port/atomics.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/smgr.h" +#include "utils/hsearch.h" + +/* + * Sample the queue depth and distance every time we replay this much WAL. + * This is used to compute avg_queue_depth and avg_distance for the log message + * that appears at the end of crash recovery. + */ +#define XLOGPREFETCHER_MONITORING_SAMPLE_STEP 32768 + +/* + * Internal state used for book-keeping. + */ +struct XLogPrefetcher +{ + /* Reader and current reading state. */ + XLogReaderState *reader; + XLogReadLocalOptions options; + bool have_record; + bool shutdown; + int next_block_id; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping required to limit concurrent prefetches. */ + XLogRecPtr *prefetch_queue; + int prefetch_queue_size; + int prefetch_head; + int prefetch_tail; + + /* Details of last prefetch to skip repeats and seq scans. */ + SMgrRelation last_reln; + RelFileNode last_rnode; + BlockNumber last_blkno; + + /* Counters used to compute avg_queue_depth and avg_distance. */ + double samples; + double queue_depth_sum; + double distance_sum; + XLogRecPtr next_sample_lsn; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that we must assume have already been dropped. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +/* + * Counters exposed in shared memory just for the benefit of monitoring + * functions. + */ +typedef struct XLogPrefetcherMonitoringStats +{ + pg_atomic_uint64 prefetch; /* Prefetches initiated. */ + pg_atomic_uint64 skip_hit; /* Blocks already buffered. */ + pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ + pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ + pg_atomic_uint64 skip_seq; /* Sequential/repeat blocks skipped. */ + int distance; /* Number of bytes ahead in the WAL. */ + int queue_depth; /* Number of I/Os possibly in progress. */ +} XLogPrefetcherMonitoringStats; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn); +static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher); + +/* + * On modern systems this is really just *counter++. On some older systems + * there might be more to it, due to inability to read and write 64 bit values + * atomically. The counters will only be written to by one process, and there + * is no ordering requirement, so there's no point in using higher overhead + * pg_atomic_fetch_add_u64(). + */ +static inline void inc_counter(pg_atomic_uint64 *counter) +{ + pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); +} + +static XLogPrefetcherMonitoringStats *MonitoringStats; + +size_t +XLogPrefetcherShmemSize(void) +{ + return sizeof(XLogPrefetcherMonitoringStats); +} + +static void +XLogPrefetcherResetMonitoringStats(void) +{ + pg_atomic_init_u64(&MonitoringStats->prefetch, 0); + pg_atomic_init_u64(&MonitoringStats->skip_hit, 0); + pg_atomic_init_u64(&MonitoringStats->skip_new, 0); + pg_atomic_init_u64(&MonitoringStats->skip_fpw, 0); + pg_atomic_init_u64(&MonitoringStats->skip_seq, 0); + MonitoringStats->distance = -1; + MonitoringStats->queue_depth = 0; +} + +void +XLogPrefetcherShmemInit(void) +{ + bool found; + + MonitoringStats = (XLogPrefetcherMonitoringStats *) + ShmemInitStruct("XLogPrefetcherMonitoringStats", + sizeof(XLogPrefetcherMonitoringStats), + &found); + if (!found) + XLogPrefetcherResetMonitoringStats(); +} + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL that is ahead of the given lsn. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming) +{ + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + XLogPrefetcher *prefetcher = palloc0(sizeof(*prefetcher)); + + prefetcher->options.nowait = true; + if (streaming) + { + /* + * We're only allowed to read as far as the WAL receiver has written. + * We don't have to wait for it to be flushed, though, as recovery + * does, so that gives us a chance to get a bit further ahead. + */ + prefetcher->options.read_upto_policy = XLRO_WALRCV_WRITTEN; + } + else + { + /* We're allowed to read as far as we can. */ + prefetcher->options.read_upto_policy = XLRO_LSN; + prefetcher->options.lsn = (XLogRecPtr) -1; + } + prefetcher->reader = XLogReaderAllocate(wal_segment_size, + NULL, + read_local_xlog_page, + &prefetcher->options); + prefetcher->filter_table = hash_create("PrefetchFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + /* + * The size of the queue is based on the maintenance_io_concurrency + * setting. In theory we might have a separate queue for each tablespace, + * but it's not clear how that should work, so for now we'll just use the + * general GUC to rate-limit all prefetching. + */ + prefetcher->prefetch_queue_size = maintenance_io_concurrency; + prefetcher->prefetch_queue = palloc0(sizeof(XLogRecPtr) * prefetcher->prefetch_queue_size); + prefetcher->prefetch_head = prefetcher->prefetch_tail = 0; + + /* Prepare to read at the given LSN. */ + ereport(LOG, + (errmsg("WAL prefetch started at %X/%X", + (uint32) (lsn << 32), (uint32) lsn))); + XLogBeginRead(prefetcher->reader, lsn); + + XLogPrefetcherResetMonitoringStats(); + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + double avg_distance = 0; + double avg_queue_depth = 0; + + /* Log final statistics. */ + if (prefetcher->samples > 0) + { + avg_distance = prefetcher->distance_sum / prefetcher->samples; + avg_queue_depth = prefetcher->queue_depth_sum / prefetcher->samples; + } + ereport(LOG, + (errmsg("WAL prefetch finished at %X/%X; " + "prefetch = " UINT64_FORMAT ", " + "skip_hit = " UINT64_FORMAT ", " + "skip_new = " UINT64_FORMAT ", " + "skip_fpw = " UINT64_FORMAT ", " + "skip_seq = " UINT64_FORMAT ", " + "avg_distance = %f, " + "avg_queue_depth = %f", + (uint32) (prefetcher->reader->EndRecPtr << 32), + (uint32) (prefetcher->reader->EndRecPtr), + pg_atomic_read_u64(&MonitoringStats->prefetch), + pg_atomic_read_u64(&MonitoringStats->skip_hit), + pg_atomic_read_u64(&MonitoringStats->skip_new), + pg_atomic_read_u64(&MonitoringStats->skip_fpw), + pg_atomic_read_u64(&MonitoringStats->skip_seq), + avg_distance, + avg_queue_depth))); + XLogReaderFree(prefetcher->reader); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher->prefetch_queue); + pfree(prefetcher); + + XLogPrefetcherResetMonitoringStats(); +} + +/* + * Read ahead in the WAL, as far as we can within the limits set by the user. + * Begin fetching any referenced blocks that are not already in the buffer + * pool. + */ +void +XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + /* If an error has occurred or we've hit the end of the WAL, do nothing. */ + if (prefetcher->shutdown) + return; + + /* + * Have any in-flight prefetches definitely completed, judging by the LSN + * that is currently being replayed? + */ + XLogPrefetcherCompletedIO(prefetcher, replaying_lsn); + + /* + * Do we already have the maximum permitted number of I/Os running + * (according to the information we have)? If so, we have to wait for at + * least one to complete, so give up early. + */ + if (XLogPrefetcherSaturated(prefetcher)) + return; + + /* Can we drop any filters yet, due to problem records begin replayed? */ + XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn); + + /* Main prefetch loop. */ + for (;;) + { + XLogReaderState *reader = prefetcher->reader; + char *error; + int64 distance; + + /* If we don't already have a record, then try to read one. */ + if (!prefetcher->have_record) + { + if (!XLogReadRecord(reader, &error)) + { + /* If we got an error, log it and give up. */ + if (error) + { + ereport(LOG, (errmsg("WAL prefetch error: %s", error))); + prefetcher->shutdown = true; + } + /* Otherwise, we'll try again later when more data is here. */ + return; + } + prefetcher->have_record = true; + prefetcher->next_block_id = 0; + } + + /* How far ahead of replay are we now? */ + distance = prefetcher->reader->ReadRecPtr - replaying_lsn; + + /* Update distance shown in shm. */ + MonitoringStats->distance = distance; + + /* Sample the averages so we can log them at end of recovery. */ + if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn)) + { + prefetcher->distance_sum += MonitoringStats->distance; + prefetcher->queue_depth_sum += MonitoringStats->queue_depth; + prefetcher->samples += 1.0; + prefetcher->next_sample_lsn = + replaying_lsn + XLOGPREFETCHER_MONITORING_SAMPLE_STEP; + } + + /* Are we too far ahead of replay? */ + if (distance >= max_wal_prefetch_distance) + break; + + /* + * If this is a record that creates a new SMGR relation, we'll avoid + * prefetching anything from that rnode until it has been replayed. + */ + if (replaying_lsn < reader->ReadRecPtr && + XLogRecGetRmid(reader) == RM_SMGR_ID && + (XLogRecGetInfo(reader) & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) XLogRecGetData(reader); + + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, + reader->ReadRecPtr); + } + + /* + * Scan the record for block references. We might already have been + * partway through processing this record when we hit maximum I/O + * concurrency, so start where we left off. + */ + for (int i = prefetcher->next_block_id; i <= reader->max_block_id; ++i) + { + PrefetchBufferResult prefetch; + DecodedBkpBlock *block = &reader->blocks[i]; + SMgrRelation reln; + + /* Ignore everything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + continue; + + /* + * If there is a full page image attached, we won't be reading the + * page, so you might thing we should skip it. However, if the + * underlying filesystem uses larger logical blocks than us, it + * might still need to perform a read-before-write some time later. + * Therefore, only prefetch if configured to do so. + */ + if (block->has_image && !wal_prefetch_fpw) + { + inc_counter(&MonitoringStats->skip_fpw); + continue; + } + + /* + * If this block will initialize a new page then it's probably an + * extension. Since it might create a new segment, we can't try + * to prefetch this block until the record has been replayed, or we + * might try to open a file that doesn't exist yet. + */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + reader->ReadRecPtr); + inc_counter(&MonitoringStats->skip_new); + continue; + } + + /* Should we skip this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, + block->blkno)) + { + inc_counter(&MonitoringStats->skip_new); + continue; + } + + /* Fast path for repeated references to the same relation. */ + if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode)) + { + /* + * If this is a repeat or sequential access, then skip it. We + * expect the kernel to detect sequential access on its own + * and do a better job than we could. + */ + if (block->blkno == prefetcher->last_blkno || + block->blkno == prefetcher->last_blkno + 1) + { + prefetcher->last_blkno = block->blkno; + inc_counter(&MonitoringStats->skip_seq); + continue; + } + + /* We can avoid calling smgropen(). */ + reln = prefetcher->last_reln; + } + else + { + /* Otherwise we have to open it. */ + reln = smgropen(block->rnode, InvalidBackendId); + prefetcher->last_rnode = block->rnode; + prefetcher->last_reln = reln; + } + prefetcher->last_blkno = block->blkno; + + /* Try to prefetch this block! */ + prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno); + if (BufferIsValid(prefetch.buffer)) + { + /* + * It was already cached, so do nothing. Perhaps in future we + * could remember the buffer so that recovery doesn't have to + * look it up again. + */ + inc_counter(&MonitoringStats->skip_hit); + } + else if (prefetch.initiated_io) + { + /* + * I/O has possibly been initiated (though we don't know if it + * was already cached by the kernel, so we just have to assume + * that it has due to lack of better information). Record + * this as an I/O in progress until eventually we replay this + * LSN. + */ + inc_counter(&MonitoringStats->prefetch); + XLogPrefetcherInitiatedIO(prefetcher, reader->ReadRecPtr); + /* + * If the queue is now full, we'll have to wait before + * processing any more blocks from this record. + */ + if (XLogPrefetcherSaturated(prefetcher)) + { + prefetcher->next_block_id = i + 1; + return; + } + } + else + { + /* + * Neither cached nor initiated. The underlying segment file + * doesn't exist. Presumably it will be unlinked by a later + * WAL record. When recovery reads this block, it will use the + * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to + * do that sort of thing while merely prefetching, so let's + * just ignore references to this relation until this record is + * replayed, and let recovery create the dummy file or complain + * if something is wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + reader->ReadRecPtr); + inc_counter(&MonitoringStats->skip_new); + } + } + + /* Advance to the next record. */ + prefetcher->have_record = false; + } +} + +/* + * Expose statistics about WAL prefetching. + */ +Datum +pg_stat_get_wal_prefetcher(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_WAL_PREFETCHER_COLS 7 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Datum values[PG_STAT_GET_WAL_PREFETCHER_COLS]; + bool nulls[PG_STAT_GET_WAL_PREFETCHER_COLS]; + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mod required, but it is not allowed in this context"))); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + if (MonitoringStats->distance < 0) + { + for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i) + nulls[i] = true; + } + else + { + for (int i = 0; i < PG_STAT_GET_WAL_PREFETCHER_COLS; ++i) + nulls[i] = false; + values[0] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->prefetch)); + values[1] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_hit)); + values[2] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_new)); + values[3] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_fpw)); + values[4] = Int64GetDatum(pg_atomic_read_u64(&MonitoringStats->skip_seq)); + values[5] = Int32GetDatum(MonitoringStats->distance); + values[6] = Int32GetDatum(MonitoringStats->queue_depth); + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the (presumably lower) block + * number there because we don't want to have to track individual + * blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } +} + +/* + * Have we replayed the records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can drop relevant filters. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of the + * time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode, + HASH_FIND, NULL); + + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * Insert an LSN into the queue. The queue must not be full already. This + * tracks the fact that we have (to the best of our knowledge) initiated an + * I/O, so that we can impose a cap on concurrent prefetching. + */ +static inline void +XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn) +{ + Assert(!XLogPrefetcherSaturated(prefetcher)); + prefetcher->prefetch_queue[prefetcher->prefetch_head++] = prefetching_lsn; + prefetcher->prefetch_head %= prefetcher->prefetch_queue_size; + MonitoringStats->queue_depth++; + Assert(MonitoringStats->queue_depth <= prefetcher->prefetch_queue_size); +} + +/* + * Have we replayed the records that caused us to initiate the oldest + * prefetches yet? That means that they're definitely finished, so we can can + * forget about them and allow ourselves to initiate more prefetches. For now + * we don't have any awareness of when I/O really completes. + */ +static inline void +XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (prefetcher->prefetch_head != prefetcher->prefetch_tail && + prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn) + { + prefetcher->prefetch_tail++; + prefetcher->prefetch_tail %= prefetcher->prefetch_queue_size; + MonitoringStats->queue_depth--; + Assert(MonitoringStats->queue_depth >= 0); + } +} + +/* + * Check if the maximum allowed number of I/Os is already in flight. + */ +static inline bool +XLogPrefetcherSaturated(XLogPrefetcher *prefetcher) +{ + return (prefetcher->prefetch_head + 1) % prefetcher->prefetch_queue_size == + prefetcher->prefetch_tail; +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index b217ffa52f..fad2acb514 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -25,6 +25,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/smgr.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -827,6 +828,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, TimeLineID tli; int count; WALReadError errinfo; + XLogReadLocalOptions *options = (XLogReadLocalOptions *) state->private_data; loc = targetPagePtr + reqLen; @@ -841,7 +843,23 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * notices recovery finishes, so we only have to maintain it for the * local process until recovery ends. */ - if (!RecoveryInProgress()) + if (options) + { + switch (options->read_upto_policy) + { + case XLRO_WALRCV_WRITTEN: + read_upto = GetWalRcvWriteRecPtr(); + break; + case XLRO_LSN: + read_upto = options->lsn; + break; + default: + read_upto = 0; + elog(ERROR, "unknown read_upto_policy value"); + break; + } + } + else if (!RecoveryInProgress()) read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); @@ -879,6 +897,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, if (loc <= read_upto) break; + if (options && options->nowait) + break; + CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b8a3f46912..7b27ac4805 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -811,6 +811,17 @@ CREATE VIEW pg_stat_wal_receiver AS FROM pg_stat_get_wal_receiver() s WHERE s.pid IS NOT NULL; +CREATE VIEW pg_stat_wal_prefetcher AS + SELECT + s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth + FROM pg_stat_get_wal_prefetcher() s; + CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5adf253583..792d90ef4c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, NULL); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 4ceb40a856..4fc391a6e4 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -572,7 +572,7 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) return PrefetchSharedBuffer(reln->rd_smgr, forkNum, blockNum); } #else - PrefetchBuffer result = { InvalidBuffer, false }; + PrefetchBufferResult result = { InvalidBuffer, false }; return result; #endif /* USE_PREFETCH */ diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 427b0d59cd..5ca98b8886 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -21,6 +21,7 @@ #include "access/nbtree.h" #include "access/subtrans.h" #include "access/twophase.h" +#include "access/xlogprefetcher.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -124,6 +125,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, PredicateLockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); + size = add_size(size, XLogPrefetcherShmemSize()); size = add_size(size, CLOGShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); @@ -212,6 +214,7 @@ CreateSharedMemoryAndSemaphores(void) * Set up xlog, clog, and buffers */ XLOGShmemInit(); + XLogPrefetcherShmemInit(); CLOGShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 68082315ac..a2a9f62160 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -197,6 +197,7 @@ static bool check_max_wal_senders(int *newval, void **extra, GucSource source); static bool check_autovacuum_work_mem(int *newval, void **extra, GucSource source); static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source); static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source); +static void assign_maintenance_io_concurrency(int newval, void *extra); static void assign_pgstat_temp_directory(const char *newval, void *extra); static bool check_application_name(char **newval, void **extra, GucSource source); static void assign_application_name(const char *newval, void *extra); @@ -1241,6 +1242,18 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"wal_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS, + gettext_noop("Prefetch blocks that have full page images in the WAL"), + gettext_noop("On some systems, there is no benefit to prefetching pages that will be " + "entirely overwritten, but if the logical page size of the filesystem is " + "larger than PostgreSQL's, this can be beneficial. This option has no " + "effect unless max_wal_prefetch_distance is set to a positive number.") + }, + &wal_prefetch_fpw, + false, + NULL, assign_wal_prefetch_fpw, NULL + }, { {"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS, @@ -2627,6 +2640,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_wal_prefetch_distance", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("Maximum number of bytes to read ahead in the WAL to prefetch referenced blocks."), + gettext_noop("Set to -1 to disable WAL prefetching."), + GUC_UNIT_BYTE + }, + &max_wal_prefetch_distance, + -1, -1, INT_MAX, + NULL, assign_max_wal_prefetch_distance, NULL + }, + { {"wal_keep_segments", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the number of WAL files held for standby servers."), @@ -2900,7 +2924,8 @@ static struct config_int ConfigureNamesInt[] = 0, #endif 0, MAX_IO_CONCURRENCY, - check_maintenance_io_concurrency, NULL, NULL + check_maintenance_io_concurrency, assign_maintenance_io_concurrency, + NULL }, { @@ -11498,6 +11523,17 @@ check_maintenance_io_concurrency(int *newval, void **extra, GucSource source) return true; } +static void +assign_maintenance_io_concurrency(int newval, void *extra) +{ +#ifdef USE_PREFETCH + /* Reset the WAL prefetcher, because a setting it depends on changed. */ + maintenance_io_concurrency = newval; + if (AmStartupProcess()) + ResetWalPrefetcher(); +#endif +} + static void assign_pgstat_temp_directory(const char *newval, void *extra) { diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 98b033fc20..82829d7854 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -111,6 +111,8 @@ extern int wal_keep_segments; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; +extern int max_wal_prefetch_distance; +extern bool wal_prefetch_fpw; extern char *XLogArchiveCommand; extern bool EnableHotStandby; extern bool fullPageWrites; @@ -319,6 +321,8 @@ extern void SetWalWriterSleeping(bool sleeping); extern void XLogRequestWalReceiverReply(void); +extern void ResetWalPrefetcher(void); + extern void assign_max_wal_size(int newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra); diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h new file mode 100644 index 0000000000..585f5564a3 --- /dev/null +++ b/src/include/access/xlogprefetcher.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetcher.h + * Declarations for the XLog prefetching facility + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/access/xlogprefetcher.h + *------------------------------------------------------------------------- + */ +#ifndef XLOGPREFETCHER_H +#define XLOGPREFETCHER_H + +#include "access/xlogdefs.h" + +struct XLogPrefetcher; +typedef struct XLogPrefetcher XLogPrefetcher; + +extern XLogPrefetcher *XLogPrefetcherAllocate(XLogRecPtr lsn, bool streaming); +extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher); +extern void XLogPrefetcherReadAhead(XLogPrefetcher *prefetch, XLogRecPtr replaying_lsn); + +extern size_t XLogPrefetcherShmemSize(void); +extern void XLogPrefetcherShmemInit(void); + +#endif diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..1c8e67d74a 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,6 +47,26 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); +/* + * A pointer to an XLogReadLocalOptions struct can supplied as the private + * data for an xlog reader, causing read_local_xlog_page to modify its + * behavior. + */ +typedef struct XLogReadLocalOptions +{ + /* Don't block waiting for new WAL to arrive. */ + bool nowait; + + /* How far to read. */ + enum { + XLRO_WALRCV_WRITTEN, + XLRO_LSN + } read_upto_policy; + + /* If read_upto_policy is XLRO_LSN, the LSN. */ + XLogRecPtr lsn; +} XLogReadLocalOptions; + extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7fb574f9dc..742741afa1 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6082,6 +6082,14 @@ prorettype => 'bool', proargtypes => '', prosrc => 'pg_is_wal_replay_paused' }, +{ oid => '9085', descr => 'statistics: information about WAL prefetching', + proname => 'pg_stat_get_wal_prefetcher', prorows => '1', provolatile => 'v', + proretset => 't', prorettype => 'record', proargtypes => '', + proallargtypes => '{int8,int8,int8,int8,int8,int4,int4}', + proargmodes => '{o,o,o,o,o,o,o}', + proargnames => '{prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth}', + prosrc => 'pg_stat_get_wal_prefetcher' }, + { oid => '2621', descr => 'reload configuration files', proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool', proargtypes => '', prosrc => 'pg_reload_conf' }, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index ce93ace76c..7d076a9743 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -438,5 +438,7 @@ extern void assign_search_path(const char *newval, void *extra); /* in access/transam/xlog.c */ extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +extern void assign_max_wal_prefetch_distance(int new_value, void *extra); +extern void assign_wal_prefetch_fpw(bool new_value, void *extra); #endif /* GUC_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index c7304611c3..63bbb796fc 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2102,6 +2102,14 @@ pg_stat_user_tables| SELECT pg_stat_all_tables.relid, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname <> ALL (ARRAY['pg_catalog'::name, 'information_schema'::name])) AND (pg_stat_all_tables.schemaname !~ '^pg_toast'::text)); +pg_stat_wal_prefetcher| SELECT s.prefetch, + s.skip_hit, + s.skip_new, + s.skip_fpw, + s.skip_seq, + s.distance, + s.queue_depth + FROM pg_stat_get_wal_prefetcher() s(prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth); pg_stat_wal_receiver| SELECT s.pid, s.status, s.receive_start_lsn, -- 2.20.1