On Sat, Apr 6, 2024 at 9:25 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
>
> I found a bug in read_stream.c that could be hit with Melanie's
> streaming seq scan patch with parallelism enabled and certain buffer
> pool conditions.  Short version: there is an edge case where an "if"
> needed to be a "while", or we could lose a few blocks.  Here's the fix
> for that, longer explanation in commit message.

Attached v13 0001 is your fix and 0002 is a new version of the
sequential scan streaming read user. Off-list Andres mentioned that I
really ought to separate the parallel and serial sequential scan users
into two different callbacks. I've done that in the attached. It
actually makes the code used by the callbacks nicer and more readable
anyway (putting aside performance). I was able to measure a small
performance difference as well.

I've also added a few comments and improved existing comments.

- Melanie
From eded321df22bf472f147bd8f94b596d465355c70 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 5 Apr 2024 13:32:14 +1300
Subject: [PATCH v13 2/2] Use streaming IO in heapam sequential and TID range
 scans

Instead of calling ReadBuffer() for each block, heap sequential scans
and TID range scans now use the streaming read API introduced in
b5a9b18cd0.

Author: Melanie Plageman <melanieplage...@gmail.com>
Reviewed-by: Andres Freund
Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com
---
 src/backend/access/heap/heapam.c | 234 +++++++++++++++++++++----------
 src/include/access/heapam.h      |  15 ++
 2 files changed, 176 insertions(+), 73 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 01bb2f4cc16..9d10d42b69b 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -223,6 +223,66 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
  * ----------------------------------------------------------------
  */
 
+/*
+ * Streaming read API callback for parallel sequential scans. Returns the next
+ * block the caller wants from the read stream or InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_parallel(ReadStream *pgsr, void *private_data,
+									void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) private_data;
+
+	Assert(ScanDirectionIsForward(scan->rs_dir));
+	Assert(scan->rs_base.rs_parallel);
+
+	if (unlikely(!scan->rs_inited))
+	{
+		/* parallel scan */
+		table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+												 scan->rs_parallelworkerdata,
+												 (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+
+		/* may return InvalidBlockNumber if there are no more blocks */
+		scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+																	scan->rs_parallelworkerdata,
+																	(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+		scan->rs_inited = true;
+	}
+	else
+	{
+		scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+																	scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
+																	scan->rs_base.rs_parallel);
+	}
+
+	return scan->rs_prefetch_block;
+}
+
+/*
+ * Streaming read API callback for serial sequential and TID range scans.
+ * Returns the next block the caller wants from the read stream or
+ * InvalidBlockNumber when done.
+ */
+static BlockNumber
+heap_scan_stream_read_next_serial(ReadStream *pgsr, void *private_data,
+								  void *per_buffer_data)
+{
+	HeapScanDesc scan = (HeapScanDesc) private_data;
+
+	if (unlikely(!scan->rs_inited))
+	{
+		scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir);
+		scan->rs_inited = true;
+	}
+	else
+		scan->rs_prefetch_block = heapgettup_advance_block(scan,
+														   scan->rs_prefetch_block,
+														   scan->rs_dir);
+
+	return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -325,6 +385,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
 
+	/*
+	 * Initialize to ForwardScanDirection because it is most common and
+	 * because heap scans go forward before going backward (e.g. CURSORs).
+	 */
+	scan->rs_dir = ForwardScanDirection;
+	scan->rs_prefetch_block = InvalidBlockNumber;
+
 	/* page-at-a-time fields are always invalid when not rs_inited */
 
 	/*
@@ -462,12 +529,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
 /*
  * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
  *
- * Read the next block of the scan relation into a buffer and pin that buffer
- * before saving it in the scan descriptor.
+ * Read the next block of the scan relation from the read stream and pin that
+ * buffer before saving it in the scan descriptor.
  */
 static inline void
 heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
 {
+	Assert(scan->rs_read_stream);
+
 	/* release previous scan buffer, if any */
 	if (BufferIsValid(scan->rs_cbuf))
 	{
@@ -482,25 +551,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
 	 */
 	CHECK_FOR_INTERRUPTS();
 
-	if (unlikely(!scan->rs_inited))
+	/*
+	 * If the scan direction is changing, reset the prefetch block to the
+	 * current block. Otherwise, we will incorrectly prefetch the blocks
+	 * between the prefetch block and the current block again before
+	 * prefetching blocks in the new, correct scan direction.
+	 */
+	if (unlikely(scan->rs_dir != dir))
 	{
-		scan->rs_cblock = heapgettup_initial_block(scan, dir);
+		scan->rs_prefetch_block = scan->rs_cblock;
+		read_stream_reset(scan->rs_read_stream);
+	}
 
-		/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-		Assert(scan->rs_cblock != InvalidBlockNumber ||
-			   !BufferIsValid(scan->rs_cbuf));
+	scan->rs_dir = dir;
 
-		scan->rs_inited = true;
-	}
-	else
-		scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock,
-												   dir);
-
-	/* read block if valid */
-	if (BlockNumberIsValid(scan->rs_cblock))
-		scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM,
-										   scan->rs_cblock, RBM_NORMAL,
-										   scan->rs_strategy);
+	scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+	if (BufferIsValid(scan->rs_cbuf))
+		scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
 
 /*
@@ -514,6 +581,7 @@ static pg_noinline BlockNumber
 heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 {
 	Assert(!scan->rs_inited);
+	Assert(scan->rs_base.rs_parallel == NULL);
 
 	/* When there are no pages to scan, return InvalidBlockNumber */
 	if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0)
@@ -521,27 +589,10 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir)
 
 	if (ScanDirectionIsForward(dir))
 	{
-		/* serial scan */
-		if (scan->rs_base.rs_parallel == NULL)
-			return scan->rs_startblock;
-		else
-		{
-			/* parallel scan */
-			table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
-													 scan->rs_parallelworkerdata,
-													 (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-
-			/* may return InvalidBlockNumber if there are no more blocks */
-			return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-													 scan->rs_parallelworkerdata,
-													 (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
-		}
+		return scan->rs_startblock;
 	}
 	else
 	{
-		/* backward parallel scan not supported */
-		Assert(scan->rs_base.rs_parallel == NULL);
-
 		/*
 		 * Disable reporting to syncscan logic in a backwards scan; it's not
 		 * very likely anyone else is doing the same thing at the same time,
@@ -653,50 +704,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
 static inline BlockNumber
 heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir)
 {
-	if (ScanDirectionIsForward(dir))
+	Assert(scan->rs_base.rs_parallel == NULL);
+
+	if (likely(ScanDirectionIsForward(dir)))
 	{
-		if (scan->rs_base.rs_parallel == NULL)
-		{
-			block++;
+		block++;
 
-			/* wrap back to the start of the heap */
-			if (block >= scan->rs_nblocks)
-				block = 0;
+		/* wrap back to the start of the heap */
+		if (block >= scan->rs_nblocks)
+			block = 0;
 
-			/*
-			 * Report our new scan position for synchronization purposes. We
-			 * don't do that when moving backwards, however. That would just
-			 * mess up any other forward-moving scanners.
-			 *
-			 * Note: we do this before checking for end of scan so that the
-			 * final state of the position hint is back at the start of the
-			 * rel.  That's not strictly necessary, but otherwise when you run
-			 * the same query multiple times the starting position would shift
-			 * a little bit backwards on every invocation, which is confusing.
-			 * We don't guarantee any specific ordering in general, though.
-			 */
-			if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
-				ss_report_location(scan->rs_base.rs_rd, block);
-
-			/* we're done if we're back at where we started */
-			if (block == scan->rs_startblock)
-				return InvalidBlockNumber;
+		/*
+		 * Report our new scan position for synchronization purposes. We don't
+		 * do that when moving backwards, however. That would just mess up any
+		 * other forward-moving scanners.
+		 *
+		 * Note: we do this before checking for end of scan so that the final
+		 * state of the position hint is back at the start of the rel.  That's
+		 * not strictly necessary, but otherwise when you run the same query
+		 * multiple times the starting position would shift a little bit
+		 * backwards on every invocation, which is confusing. We don't
+		 * guarantee any specific ordering in general, though.
+		 */
+		if (scan->rs_base.rs_flags & SO_ALLOW_SYNC)
+			ss_report_location(scan->rs_base.rs_rd, block);
 
-			/* check if the limit imposed by heap_setscanlimits() is met */
-			if (scan->rs_numblocks != InvalidBlockNumber)
-			{
-				if (--scan->rs_numblocks == 0)
-					return InvalidBlockNumber;
-			}
+		/* we're done if we're back at where we started */
+		if (block == scan->rs_startblock)
+			return InvalidBlockNumber;
 
-			return block;
-		}
-		else
+		/* check if the limit imposed by heap_setscanlimits() is met */
+		if (scan->rs_numblocks != InvalidBlockNumber)
 		{
-			return table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
-													 scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc)
-													 scan->rs_base.rs_parallel);
+			if (--scan->rs_numblocks == 0)
+				return InvalidBlockNumber;
 		}
+
+		return block;
 	}
 	else
 	{
@@ -833,6 +877,7 @@ continue_page:
 
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -928,6 +973,7 @@ continue_page:
 		ReleaseBuffer(scan->rs_cbuf);
 	scan->rs_cbuf = InvalidBuffer;
 	scan->rs_cblock = InvalidBlockNumber;
+	scan->rs_prefetch_block = InvalidBlockNumber;
 	tuple->t_data = NULL;
 	scan->rs_inited = false;
 }
@@ -1023,6 +1069,34 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
 	initscan(scan, key, false);
 
+	scan->rs_read_stream = NULL;
+
+	/*
+	 * Set up a read stream for sequential scans and TID range scans. This
+	 * should be done after initscan() because initscan() allocates the
+	 * BufferAccessStrategy object passed to the streaming read API.
+	 */
+	if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+		scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+	{
+		BlockNumber (*cb) (ReadStream *pgsr, void *private_data,
+						   void *per_buffer_data);
+
+		if (scan->rs_base.rs_parallel)
+			cb = heap_scan_stream_read_next_parallel;
+		else
+			cb = heap_scan_stream_read_next_serial;
+
+		scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+														  scan->rs_strategy,
+														  scan->rs_base.rs_rd,
+														  MAIN_FORKNUM,
+														  cb,
+														  scan,
+														  0);
+	}
+
+
 	return (TableScanDesc) scan;
 }
 
@@ -1065,6 +1139,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 
 	Assert(scan->rs_empty_tuples_pending == 0);
 
+	/*
+	 * The read stream is reset on rescan. This must be done before
+	 * initscan(), as some state referred to by read_stream_reset() is reset
+	 * in initscan().
+	 */
+	if (scan->rs_read_stream)
+		read_stream_reset(scan->rs_read_stream);
+
 	/*
 	 * reinitialize scan descriptor
 	 */
@@ -1089,6 +1171,12 @@ heap_endscan(TableScanDesc sscan)
 
 	Assert(scan->rs_empty_tuples_pending == 0);
 
+	/*
+	 * Must free the read stream before freeing the BufferAccessStrategy.
+	 */
+	if (scan->rs_read_stream)
+		read_stream_end(scan->rs_read_stream);
+
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 750ea30852e..48936826bcc 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -70,6 +71,20 @@ typedef struct HeapScanDescData
 
 	HeapTupleData rs_ctup;		/* current tuple in scan, if any */
 
+	/* For scans that stream reads */
+	ReadStream *rs_read_stream;
+
+	/*
+	 * For sequential scans and TID range scans to stream reads. The read
+	 * stream is allocated at the beginning of the scan and reset on rescan or
+	 * when the scan direction changes. The scan direction is saved each time
+	 * a new page is requested. If the scan direction changes from one page to
+	 * the next, the read stream releases all previously pinned buffers and
+	 * resets the prefetch block.
+	 */
+	ScanDirection rs_dir;
+	BlockNumber rs_prefetch_block;
+
 	/*
 	 * For parallel scans to store page allocation data.  NULL when not
 	 * performing a parallel scan.
-- 
2.40.1

From d3d7d4d09d804a8a8d00ecea080de5d63d21159c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sun, 7 Apr 2024 12:36:44 +1200
Subject: [PATCH v13 1/2] Fix bug in read_stream.c.

When we determine that a wanted block can't be combined with the current
pending read, it's time to start that pending read to get it out of the
way.  An "if" in that code path should have been a "while", because it
might take more than one go to get that job done.  Otherwise the
remaining part of a partially started read could be clobbered and we
could lose some blocks.  This was only broken for smaller ranges, as the
more common case of io_combine_limit-sized ranges is handled earlier in
the code and knows how to loop.

Discovered while testing parallel sequential scans of partially cached
tables.  They have a ramp-down phase with ever smaller ranges of
contiguous blocks, to be fair to parallel workers as the work runs out.

Defect in commit b5a9b18c.
---
 src/backend/storage/aio/read_stream.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 9a70a81f7ae..f54dacdd914 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -363,7 +363,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		}
 
 		/* We have to start the pending read before we can build another. */
-		if (stream->pending_read_nblocks > 0)
+		while (stream->pending_read_nblocks > 0)
 		{
 			read_stream_start_pending_read(stream, suppress_advice);
 			suppress_advice = false;
-- 
2.40.1

Reply via email to