On 19/11/2023 02:04, Andres Freund wrote:
On 2023-11-17 11:37:21 +0100, Heikki Linnakangas wrote:
The new facility makes it easier to optimize bulk loading, as the
logic for buffering, WAL-logging, and syncing the relation only needs
to be implemented once. It's also less error-prone: We have had a
number of bugs in how a relation is fsync'd - or not - at the end of a
bulk loading operation. By centralizing that logic to one place, we
only need to write it correctly once.

One thing I'd like to use the centralized handling for is to track such
writes in pg_stat_io. I don't mean as part of the initial patch, just that
that's another reason I like the facility.

Oh I didn't realize they're not counted at the moment.

+       bulkw = bulkw_start_smgr(dst, forkNum, use_wal);
+
        nblocks = smgrnblocks(src, forkNum);
for (blkno = 0; blkno < nblocks; blkno++)
        {
+               Page            page;
+
                /* If we got a cancel signal during the copy of the data, quit 
*/
                CHECK_FOR_INTERRUPTS();
- smgrread(src, forkNum, blkno, buf.data);
+               page = bulkw_alloc_buf(bulkw);
+               smgrread(src, forkNum, blkno, page);
if (!PageIsVerifiedExtended(page, blkno,
                                                                        
PIV_LOG_WARNING | PIV_REPORT_STAT))
@@ -511,30 +514,9 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
                 * page this is, so we have to log the full page including any 
unused
                 * space.
                 */
-               if (use_wal)
-                       log_newpage(&dst->smgr_rlocator.locator, forkNum, 
blkno, page, false);
-
-               PageSetChecksumInplace(page, blkno);
-
-               /*
-                * Now write the page.  We say skipFsync = true because there's 
no
-                * need for smgr to schedule an fsync for this write; we'll do 
it
-                * ourselves below.
-                */
-               smgrextend(dst, forkNum, blkno, buf.data, true);
+               bulkw_write(bulkw, blkno, page, false);

I wonder if bulkw_alloc_buf() is a good name - if you naively read this
change, it looks like it'll just leak memory. It also might be taken to be
valid until freed, which I don't think is the case?

Yeah, I'm not very happy with this interface. The model is that you get a buffer to write to by calling bulkw_alloc_buf(). Later, you hand it over to bulkw_write(), which takes ownership of it and frees it later. There is no other function to free it, although currently the buffer is just palloc'd so you could call pfree on it.

However, I'd like to not expose that detail to the callers. I'm imagining that in the future we might optimize further, by having a larger e.g. 1 MB buffer, and carve the 8kB blocks from that. Then opportunistically, if you fill the buffers sequentially, bulk_write.c could do one smgrextend() to write the whole 1 MB chunk.

I renamed it to bulkw_get_buf() now, and made it return a new BulkWriteBuffer typedef instead of a plain Page. The point of the new typedef is to distinguish a buffer returned by bulkw_get_buf() from a Page or char[BLCKSZ] that you might palloc on your own. That indeed revealed some latent bugs in gistbuild.c where I had mixed up buffers returned by bulkw_alloc_buf() and palloc'd buffers.

(The previous version of this patch called a different struct BulkWriteBuffer, but I renamed that to PendingWrite; see below. Don't be confused!)

I think this helps a little, but I'm still not very happy with it. I'll give it some more thought after sleeping over it, but in the meanwhile, I'm all ears for suggestions.

+/*-------------------------------------------------------------------------
+ *
+ * bulk_write.c
+ *       Efficiently and reliably populate a new relation
+ *
+ * The assumption is that no other backends access the relation while we are
+ * loading it, so we can take some shortcuts.  Alternatively, you can use the
+ * buffer manager as usual, if performance is not critical, but you must not
+ * mix operations through the buffer manager and the bulk loading interface at
+ * the same time.

 From "Alternatively" onward this is is somewhat confusing.

Rewrote that to just "Do not mix operations through the regular buffer manager and the bulk loading interface!"

+ * One tricky point is that because we bypass the buffer manager, we need to
+ * register the relation for fsyncing at the next checkpoint ourselves, and
+ * make sure that the relation is correctly fsync by us or the checkpointer
+ * even if a checkpoint happens concurrently.

"fsync'ed" or such? Otherwise this reads awkwardly for me.

Yep, fixed.

+typedef struct BulkWriteBuffer
+{
+       Page            page;
+       BlockNumber blkno;
+       bool            page_std;
+       int16           order;
+} BulkWriteBuffer;
+

The name makes it sound like this struct itself contains a buffer - but it's
just pointing to one. *BufferRef or such maybe?

I was wondering how you dealt with the alignment of buffers given the struct
definition, which is what lead me to look at this...

I renamed this to PendingWrite, and the field that holds these "pending_writes". Think of it as a queue of writes that haven't been performed yet.

+/*
+ * Bulk writer state for one relation fork.
+ */
+typedef struct BulkWriteState
+{
+       /* Information about the target relation we're writing */
+       SMgrRelation smgr;

Isn't there a danger of this becoming a dangling pointer? At least until
https://postgr.es/m/CA%2BhUKGJ8NTvqLHz6dqbQnt2c8XCki4r2QvXjBQcXpVwxTY_pvA%40mail.gmail.com
is merged?

Good point. I just added a FIXME comment to remind about that, hoping that that patch gets merged soon. If not, I'll come up with a different fix.

+       ForkNumber      forknum;
+       bool            use_wal;
+
+       /* We keep several pages buffered, and WAL-log them in batches */
+       int                     nbuffered;
+       BulkWriteBuffer buffers[MAX_BUFFERED_PAGES];
+
+       /* Current size of the relation */
+       BlockNumber pages_written;
+
+       /* The RedoRecPtr at the time that the bulk operation started */
+       XLogRecPtr      start_RedoRecPtr;
+
+       Page            zeropage;               /* workspace for filling zeroes 
*/

We really should just have one such page in shared memory somewhere... For WAL
writes as well.

But until then - why do you allocate the page? Seems like we could just use a
static global variable?

I made it a static global variable for now. (The palloc way was copied over from nbtsort.c)

+/*
+ * Write all buffered pages to disk.
+ */
+static void
+bulkw_flush(BulkWriteState *bulkw)
+{
+       int                     nbuffered = bulkw->nbuffered;
+       BulkWriteBuffer *buffers = bulkw->buffers;
+
+       if (nbuffered == 0)
+               return;
+
+       if (nbuffered > 1)
+       {
+               int                     o;
+
+               qsort(buffers, nbuffered, sizeof(BulkWriteBuffer), buffer_cmp);
+
+               /*
+                * Eliminate duplicates, keeping the last write of each block.
+                * (buffer_cmp uses 'order' as the last sort key)
+                */

Huh - which use cases would actually cause duplicate writes?

Hmm, nothing anymore I guess. Many AMs used to write zero pages as a placeholder and come back to fill them in later, but now that bulk_write.c handles that,

Removed that, and replaced it with with an assertion in buffer_cmp() that there are no duplicates.

--
Heikki Linnakangas
Neon (https://neon.tech)
From 50b19da1a6d4c96084fc8388e64c1c646e9b5378 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 19 Sep 2023 18:09:34 +0300
Subject: [PATCH v3 1/1] Introduce a new bulk loading facility.

The new facility makes it easier to optimize bulk loading, as the
logic for buffering, WAL-logging, and syncing the relation only needs
to be implemented once. It's also less error-prone: We have had a
number of bugs in how a relation is fsync'd - or not - at the end of a
bulk loading operation. By centralizing that logic to one place, we
only need to write it correctly once.

The new facility is faster for small relations: Instead of of calling
smgrimmedsync(), we register the fsync to happen at next checkpoint,
which avoids the fsync latency. That can make a big difference if you
are e.g. restoring a schema-only dump with lots of relations.

It is also slightly more efficient with large relations, as the WAL
logging is performed multiple pages at a time. That avoids some WAL
header overhead. The sorted GiST index build did that already, this
moves the buffering to the new facility.

The changes to pageinspect GiST test needs an explanation: Before this
patch, the sorted GiST index build set the LSN on every page to the
special GistBuildLSN value, not the LSN of the WAL record, even though
they were WAL-logged. There was no particular need for it, it just
happened naturally when we wrote out the pages before WAL-logging
them. Now we WAL-log the pages first, like in B-tree build, so the
pages are stamped with the record's real LSN. When the build is not
WAL-logged, we still use GistBuildLSN. To make the test output
predictable, use an unlogged index.

Reviewed-by: Andres Freund
Discussion: https://www.postgresql.org/message-id/30e8f366-58b3-b239-c521-422122dd5150%40iki.fi
---
 contrib/pageinspect/expected/gist.out |  14 +-
 contrib/pageinspect/sql/gist.sql      |  16 +-
 src/backend/access/gist/gistbuild.c   | 122 +++--------
 src/backend/access/heap/rewriteheap.c |  75 ++-----
 src/backend/access/nbtree/nbtree.c    |  33 +--
 src/backend/access/nbtree/nbtsort.c   | 134 ++++--------
 src/backend/access/spgist/spginsert.c |  49 ++---
 src/backend/catalog/storage.c         |  46 ++--
 src/backend/storage/smgr/Makefile     |   1 +
 src/backend/storage/smgr/bulk_write.c | 301 ++++++++++++++++++++++++++
 src/backend/storage/smgr/md.c         |  43 ++++
 src/backend/storage/smgr/meson.build  |   1 +
 src/backend/storage/smgr/smgr.c       |  31 +++
 src/include/storage/bulk_write.h      |  38 ++++
 src/include/storage/md.h              |   1 +
 src/include/storage/smgr.h            |   1 +
 src/tools/pgindent/typedefs.list      |   3 +
 17 files changed, 556 insertions(+), 353 deletions(-)
 create mode 100644 src/backend/storage/smgr/bulk_write.c
 create mode 100644 src/include/storage/bulk_write.h

diff --git a/contrib/pageinspect/expected/gist.out b/contrib/pageinspect/expected/gist.out
index d1adbab8ae2..2b1d54a6279 100644
--- a/contrib/pageinspect/expected/gist.out
+++ b/contrib/pageinspect/expected/gist.out
@@ -1,13 +1,6 @@
--- The gist_page_opaque_info() function prints the page's LSN. Normally,
--- that's constant 1 (GistBuildLSN) on every page of a freshly built GiST
--- index. But with wal_level=minimal, the whole relation is dumped to WAL at
--- the end of the transaction if it's smaller than wal_skip_threshold, which
--- updates the LSNs. Wrap the tests on gist_page_opaque_info() in the
--- same transaction with the CREATE INDEX so that we see the LSNs before
--- they are possibly overwritten at end of transaction.
-BEGIN;
--- Create a test table and GiST index.
-CREATE TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
+-- The gist_page_opaque_info() function prints the page's LSN.
+-- Use an unlogged index, so that the LSN is predictable.
+CREATE UNLOGGED TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
     generate_series(1,1000) i;
 CREATE INDEX test_gist_idx ON test_gist USING gist (p);
 -- Page 0 is the root, the rest are leaf pages
@@ -29,7 +22,6 @@ SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 2));
  0/1 | 0/0 |         1 | {leaf}
 (1 row)
 
-COMMIT;
 SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
  itemoffset |   ctid    | itemlen | dead |             keys              
 ------------+-----------+---------+------+-------------------------------
diff --git a/contrib/pageinspect/sql/gist.sql b/contrib/pageinspect/sql/gist.sql
index d263542ba15..85bc44b8000 100644
--- a/contrib/pageinspect/sql/gist.sql
+++ b/contrib/pageinspect/sql/gist.sql
@@ -1,14 +1,6 @@
--- The gist_page_opaque_info() function prints the page's LSN. Normally,
--- that's constant 1 (GistBuildLSN) on every page of a freshly built GiST
--- index. But with wal_level=minimal, the whole relation is dumped to WAL at
--- the end of the transaction if it's smaller than wal_skip_threshold, which
--- updates the LSNs. Wrap the tests on gist_page_opaque_info() in the
--- same transaction with the CREATE INDEX so that we see the LSNs before
--- they are possibly overwritten at end of transaction.
-BEGIN;
-
--- Create a test table and GiST index.
-CREATE TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
+-- The gist_page_opaque_info() function prints the page's LSN.
+-- Use an unlogged index, so that the LSN is predictable.
+CREATE UNLOGGED TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
     generate_series(1,1000) i;
 CREATE INDEX test_gist_idx ON test_gist USING gist (p);
 
@@ -17,8 +9,6 @@ SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 0));
 SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 1));
 SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 2));
 
-COMMIT;
-
 SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
 SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 1), 'test_gist_idx') LIMIT 5;
 
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index a45e2fe3755..5b7cefdcaaf 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -43,7 +43,8 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "storage/bufmgr.h"
-#include "storage/smgr.h"
+#include "storage/bulk_write.h"
+
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/tuplesort.h"
@@ -106,11 +107,8 @@ typedef struct
 	Tuplesortstate *sortstate;	/* state data for tuplesort.c */
 
 	BlockNumber pages_allocated;
-	BlockNumber pages_written;
 
-	int			ready_num_pages;
-	BlockNumber ready_blknos[XLR_MAX_BLOCK_ID];
-	Page		ready_pages[XLR_MAX_BLOCK_ID];
+	BulkWriteState *bulkw;
 } GISTBuildState;
 
 #define GIST_SORTED_BUILD_PAGE_NUM 4
@@ -142,7 +140,6 @@ static void gist_indexsortbuild_levelstate_add(GISTBuildState *state,
 											   IndexTuple itup);
 static void gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 												 GistSortedBuildLevelState *levelstate);
-static void gist_indexsortbuild_flush_ready_pages(GISTBuildState *state);
 
 static void gistInitBuffering(GISTBuildState *buildstate);
 static int	calculatePagesPerBuffer(GISTBuildState *buildstate, int levelStep);
@@ -405,27 +402,18 @@ gist_indexsortbuild(GISTBuildState *state)
 {
 	IndexTuple	itup;
 	GistSortedBuildLevelState *levelstate;
-	Page		page;
+	BulkWriteBuffer rootbuf;
 
-	state->pages_allocated = 0;
-	state->pages_written = 0;
-	state->ready_num_pages = 0;
+	/* Reserve block 0 for the root page */
+	state->pages_allocated = 1;
 
-	/*
-	 * Write an empty page as a placeholder for the root page. It will be
-	 * replaced with the real root page at the end.
-	 */
-	page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
-	smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			   page, true);
-	state->pages_allocated++;
-	state->pages_written++;
+	state->bulkw = bulkw_start_rel(state->indexrel, MAIN_FORKNUM);
 
 	/* Allocate a temporary buffer for the first leaf page batch. */
 	levelstate = palloc0(sizeof(GistSortedBuildLevelState));
-	levelstate->pages[0] = page;
+	levelstate->pages[0] = palloc(BLCKSZ);
 	levelstate->parent = NULL;
-	gistinitpage(page, F_LEAF);
+	gistinitpage(levelstate->pages[0], F_LEAF);
 
 	/*
 	 * Fill index pages with tuples in the sorted order.
@@ -455,31 +443,16 @@ gist_indexsortbuild(GISTBuildState *state)
 		levelstate = parent;
 	}
 
-	gist_indexsortbuild_flush_ready_pages(state);
-
 	/* Write out the root */
 	PageSetLSN(levelstate->pages[0], GistBuildLSN);
-	PageSetChecksumInplace(levelstate->pages[0], GIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			  levelstate->pages[0], true);
-	if (RelationNeedsWAL(state->indexrel))
-		log_newpage(&state->indexrel->rd_locator, MAIN_FORKNUM, GIST_ROOT_BLKNO,
-					levelstate->pages[0], true);
-
-	pfree(levelstate->pages[0]);
+
+	rootbuf = bulkw_get_buf(state->bulkw);
+	memcpy(rootbuf, levelstate->pages[0], BLCKSZ);
+	bulkw_write(state->bulkw, GIST_ROOT_BLKNO, rootbuf, true);
+
 	pfree(levelstate);
 
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
-	if (RelationNeedsWAL(state->indexrel))
-		smgrimmedsync(RelationGetSmgr(state->indexrel), MAIN_FORKNUM);
+	bulkw_finish(state->bulkw);
 }
 
 /*
@@ -509,8 +482,7 @@ gist_indexsortbuild_levelstate_add(GISTBuildState *state,
 			levelstate->current_page++;
 
 		if (levelstate->pages[levelstate->current_page] == NULL)
-			levelstate->pages[levelstate->current_page] =
-				palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+			levelstate->pages[levelstate->current_page] = palloc0(BLCKSZ);
 
 		newPage = levelstate->pages[levelstate->current_page];
 		gistinitpage(newPage, old_page_flags);
@@ -573,6 +545,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 	for (; dist != NULL; dist = dist->next)
 	{
 		char	   *data;
+		BulkWriteBuffer buf;
 		Page		target;
 
 		/* check once per page */
@@ -580,7 +553,8 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 
 		/* Create page and copy data */
 		data = (char *) (dist->list);
-		target = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
+		buf = bulkw_get_buf(state->bulkw);
+		target = (Page) buf;
 		gistinitpage(target, isleaf ? F_LEAF : 0);
 		for (int i = 0; i < dist->block.num; i++)
 		{
@@ -593,20 +567,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 		}
 		union_tuple = dist->itup;
 
-		if (state->ready_num_pages == XLR_MAX_BLOCK_ID)
-			gist_indexsortbuild_flush_ready_pages(state);
-
-		/*
-		 * The page is now complete. Assign a block number to it, and add it
-		 * to the list of finished pages. (We don't write it out immediately,
-		 * because we want to WAL-log the pages in batches.)
-		 */
-		blkno = state->pages_allocated++;
-		state->ready_blknos[state->ready_num_pages] = blkno;
-		state->ready_pages[state->ready_num_pages] = target;
-		state->ready_num_pages++;
-		ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
-
 		/*
 		 * Set the right link to point to the previous page. This is just for
 		 * debugging purposes: GiST only follows the right link if a page is
@@ -621,6 +581,15 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 		 */
 		if (levelstate->last_blkno)
 			GistPageGetOpaque(target)->rightlink = levelstate->last_blkno;
+
+		/*
+		 * The page is now complete. Assign a block number to it, and pass it
+		 * to the bulk writer.
+		 */
+		blkno = state->pages_allocated++;
+		PageSetLSN(target, GistBuildLSN);
+		bulkw_write(state->bulkw, blkno, buf, true);
+		ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
 		levelstate->last_blkno = blkno;
 
 		/*
@@ -631,7 +600,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 		if (parent == NULL)
 		{
 			parent = palloc0(sizeof(GistSortedBuildLevelState));
-			parent->pages[0] = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+			parent->pages[0] = palloc(BLCKSZ);
 			parent->parent = NULL;
 			gistinitpage(parent->pages[0], 0);
 
@@ -641,39 +610,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 	}
 }
 
-static void
-gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
-{
-	if (state->ready_num_pages == 0)
-		return;
-
-	for (int i = 0; i < state->ready_num_pages; i++)
-	{
-		Page		page = state->ready_pages[i];
-		BlockNumber blkno = state->ready_blknos[i];
-
-		/* Currently, the blocks must be buffered in order. */
-		if (blkno != state->pages_written)
-			elog(ERROR, "unexpected block number to flush GiST sorting build");
-
-		PageSetLSN(page, GistBuildLSN);
-		PageSetChecksumInplace(page, blkno);
-		smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
-				   true);
-
-		state->pages_written++;
-	}
-
-	if (RelationNeedsWAL(state->indexrel))
-		log_newpages(&state->indexrel->rd_locator, MAIN_FORKNUM, state->ready_num_pages,
-					 state->ready_blknos, state->ready_pages, true);
-
-	for (int i = 0; i < state->ready_num_pages; i++)
-		pfree(state->ready_pages[i]);
-
-	state->ready_num_pages = 0;
-}
-
 
 /*-------------------------------------------------------------------------
  * Routines for non-sorted build
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 424958912c7..ad848e31b1d 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -87,8 +87,8 @@
  * is optimized for bulk inserting a lot of tuples, knowing that we have
  * exclusive access to the heap.  raw_heap_insert builds new pages in
  * local storage.  When a page is full, or at the end of the process,
- * we insert it to WAL as a single record and then write it to disk
- * directly through smgr.  Note, however, that any data sent to the new
+ * we insert it to WAL as a single record and then write it to disk with
+ * the bulk smgr writer.  Note, however, that any data sent to the new
  * heap's TOAST table will go through the normal bufmgr.
  *
  *
@@ -119,9 +119,9 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
+#include "storage/bulk_write.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
-#include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -133,9 +133,11 @@ typedef struct RewriteStateData
 {
 	Relation	rs_old_rel;		/* source heap */
 	Relation	rs_new_rel;		/* destination heap */
-	Page		rs_buffer;		/* page currently being built */
+
+	BulkWriteState *rs_bulkw;
+
+	BulkWriteBuffer rs_buffer;	/* page currently being built */
 	BlockNumber rs_blockno;		/* block where page will go */
-	bool		rs_buffer_valid;	/* T if any tuples in buffer */
 	bool		rs_logical_rewrite; /* do we need to do logical rewriting */
 	TransactionId rs_oldest_xmin;	/* oldest xmin used by caller to determine
 									 * tuple visibility */
@@ -255,15 +257,16 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 
 	state->rs_old_rel = old_heap;
 	state->rs_new_rel = new_heap;
-	state->rs_buffer = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	state->rs_buffer = NULL;
 	/* new_heap needn't be empty, just locked */
 	state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
-	state->rs_buffer_valid = false;
 	state->rs_oldest_xmin = oldest_xmin;
 	state->rs_freeze_xid = freeze_xid;
 	state->rs_cutoff_multi = cutoff_multi;
 	state->rs_cxt = rw_cxt;
 
+	state->rs_bulkw = bulkw_start_rel(new_heap, MAIN_FORKNUM);
+
 	/* Initialize hash tables used to track update chains */
 	hash_ctl.keysize = sizeof(TidHashKey);
 	hash_ctl.entrysize = sizeof(UnresolvedTupData);
@@ -314,30 +317,13 @@ end_heap_rewrite(RewriteState state)
 	}
 
 	/* Write the last page, if any */
-	if (state->rs_buffer_valid)
+	if (state->rs_buffer)
 	{
-		if (RelationNeedsWAL(state->rs_new_rel))
-			log_newpage(&state->rs_new_rel->rd_locator,
-						MAIN_FORKNUM,
-						state->rs_blockno,
-						state->rs_buffer,
-						true);
-
-		PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
-
-		smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-				   state->rs_blockno, state->rs_buffer, true);
+		bulkw_write(state->rs_bulkw, state->rs_blockno, state->rs_buffer, true);
+		state->rs_buffer = NULL;
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is the same as in storage.c's RelationCopyStorage(): we're
-	 * writing data that's not in shared buffers, and so a CHECKPOINT
-	 * occurring during the rewriteheap operation won't have fsync'd data we
-	 * wrote before the checkpoint.
-	 */
-	if (RelationNeedsWAL(state->rs_new_rel))
-		smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
+	bulkw_finish(state->rs_bulkw);
 
 	logical_end_heap_rewrite(state);
 
@@ -611,7 +597,7 @@ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
 static void
 raw_heap_insert(RewriteState state, HeapTuple tup)
 {
-	Page		page = state->rs_buffer;
+	Page		page;
 	Size		pageFreeSpace,
 				saveFreeSpace;
 	Size		len;
@@ -664,7 +650,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 												   HEAP_DEFAULT_FILLFACTOR);
 
 	/* Now we can check to see if there's enough free space already. */
-	if (state->rs_buffer_valid)
+	page = (Page) state->rs_buffer;
+	if (page)
 	{
 		pageFreeSpace = PageGetHeapFreeSpace(page);
 
@@ -675,35 +662,19 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			 * contains a tuple.  Hence, unlike RelationGetBufferForTuple(),
 			 * enforce saveFreeSpace unconditionally.
 			 */
-
-			/* XLOG stuff */
-			if (RelationNeedsWAL(state->rs_new_rel))
-				log_newpage(&state->rs_new_rel->rd_locator,
-							MAIN_FORKNUM,
-							state->rs_blockno,
-							page,
-							true);
-
-			/*
-			 * Now write the page. We say skipFsync = true because there's no
-			 * need for smgr to schedule an fsync for this write; we'll do it
-			 * ourselves in end_heap_rewrite.
-			 */
-			PageSetChecksumInplace(page, state->rs_blockno);
-
-			smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-					   state->rs_blockno, page, true);
-
+			bulkw_write(state->rs_bulkw, state->rs_blockno, state->rs_buffer, true);
+			state->rs_buffer = NULL;
+			page = NULL;
 			state->rs_blockno++;
-			state->rs_buffer_valid = false;
 		}
 	}
 
-	if (!state->rs_buffer_valid)
+	if (!page)
 	{
 		/* Initialize a new empty page */
+		state->rs_buffer = bulkw_get_buf(state->rs_bulkw);
+		page = (Page) state->rs_buffer;
 		PageInit(page, BLCKSZ, 0);
-		state->rs_buffer_valid = true;
 	}
 
 	/* And now we can insert the tuple into the page */
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index a88b36a589a..38f957e6f4c 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -29,11 +29,11 @@
 #include "nodes/execnodes.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
+#include "storage/bulk_write.h"
 #include "storage/condition_variable.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
-#include "storage/smgr.h"
 #include "utils/builtins.h"
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
@@ -152,32 +152,17 @@ void
 btbuildempty(Relation index)
 {
 	bool		allequalimage = _bt_allequalimage(index, false);
-	Buffer		metabuf;
-	Page		metapage;
+	BulkWriteState *bulkw;
+	BulkWriteBuffer metabuf;
 
-	/*
-	 * Initalize the metapage.
-	 *
-	 * Regular index build bypasses the buffer manager and uses smgr functions
-	 * directly, with an smgrimmedsync() call at the end.  That makes sense
-	 * when the index is large, but for an empty index, it's better to use the
-	 * buffer cache to avoid the smgrimmedsync().
-	 */
-	metabuf = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
-	_bt_lockbuf(index, metabuf, BT_WRITE);
-
-	START_CRIT_SECTION();
-
-	metapage = BufferGetPage(metabuf);
-	_bt_initmetapage(metapage, P_NONE, 0, allequalimage);
-	MarkBufferDirty(metabuf);
-	log_newpage_buffer(metabuf, true);
+	bulkw = bulkw_start_rel(index, INIT_FORKNUM);
 
-	END_CRIT_SECTION();
+	/* Construct metapage. */
+	metabuf = bulkw_get_buf(bulkw);
+	_bt_initmetapage((Page) metabuf, P_NONE, 0, allequalimage);
+	bulkw_write(bulkw, BTREE_METAPAGE, metabuf, true);
 
-	_bt_unlockbuf(index, metabuf);
-	ReleaseBuffer(metabuf);
+	bulkw_finish(bulkw);
 }
 
 /*
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index c2665fce411..6034d7f61b2 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -23,13 +23,8 @@
  * many upper pages if the keys are reasonable-size) without risking a lot of
  * cascading splits during early insertions.
  *
- * Formerly the index pages being built were kept in shared buffers, but
- * that is of no value (since other backends have no interest in them yet)
- * and it created locking problems for CHECKPOINT, because the upper-level
- * pages were held exclusive-locked for long periods.  Now we just build
- * the pages in local memory and smgrwrite or smgrextend them as we finish
- * them.  They will need to be re-read into shared buffers on first use after
- * the build finishes.
+ * We use the bulk smgr loading facility to bypass the buffer cache and
+ * WAL log the pages efficiently.
  *
  * This code isn't concerned about the FSM at all. The caller is responsible
  * for initializing that.
@@ -57,7 +52,7 @@
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
-#include "storage/smgr.h"
+#include "storage/bulk_write.h"
 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/rel.h"
 #include "utils/sortsupport.h"
@@ -234,7 +229,7 @@ typedef struct BTBuildState
  */
 typedef struct BTPageState
 {
-	Page		btps_page;		/* workspace for page building */
+	BulkWriteBuffer btps_buf;	/* workspace for page building */
 	BlockNumber btps_blkno;		/* block # to write this page at */
 	IndexTuple	btps_lowkey;	/* page's strict lower bound pivot tuple */
 	OffsetNumber btps_lastoff;	/* last item offset loaded */
@@ -251,11 +246,9 @@ typedef struct BTWriteState
 {
 	Relation	heap;
 	Relation	index;
+	BulkWriteState *bulkw;
 	BTScanInsert inskey;		/* generic insertion scankey */
-	bool		btws_use_wal;	/* dump pages to WAL? */
 	BlockNumber btws_pages_alloced; /* # pages allocated */
-	BlockNumber btws_pages_written; /* # pages written out */
-	Page		btws_zeropage;	/* workspace for filling zeroes */
 } BTWriteState;
 
 
@@ -267,7 +260,7 @@ static void _bt_spool(BTSpool *btspool, ItemPointer self,
 static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2);
 static void _bt_build_callback(Relation index, ItemPointer tid, Datum *values,
 							   bool *isnull, bool tupleIsAlive, void *state);
-static Page _bt_blnewpage(uint32 level);
+static BulkWriteBuffer _bt_blnewpage(BTWriteState *wstate, uint32 level);
 static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
 static void _bt_slideleft(Page rightmostpage);
 static void _bt_sortaddtup(Page page, Size itemsize,
@@ -569,16 +562,17 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 	wstate.inskey = _bt_mkscankey(wstate.index, NULL);
 	/* _bt_mkscankey() won't set allequalimage without metapage */
 	wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
-	wstate.btws_use_wal = RelationNeedsWAL(wstate.index);
 
 	/* reserve the metapage */
 	wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
-	wstate.btws_pages_written = 0;
-	wstate.btws_zeropage = NULL;	/* until needed */
+
+	wstate.bulkw = bulkw_start_rel(wstate.index, MAIN_FORKNUM);
 
 	pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
 								 PROGRESS_BTREE_PHASE_LEAF_LOAD);
 	_bt_load(&wstate, btspool, btspool2);
+
+	bulkw_finish(wstate.bulkw);
 }
 
 /*
@@ -613,13 +607,15 @@ _bt_build_callback(Relation index,
 /*
  * allocate workspace for a new, clean btree page, not linked to any siblings.
  */
-static Page
-_bt_blnewpage(uint32 level)
+static BulkWriteBuffer
+_bt_blnewpage(BTWriteState *wstate, uint32 level)
 {
+	BulkWriteBuffer buf;
 	Page		page;
 	BTPageOpaque opaque;
 
-	page = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	buf = bulkw_get_buf(wstate->bulkw);
+	page = (Page) buf;
 
 	/* Zero the page and set up standard page header info */
 	_bt_pageinit(page, BLCKSZ);
@@ -634,63 +630,17 @@ _bt_blnewpage(uint32 level)
 	/* Make the P_HIKEY line pointer appear allocated */
 	((PageHeader) page)->pd_lower += sizeof(ItemIdData);
 
-	return page;
+	return buf;
 }
 
 /*
  * emit a completed btree page, and release the working storage.
  */
 static void
-_bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
+_bt_blwritepage(BTWriteState *wstate, BulkWriteBuffer buf, BlockNumber blkno)
 {
-	/* XLOG stuff */
-	if (wstate->btws_use_wal)
-	{
-		/* We use the XLOG_FPI record type for this */
-		log_newpage(&wstate->index->rd_locator, MAIN_FORKNUM, blkno, page, true);
-	}
-
-	/*
-	 * If we have to write pages nonsequentially, fill in the space with
-	 * zeroes until we come back and overwrite.  This is not logically
-	 * necessary on standard Unix filesystems (unwritten space will read as
-	 * zeroes anyway), but it should help to avoid fragmentation. The dummy
-	 * pages aren't WAL-logged though.
-	 */
-	while (blkno > wstate->btws_pages_written)
-	{
-		if (!wstate->btws_zeropage)
-			wstate->btws_zeropage = (Page) palloc_aligned(BLCKSZ,
-														  PG_IO_ALIGN_SIZE,
-														  MCXT_ALLOC_ZERO);
-		/* don't set checksum for all-zero page */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
-				   wstate->btws_pages_written++,
-				   wstate->btws_zeropage,
-				   true);
-	}
-
-	PageSetChecksumInplace(page, blkno);
-
-	/*
-	 * Now write the page.  There's no need for smgr to schedule an fsync for
-	 * this write; we'll do it ourselves before ending the build.
-	 */
-	if (blkno == wstate->btws_pages_written)
-	{
-		/* extending the file... */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				   page, true);
-		wstate->btws_pages_written++;
-	}
-	else
-	{
-		/* overwriting a block we zero-filled before */
-		smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				  page, true);
-	}
-
-	pfree(page);
+	bulkw_write(wstate->bulkw, blkno, buf, true);
+	/* bulkw_write took ownership of 'buf' */
 }
 
 /*
@@ -703,7 +653,7 @@ _bt_pagestate(BTWriteState *wstate, uint32 level)
 	BTPageState *state = (BTPageState *) palloc0(sizeof(BTPageState));
 
 	/* create initial page for level */
-	state->btps_page = _bt_blnewpage(level);
+	state->btps_buf = _bt_blnewpage(wstate, level);
 
 	/* and assign it a page position */
 	state->btps_blkno = wstate->btws_pages_alloced++;
@@ -839,6 +789,7 @@ static void
 _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 			 Size truncextra)
 {
+	BulkWriteBuffer nbuf;
 	Page		npage;
 	BlockNumber nblkno;
 	OffsetNumber last_off;
@@ -853,7 +804,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	npage = state->btps_page;
+	nbuf = state->btps_buf;
+	npage = (Page) nbuf;
 	nblkno = state->btps_blkno;
 	last_off = state->btps_lastoff;
 	last_truncextra = state->btps_lastextra;
@@ -909,6 +861,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 		/*
 		 * Finish off the page and write it out.
 		 */
+		BulkWriteBuffer obuf = nbuf;
 		Page		opage = npage;
 		BlockNumber oblkno = nblkno;
 		ItemId		ii;
@@ -916,7 +869,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 		IndexTuple	oitup;
 
 		/* Create new page of same level */
-		npage = _bt_blnewpage(state->btps_level);
+		nbuf = _bt_blnewpage(wstate, state->btps_level);
+		npage = (Page) nbuf;
 
 		/* and assign it a page position */
 		nblkno = wstate->btws_pages_alloced++;
@@ -1028,10 +982,10 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 		}
 
 		/*
-		 * Write out the old page.  We never need to touch it again, so we can
-		 * free the opage workspace too.
+		 * Write out the old page. _bt_blwritepage takes ownership of the
+		 * 'opage' buffer.
 		 */
-		_bt_blwritepage(wstate, opage, oblkno);
+		_bt_blwritepage(wstate, obuf, oblkno);
 
 		/*
 		 * Reset last_off to point to new page
@@ -1064,7 +1018,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 	_bt_sortaddtup(npage, itupsz, itup, last_off,
 				   !isleaf && last_off == P_FIRSTKEY);
 
-	state->btps_page = npage;
+	state->btps_buf = nbuf;
 	state->btps_blkno = nblkno;
 	state->btps_lastoff = last_off;
 }
@@ -1116,7 +1070,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 	BTPageState *s;
 	BlockNumber rootblkno = P_NONE;
 	uint32		rootlevel = 0;
-	Page		metapage;
+	BulkWriteBuffer metabuf;
 
 	/*
 	 * Each iteration of this loop completes one more level of the tree.
@@ -1127,7 +1081,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 		BTPageOpaque opaque;
 
 		blkno = s->btps_blkno;
-		opaque = BTPageGetOpaque(s->btps_page);
+		opaque = BTPageGetOpaque((Page) s->btps_buf);
 
 		/*
 		 * We have to link the last page on this level to somewhere.
@@ -1161,9 +1115,9 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 		 * This is the rightmost page, so the ItemId array needs to be slid
 		 * back one slot.  Then we can dump out the page.
 		 */
-		_bt_slideleft(s->btps_page);
-		_bt_blwritepage(wstate, s->btps_page, s->btps_blkno);
-		s->btps_page = NULL;	/* writepage freed the workspace */
+		_bt_slideleft((Page) s->btps_buf);
+		_bt_blwritepage(wstate, s->btps_buf, s->btps_blkno);
+		s->btps_buf = NULL;		/* writepage took ownership of the buffer */
 	}
 
 	/*
@@ -1172,10 +1126,10 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 	 * set to point to "P_NONE").  This changes the index to the "valid" state
 	 * by filling in a valid magic number in the metapage.
 	 */
-	metapage = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
-	_bt_initmetapage(metapage, rootblkno, rootlevel,
+	metabuf = bulkw_get_buf(wstate->bulkw);
+	_bt_initmetapage((Page) metabuf, rootblkno, rootlevel,
 					 wstate->inskey->allequalimage);
-	_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
+	_bt_blwritepage(wstate, metabuf, BTREE_METAPAGE);
 }
 
 /*
@@ -1422,18 +1376,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 
 	/* Close down final pages and write the metapage */
 	_bt_uppershutdown(wstate, state);
-
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
-	if (wstate->btws_use_wal)
-		smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index 4443f1918df..31b9d5244db 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -25,7 +25,7 @@
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
-#include "storage/smgr.h"
+#include "storage/bulk_write.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -155,42 +155,27 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 void
 spgbuildempty(Relation index)
 {
-	Buffer		metabuffer,
-				rootbuffer,
-				nullbuffer;
-
-	/*
-	 * Initialize the meta page and root pages
-	 */
-	metabuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	LockBuffer(metabuffer, BUFFER_LOCK_EXCLUSIVE);
-	rootbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	LockBuffer(rootbuffer, BUFFER_LOCK_EXCLUSIVE);
-	nullbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	LockBuffer(nullbuffer, BUFFER_LOCK_EXCLUSIVE);
-
-	Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO);
-	Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO);
-	Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO);
+	BulkWriteState *bulkw;
+	BulkWriteBuffer buf;
 
-	START_CRIT_SECTION();
+	bulkw = bulkw_start_rel(index, INIT_FORKNUM);
 
-	SpGistInitMetapage(BufferGetPage(metabuffer));
-	MarkBufferDirty(metabuffer);
-	SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
-	MarkBufferDirty(rootbuffer);
-	SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS);
-	MarkBufferDirty(nullbuffer);
+	/* Construct metapage. */
+	buf = bulkw_get_buf(bulkw);
+	SpGistInitMetapage((Page) buf);
+	bulkw_write(bulkw, SPGIST_METAPAGE_BLKNO, buf, true);
 
-	log_newpage_buffer(metabuffer, true);
-	log_newpage_buffer(rootbuffer, true);
-	log_newpage_buffer(nullbuffer, true);
+	/* Likewise for the root page. */
+	buf = bulkw_get_buf(bulkw);
+	SpGistInitPage((Page) buf, SPGIST_LEAF);
+	bulkw_write(bulkw, SPGIST_ROOT_BLKNO, buf, true);
 
-	END_CRIT_SECTION();
+	/* Likewise for the null-tuples root page. */
+	buf = bulkw_get_buf(bulkw);
+	SpGistInitPage((Page) buf, SPGIST_LEAF | SPGIST_NULLS);
+	bulkw_write(bulkw, SPGIST_NULL_BLKNO, buf, true);
 
-	UnlockReleaseBuffer(metabuffer);
-	UnlockReleaseBuffer(rootbuffer);
-	UnlockReleaseBuffer(nullbuffer);
+	bulkw_finish(bulkw);
 }
 
 /*
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 93f07e49b72..c2d5b3ecb28 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -28,6 +28,7 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "miscadmin.h"
+#include "storage/bulk_write.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
 #include "utils/hsearch.h"
@@ -451,14 +452,11 @@ void
 RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 					ForkNumber forkNum, char relpersistence)
 {
-	PGIOAlignedBlock buf;
-	Page		page;
 	bool		use_wal;
 	bool		copying_initfork;
 	BlockNumber nblocks;
 	BlockNumber blkno;
-
-	page = (Page) buf.data;
+	BulkWriteState *bulkw;
 
 	/*
 	 * The init fork for an unlogged relation in many respects has to be
@@ -477,16 +475,21 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 	use_wal = XLogIsNeeded() &&
 		(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
 
+	bulkw = bulkw_start_smgr(dst, forkNum, use_wal);
+
 	nblocks = smgrnblocks(src, forkNum);
 
 	for (blkno = 0; blkno < nblocks; blkno++)
 	{
+		BulkWriteBuffer buf;
+
 		/* If we got a cancel signal during the copy of the data, quit */
 		CHECK_FOR_INTERRUPTS();
 
-		smgrread(src, forkNum, blkno, buf.data);
+		buf = bulkw_get_buf(bulkw);
+		smgrread(src, forkNum, blkno, (Page) buf);
 
-		if (!PageIsVerifiedExtended(page, blkno,
+		if (!PageIsVerifiedExtended((Page) buf, blkno,
 									PIV_LOG_WARNING | PIV_REPORT_STAT))
 		{
 			/*
@@ -507,34 +510,13 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 		}
 
 		/*
-		 * WAL-log the copied page. Unfortunately we don't know what kind of a
-		 * page this is, so we have to log the full page including any unused
-		 * space.
-		 */
-		if (use_wal)
-			log_newpage(&dst->smgr_rlocator.locator, forkNum, blkno, page, false);
-
-		PageSetChecksumInplace(page, blkno);
-
-		/*
-		 * Now write the page.  We say skipFsync = true because there's no
-		 * need for smgr to schedule an fsync for this write; we'll do it
-		 * ourselves below.
+		 * Queue the page for WAL-logging and writing out.  Unfortunately we
+		 * don't know what kind of a page this is, so we have to log the full
+		 * page including any unused space.
 		 */
-		smgrextend(dst, forkNum, blkno, buf.data, true);
+		bulkw_write(bulkw, blkno, buf, false);
 	}
-
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is that since we're copying outside shared buffers, a CHECKPOINT
-	 * occurring during the copy has no way to flush the previously written
-	 * data to disk (indeed it won't know the new rel even exists).  A crash
-	 * later on would replay WAL from the checkpoint, therefore it wouldn't
-	 * replay our earlier WAL entries. If we do not fsync those pages here,
-	 * they might still not be on disk when the crash occurs.
-	 */
-	if (use_wal || copying_initfork)
-		smgrimmedsync(dst, forkNum);
+	bulkw_finish(bulkw);
 }
 
 /*
diff --git a/src/backend/storage/smgr/Makefile b/src/backend/storage/smgr/Makefile
index 596b564656f..1d0b98764f9 100644
--- a/src/backend/storage/smgr/Makefile
+++ b/src/backend/storage/smgr/Makefile
@@ -13,6 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	bulk_write.o \
 	md.o \
 	smgr.o
 
diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
new file mode 100644
index 00000000000..1913c39bf21
--- /dev/null
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -0,0 +1,301 @@
+/*-------------------------------------------------------------------------
+ *
+ * bulk_write.c
+ *	  Efficiently and reliably populate a new relation
+ *
+ * The assumption is that no other backends access the relation while we are
+ * loading it, so we can take some shortcuts.  Do not mix operations through
+ * the regular buffer manager and the bulk loading interface!
+ *
+ * We bypass the buffer manager to avoid the locking overhead, and call
+ * smgrextend() directly.  A downside is that the pages will need to be
+ * re-read into shared buffers on first use after the build finishes.  That's
+ * usually a good tradeoff for large relations, and for small relations, the
+ * overhead isn't very significant compared to creating the relation in the
+ * first place.
+ *
+ * The pages are WAL-logged if needed.  To save on WAL header overhead, we
+ * WAL-log several pages in one record.
+ *
+ * One tricky point is that because we bypass the buffer manager, we need to
+ * register the relation for fsyncing at the next checkpoint ourselves, and
+ * make sure that the relation is correctly fsync'd by us or the checkpointer
+ * even if a checkpoint happens concurrently.
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/smgr/bulk_write.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xloginsert.h"
+#include "access/xlogrecord.h"
+#include "storage/bufmgr.h"
+#include "storage/bufpage.h"
+#include "storage/bulk_write.h"
+#include "storage/proc.h"
+#include "storage/smgr.h"
+#include "utils/rel.h"
+
+#define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
+
+static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
+
+typedef struct PendingWrite
+{
+	BulkWriteBuffer buf;
+	BlockNumber blkno;
+	bool		page_std;
+} PendingWrite;
+
+/*
+ * Bulk writer state for one relation fork.
+ */
+typedef struct BulkWriteState
+{
+	/* Information about the target relation we're writing */
+	/*
+	 * FIXME: 'smgr' might get invalidated. Hopefully
+	 * https://postgr.es/m/CA%2BhUKGJ8NTvqLHz6dqbQnt2c8XCki4r2QvXjBQcXpVwxTY_pvA%40mail.gmail.com
+	 * is merged before this.
+	 */
+	SMgrRelation smgr;
+	ForkNumber	forknum;
+	bool		use_wal;
+
+	/* We keep several writes queued, and WAL-log them in batches */
+	int			npending;
+	PendingWrite pending_writes[MAX_PENDING_WRITES];
+
+	/* Current size of the relation */
+	BlockNumber pages_written;
+
+	/* The RedoRecPtr at the time that the bulk operation started */
+	XLogRecPtr	start_RedoRecPtr;
+
+	MemoryContext memcxt;
+} BulkWriteState;
+
+static void bulkw_flush(BulkWriteState *bulkw);
+
+/*
+ * Start a bulk write operation on a relation fork.
+ */
+BulkWriteState *
+bulkw_start_rel(Relation rel, ForkNumber forknum)
+{
+	return bulkw_start_smgr(RelationGetSmgr(rel),
+							forknum,
+							RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
+}
+
+/*
+ * Start a bulk write operation on a relation fork.
+ *
+ * This is like bulkw_start_rel, but can be used without a relcache entry.
+ */
+BulkWriteState *
+bulkw_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
+{
+	BulkWriteState *bulkw;
+
+	bulkw = palloc(sizeof(BulkWriteState));
+	bulkw->smgr = smgr;
+	bulkw->forknum = forknum;
+	bulkw->use_wal = use_wal;
+
+	bulkw->npending = 0;
+	bulkw->pages_written = 0;
+
+	bulkw->start_RedoRecPtr = GetRedoRecPtr();
+
+	/*
+	 * Remember the memory context.  We will use it to allocate all the
+	 * buffers later.
+	 */
+	bulkw->memcxt = CurrentMemoryContext;
+
+	return bulkw;
+}
+
+/*
+ * Finish bulk write operation.
+ *
+ * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
+ * the relation if needed.
+ */
+void
+bulkw_finish(BulkWriteState *bulkw)
+{
+	/* WAL-log and flush any remaining pages */
+	bulkw_flush(bulkw);
+
+	/*
+	 * When we wrote out the pages, we passed skipFsync=true to avoid the
+	 * overhead of registering all the writes with the checkpointer.  Register
+	 * the whole relation now.
+	 *
+	 * There is one hole in that idea: If a checkpoint occurred while we were
+	 * writing the pages, it already missed fsyncing the pages we had written
+	 * before the checkpoint started.  A crash later on would replay the WAL
+	 * starting from the checkpoint, therefore it wouldn't replay our earlier
+	 * WAL records.  So if a checkpoint started after the bulk write, fsync
+	 * the files now.
+	 */
+	if (!SmgrIsTemp(bulkw->smgr))
+	{
+		/*
+		 * Prevent a checkpoint from starting between the GetRedoRecPtr() and
+		 * smgrregistersync() calls.
+		 */
+		Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+		MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+
+		if (bulkw->start_RedoRecPtr != GetRedoRecPtr())
+		{
+			/*
+			 * A checkpoint occurred and it didn't know about our writes, so
+			 * fsync() the relation ourselves.
+			 */
+			MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+			smgrimmedsync(bulkw->smgr, bulkw->forknum);
+			elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
+		}
+		else
+		{
+			smgrregistersync(bulkw->smgr, bulkw->forknum);
+			MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+		}
+	}
+}
+
+static int
+buffer_cmp(const void *a, const void *b)
+{
+	const PendingWrite *bufa = (const PendingWrite *) a;
+	const PendingWrite *bufb = (const PendingWrite *) b;
+
+	/* We should not see duplicated writes for the same block */
+	Assert(bufa->blkno != bufb->blkno);
+	if (bufa->blkno > bufb->blkno)
+		return 1;
+	else
+		return -1;
+}
+
+/*
+ * Finish all the pending writes.
+ */
+static void
+bulkw_flush(BulkWriteState *bulkw)
+{
+	int			npending = bulkw->npending;
+	PendingWrite *pending_writes = bulkw->pending_writes;
+
+	if (npending == 0)
+		return;
+
+	if (npending > 1)
+		qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
+
+	if (bulkw->use_wal)
+	{
+		BlockNumber blknos[MAX_PENDING_WRITES];
+		Page		pages[MAX_PENDING_WRITES];
+		bool		page_std = true;
+
+		for (int i = 0; i < npending; i++)
+		{
+			blknos[i] = pending_writes[i].blkno;
+			pages[i] = pending_writes[i].buf->data;
+
+			/*
+			 * If any of the pages use !page_std, we log them all as such.
+			 * That's a bit wasteful, but in practice, a mix of standard and
+			 * non-standard page layout is rare.  None of the built-in AMs do
+			 * that.
+			 */
+			if (!pending_writes[i].page_std)
+				page_std = false;
+		}
+		log_newpages(&bulkw->smgr->smgr_rlocator.locator, bulkw->forknum,
+					 npending, blknos, pages, page_std);
+	}
+
+	for (int i = 0; i < npending; i++)
+	{
+		BlockNumber blkno = pending_writes[i].blkno;
+		Page		page = pending_writes[i].buf->data;
+
+		PageSetChecksumInplace(page, blkno);
+
+		if (blkno >= bulkw->pages_written)
+		{
+			/*
+			 * If we have to write pages nonsequentially, fill in the space
+			 * with zeroes until we come back and overwrite.  This is not
+			 * logically necessary on standard Unix filesystems (unwritten
+			 * space will read as zeroes anyway), but it should help to avoid
+			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 */
+			while (blkno > bulkw->pages_written)
+			{
+				/* don't set checksum for all-zero page */
+				smgrextend(bulkw->smgr, bulkw->forknum,
+						   bulkw->pages_written++,
+						   &zero_buffer,
+						   true);
+			}
+
+			smgrextend(bulkw->smgr, bulkw->forknum, blkno, page, true);
+			bulkw->pages_written = pending_writes[i].blkno + 1;
+		}
+		else
+			smgrwrite(bulkw->smgr, bulkw->forknum, blkno, page, true);
+		pfree(page);
+	}
+
+	bulkw->npending = 0;
+}
+
+/*
+ * Queue write of 'buf'.
+ *
+ * NB: this takes ownership of 'buf'!
+ *
+ * You are only allowed to write a given block once as part of one bulk write
+ * operation.
+ */
+void
+bulkw_write(BulkWriteState *bulkw, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
+{
+	bulkw->pending_writes[bulkw->npending].buf = buf;
+	bulkw->pending_writes[bulkw->npending].blkno = blocknum;
+	bulkw->pending_writes[bulkw->npending].page_std = page_std;
+
+	bulkw->npending++;
+
+	if (bulkw->npending == MAX_PENDING_WRITES)
+		bulkw_flush(bulkw);
+}
+
+/*
+ * Allocate a new buffer which can later be written with bulkw_write().
+ *
+ * There is no function to free a buffer.  When you pass it to bulkw_write(),
+ * it takes ownership and frees it when it's no longer needed.
+ *
+ * This is currently implemented as a simple palloc, but could be implemented
+ * using a ring buffer or larger chunks in the future, so don't rely on it.
+ */
+BulkWriteBuffer
+bulkw_get_buf(BulkWriteState *bulkw)
+{
+	return MemoryContextAllocAligned(bulkw->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+}
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index fdecbad1709..343ee51048e 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -1082,6 +1082,49 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
 	}
 }
 
+/*
+ * mdregistersync() -- Mark whole relation as needing fsync
+ */
+void
+mdregistersync(SMgrRelation reln, ForkNumber forknum)
+{
+	int			segno;
+	int			min_inactive_seg;
+
+	/*
+	 * NOTE: mdnblocks makes sure we have opened all active segments, so that
+	 * the loop below will get them all!
+	 */
+	mdnblocks(reln, forknum);
+
+	min_inactive_seg = segno = reln->md_num_open_segs[forknum];
+
+	/*
+	 * Temporarily open inactive segments, then close them after sync.  There
+	 * may be some inactive segments left opened after error, but that is
+	 * harmless.  We don't bother to clean them up and take a risk of further
+	 * trouble.  The next mdclose() will soon close them.
+	 */
+	while (_mdfd_openseg(reln, forknum, segno, 0) != NULL)
+		segno++;
+
+	while (segno > 0)
+	{
+		MdfdVec    *v = &reln->md_seg_fds[forknum][segno - 1];
+
+		register_dirty_segment(reln, forknum, v);
+
+		/* Close inactive segments immediately */
+		if (segno > min_inactive_seg)
+		{
+			FileClose(v->mdfd_vfd);
+			_fdvec_resize(reln, forknum, segno - 1);
+		}
+
+		segno--;
+	}
+}
+
 /*
  * mdimmedsync() -- Immediately sync a relation to stable storage.
  *
diff --git a/src/backend/storage/smgr/meson.build b/src/backend/storage/smgr/meson.build
index e1ba6ed74b8..133622a6528 100644
--- a/src/backend/storage/smgr/meson.build
+++ b/src/backend/storage/smgr/meson.build
@@ -1,6 +1,7 @@
 # Copyright (c) 2022-2023, PostgreSQL Global Development Group
 
 backend_sources += files(
+  'bulk_write.c',
   'md.c',
   'smgr.c',
 )
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 5d0f3d515c3..9f7405e3c88 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -65,6 +65,7 @@ typedef struct f_smgr
 	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+	void		(*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -86,6 +87,7 @@ static const f_smgr smgrsw[] = {
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
+		.smgr_registersync = mdregistersync,
 	}
 };
 
@@ -576,6 +578,14 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  * on disk at return, only dumped out to the kernel.  However,
  * provisions will be made to fsync the write before the next checkpoint.
  *
+ * NB: The mechanism to ensure fsync at next checkpoint assumes that there is
+ * something that prevents a concurrent checkpoint from "racing ahead" of the
+ * write.  One way to prevent that is by holding a lock on the buffer; the
+ * buffer manager's writes are protected by that.  The bulk writer facility in
+ * bulk_write.c checks the redo pointer and and calls smgrimmedsync() if a
+ * checkpoint happened; that relies on the fact that no other backend can be
+ * concurrently modify the page.
+ *
  * skipFsync indicates that the caller will make other provisions to
  * fsync the relation, so we needn't bother.  Temporary relations also
  * do not require fsync.
@@ -694,6 +704,24 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
 	}
 }
 
+/*
+ * smgrregistersync() -- Request a relation to be sync'd at next checkpoint
+ *
+ * This can be used after calling smgrwrite() or smgrextend() with skipFsync =
+ * true, to register the fsyncs that were skipped earlier.
+ *
+ * Note: be mindful that a checkpoint could already have happened between the
+ * smgrwrite or smgrextend calls and this!  In that case, the checkpoint
+ * already missed fsyncing this relation, and you should use smgrimmedsync
+ * instead.  Most callers should use the bulk loading facility in bulk_write.c
+ * instead, which handles all that.
+ */
+void
+smgrregistersync(SMgrRelation reln, ForkNumber forknum)
+{
+	smgrsw[reln->smgr_which].smgr_registersync(reln, forknum);
+}
+
 /*
  * smgrimmedsync() -- Force the specified relation to stable storage.
  *
@@ -716,6 +744,9 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
  * Note that you need to do FlushRelationBuffers() first if there is
  * any possibility that there are dirty buffers for the relation;
  * otherwise the sync is not very meaningful.
+ *
+ * Most callers should use the bulk loading facility in bulk_write.c
+ * instead of calling this directly.
  */
 void
 smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
diff --git a/src/include/storage/bulk_write.h b/src/include/storage/bulk_write.h
new file mode 100644
index 00000000000..4defaf20125
--- /dev/null
+++ b/src/include/storage/bulk_write.h
@@ -0,0 +1,38 @@
+/*-------------------------------------------------------------------------
+ *
+ * bulk_write.h
+ *	  Efficiently and reliably populate a new relation
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/bulk_write.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef BULK_WRITE_H
+#define BULK_WRITE_H
+
+typedef struct BulkWriteState BulkWriteState;
+
+/*
+ * Temporary buffer to hold a page to until it's written out. Use
+ * bulkw_get_buf() to reserve one of these.  This is a separate typedef to
+ * make distinguish from other block-sized buffers passed around in the
+ * system.
+ */
+typedef PGIOAlignedBlock *BulkWriteBuffer;
+
+/* forward declared from smgr.h */
+struct SMgrRelationData;
+
+extern BulkWriteState *bulkw_start_rel(Relation rel, ForkNumber forknum);
+extern BulkWriteState *bulkw_start_smgr(struct SMgrRelationData *smgr, ForkNumber forknum, bool use_wal);
+
+extern BulkWriteBuffer bulkw_get_buf(BulkWriteState *bulkw);
+extern void bulkw_write(BulkWriteState *bulkw, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std);
+
+extern void bulkw_finish(BulkWriteState *bulkw);
+
+#endif							/* BULK_WRITE_H */
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 941879ee6a8..225701271d2 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -42,6 +42,7 @@ extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
 extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
+extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
 
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index a9a179aabac..cc5a91dc624 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -107,6 +107,7 @@ extern BlockNumber smgrnblocks_cached(SMgrRelation reln, ForkNumber forknum);
 extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
 						 int nforks, BlockNumber *nblocks);
 extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
+extern void smgrregistersync(SMgrRelation reln, ForkNumber forknum);
 extern void AtEOXact_SMgr(void);
 extern bool ProcessBarrierSmgrRelease(void);
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index dba3498a13e..85b8491fd49 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -327,6 +327,8 @@ BuildAccumulator
 BuiltinScript
 BulkInsertState
 BulkInsertStateData
+BulkWriteBuffer
+BulkWriteState
 CACHESIGN
 CAC_state
 CCFastEqualFN
@@ -2005,6 +2007,7 @@ PendingFsyncEntry
 PendingRelDelete
 PendingRelSync
 PendingUnlinkEntry
+PendingWrite
 PendingWriteback
 PerLockTagEntry
 PerlInterpreter
-- 
2.39.2

Reply via email to