Hi, The attached patchset implements checkpointer write combining -- which makes immediate checkpoints at least 20% faster in my tests. Checkpointer achieves higher write throughput and higher write IOPs with the patch.
Besides the immediate performance gain with the patchset, we will eventually need all writers to do write combining if we want to use direct IO. Additionally, I think the general shape I refactored BufferSync() into will be useful for AIO-ifying checkpointer. The patch set has preliminary patches (0001-0004) that implement eager flushing and write combining for bulkwrites (like COPY FROM). The functions used to flush a batch of writes for bulkwrites (see 0004) are reused for the checkpointer. The eager flushing component of this patch set has been discussed elsewhere [1]. 0005 implements a fix for XLogNeedsFlush() when called by checkpointer during an end-of-crash-recovery checkpoint. I've already started another thread about this [2], but the patch is required for the patch set to pass tests. One outstanding action item is to test to see if there are any benefits to spread checkpoints. More on how I measured the performance benefit to immediate checkpoints: I tuned checkpoint_completion_target, checkpoint_timeout, and min and max_wal_size to ensure no other checkpoints were initiated. With 16 GB shared buffers and io_combine_limit 128, I created a 15 GB table. To get consistent results, I used pg_prewarm to read the table into shared buffers, issued a checkpoint, then used Bilal's patch [3] to mark all the buffers as dirty again and issue another checkpoint. On a fast local SSD, this proved to be a consistent 20%+ speed up (~6.5 seconds to ~5 seconds). - Melanie [1] https://www.postgresql.org/message-id/caakru_yjn4mvn9nbxtmscqsgwup45coa4e05nhr7adp-v0w...@mail.gmail.com [2] https://www.postgresql.org/message-id/CAAKRu_a1vZRZRWO3_jv_X13RYoqLRVipGO0237g5PKzPa2YX6g%40mail.gmail.com [3] https://www.postgresql.org/message-id/flat/CAN55FZ0h_YoSqqutxV6DES1RW8ig6wcA8CR9rJk358YRMxZFmw%40mail.gmail.com
From 3b57dbff6412f3864633eecd0d153d862e1737af Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 2 Sep 2025 10:01:17 -0400 Subject: [PATCH v1 5/9] Fix XLogNeedsFlush() for checkpointer XLogNeedsFlush() takes an LSN and compares it to either the flush pointer or the min recovery point, depending on whether it is in normal operation or recovery. Even though it is technically recovery, the checkpointer must flush WAL during an end-of-recovery checkpoint, so in this case, it should compare the provided LSN to the flush pointer and not the min recovery point. If it compares the LSN to the min recovery point when the control file's min recovery point has been updated to an incorrect value, XLogNeedsFlush() can return an incorrect result of true -- even after just having flushed WAL. Change this to only compare the LSN to min recovery point -- and, potentially update the local copy of min recovery point, when xlog inserts are allowed -- which is true for the checkpointer during an end-of-recovery checkpoint, but false during crash recovery otherwise. Author: Melanie Plageman <melanieplage...@gmail.com> Reported-by: Melanie Plageman <melanieplage...@gmail.com> Discussion: https://postgr.es/m/CAAKRu_a1vZRZRWO3_jv_X13RYoqLRVipGO0237g5PKzPa2YX6g%40mail.gmail.com --- src/backend/access/transam/xlog.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7ffb2179151..16ef6d2cd64 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -3115,7 +3115,7 @@ XLogNeedsFlush(XLogRecPtr record) * instead. So "needs flush" is taken to mean whether minRecoveryPoint * would need to be updated. */ - if (RecoveryInProgress()) + if (RecoveryInProgress() && !XLogInsertAllowed()) { /* * An invalid minRecoveryPoint means that we need to recover all the -- 2.43.0
From 8d874b737771dbb9b2cb6968d79376a1b1276491 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 2 Sep 2025 11:00:44 -0400 Subject: [PATCH v1 1/9] Refactor goto into for loop in GetVictimBuffer() GetVictimBuffer() implemented a loop to optimistically lock a clean victim buffer using a goto. Future commits will add batch flushing functionality to GetVictimBuffer. The new logic works better with a regular for loop flow control. This commit is only a refactor and does not introduce any new functionality. --- src/backend/storage/buffer/bufmgr.c | 200 ++++++++++++-------------- src/backend/storage/buffer/freelist.c | 17 +++ src/include/storage/buf_internals.h | 5 + 3 files changed, 116 insertions(+), 106 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 350cc0402aa..c0f0e052135 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -68,10 +68,6 @@ #include "utils/timestamp.h" -/* Note: these two macros only work on shared buffers, not local ones! */ -#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) -#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) - /* Note: this macro only works on local buffers, not shared ones! */ #define LocalBufHdrGetBlock(bufHdr) \ LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] @@ -2356,130 +2352,122 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) ReservePrivateRefCountEntry(); ResourceOwnerEnlarge(CurrentResourceOwner); - /* we return here if a prospective victim buffer gets used concurrently */ -again: - - /* - * Select a victim buffer. The buffer is returned with its header - * spinlock still held! - */ - buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring); - buf = BufferDescriptorGetBuffer(buf_hdr); - - Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); - - /* Pin the buffer and then release the buffer spinlock */ - PinBuffer_Locked(buf_hdr); - - /* - * We shouldn't have any other pins for this buffer. - */ - CheckBufferIsPinnedOnce(buf); - - /* - * If the buffer was dirty, try to write it out. There is a race - * condition here, in that someone might dirty it after we released the - * buffer header lock above, or even while we are writing it out (since - * our share-lock won't prevent hint-bit updates). We will recheck the - * dirty bit after re-locking the buffer header. - */ - if (buf_state & BM_DIRTY) + /* Select a victim buffer using an optimistic locking scheme. */ + for (;;) { - LWLock *content_lock; + /* + * Attempt to claim a victim buffer. The buffer is returned with its + * header spinlock still held! + */ + buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring); + buf = BufferDescriptorGetBuffer(buf_hdr); - Assert(buf_state & BM_TAG_VALID); - Assert(buf_state & BM_VALID); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); + + /* Pin the buffer and then release the buffer spinlock */ + PinBuffer_Locked(buf_hdr); /* - * We need a share-lock on the buffer contents to write it out (else - * we might write invalid data, eg because someone else is compacting - * the page contents while we write). We must use a conditional lock - * acquisition here to avoid deadlock. Even though the buffer was not - * pinned (and therefore surely not locked) when StrategyGetBuffer - * returned it, someone else could have pinned and exclusive-locked it - * by the time we get here. If we try to get the lock unconditionally, - * we'd block waiting for them; if they later block waiting for us, - * deadlock ensues. (This has been observed to happen when two - * backends are both trying to split btree index pages, and the second - * one just happens to be trying to split the page the first one got - * from StrategyGetBuffer.) + * We shouldn't have any other pins for this buffer. */ - content_lock = BufferDescriptorGetContentLock(buf_hdr); - if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) - { - /* - * Someone else has locked the buffer, so give it up and loop back - * to get another one. - */ - UnpinBuffer(buf_hdr); - goto again; - } + CheckBufferIsPinnedOnce(buf); /* - * If using a nondefault strategy, and writing the buffer would - * require a WAL flush, let the strategy decide whether to go ahead - * and write/reuse the buffer or to choose another victim. We need a - * lock to inspect the page LSN, so this can't be done inside - * StrategyGetBuffer. + * If the buffer was dirty, try to write it out. There is a race + * condition here, in that someone might dirty it after we released + * the buffer header lock above, or even while we are writing it out + * (since our share-lock won't prevent hint-bit updates). We will + * recheck the dirty bit after re-locking the buffer header. */ - if (strategy != NULL) + if (buf_state & BM_DIRTY) { - XLogRecPtr lsn; + LWLock *content_lock; - /* Read the LSN while holding buffer header lock */ - buf_state = LockBufHdr(buf_hdr); - lsn = BufferGetLSN(buf_hdr); - UnlockBufHdr(buf_hdr, buf_state); + Assert(buf_state & BM_TAG_VALID); + Assert(buf_state & BM_VALID); - if (XLogNeedsFlush(lsn) - && StrategyRejectBuffer(strategy, buf_hdr, from_ring)) + /* + * We need a share-lock on the buffer contents to write it out + * (else we might write invalid data, eg because someone else is + * compacting the page contents while we write). We must use a + * conditional lock acquisition here to avoid deadlock. Even + * though the buffer was not pinned (and therefore surely not + * locked) when StrategyGetBuffer returned it, someone else could + * have pinned and exclusive-locked it by the time we get here. If + * we try to get the lock unconditionally, we'd block waiting for + * them; if they later block waiting for us, deadlock ensues. + * (This has been observed to happen when two backends are both + * trying to split btree index pages, and the second one just + * happens to be trying to split the page the first one got from + * StrategyGetBuffer.) + */ + content_lock = BufferDescriptorGetContentLock(buf_hdr); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + /* + * Someone else has locked the buffer, so give it up and loop + * back to get another one. + */ + UnpinBuffer(buf_hdr); + continue; + } + + /* + * If using a nondefault strategy, and writing the buffer would + * require a WAL flush, let the strategy decide whether to go + * ahead and write/reuse the buffer or to choose another victim. + * We need the content lock to inspect the page LSN, so this can't + * be done inside StrategyGetBuffer. + */ + if (StrategyRejectBuffer(strategy, buf_hdr, from_ring)) { LWLockRelease(content_lock); UnpinBuffer(buf_hdr); - goto again; + continue; } - } - /* OK, do the I/O */ - FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); - LWLockRelease(content_lock); + /* OK, do the I/O */ + FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); + LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag); - } + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &buf_hdr->tag); + } + if (buf_state & BM_VALID) + { + /* + * When a BufferAccessStrategy is in use, blocks evicted from + * shared buffers are counted as IOOP_EVICT in the corresponding + * context (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted + * by a strategy in two cases: 1) while initially claiming buffers + * for the strategy ring 2) to replace an existing strategy ring + * buffer because it is pinned or in use and cannot be reused. + * + * Blocks evicted from buffers already in the strategy ring are + * counted as IOOP_REUSE in the corresponding strategy context. + * + * At this point, we can accurately count evictions and reuses, + * because we have successfully claimed the valid buffer. + * Previously, we may have been forced to release the buffer due + * to concurrent pinners or erroring out. + */ + pgstat_count_io_op(IOOBJECT_RELATION, io_context, + from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0); + } - if (buf_state & BM_VALID) - { /* - * When a BufferAccessStrategy is in use, blocks evicted from shared - * buffers are counted as IOOP_EVICT in the corresponding context - * (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted by a - * strategy in two cases: 1) while initially claiming buffers for the - * strategy ring 2) to replace an existing strategy ring buffer - * because it is pinned or in use and cannot be reused. - * - * Blocks evicted from buffers already in the strategy ring are - * counted as IOOP_REUSE in the corresponding strategy context. - * - * At this point, we can accurately count evictions and reuses, - * because we have successfully claimed the valid buffer. Previously, - * we may have been forced to release the buffer due to concurrent - * pinners or erroring out. + * If the buffer has an entry in the buffer mapping table, delete it. + * This can fail because another backend could have pinned or dirtied + * the buffer. Then loop around and try again. */ - pgstat_count_io_op(IOOBJECT_RELATION, io_context, - from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0); - } + if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr)) + { + UnpinBuffer(buf_hdr); + continue; + } - /* - * If the buffer has an entry in the buffer mapping table, delete it. This - * can fail because another backend could have pinned or dirtied the - * buffer. - */ - if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr)) - { - UnpinBuffer(buf_hdr); - goto again; + break; } /* a final set of sanity checks */ diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 01909be0272..f695ce43224 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -15,6 +15,7 @@ */ #include "postgres.h" +#include "access/xlog.h" #include "pgstat.h" #include "port/atomics.h" #include "storage/buf_internals.h" @@ -833,12 +834,21 @@ IOContextForStrategy(BufferAccessStrategy strategy) * be written out and doing so would require flushing WAL too. This gives us * a chance to choose a different victim. * + * The buffer must pinned and content locked and the buffer header spinlock + * must not be held. We must have the content lock to examine the LSN. + * * Returns true if buffer manager should ask for a new victim, and false * if this buffer should be written and re-used. */ bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_ring) { + uint32 buf_state; + XLogRecPtr lsn; + + if (!strategy) + return false; + /* We only do this in bulkread mode */ if (strategy->btype != BAS_BULKREAD) return false; @@ -848,6 +858,13 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r strategy->buffers[strategy->current] != BufferDescriptorGetBuffer(buf)) return false; + buf_state = LockBufHdr(buf); + lsn = BufferGetLSN(buf); + UnlockBufHdr(buf, buf_state); + + if (!XLogNeedsFlush(lsn)) + return true; + /* * Remove the dirty buffer from the ring; necessary to prevent infinite * loop if all ring members are dirty. diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 52a71b138f7..ed65ed84034 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -428,6 +428,11 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer) /* * Internal buffer management routines */ + +/* Note: these two macros only work on shared buffers, not local ones! */ +#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) +#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) + /* bufmgr.c */ extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context); -- 2.43.0
From 8cd1a72128e25a9fccc9ed4551498f13e650fc97 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 2 Sep 2025 12:56:38 -0400 Subject: [PATCH v1 4/9] Write combining for BAS_BULKWRITE Implement write combining for users of the bulkwrite buffer access strategy (e.g. COPY FROM). When the buffer access strategy needs to clean a buffer for reuse, it already opportunistically flushes some other buffers. Now, combine any contiguous blocks from the same relation into larger writes and issue them with smgrwritev(). The performance benefit for COPY FROM is mostly noticeable for multiple concurrent COPY FROMs because a single COPY FROM is either CPU bound or bound by WAL writes. The infrastructure for flushing larger batches of IOs will be reused by checkpointer and other processes doing writes of dirty data. --- src/backend/storage/buffer/bufmgr.c | 198 ++++++++++++++++++++++++-- src/backend/storage/buffer/freelist.c | 26 ++++ src/backend/storage/page/bufpage.c | 20 +++ src/backend/utils/probes.d | 2 + src/include/storage/buf_internals.h | 32 +++++ src/include/storage/bufpage.h | 1 + src/tools/pgindent/typedefs.list | 1 + 7 files changed, 269 insertions(+), 11 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a38f1247135..80122abd9aa 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -539,6 +539,8 @@ static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber re RelFileLocator *rlocator, bool skip_pinned, XLogRecPtr *max_lsn); +static void FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *start, + uint32 max_batch_size, BufWriteBatch *batch); static void CleanVictimBuffer(BufferAccessStrategy strategy, BufferDesc *bufdesc, uint32 *buf_state, bool from_ring); static void FindAndDropRelationBuffers(RelFileLocator rlocator, @@ -4281,10 +4283,73 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } + +/* + * Given a buffer descriptor, start, from a strategy ring, strategy, that + * supports eager flushing, find additional buffers from the ring that can be + * combined into a single write batch with this buffer. + * + * max_batch_size is the maximum number of blocks that can be combined into a + * single write in general. This function, based on the block number of start, + * will determine the maximum IO size for this particular write given how much + * of the file remains. max_batch_size is provided by the caller so it doesn't + * have to be recalculated for each write. + * + * batch is an output parameter that this function will fill with the needed + * information to write this IO. + * + * This function will pin and content lock all of the buffers that it + * assembles for the IO batch. The caller is responsible for issuing the IO. + */ +static void +FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *start, + uint32 max_batch_size, BufWriteBatch *batch) +{ + BlockNumber limit; + uint32 buf_state; + + Assert(start); + batch->bufdescs[0] = start; + + buf_state = LockBufHdr(start); + batch->max_lsn = BufferGetLSN(start); + UnlockBufHdr(start, buf_state); + + batch->start = batch->bufdescs[0]->tag.blockNum; + batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); + batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + + Assert(BlockNumberIsValid(batch->start)); + + limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); + limit = Max(limit, 1); + limit = Min(max_batch_size, limit); + + /* Now assemble a run of blocks to write out. */ + for (batch->n = 1; batch->n < limit; batch->n++) + { + Buffer bufnum; + + if ((bufnum = StrategySweepNextBuffer(strategy)) == InvalidBuffer) + break; + + /* Stop when we encounter a buffer that will break the run */ + if ((batch->bufdescs[batch->n] = + PrepareOrRejectEagerFlushBuffer(bufnum, + batch->start + batch->n, + &batch->rlocator, + true, + &batch->max_lsn)) == NULL) + break; + } +} + /* * Returns the buffer descriptor of the buffer containing the next block we * should eagerly flush or or NULL when there are no further buffers to - * consider writing out. + * consider writing out. This will be the start of a new batch of buffers to + * write out. */ static BufferDesc * next_strat_buf_to_flush(BufferAccessStrategy strategy, @@ -4316,7 +4381,6 @@ CleanVictimBuffer(BufferAccessStrategy strategy, XLogRecPtr max_lsn = InvalidXLogRecPtr; LWLock *content_lock; - bool first_buffer = true; IOContext io_context = IOContextForStrategy(strategy); Assert(*buf_state & BM_DIRTY); @@ -4327,19 +4391,22 @@ CleanVictimBuffer(BufferAccessStrategy strategy, if (from_ring && strategy_supports_eager_flush(strategy)) { + uint32 max_batch_size = max_write_batch_size_for_strategy(strategy); + + /* Pin our victim again so it stays ours even after batch released */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + IncrBufferRefCount(BufferDescriptorGetBuffer(bufdesc)); + /* Clean victim buffer and find more to flush opportunistically */ StartStrategySweep(strategy); do { - DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); - content_lock = BufferDescriptorGetContentLock(bufdesc); - LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &bufdesc->tag); - /* We leave the first buffer pinned for the caller */ - if (!first_buffer) - UnpinBuffer(bufdesc); - first_buffer = false; + BufWriteBatch batch; + + FindFlushAdjacents(strategy, bufdesc, max_batch_size, &batch); + FlushBufferBatch(&batch, io_context); + CompleteWriteBatchIO(&batch, &BackendWritebackContext, io_context); } while ((bufdesc = next_strat_buf_to_flush(strategy, &max_lsn)) != NULL); } else @@ -4461,6 +4528,73 @@ except_unlock_header: return NULL; } +/* + * Given a prepared batch of buffers write them out as a vector. + */ +void +FlushBufferBatch(BufWriteBatch *batch, + IOContext io_context) +{ + BlockNumber blknums[MAX_IO_COMBINE_LIMIT]; + Block blocks[MAX_IO_COMBINE_LIMIT]; + instr_time io_start; + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + + error_context_stack = &errcallback; + + if (!XLogRecPtrIsInvalid(batch->max_lsn)) + XLogFlush(batch->max_lsn); + + if (batch->reln == NULL) + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + +#ifdef USE_ASSERT_CHECKING + for (uint32 i = 0; i < batch->n; i++) + { + BufferDesc *bufdesc = batch->bufdescs[i]; + uint32 buf_state = LockBufHdr(bufdesc); + XLogRecPtr lsn = BufferGetLSN(bufdesc); + + UnlockBufHdr(bufdesc, buf_state); + Assert(!(buf_state & BM_PERMANENT) || !XLogNeedsFlush(lsn)); + } +#endif + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n); + + /* + * XXX: All blocks should be copied and then checksummed but doing so + * takes a lot of extra memory and a future patch will eliminate this + * requirement. + */ + for (BlockNumber i = 0; i < batch->n; i++) + { + blknums[i] = batch->start + i; + blocks[i] = BufHdrGetBlock(batch->bufdescs[i]); + } + + PageSetBatchChecksumInplace((Page *) blocks, blknums, batch->n); + + io_start = pgstat_prepare_io_time(track_io_timing); + + smgrwritev(batch->reln, batch->forkno, + batch->start, (const void **) blocks, batch->n, false); + + pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE, + io_start, batch->n, BLCKSZ); + + error_context_stack = errcallback.previous; +} + /* * Prepare the buffer with budesc for writing. buf_state and lsn are output * parameters. Returns true if the buffer acutally needs writing and false @@ -4606,6 +4740,48 @@ DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, error_context_stack = errcallback.previous; } +/* + * Given a previously initialized batch with buffers that have already been + * flushed, terminate the IO on each buffer and then unlock and unpin them. + * This assumes all the buffers were locked and pinned. wb_context will be + * modified. + */ +void +CompleteWriteBatchIO(BufWriteBatch *batch, + WritebackContext *wb_context, IOContext io_context) +{ + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + + error_context_stack = &errcallback; + pgBufferUsage.shared_blks_written += batch->n; + + for (uint32 i = 0; i < batch->n; i++) + { + Buffer buffer = BufferDescriptorGetBuffer(batch->bufdescs[i]); + + errcallback.arg = batch->bufdescs[i]; + + /* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */ + TerminateBufferIO(batch->bufdescs[i], true, 0, true, false); + LWLockRelease(BufferDescriptorGetContentLock(batch->bufdescs[i])); + ReleaseBuffer(buffer); + ScheduleBufferTagForWriteback(wb_context, io_context, + &batch->bufdescs[i]->tag); + } + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n, batch->start); + error_context_stack = errcallback.previous; +} + /* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index af75c02723d..4ce70de11c9 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -843,6 +843,32 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) return NULL; } + +/* + * Determine the largest IO we can assemble from the given strategy ring given + * strategy-specific as well as global constraints on the number of pinned + * buffers and max IO size. + */ +uint32 +max_write_batch_size_for_strategy(BufferAccessStrategy strategy) +{ + uint32 max_possible_buffer_limit; + uint32 max_write_batch_size; + int strategy_pin_limit; + + max_write_batch_size = io_combine_limit; + + strategy_pin_limit = GetAccessStrategyPinLimit(strategy); + max_possible_buffer_limit = GetPinLimit(); + + max_write_batch_size = Min(strategy_pin_limit, max_write_batch_size); + max_write_batch_size = Min(max_possible_buffer_limit, max_write_batch_size); + max_write_batch_size = Max(1, max_write_batch_size); + max_write_batch_size = Min(max_write_batch_size, io_combine_limit); + Assert(max_write_batch_size < MAX_IO_COMBINE_LIMIT); + return max_write_batch_size; +} + /* * AddBufferToRing -- add a buffer to the buffer ring * diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index dbb49ed9197..fc749dd5a50 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1546,3 +1546,23 @@ PageSetChecksumInplace(Page page, BlockNumber blkno) ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno); } + +/* + * A helper to set multiple block's checksums. + */ +void +PageSetBatchChecksumInplace(Page *pages, BlockNumber *blknos, uint32 length) +{ + /* If we don't need a checksum, just return */ + if (!DataChecksumsEnabled()) + return; + + for (uint32 i = 0; i < length; i++) + { + Page page = pages[i]; + + if (PageIsNew(page)) + continue; + ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]); + } +} diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index e9e413477ba..36dd4f8375b 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -61,6 +61,8 @@ provider postgresql { probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid); probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); + probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); + probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); probe buffer__checkpoint__start(int); probe buffer__checkpoint__sync__start(); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 49914f8b46f..586e52cd01b 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -425,6 +425,34 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer) ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc); } +/* + * Used to write out multiple blocks at a time in a combined IO. bufdescs + * contains buffer descriptors for buffers containing adjacent blocks of the + * same fork of the same relation. + */ +typedef struct BufWriteBatch +{ + RelFileLocator rlocator; + ForkNumber forkno; + SMgrRelation reln; + + /* + * The BlockNumber of the first block in the run of contiguous blocks to + * be written out as a single IO. + */ + BlockNumber start; + + /* + * While assembling the buffers, we keep track of the maximum LSN so that + * we can flush WAL through this LSN before flushing the buffers. + */ + XLogRecPtr max_lsn; + + /* The number of valid buffers in bufdescs */ + uint32 n; + BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT]; +} BufWriteBatch; + /* * Internal buffer management routines */ @@ -438,6 +466,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context); extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, BufferTag *tag); +extern void FlushBufferBatch(BufWriteBatch *batch, IOContext io_context); /* solely to make it easier to write tests */ extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); @@ -447,8 +476,11 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ extern bool strategy_supports_eager_flush(BufferAccessStrategy strategy); +extern uint32 max_write_batch_size_for_strategy(BufferAccessStrategy strategy); extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy); extern void StartStrategySweep(BufferAccessStrategy strategy); +extern void CompleteWriteBatchIO(BufWriteBatch *batch, WritebackContext *wb_context, + IOContext io_context); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index aeb67c498c5..1020cb3ac78 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -507,5 +507,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, Item newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); +extern void PageSetBatchChecksumInplace(Page *pages, BlockNumber *blknos, uint32 length); #endif /* BUFPAGE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a13e8162890..9492adeee58 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -349,6 +349,7 @@ BufferManagerRelation BufferStrategyControl BufferTag BufferUsage +BufWriteBatch BuildAccumulator BuiltinScript BulkInsertState -- 2.43.0
From 62b718b0d3adbb95151ebbe8ef6d621f103458e9 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 2 Sep 2025 12:43:24 -0400 Subject: [PATCH v1 3/9] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger batch, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level as well. The heuristic for eager eviction is to only flush buffers in the strategy ring which flushing does not require flushing WAL. This patch also is a stepping stone toward AIO writes. Earlier version Reviewed-by: Kirill Reshke <reshkekir...@gmail.com> Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 166 +++++++++++++++++++++++++- src/backend/storage/buffer/freelist.c | 62 ++++++++++ src/include/storage/buf_internals.h | 3 + 3 files changed, 228 insertions(+), 3 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a0077a3f662..a38f1247135 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -534,6 +534,11 @@ static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object IOContext io_context, XLogRecPtr buffer_lsn); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static BufferDesc *next_strat_buf_to_flush(BufferAccessStrategy strategy, XLogRecPtr *lsn); +static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, + bool skip_pinned, + XLogRecPtr *max_lsn); static void CleanVictimBuffer(BufferAccessStrategy strategy, BufferDesc *bufdesc, uint32 *buf_state, bool from_ring); static void FindAndDropRelationBuffers(RelFileLocator rlocator, @@ -4276,6 +4281,31 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * Returns the buffer descriptor of the buffer containing the next block we + * should eagerly flush or or NULL when there are no further buffers to + * consider writing out. + */ +static BufferDesc * +next_strat_buf_to_flush(BufferAccessStrategy strategy, + XLogRecPtr *lsn) +{ + Buffer bufnum; + BufferDesc *bufdesc; + + while ((bufnum = StrategySweepNextBuffer(strategy)) != InvalidBuffer) + { + if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum, + InvalidBlockNumber, + NULL, + true, + lsn)) != NULL) + return bufdesc; + } + + return NULL; +} + /* * Prepare to write and write a dirty victim buffer. */ @@ -4286,6 +4316,7 @@ CleanVictimBuffer(BufferAccessStrategy strategy, XLogRecPtr max_lsn = InvalidXLogRecPtr; LWLock *content_lock; + bool first_buffer = true; IOContext io_context = IOContextForStrategy(strategy); Assert(*buf_state & BM_DIRTY); @@ -4294,11 +4325,140 @@ CleanVictimBuffer(BufferAccessStrategy strategy, if (!PrepareFlushBuffer(bufdesc, buf_state, &max_lsn)) return; - DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + if (from_ring && strategy_supports_eager_flush(strategy)) + { + /* Clean victim buffer and find more to flush opportunistically */ + StartStrategySweep(strategy); + do + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + /* We leave the first buffer pinned for the caller */ + if (!first_buffer) + UnpinBuffer(bufdesc); + first_buffer = false; + } while ((bufdesc = next_strat_buf_to_flush(strategy, &max_lsn)) != NULL); + } + else + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + } +} + +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, returns the block -- the pointer to the block data in memory + * -- which we will opportunistically flush or NULL if this buffer does not + * contain a block that should be flushed. + * + * require is the BlockNumber required by the caller. Some callers may require + * a specific BlockNumber to be in bufnum because they are assembling a + * contiguous run of blocks. + * + * If the caller needs the block to be from a specific relation, rlocator will + * be provided. + */ +BufferDesc * +PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, bool skip_pinned, + XLogRecPtr *max_lsn) +{ + BufferDesc *bufdesc; + uint32 buf_state; + XLogRecPtr lsn; + BlockNumber blknum; + LWLock *content_lock; + + if (!BufferIsValid(bufnum)) + return NULL; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + + /* Block may need to be in a specific relation */ + if (rlocator && + !RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag), + *rlocator)) + return NULL; + + /* Must do this before taking the buffer header spinlock. */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + buf_state = LockBufHdr(bufdesc); + + if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID)) + goto except_unlock_header; + + /* We don't include used buffers in batches */ + if (skip_pinned && + (BUF_STATE_GET_REFCOUNT(buf_state) > 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) > 1)) + goto except_unlock_header; + + /* Get page LSN while holding header lock */ + lsn = BufferGetLSN(bufdesc); + + PinBuffer_Locked(bufdesc); + CheckBufferIsPinnedOnce(bufnum); + + blknum = BufferGetBlockNumber(bufnum); + Assert(BlockNumberIsValid(blknum)); + + /* If we'll have to flush WAL to flush the block, we're done */ + if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn)) + goto except_unpin_buffer; + + /* We only include contiguous blocks in the run */ + if (BlockNumberIsValid(require) && blknum != require) + goto except_unpin_buffer; + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + goto except_unpin_buffer; + + /* + * Now that we have the content lock, we need to recheck if we need to + * flush WAL. + */ + buf_state = LockBufHdr(bufdesc); + lsn = BufferGetLSN(bufdesc); + UnlockBufHdr(bufdesc, buf_state); + + if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn)) + goto except_unlock_content; + + /* Try to start an I/O operation. */ + if (!StartBufferIO(bufdesc, false, true)) + goto except_unlock_content; + + if (lsn > *max_lsn) + *max_lsn = lsn; + buf_state = LockBufHdr(bufdesc); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(bufdesc, buf_state); + + return bufdesc; + +except_unlock_content: LWLockRelease(content_lock); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &bufdesc->tag); + +except_unpin_buffer: + UnpinBuffer(bufdesc); + return NULL; + +except_unlock_header: + UnlockBufHdr(bufdesc, buf_state); + return NULL; } /* diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index f695ce43224..af75c02723d 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -83,6 +83,15 @@ typedef struct BufferAccessStrategyData */ int current; + /* + * If the strategy supports eager flushing, we may initiate a sweep of the + * strategy ring, flushing all the dirty buffers we can cheaply flush. + * sweep_start and sweep_current keep track of a given sweep so we don't + * loop around the ring infinitely. + */ + int sweep_start; + int sweep_current; + /* * Array of buffer numbers. InvalidBuffer (that is, zero) indicates we * have not yet selected a buffer for this ring slot. For allocation @@ -181,6 +190,31 @@ have_free_buffer(void) return false; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lean to better I/O + * patterns than lazily flushing buffers directly before reusing them. + */ +bool +strategy_supports_eager_flush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -357,6 +391,34 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * Return the next buffer in the ring or InvalidBuffer if the current sweep is + * over. + */ +Buffer +StrategySweepNextBuffer(BufferAccessStrategy strategy) +{ + strategy->sweep_current++; + if (strategy->sweep_current >= strategy->nbuffers) + strategy->sweep_current = 0; + + if (strategy->sweep_current == strategy->sweep_start) + return InvalidBuffer; + + return strategy->buffers[strategy->sweep_current]; +} + +/* + * Start a sweep of the strategy ring. + */ +void +StartStrategySweep(BufferAccessStrategy strategy) +{ + if (!strategy) + return; + strategy->sweep_start = strategy->sweep_current = strategy->current; +} + /* * StrategyFreeBuffer: put a buffer on the freelist */ diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index ed65ed84034..49914f8b46f 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -446,6 +446,9 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ +extern bool strategy_supports_eager_flush(BufferAccessStrategy strategy); +extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy); +extern void StartStrategySweep(BufferAccessStrategy strategy); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); -- 2.43.0
From 66804599c04512cf572921cea1af0e4b42a2e6c2 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 2 Sep 2025 11:32:24 -0400 Subject: [PATCH v1 2/9] Split FlushBuffer() into two parts Before adding write combining to write a batch of blocks when flushing dirty buffers, refactor FlushBuffer() into the preparatory step and actual buffer flushing step. This provides better symmetry with the batch flushing code. --- src/backend/storage/buffer/bufmgr.c | 103 ++++++++++++++++++++-------- 1 file changed, 76 insertions(+), 27 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index c0f0e052135..a0077a3f662 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -529,8 +529,13 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress); static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); +static bool PrepareFlushBuffer(BufferDesc *bufdesc, uint32 *buf_state, XLogRecPtr *lsn); +static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, + IOContext io_context, XLogRecPtr buffer_lsn); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static void CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, uint32 *buf_state, bool from_ring); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -2426,12 +2431,7 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) continue; } - /* OK, do the I/O */ - FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); - LWLockRelease(content_lock); - - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag); + CleanVictimBuffer(strategy, buf_hdr, &buf_state, from_ring); } if (buf_state & BM_VALID) @@ -4269,20 +4269,81 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context) { - XLogRecPtr recptr; - ErrorContextCallback errcallback; - instr_time io_start; - Block bufBlock; - char *bufToWrite; uint32 buf_state; + XLogRecPtr lsn; + + if (PrepareFlushBuffer(buf, &buf_state, &lsn)) + DoFlushBuffer(buf, reln, io_object, io_context, lsn); +} + +/* + * Prepare to write and write a dirty victim buffer. + */ +static void +CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, uint32 *buf_state, bool from_ring) +{ + + XLogRecPtr max_lsn = InvalidXLogRecPtr; + LWLock *content_lock; + IOContext io_context = IOContextForStrategy(strategy); + + Assert(*buf_state & BM_DIRTY); + + /* Set up this victim buffer to be flushed */ + if (!PrepareFlushBuffer(bufdesc, buf_state, &max_lsn)) + return; + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + content_lock = BufferDescriptorGetContentLock(bufdesc); + LWLockRelease(content_lock); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); +} + +/* + * Prepare the buffer with budesc for writing. buf_state and lsn are output + * parameters. Returns true if the buffer acutally needs writing and false + * otherwise. + */ +static bool +PrepareFlushBuffer(BufferDesc *bufdesc, uint32 *buf_state, XLogRecPtr *lsn) +{ /* * Try to start an I/O operation. If StartBufferIO returns false, then * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false, false)) - return; + if (!StartBufferIO(bufdesc, false, false)) + return false; + + *lsn = InvalidXLogRecPtr; + *buf_state = LockBufHdr(bufdesc); + + /* + * Run PageGetLSN while holding header lock, since we don't have the + * buffer locked exclusively in all cases. + */ + if (*buf_state & BM_PERMANENT) + *lsn = BufferGetLSN(bufdesc); + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + *buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(bufdesc, *buf_state); + return true; +} + +/* + * Actually do the write I/O to clean a buffer. + */ +static void +DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, + IOContext io_context, XLogRecPtr buffer_lsn) +{ + ErrorContextCallback errcallback; + instr_time io_start; + Block bufBlock; + char *bufToWrite; /* Setup error traceback support for ereport() */ errcallback.callback = shared_buffer_write_error_callback; @@ -4300,18 +4361,6 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, reln->smgr_rlocator.locator.dbOid, reln->smgr_rlocator.locator.relNumber); - buf_state = LockBufHdr(buf); - - /* - * Run PageGetLSN while holding header lock, since we don't have the - * buffer locked exclusively in all cases. - */ - recptr = BufferGetLSN(buf); - - /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf_state &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf, buf_state); - /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL * rule that log updates must hit disk before any of the data-file changes @@ -4329,8 +4378,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf_state & BM_PERMANENT) - XLogFlush(recptr); + if (!XLogRecPtrIsInvalid(buffer_lsn)) + XLogFlush(buffer_lsn); /* * Now it's safe to write the buffer to disk. Note that no one else should -- 2.43.0
From eda89d4b1491922315222773c739b5b04f44fa4a Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 2 Sep 2025 15:22:11 -0400 Subject: [PATCH v1 6/9] Add database Oid to CkptSortItem This is useful for checkpointer write combining -- which will be added in a future commit. --- src/backend/storage/buffer/bufmgr.c | 8 ++++++++ src/include/storage/buf_internals.h | 1 + 2 files changed, 9 insertions(+) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 80122abd9aa..ab0b9246759 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -3393,6 +3393,7 @@ BufferSync(int flags) item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; item->tsId = bufHdr->tag.spcOid; + item->db_id = bufHdr->tag.dbOid; item->relNumber = BufTagGetRelNumber(&bufHdr->tag); item->forkNum = BufTagGetForkNum(&bufHdr->tag); item->blockNum = bufHdr->tag.blockNum; @@ -6712,6 +6713,13 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b) return -1; else if (a->tsId > b->tsId) return 1; + + /* compare database */ + if (a->db_id < b->db_id) + return -1; + else if (a->db_id > b->db_id) + return 1; + /* compare relation */ if (a->relNumber < b->relNumber) return -1; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 586e52cd01b..3383a674c0c 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -391,6 +391,7 @@ UnlockBufHdr(BufferDesc *desc, uint32 buf_state) typedef struct CkptSortItem { Oid tsId; + Oid db_id; RelFileNumber relNumber; ForkNumber forkNum; BlockNumber blockNum; -- 2.43.0
From cdb40b2f12663bd687bae416962fdb95ff9252cc Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 2 Sep 2025 15:42:29 -0400 Subject: [PATCH v1 7/9] Implement checkpointer data write combining When the checkpointer writes out dirty buffers, writing multiple contiguous blocks as a single IO is a substantial performance improvement. The checkpointer is usually bottlenecked on IO, so issuing larger IOs leads to increased write throughput and faster checkpoints. --- src/backend/storage/buffer/bufmgr.c | 232 ++++++++++++++++++++++++---- src/backend/utils/probes.d | 2 +- 2 files changed, 207 insertions(+), 27 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index ab0b9246759..a1d347b5966 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -512,6 +512,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); +static uint32 checkpointer_max_batch_size(void); static void BufferSync(int flags); static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, @@ -3335,7 +3336,6 @@ UnpinBufferNoOwner(BufferDesc *buf) static void BufferSync(int flags) { - uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -3347,6 +3347,8 @@ BufferSync(int flags) int i; int mask = BM_DIRTY; WritebackContext wb_context; + uint32 max_batch_size; + BufWriteBatch batch; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3377,6 +3379,7 @@ BufferSync(int flags) for (buf_id = 0; buf_id < NBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); + uint32 buf_state; /* * Header spinlock is enough to examine BM_DIRTY, see comment in @@ -3517,48 +3520,208 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + max_batch_size = checkpointer_max_batch_size(); while (!binaryheap_empty(ts_heap)) { + BlockNumber limit = max_batch_size; BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); - - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); - - bufHdr = GetBufferDescriptor(buf_id); - - num_processed++; + int ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan; + int processed = 0; /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. + * Each batch will have exactly one start and one max lsn and one + * length. */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) + batch.start = InvalidBlockNumber; + batch.max_lsn = InvalidXLogRecPtr; + batch.n = 0; + + while (batch.n < limit) { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + uint32 buf_state; + XLogRecPtr lsn = InvalidXLogRecPtr; + LWLock *content_lock; + CkptSortItem item; + + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); + + /* Check if we are done with this tablespace */ + if (ts_stat->index + processed >= ts_end) + break; + + item = CkptBufferIds[ts_stat->index + processed]; + + buf_id = item.buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + + /* + * If this is the first block of the batch, then check if we need + * to open a new relation. Open the relation now because we have + * to determine the maximum IO size based on how many blocks + * remain in the file. + */ + if (!BlockNumberIsValid(batch.start)) + { + Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0); + batch.rlocator.spcOid = item.tsId; + batch.rlocator.dbOid = item.db_id; + batch.rlocator.relNumber = item.relNumber; + batch.forkno = item.forkNum; + batch.start = item.blockNum; + batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER); + limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start); + limit = Max(1, limit); + limit = Min(limit, max_batch_size); + } + + /* + * Once we hit blocks from the next relation or fork of the + * relation, break out of the loop and issue the IO we've built up + * so far. It is important that we don't increment processed + * becasue we want to start the next IO with this item. + */ + if (item.db_id != batch.rlocator.dbOid) + break; + + if (item.relNumber != batch.rlocator.relNumber) + break; + + if (item.forkNum != batch.forkno) + break; + + /* + * It the next block is not contiguous, we can't include it in the + * IO we will issue. Break out of the loop and issue what we have + * so far. Do not count this item as processed -- otherwise we + * will end up skipping it. + */ + if (item.blockNum != batch.start + batch.n) + break; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a single bit. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since StartBufferIO will then return false. If + * the buffer doesn't need checkpointing, don't include it in the + * batch we are building. We're done with the item, so count it as + * processed and break out of the loop to issue the IO we have + * built so far. + */ + if (!(pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)) + { + processed++; + break; + } + + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + + buf_state = LockBufHdr(bufHdr); + + /* + * If the buffer doesn't need eviction, we're done with the item, + * so count it as processed and break out of the loop to issue the + * IO so far. + */ + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + processed++; + UnlockBufHdr(bufHdr, buf_state); + break; + } + + PinBuffer_Locked(bufHdr); + + /* + * There is a race condition here: it's conceivable that between + * the time we examine the buffer header for BM_CHECKPOINT_NEEDED + * above and when we are now acquiring the lock that, someone else + * not only wrote the buffer but replaced it with another page and + * dirtied it. In that improbable case, we will write the buffer + * though we didn't need to. It doesn't seem worth guarding + * against this, though. + */ + content_lock = BufferDescriptorGetContentLock(bufHdr); + + /* + * We are willing to wait for the content lock on the first IO in + * the batch. However, for subsequent IOs, waiting could lead to + * deadlock. We have to eventually flush all eligible buffers, + * though. So, if we fail to acquire the lock on a subsequent + * buffer, we break out and issue the IO we've built up so far. + * Then we come back and start a new IO with that buffer as the + * starting buffer. As such, we must not count the item as + * processed if we end up failing to acquire the content lock. + */ + if (batch.n == 0) + LWLockAcquire(content_lock, LW_SHARED); + else if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + UnpinBuffer(bufHdr); + break; + } + + /* + * If the buffer doesn't need IO, count the item as processed, + * release the buffer, and break out of the loop to issue the IO + * we have built up so far. + */ + if (!StartBufferIO(bufHdr, false, true)) + { + processed++; + LWLockRelease(content_lock); + UnpinBuffer(bufHdr); + break; } + + buf_state = LockBufHdr(bufHdr); + lsn = BufferGetLSN(bufHdr); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(bufHdr, buf_state); + + /* + * Keep track of the max LSN so that we can be sure to flush + * enough WAL before flushing data from the buffers. See comment + * in DoFlushBuffer() for more on why we don't consider the LSNs + * of unlogged relations. + */ + if (buf_state & BM_PERMANENT && lsn > batch.max_lsn) + batch.max_lsn = lsn; + + batch.bufdescs[batch.n++] = bufHdr; + processed++; } /* * Measure progress independent of actually having to flush the buffer * - otherwise writing become unbalanced. */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + num_processed += processed; + ts_stat->progress += ts_stat->progress_slice * processed; + ts_stat->num_scanned += processed; + ts_stat->index += processed; + + /* + * If we built up an IO, issue it. There's a chance we didn't find any + * items referencing buffers that needed flushing this time, but we + * still want to check if we should update the heap if we examined and + * processed the items. + */ + if (batch.n > 0) + { + FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + CompleteWriteBatchIO(&batch, &wb_context, IOCONTEXT_NORMAL); + + TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n); + PendingCheckpointerStats.buffers_written += batch.n; + num_written += batch.n; + } /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) @@ -4284,6 +4447,23 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * The maximum number of blocks that can be written out in a single batch by + * the checkpointer. + */ +static uint32 +checkpointer_max_batch_size(void) +{ + uint32 result; + uint32 pin_limit = GetPinLimit(); + + result = Max(pin_limit, 1); + result = Min(pin_limit, io_combine_limit); + result = Max(result, 1); + Assert(result < MAX_IO_COMBINE_LIMIT); + return result; +} + /* * Given a buffer descriptor, start, from a strategy ring, strategy, that diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index 36dd4f8375b..d6970731ba9 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -68,7 +68,7 @@ provider postgresql { probe buffer__checkpoint__sync__start(); probe buffer__checkpoint__done(); probe buffer__sync__start(int, int); - probe buffer__sync__written(int); + probe buffer__batch__sync__written(BlockNumber); probe buffer__sync__done(int, int, int); probe deadlock__found(); -- 2.43.0