Hi, I was trying to learn enough about the new bulk_write.c to figure out what might be going wrong over at [1], and finished up doing this exercise, which is experiment quality but passes basic tests. It's a bit like v1-0013 and v1-0014's experimental vectored checkpointing from [2] (which themselves are not currently proposed, that too was in the experiment category), but this usage is a lot simpler and might be worth considering. Presumably both things would eventually finish up being done by (not yet proposed) streaming write, but could also be done directly in this simple case.
This way, CREATE INDEX generates 128kB pwritev() calls instead of 8kB pwrite() calls. (There's a magic 16 in there, we'd probably need to think harder about that.) It'd be better if bulk_write.c's memory management were improved: if buffers were mostly contiguous neighbours instead of being separately palloc'd objects, you'd probably often get 128kB pwrite() instead of pwritev(), which might be marginally more efficient. This made me wonder why smgrwritev() and smgrextendv() shouldn't be backed by the same implementation, since they are essentially the same operation. The differences are some assertions which might as well be moved up to the smgr.c level as they must surely apply to any future smgr implementation too, right?, and the segment file creation policy which can be controlled with a new argument. I tried that here. An alternative would be for md.c to have a workhorse function that both mdextendv() and mdwritev() call, but I'm not sure if there's much point in that. While thinking about that I realised that an existing write-or-extend assertion in master is wrong because it doesn't add nblocks. Hmm, it's a bit weird that we have nblocks as int or BlockNumber in various places, which I think should probably be fixed. [1] https://www.postgresql.org/message-id/flat/CA%2BhUKGK%2B5DOmLaBp3Z7C4S-Yv6yoROvr1UncjH2S1ZbPT8D%2BZg%40mail.gmail.com [2] https://www.postgresql.org/message-id/CA%2BhUKGJkOiOCa%2Bmag4BF%2BzHo7qo%3Do9CFheB8%3Dg6uT5TUm2gkvA%40mail.gmail.com
From 4611cb121bbfa787ddbba4bc0e80ac6c732345d0 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 9 Mar 2024 16:04:21 +1300 Subject: [PATCH 1/2] Provide vectored variant of smgrextend(). Since mdwrite() and mdextend() were basically the same, merge them. They had different assertions, but those would surely apply to any implementation of smgr, so move them up to the smgr.c wrapper level. The other difference was the policy on segment creation, but that can be captured by having smgrwritev() and smgrextendv() call a single mdwritev() function with a new "extend" flag. --- src/backend/storage/smgr/md.c | 91 ++++----------------------------- src/backend/storage/smgr/smgr.c | 28 ++++++---- src/include/storage/md.h | 3 +- src/include/storage/smgr.h | 12 ++++- 4 files changed, 40 insertions(+), 94 deletions(-) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index bf0f3ca76d..80716f11e1 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -447,74 +447,6 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo) pfree(path); } -/* - * mdextend() -- Add a block to the specified relation. - * - * The semantics are nearly the same as mdwrite(): write at the - * specified position. However, this is to be used for the case of - * extending a relation (i.e., blocknum is at or beyond the current - * EOF). Note that we assume writing a block beyond current EOF - * causes intervening file space to become filled with zeroes. - */ -void -mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - const void *buffer, bool skipFsync) -{ - off_t seekpos; - int nbytes; - MdfdVec *v; - - /* If this build supports direct I/O, the buffer must be I/O aligned. */ - if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ) - Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer)); - - /* This assert is too expensive to have on normally ... */ -#ifdef CHECK_WRITE_VS_EXTEND - Assert(blocknum >= mdnblocks(reln, forknum)); -#endif - - /* - * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any - * more --- we mustn't create a block whose number actually is - * InvalidBlockNumber. (Note that this failure should be unreachable - * because of upstream checks in bufmgr.c.) - */ - if (blocknum == InvalidBlockNumber) - ereport(ERROR, - (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("cannot extend file \"%s\" beyond %u blocks", - relpath(reln->smgr_rlocator, forknum), - InvalidBlockNumber))); - - v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE); - - seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); - - Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); - - if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ) - { - if (nbytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not extend file \"%s\": %m", - FilePathName(v->mdfd_vfd)), - errhint("Check free disk space."))); - /* short write: complain appropriately */ - ereport(ERROR, - (errcode(ERRCODE_DISK_FULL), - errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u", - FilePathName(v->mdfd_vfd), - nbytes, BLCKSZ, blocknum), - errhint("Check free disk space."))); - } - - if (!skipFsync && !SmgrIsTemp(reln)) - register_dirty_segment(reln, forknum, v); - - Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE)); -} - /* * mdzeroextend() -- Add new zeroed out blocks to the specified relation. * @@ -920,19 +852,14 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, /* * mdwritev() -- Write the supplied blocks at the appropriate location. * - * This is to be used only for updating already-existing blocks of a - * relation (ie, those before the current EOF). To extend a relation, - * use mdextend(). + * Note that smgrextendv() and smgrwritev() are different operations, but both + * are handled here. */ void mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - const void **buffers, BlockNumber nblocks, bool skipFsync) + const void **buffers, BlockNumber nblocks, + bool extend, bool skipFsync) { - /* This assert is too expensive to have on normally ... */ -#ifdef CHECK_WRITE_VS_EXTEND - Assert(blocknum < mdnblocks(reln, forknum)); -#endif - while (nblocks > 0) { struct iovec iov[PG_IOV_MAX]; @@ -945,6 +872,8 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, size_t size_this_segment; v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, + extend ? + EXTENSION_CREATE : EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY); seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE)); @@ -1638,7 +1567,7 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, { /* * Normally we will create new segments only if authorized by the - * caller (i.e., we are doing mdextend()). But when doing WAL + * caller (i.e., we are doing smgrextend()). But when doing WAL * recovery, create segments anyway; this allows cases such as * replaying WAL data that has a write into a high-numbered * segment of a relation that was later deleted. We want to go @@ -1655,9 +1584,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, char *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO); - mdextend(reln, forknum, - nextsegno * ((BlockNumber) RELSEG_SIZE) - 1, - zerobuf, skipFsync); + smgrextend(reln, forknum, + nextsegno * ((BlockNumber) RELSEG_SIZE) - 1, + zerobuf, skipFsync); pfree(zerobuf); } flags = O_CREAT; diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index a5b18328b8..3287fc26d1 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -82,8 +82,6 @@ typedef struct f_smgr bool (*smgr_exists) (SMgrRelation reln, ForkNumber forknum); void (*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo); - void (*smgr_extend) (SMgrRelation reln, ForkNumber forknum, - BlockNumber blocknum, const void *buffer, bool skipFsync); void (*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks, bool skipFsync); bool (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum, @@ -94,7 +92,7 @@ typedef struct f_smgr void (*smgr_writev) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, - bool skipFsync); + bool extend, bool skipFsync); void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); @@ -114,7 +112,6 @@ static const f_smgr smgrsw[] = { .smgr_create = mdcreate, .smgr_exists = mdexists, .smgr_unlink = mdunlink, - .smgr_extend = mdextend, .smgr_zeroextend = mdzeroextend, .smgr_prefetch = mdprefetch, .smgr_readv = mdreadv, @@ -525,7 +522,7 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo) /* - * smgrextend() -- Add a new block to a file. + * smgrextendv() -- Add new blocks to a file. * * The semantics are nearly the same as smgrwrite(): write at the * specified position. However, this is to be used for the case of @@ -534,11 +531,16 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo) * causes intervening file space to become filled with zeroes. */ void -smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - const void *buffer, bool skipFsync) +smgrextendv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync) { - smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum, - buffer, skipFsync); + /* This assert is too expensive to have on normally ... */ +#ifdef CHECK_WRITE_VS_EXTEND + Assert(blocknum >= smgrnblocks(reln, forknum)); +#endif + + smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum, + buffers, nblocks, true, skipFsync); /* * Normally we expect this to increase nblocks by one, but if the cached @@ -633,8 +635,14 @@ void smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void **buffers, BlockNumber nblocks, bool skipFsync) { + /* This assert is too expensive to have on normally ... */ +#ifdef CHECK_WRITE_VS_EXTEND + Assert(blocknum + nblocks <= smgrnblocks(reln, forknum)); +#endif + smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum, - buffers, nblocks, skipFsync); + buffers, nblocks, false, + skipFsync); } /* diff --git a/src/include/storage/md.h b/src/include/storage/md.h index 620f10abde..8746a48a5f 100644 --- a/src/include/storage/md.h +++ b/src/include/storage/md.h @@ -36,7 +36,8 @@ extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, void **buffers, BlockNumber nblocks); extern void mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, - const void **buffers, BlockNumber nblocks, bool skipFsync); + const void **buffers, BlockNumber nblocks, + bool extend, bool skipFsync); extern void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index fc5f883ce1..7ffea4332d 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -86,8 +86,9 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator); extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo); extern void smgrdosyncall(SMgrRelation *rels, int nrels); extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo); -extern void smgrextend(SMgrRelation reln, ForkNumber forknum, - BlockNumber blocknum, const void *buffer, bool skipFsync); +extern void smgrextendv(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, + const void **buffers, BlockNumber nblocks, bool skipFsync); extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks, bool skipFsync); extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum, @@ -124,4 +125,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync); } +static inline void +smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void *buffer, bool skipFsync) +{ + smgrextendv(reln, forknum, blocknum, &buffer, 1, skipFsync); +} + #endif /* SMGR_H */ -- 2.43.2
From 3823d8a25fbf41ea2abf396329ce465cca74727b Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 9 Mar 2024 16:54:56 +1300 Subject: [PATCH 2/2] Use vectored I/O for bulk writes. Zero-extend using smgrzeroextend(). Extend using smgrextendv(). Write using smgrwritev(). --- src/backend/storage/smgr/bulk_write.c | 83 +++++++++++++++++++-------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c index 4a10ece4c3..500a14fe30 100644 --- a/src/backend/storage/smgr/bulk_write.c +++ b/src/backend/storage/smgr/bulk_write.c @@ -8,7 +8,7 @@ * the regular buffer manager and the bulk loading interface! * * We bypass the buffer manager to avoid the locking overhead, and call - * smgrextend() directly. A downside is that the pages will need to be + * smgrextendv() directly. A downside is that the pages will need to be * re-read into shared buffers on first use after the build finishes. That's * usually a good tradeoff for large relations, and for small relations, the * overhead isn't very significant compared to creating the relation in the @@ -45,8 +45,6 @@ #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID -static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */ - typedef struct PendingWrite { BulkWriteBuffer buf; @@ -225,35 +223,70 @@ smgr_bulk_flush(BulkWriteState *bulkstate) for (int i = 0; i < npending; i++) { - BlockNumber blkno = pending_writes[i].blkno; - Page page = pending_writes[i].buf->data; - + Page page; + const void *pages[16]; + BlockNumber blkno; + int nblocks; + int max_nblocks; + + /* Prepare to write the first block. */ + blkno = pending_writes[i].blkno; + page = pending_writes[i].buf->data; PageSetChecksumInplace(page, blkno); + pages[0] = page; + nblocks = 1; - if (blkno >= bulkstate->pages_written) + /* Zero-extend any missing space before the first block. */ + if (blkno > bulkstate->pages_written) + { + int nzeroblocks; + + nzeroblocks = blkno - bulkstate->pages_written; + smgrzeroextend(bulkstate->smgr, bulkstate->forknum, + bulkstate->pages_written, nzeroblocks, true); + bulkstate->pages_written += nzeroblocks; + } + + if (blkno < bulkstate->pages_written) { /* - * If we have to write pages nonsequentially, fill in the space - * with zeroes until we come back and overwrite. This is not - * logically necessary on standard Unix filesystems (unwritten - * space will read as zeroes anyway), but it should help to avoid - * fragmentation. The dummy pages aren't WAL-logged though. + * We're overwriting. Clamp at the existing size, because we can't + * mix writing and extending in a single operation. */ - while (blkno > bulkstate->pages_written) - { - /* don't set checksum for all-zero page */ - smgrextend(bulkstate->smgr, bulkstate->forknum, - bulkstate->pages_written++, - &zero_buffer, - true); - } - - smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true); - bulkstate->pages_written = pending_writes[i].blkno + 1; + max_nblocks = Min(lengthof(pages), + bulkstate->pages_written - blkno); + } + else + { + /* We're extending. */ + Assert(blkno == bulkstate->pages_written); + max_nblocks = lengthof(pages); + } + + /* Find as many consecutive blocks as we can. */ + while (i + 1 < npending && + pending_writes[i + 1].blkno == blkno + nblocks && + nblocks < max_nblocks) + { + page = pending_writes[++i].buf->data; + PageSetChecksumInplace(page, pending_writes[i].blkno); + pages[nblocks++] = page; + } + + /* Extend or overwrite. */ + if (blkno == bulkstate->pages_written) + { + smgrextendv(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true); + bulkstate->pages_written += nblocks; } else - smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true); - pfree(page); + { + Assert(blkno + nblocks <= bulkstate->pages_written); + smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno, pages, nblocks, true); + } + + for (int j = 0; j < nblocks; ++j) + pfree(pending_writes[i - j].buf->data); } bulkstate->npending = 0; -- 2.43.2