Here is a more fleshed out version of this concept patch, now that we
have lots of streams wired up to try it with.  Bitmap Heap Scan seemed
to be a good candidate.

postgres=# create table t (a int unique, b int unique);
CREATE TABLE
postgres=# insert into t select generate_series(1, 100000),
generate_series(1, 100000);
INSERT 0 100000
postgres=# select ctid from t where (a between 1 and 8000 or b between
1 and 8000) and ctid::text like '%,1)';
  ctid
--------
 (0,1)
 (1,1)
 (2,1)
... pages in order ...
 (33,1)
 (34,1)
 (35,1)
(36 rows)
postgres=# select pg_buffercache_evict(bufferid) from pg_buffercache
where relfilenode = pg_relation_filenode('t'::regclass) and
relblocknumber % 3 != 2;
... some pages being kicked out to create a mixed hit/miss scan ...
postgres=# select ctid from t where (a between 1 and 8000 or b between
1 and 8000) and ctid::text like '%,1)';
  ctid
--------
 (0,1)
 (1,1)
 (2,1)
... order during early distance ramp-up ...
 (12,1)
 (20,1) <-- chaos reigns
 (23,1)
 (17,1)
 (26,1)
 (13,1)
 (29,1)
...
 (34,1)
(36 rows)

One weird issue, not just with reordering, is that read_stream.c's
distance cools off too easily with some simple test patterns.  Start
at 1, double on miss, subtract one on hit, repeat, and you can be
trapped alternating between 1 and 2 when you'd certainly benefit from
IO concurrency and also reordering.  It may need a longer memory.
That seemed like too artificial a problem to worry about for v18, but
it's why I used relblocknumber % 3 != 2 and not eg relblocknumber % 2
!= 1 above.
From 3b460cb102c8af49a0af0d3b08bf86a436564fd3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 15 Feb 2025 14:47:25 +1300
Subject: [PATCH v2 1/4] Move read stream modulo arithmetic into functions.

Several places have open-coded circular index arithmetic.  Make some
common functions for better readability and consistent assertion
checking.  This avoids adding yet more open-coding in later patches.
---
 src/backend/storage/aio/read_stream.c | 93 +++++++++++++++++++++------
 1 file changed, 74 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 0e7f5557f5c..0d3dd65bfed 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -216,6 +216,67 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
 	stream->buffered_blocknum = blocknum;
 }
 
+/*
+ * Increment buffer index, wrapping around at queue size.
+ */
+static inline void
+read_stream_advance_buffer(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	*index += 1;
+	if (*index == stream->queue_size)
+		*index = 0;
+}
+
+/*
+ * Increment buffer index by n, wrapping around at queue size.
+ */
+static inline void
+read_stream_advance_buffer_n(ReadStream *stream, int16 *index, int16 n)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+	Assert(n <= stream->io_combine_limit);
+
+	*index += n;
+	if (*index >= stream->queue_size)
+		*index -= stream->queue_size;
+
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+}
+
+/*
+ * Decrement buffer index, wrapping around at queue size.
+ */
+static inline void
+read_stream_retreat_buffer(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	if (*index == 0)
+		*index = stream->queue_size - 1;
+	else
+		*index -= 1;
+}
+
+/*
+ * Increment IO index, wrapping around at queue size.
+ */
+static inline void
+read_stream_advance_io(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->max_ios);
+
+	*index += 1;
+	if (*index == stream->max_ios)
+		*index = 0;
+}
+
 /*
  * Start as much of the current pending read as we can.  If we have to split it
  * because of the per-backend buffer limit, or the buffer manager decides to
@@ -353,8 +414,7 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * Look-ahead distance will be adjusted after waiting.
 		 */
 		stream->ios[io_index].buffer_index = buffer_index;
-		if (++stream->next_io_index == stream->max_ios)
-			stream->next_io_index = 0;
+		read_stream_advance_io(stream, &stream->next_io_index);
 		Assert(stream->ios_in_progress < stream->max_ios);
 		stream->ios_in_progress++;
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
@@ -390,11 +450,8 @@ read_stream_start_pending_read(ReadStream *stream)
 			   sizeof(stream->buffers[0]) * overflow);
 	}
 
-	/* Compute location of start of next read, without using % operator. */
-	buffer_index += nblocks;
-	if (buffer_index >= stream->queue_size)
-		buffer_index -= stream->queue_size;
-	Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+	/* Move to the location of start of next read. */
+	read_stream_advance_buffer_n(stream, &buffer_index, nblocks);
 	stream->next_buffer_index = buffer_index;
 
 	/* Adjust the pending read to cover the remaining portion, if any. */
@@ -432,12 +489,12 @@ read_stream_look_ahead(ReadStream *stream)
 		/*
 		 * See which block the callback wants next in the stream.  We need to
 		 * compute the index of the Nth block of the pending read including
-		 * wrap-around, but we don't want to use the expensive % operator.
+		 * wrap-around.
 		 */
-		buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
-		if (buffer_index >= stream->queue_size)
-			buffer_index -= stream->queue_size;
-		Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+		buffer_index = stream->next_buffer_index;
+		read_stream_advance_buffer_n(stream,
+									 &buffer_index,
+									 stream->pending_read_nblocks);
 		per_buffer_data = get_per_buffer_data(stream, buffer_index);
 		blocknum = read_stream_get_block(stream, per_buffer_data);
 		if (blocknum == InvalidBlockNumber)
@@ -940,12 +997,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	 */
 	if (stream->per_buffer_data)
 	{
+		int16		index;
 		void	   *per_buffer_data;
 
-		per_buffer_data = get_per_buffer_data(stream,
-											  oldest_buffer_index == 0 ?
-											  stream->queue_size - 1 :
-											  oldest_buffer_index - 1);
+		index = oldest_buffer_index;
+		read_stream_retreat_buffer(stream, &index);
+		per_buffer_data = get_per_buffer_data(stream, index);
 
 #if defined(CLOBBER_FREED_MEMORY)
 		/* This also tells Valgrind the memory is "noaccess". */
@@ -963,9 +1020,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	stream->pinned_buffers--;
 
 	/* Advance oldest buffer, with wrap-around. */
-	stream->oldest_buffer_index++;
-	if (stream->oldest_buffer_index == stream->queue_size)
-		stream->oldest_buffer_index = 0;
+	read_stream_advance_buffer(stream, &stream->oldest_buffer_index);
 
 	/* Prepare for the next call. */
 	read_stream_look_ahead(stream);
-- 
2.39.5

From ff29856d80c1c3778cc99a35ed8f2ba8c642d37f Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 9 Apr 2025 15:16:31 +1200
Subject: [PATCH v2 2/4] Improve read_stream.c program flow.

Commit 55918f79 removed the space-based reason for calling
read_stream_look_ahead() at the top and bottom of
read_stream_next_buffer().

Commit 7ea8cd15 removed the need to call it with a different flag in
each spot.

We can delete a couple of dozen lines and just call it at the top now.
That's much tidier, but also required for a later patch that reorders
the queue.
---
 src/backend/storage/aio/read_stream.c | 29 +++++----------------------
 1 file changed, 5 insertions(+), 24 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 0d3dd65bfed..a98213d4df2 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -909,28 +909,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	}
 #endif
 
-	if (unlikely(stream->pinned_buffers == 0))
+	/* Look ahead and check for end of stream. */
+	read_stream_look_ahead(stream);
+	if (stream->pinned_buffers == 0)
 	{
-		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
-
-		/* End of stream reached?  */
-		if (stream->distance == 0)
-			return InvalidBuffer;
-
-		/*
-		 * The usual order of operations is that we look ahead at the bottom
-		 * of this function after potentially finishing an I/O and making
-		 * space for more, but if we're just starting up we'll need to crank
-		 * the handle to get started.
-		 */
-		read_stream_look_ahead(stream);
-
-		/* End of stream reached? */
-		if (stream->pinned_buffers == 0)
-		{
-			Assert(stream->distance == 0);
-			return InvalidBuffer;
-		}
+		Assert(stream->distance == 0);
+		return InvalidBuffer;
 	}
 
 	/* Grab the oldest pinned buffer and associated per-buffer data. */
@@ -1022,9 +1006,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	/* Advance oldest buffer, with wrap-around. */
 	read_stream_advance_buffer(stream, &stream->oldest_buffer_index);
 
-	/* Prepare for the next call. */
-	read_stream_look_ahead(stream);
-
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
 	if (stream->ios_in_progress == 0 &&
-- 
2.39.5

From d4a86ee55c0a1d4e77a5de06df9f625e37863371 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 6 Apr 2024 13:28:28 +1300
Subject: [PATCH v2 3/4] Add READ_STREAM_OUT_OF_ORDER.

If the stream consumer authorizes it, already cached buffers can jump
the queue and be emitted in LIFO order if there are IOs running.  This
gives the storage more time to complete IOs, and the consumer something
to work on immediately.
---
 src/backend/storage/aio/read_stream.c | 214 +++++++++++++++++++++++---
 src/include/storage/read_stream.h     |   8 +
 2 files changed, 202 insertions(+), 20 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index a98213d4df2..e90db1546fd 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -105,6 +105,7 @@ struct ReadStream
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
 	bool		advice_enabled;
 	bool		temporary;
+	bool		out_of_order_enabled;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -154,6 +155,27 @@ get_per_buffer_data(ReadStream *stream, int16 buffer_index)
 		stream->per_buffer_data_size * buffer_index;
 }
 
+/*
+ * To help catch bugs, wipe per-buffer data when it shouldn't be accessed
+ * again by client code.  This has no effect unless CLOBBER_FREED_MEMORY or
+ * USE_VALGRIND is defined.  After clobbering, this per-buffer data object
+ * must be explicitly marked as undefined or defined before it is accessed
+ * again.
+  */
+static void
+clobber_per_buffer_data(ReadStream *stream, int16 buffer_index)
+{
+#ifdef CLOBBER_FREED_MEMORY
+	/* This also tells Valgrind the memory is "noaccess". */
+	wipe_mem(get_per_buffer_data(stream, buffer_index),
+			 stream->per_buffer_data_size);
+#else
+	/* Tell it ourselves. */
+	VALGRIND_MAKE_MEM_NOACCESS(get_per_buffer_data(stream, buffer_index),
+							   stream->per_buffer_data_size);
+#endif
+}
+
 /*
  * General-use ReadStreamBlockNumberCB for block range scans.  Loops over the
  * blocks [current_blocknum, last_exclusive).
@@ -277,6 +299,158 @@ read_stream_advance_io(ReadStream *stream, int16 *index)
 		*index = 0;
 }
 
+/*
+ * Relocate a single valid buffer that has been stored in FIFO position to
+ * LIFO position so it will be consumed next.
+ *
+ * When there is no per-buffer data, reordering the buffer queue itself is
+ * cheap, as we just relocate a buffer and adjust an index.  Per-buffer data
+ * is more expensive: we often have more of those ahead of us corresponding to
+ * the blocks of the current pending read, and their size is user-controlled
+ * but expected to be very small.  (It's not yet clear if a more complex data
+ * structure is worth the overheads to avoid this.)
+ */
+static void
+read_stream_reorder(ReadStream *stream)
+{
+	int16		gap_index = stream->next_buffer_index;
+	int16		new_index = stream->oldest_buffer_index;	/* retreated below */
+	int16		per_buffer_data_size = stream->per_buffer_data_size;
+	int16		per_buffer_data_count = 0;
+	int16		forwarded_buffers = stream->forwarded_buffers;
+
+	Assert(BufferIsValid(stream->buffers[gap_index]));
+	Assert(stream->ios_in_progress > 0);
+	Assert(stream->out_of_order_enabled);
+
+	/* Move "gap" entry to "new" entry. */
+	read_stream_retreat_buffer(stream, &new_index);
+	Assert(new_index >= stream->initialized_buffers ||
+		   stream->buffers[new_index] == InvalidBuffer);
+	stream->buffers[new_index] = stream->buffers[gap_index];
+	stream->buffers[gap_index] = InvalidBuffer;
+	stream->oldest_buffer_index = new_index;
+
+	/* If there are forwarded buffers (rare), then we'll have to shift them. */
+	if (forwarded_buffers > 0)
+	{
+		int16		zap_index = gap_index + forwarded_buffers;
+
+		Assert(forwarded_buffers < stream->io_combine_limit);
+
+		/* Shift to fill in the gap created above, and zap the final one. */
+		memmove(&stream->buffers[gap_index],
+				&stream->buffers[gap_index + 1],
+				sizeof(stream->buffers[gap_index]) * stream->forwarded_buffers);
+		stream->buffers[zap_index] = InvalidBuffer;
+
+		/*
+		 * If some of them are in the overflow zone then we also need to shift
+		 * the copies that begin at index 0 and zap their trailing element
+		 * too.
+		 */
+		if (zap_index >= stream->queue_size)
+		{
+			int16		copies = zap_index - stream->queue_size;
+
+			memmove(&stream->buffers[0], &stream->buffers[1], copies);
+			stream->buffers[copies] = InvalidBuffer;
+		}
+	}
+
+	/*
+	 * If there is per-buffer data, we also need to relocate one element from
+	 * the "gap" to the "new" index, to keep it in sync with the buffer index.
+	 */
+	if (per_buffer_data_size > 0)
+	{
+		void	   *src = get_per_buffer_data(stream, gap_index);
+		void	   *dst = get_per_buffer_data(stream, new_index);
+
+		/*
+		 * We don't know how much of the per-buffer data was filled in by the
+		 * callback (that's a private matter between the callback and the
+		 * consumer).  We falsely claim that we know the source is entirely
+		 * initialized to avoid Valgrind errors from memcpy(), and also mark
+		 * the destination as defined, because it was previously unused and
+		 * clobbered ("noaccess").
+		 */
+		VALGRIND_MAKE_MEM_DEFINED(src, per_buffer_data_size);
+		VALGRIND_MAKE_MEM_DEFINED(dst, per_buffer_data_size);
+		memcpy(dst, src, per_buffer_data_size);
+
+		/*
+		 * We also have per-buffer data for every block of the current pending
+		 * read, filled in by the callback while it was being built up.
+		 */
+		per_buffer_data_count = stream->pending_read_nblocks;
+
+		/*
+		 * We might also have one extra object beyond that, if we had to
+		 * "unget" a block that couldn't be combined.
+		 */
+		if (stream->buffered_blocknum != InvalidBlockNumber)
+			per_buffer_data_count++;
+
+		Assert(forwarded_buffers < stream->io_combine_limit);
+		Assert(forwarded_buffers <= per_buffer_data_count);
+	}
+
+	/* Are there additional per-buffer objects to shift? */
+	if (per_buffer_data_count > 0)
+	{
+		int16		remaining = per_buffer_data_count;
+		int16		contiguous;
+		void	   *src;
+		void	   *dst;
+
+		/*
+		 * This first memmove() should often shift all of them.  (See note
+		 * above about Valgrind vs partial initialization.)
+		 */
+		contiguous = Min(remaining, stream->queue_size - (gap_index + 1));
+		dst = get_per_buffer_data(stream, gap_index);
+		src = get_per_buffer_data(stream, gap_index + 1);
+		VALGRIND_MAKE_MEM_DEFINED(src, contiguous * per_buffer_data_size);
+		memmove(dst, src, contiguous * per_buffer_data_size);
+		remaining -= contiguous;
+
+		/*
+		 * If it doesn't, then we need to deal with wraparound.  Start by
+		 * rotating one object from the physical start to the physical end.
+		 */
+		if (remaining > 0)
+		{
+			dst = get_per_buffer_data(stream, stream->queue_size - 1);
+			src = get_per_buffer_data(stream, 0);
+			VALGRIND_MAKE_MEM_DEFINED(src, per_buffer_data_size);
+			memcpy(dst, src, per_buffer_data_size);
+			remaining -= 1;
+
+			/*
+			 * If any are left after that, they must be contiguous at the
+			 * physical start of the queue, so shift them too.
+			 */
+			if (remaining > 0)
+			{
+				dst = get_per_buffer_data(stream, 0);
+				src = get_per_buffer_data(stream, 1);
+				VALGRIND_MAKE_MEM_DEFINED(src, remaining * per_buffer_data_size);
+				memmove(dst, src, remaining * per_buffer_data_size);
+			}
+		}
+	}
+
+	/* Clobber the final per-buffer-data object. */
+	if (per_buffer_data_size > 0)
+	{
+		int16		zap_index = gap_index;
+
+		read_stream_advance_buffer_n(stream, &zap_index, per_buffer_data_count);
+		clobber_per_buffer_data(stream, zap_index);
+	}
+}
+
 /*
  * Start as much of the current pending read as we can.  If we have to split it
  * because of the per-backend buffer limit, or the buffer manager decides to
@@ -450,9 +624,21 @@ read_stream_start_pending_read(ReadStream *stream)
 			   sizeof(stream->buffers[0]) * overflow);
 	}
 
-	/* Move to the location of start of next read. */
-	read_stream_advance_buffer_n(stream, &buffer_index, nblocks);
-	stream->next_buffer_index = buffer_index;
+	if (stream->out_of_order_enabled &&
+		stream->ios_in_progress > 0 &&
+		!need_wait &&
+		nblocks == 1)
+	{
+		/* Promote cached block to LIFO order, jumping in front of IOs. */
+		read_stream_reorder(stream);
+	}
+	else
+	{
+		/* Advance to the position of next read, maintaining FIFO order. */
+		read_stream_advance_buffer_n(stream,
+									 &stream->next_buffer_index,
+									 nblocks);
+	}
 
 	/* Adjust the pending read to cover the remaining portion, if any. */
 	stream->pending_read_blocknum += nblocks;
@@ -709,6 +895,9 @@ read_stream_begin_impl(int flags,
 		stream->advice_enabled = true;
 #endif
 
+	if (flags & READ_STREAM_OUT_OF_ORDER)
+		stream->out_of_order_enabled = true;
+
 	/*
 	 * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
 	 * we still need to allocate space to combine and run one I/O.  Bump it up
@@ -925,7 +1114,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	buffer = stream->buffers[oldest_buffer_index];
 	if (per_buffer_data)
 		*per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
-
 	Assert(BufferIsValid(buffer));
 
 	/* Do we have to wait for an associated I/O first? */
@@ -971,8 +1159,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->buffers[stream->queue_size + oldest_buffer_index] =
 			InvalidBuffer;
 
-#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
-
 	/*
 	 * The caller will get access to the per-buffer data, until the next call.
 	 * We wipe the one before, which is never occupied because queue_size
@@ -981,23 +1167,11 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	 */
 	if (stream->per_buffer_data)
 	{
-		int16		index;
-		void	   *per_buffer_data;
+		int16		index = oldest_buffer_index;
 
-		index = oldest_buffer_index;
 		read_stream_retreat_buffer(stream, &index);
-		per_buffer_data = get_per_buffer_data(stream, index);
-
-#if defined(CLOBBER_FREED_MEMORY)
-		/* This also tells Valgrind the memory is "noaccess". */
-		wipe_mem(per_buffer_data, stream->per_buffer_data_size);
-#elif defined(USE_VALGRIND)
-		/* Tell it ourselves. */
-		VALGRIND_MAKE_MEM_NOACCESS(per_buffer_data,
-								   stream->per_buffer_data_size);
-#endif
+		clobber_per_buffer_data(stream, index);
 	}
-#endif
 
 	/* Pin transferred to caller. */
 	Assert(stream->pinned_buffers > 0);
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..5af2f1d0ec7 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -63,6 +63,14 @@
  */
 #define READ_STREAM_USE_BATCHING 0x08
 
+/*
+ * Blocks are usually streamed in FIFO order.  This flag allows reordering,
+ * for consumers that can deal with out-of-order buffers.  Whenever IOs are
+ * running, any already-cached buffers found in the look-ahead window jump
+ * directly to the front of the queue, ready to be consumed immediately.
+ */
+#define READ_STREAM_OUT_OF_ORDER 0x10
+
 struct ReadStream;
 typedef struct ReadStream ReadStream;
 
-- 
2.39.5

From e458382a471831a759885a2dcdbe58422d2c365a Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 9 Apr 2025 16:10:33 +1200
Subject: [PATCH v2 4/4] Use READ_STREAM_OUT_OF_ORDER for Bitmap Heap Scan.

Bitmap Heap Scan can process blocks in any order, so we should
prioritize cached data while IOs are running.
---
 src/backend/access/heap/heapam.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index ed2e3021799..cd4f8967a27 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1205,6 +1205,7 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	else if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN)
 	{
 		scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_DEFAULT |
+														  READ_STREAM_OUT_OF_ORDER |
 														  READ_STREAM_USE_BATCHING,
 														  scan->rs_strategy,
 														  scan->rs_base.rs_rd,
-- 
2.39.5

Reply via email to