Hi,

Here are some patches that address some of Andres's feedback since the
AIO v2 rebase[1], anticipate out-of-order streams, and make some other
minor improvements.  They are independent of the main AIO patch set
and apply to master, hence separate thread.

0001-Refactor-read_stream.c-s-circular-arithmetic.patch

This just replaced open-coded arithmetic with inline functions.  They
will be used a lot more in later work, and provide central places to
put assertions that were not checked as uniformly as I would like.

0002-Allow-more-buffers-for-sequential-read-streams.patch

Currently we're only generating I/O concurrency for random I/O, and I
originally guesstimated that random I/O wouldn't typically be able to
combine more than about 4 blocks on average when capping the buffer
queue.  Debatable perhaps, but soon obsolete: for true asynchronous
I/O, we'll need full concurrency * full combine limit, so we'll
potentially want to pin more buffers.  This just changes that
hard-coded 4 to io_combine_limit.

0003-Improve-buffer-pool-API-for-per-backend-pin-limits.patch

In preparation for the next patch, provide some new bufmgr APIs.

0004-Respect-pin-limits-accurately-in-read_stream.c.patch

The current coding only computes the remaining "fair share" of the
buffer pool for this backend at stream initialisation.  It's hard, but
not impossible, to get one backend to pin more than 1/max_connections
of the buffer pool (the intended cap), when using multiple streams at
the same time in one backend.  This patch moves the time of check to
the time of use, so it respects the limit strictly.  I avoid adding
any changes to the fast path for all-cached streams, which only pin
one buffer at a time.

This is basically a TOCTOU bug, and could in theory be back-patched,
but I don't personally think it's necessary yet, since it's so hard to
actually break anything with v17's rather limited use of streams.  The
only way I can think of right now to see a concrete problem would be
to get many backends all to create many CURSORs that stream cold data,
and FETCH in a particular order only after they've all been
constructed, which is also a recipe for running out of buffers without
using streams, albeit not quite as quickly.

0005-Support-buffer-forwarding-in-read_stream.c.patch
0006-Support-buffer-forwarding-in-StartReadBuffers.patch

Background:  StartReadBuffers() reports a short read when it finds a
cached block that divides a potential I/O.  For example, if you ask to
read 8 blocks, and the 6th one turns out to be a hit, ie already valid
in the buffer pool, then we only need to read the first 5 from disk.
Perhaps the 7th and 8th blocks will also need I/O, but we don't want
StartReadBuffers() to start *two* I/Os, so the 6th block terminates
our search.  The question is what to do with that 6th block.
Currently we'll tell the user that the size of the operation is 6
blocks -- that is, we'll silently include that hit that prevented
further combining, even though we'll only actually do the read for the
first 5 blocks, and if someone else manages to make any buffers valid
before you call WaitReadBuffers(), we'll just silently skip over them
and loop.

That was simple enough, but having hits that are invisibly mixed up
with misses prevents us from implementing fully general reordering of
hits, an important optimisation that reduces I/O stalls for consumers
that can cope with out-of-order data (I'll post a new patch for that
separately).

A simple solution we rejected was to unpin such buffers and then repin
later, but that would waste cycles, allow eviction and potentially
mess up the usage count.  The AIO patch set also introduces another
case involving potentially many buffers: it moves the
BM_IO_IN_PROGRESS negotiation into StartReadBuffers(), and reports a
short read then too for middle blocks, but it has already acquired all
the pins.

The solution we agreed on is to introduce a way for StartReadBuffers()
to communicate with future calls, and "forward" pinned buffers between
calls.  The function arguments don't change, but its "buffers"
argument becomes an in/out array: one StartReadBuffers() call can
leave extra pinned buffers after the ones that were included in a
short read (*nblocks), and then when you retry (or possibly extend)
the rest of the read, you have to pass them back in.  That is easy for
the read stream code, as it can just leave them in its circular queue
for the next call to take as input.  It only needs to be aware of them
for pin limit accounting and stream reset (including early shutdown).

One goal was to avoid introducing any new instructions into
ReadBuffer() or the read stream fast path, so StartReadBuffer(), the
single-block specialisation, doesn't support forwarding (it already
can't split reads, they are only one block, but it also doesn't
support receiving forwarded buffers).

I plan to share some new C-level tests and illustrations of the
internal states of read_stream.c separately, as the complexity is
obviously increasing a bit (especially with out-of-order streams,
about which more soon).

[1] 
https://www.postgresql.org/message-id/CA%2BhUKGLxH1tsUgzZfng4BU6GqnS6bKF2ThvxH1_w5c7-sLRKQw%40mail.gmail.com
From 57cc7f12e4a7799c581d4ced054dfbb2663c5b81 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 15 Feb 2025 14:47:25 +1300
Subject: [PATCH v1 1/6] Refactor read_stream.c's circular arithmetic.

Several places have open-coded circular index arithmetic.  Make some
common functions for better readability and consistent assertion
checking.

This avoids adding yet more open-coded duplication in later patches, and
standardizes on the vocabulary "advance" and "retreat" as used elsewhere
in PostgreSQL.
---
 src/backend/storage/aio/read_stream.c | 78 +++++++++++++++++++++------
 1 file changed, 61 insertions(+), 17 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 4b499dfb441..eadfe88c35a 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -224,6 +224,55 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
 	stream->buffered_blocknum = blocknum;
 }
 
+/*
+ * Increment index, wrapping around at queue size.
+ */
+static inline void
+read_stream_index_advance(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	*index += 1;
+	if (*index == stream->queue_size)
+		*index = 0;
+}
+
+/*
+ * Increment index by n, wrapping around at queue size.
+ */
+static inline void
+read_stream_index_advance_n(ReadStream *stream, int16 *index, int16 n)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+	Assert(n <= MAX_IO_COMBINE_LIMIT);
+
+	*index += n;
+	if (*index >= stream->queue_size)
+		*index -= stream->queue_size;
+
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+}
+
+#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
+/*
+ * Decrement index, wrapping around at queue size.
+ */
+static inline void
+read_stream_index_retreat(ReadStream *stream, int16 *index)
+{
+	Assert(*index >= 0);
+	Assert(*index < stream->queue_size);
+
+	if (*index == 0)
+		*index = stream->queue_size - 1;
+	else
+		*index -= 1;
+}
+#endif
+
 static void
 read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 {
@@ -302,11 +351,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 				&stream->buffers[stream->queue_size],
 				sizeof(stream->buffers[0]) * overflow);
 
-	/* Compute location of start of next read, without using % operator. */
-	buffer_index += nblocks;
-	if (buffer_index >= stream->queue_size)
-		buffer_index -= stream->queue_size;
-	Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+	/* Move to the location of start of next read. */
+	read_stream_index_advance_n(stream, &buffer_index, nblocks);
 	stream->next_buffer_index = buffer_index;
 
 	/* Adjust the pending read to cover the remaining portion, if any. */
@@ -334,12 +380,12 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/*
 		 * See which block the callback wants next in the stream.  We need to
 		 * compute the index of the Nth block of the pending read including
-		 * wrap-around, but we don't want to use the expensive % operator.
+		 * wrap-around.
 		 */
-		buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
-		if (buffer_index >= stream->queue_size)
-			buffer_index -= stream->queue_size;
-		Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
+		buffer_index = stream->next_buffer_index;
+		read_stream_index_advance_n(stream,
+									&buffer_index,
+									stream->pending_read_nblocks);
 		per_buffer_data = get_per_buffer_data(stream, buffer_index);
 		blocknum = read_stream_get_block(stream, per_buffer_data);
 		if (blocknum == InvalidBlockNumber)
@@ -777,12 +823,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	 */
 	if (stream->per_buffer_data)
 	{
+		int16		index;
 		void	   *per_buffer_data;
 
-		per_buffer_data = get_per_buffer_data(stream,
-											  oldest_buffer_index == 0 ?
-											  stream->queue_size - 1 :
-											  oldest_buffer_index - 1);
+		index = oldest_buffer_index;
+		read_stream_index_retreat(stream, &index);
+		per_buffer_data = get_per_buffer_data(stream, index);
 
 #if defined(CLOBBER_FREED_MEMORY)
 		/* This also tells Valgrind the memory is "noaccess". */
@@ -800,9 +846,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	stream->pinned_buffers--;
 
 	/* Advance oldest buffer, with wrap-around. */
-	stream->oldest_buffer_index++;
-	if (stream->oldest_buffer_index == stream->queue_size)
-		stream->oldest_buffer_index = 0;
+	read_stream_index_advance(stream, &stream->oldest_buffer_index);
 
 	/* Prepare for the next call. */
 	read_stream_look_ahead(stream, false);
-- 
2.48.1

From cb597bdb53904a82cd27d2f67b8bad9873066f14 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Tue, 21 Jan 2025 08:08:08 +1300
Subject: [PATCH v1 2/6] Allow more buffers for sequential read streams.

Read streams currently only start concurrent I/Os (via read-ahead
advice) for random access, with a hard-coded guesstimate that their
average size is likely to be at most 4 blocks when planning the size of
the buffer queue.  Sequential streams benefit from kernel readahead when
using buffered I/O, and read-ahead advice doesn't exist for direct I/O
by definition, so we didn't need to look ahead more than
io_combine_limit in that case.

Proposed patches need more buffers to be able start multiple
asynchronous I/O operations even for sequential access.  Adjust the
arithmetic in preparation, replacing "4" with io_combine_limit, though
there is no benefit yet, just some wasted queue space.

As of the time of writing, the maximum GUC values for
effective_io_concurrent (1000) and io_combine_limit (32) imply a queue
with around 32K entries (slightly more for technical reasons), though
those numbers are likely to change.  That requires a wider type in one
place that has a intermediate value that might overflow before clamping.
---
 src/backend/storage/aio/read_stream.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index eadfe88c35a..cb308948b6e 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -499,7 +499,7 @@ read_stream_begin_impl(int flags,
 	 * overflow (even though that's not possible with the current GUC range
 	 * limits), allowing also for the spare entry and the overflow space.
 	 */
-	max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
+	max_pinned_buffers = Max(max_ios, 1) * io_combine_limit;
 	max_pinned_buffers = Min(max_pinned_buffers,
 							 PG_INT16_MAX - io_combine_limit - 1);
 
@@ -771,7 +771,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int16		distance;
+		int32		distance;	/* wider temporary value, clamped below */
 
 		/* Sanity check that we still agree on the buffers. */
 		Assert(stream->ios[io_index].op.buffers ==
-- 
2.48.1

From d98dbce9475d4640305bee2e6f536512de2d20d3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 24 Jan 2025 10:59:39 +1300
Subject: [PATCH v1 3/6] Improve buffer pool API for per-backend pin limits.

Previously the support functions assumed that you needed one additional
pin to make progress, and could optionally use some more.  Add a couple
more functions for callers that want to know:

* what the maximum possible number could be, for space planning
  purposes, called the "soft pin limit"

* how many additional pins they could acquire right now, without the
  special case allowing one pin (ie for users that already hold pins and
  can already make progress even if zero extra pins are available now)

These APIs are better suited to read_stream.c, which will be adjusted in
a follow-up patch.  Also move the computation of the each backend's fair
share of the buffer pool to backend initialization time, since the
answer doesn't change and we don't want to perform a division operation
every time we compute availability.
---
 src/backend/storage/buffer/bufmgr.c   | 75 ++++++++++++++++++++-------
 src/backend/storage/buffer/localbuf.c | 16 ++++++
 src/include/storage/bufmgr.h          |  4 ++
 3 files changed, 77 insertions(+), 18 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 80b0d0c5ded..2ca641204fb 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0;
 static uint32 PrivateRefCountClock = 0;
 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
 
+static uint32 MaxProportionalPins;
+
 static void ReservePrivateRefCountEntry(void);
 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
 static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
@@ -2097,6 +2099,46 @@ again:
 	return buf;
 }
 
+/*
+ * Return the maximum number of buffer than this backend should try to pin at
+ * once, to avoid pinning more than its fair share.  This is the highest value
+ * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return.
+ *
+ * It's called a soft limit because nothing stops a backend from trying to
+ * acquire more pins than this this with ReadBuffer(), but code that wants more
+ * for I/O optimizations should respect this per-backend limit when it can
+ * still make progress without them.
+ */
+uint32
+GetSoftPinLimit(void)
+{
+	return MaxProportionalPins;
+}
+
+/*
+ * Return the maximum number of additional buffers that this backend should
+ * pin if it wants to stay under the per-backend soft limit, considering the
+ * number of buffers it has already pinned.
+ */
+uint32
+GetAdditionalPinLimit(void)
+{
+	uint32		estimated_pins_held;
+
+	/*
+	 * We get the number of "overflowed" pins for free, but don't know the
+	 * number of pins in PrivateRefCountArray.  The cost of calculating that
+	 * exactly doesn't seem worth it, so just assume the max.
+	 */
+	estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
+
+	/* Is this backend already holding more than its fair share? */
+	if (estimated_pins_held > MaxProportionalPins)
+		return 0;
+
+	return MaxProportionalPins - estimated_pins_held;
+}
+
 /*
  * Limit the number of pins a batch operation may additionally acquire, to
  * avoid running out of pinnable buffers.
@@ -2112,28 +2154,15 @@ again:
 void
 LimitAdditionalPins(uint32 *additional_pins)
 {
-	uint32		max_backends;
-	int			max_proportional_pins;
+	uint32		limit;
 
 	if (*additional_pins <= 1)
 		return;
 
-	max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
-	max_proportional_pins = NBuffers / max_backends;
-
-	/*
-	 * Subtract the approximate number of buffers already pinned by this
-	 * backend. We get the number of "overflowed" pins for free, but don't
-	 * know the number of pins in PrivateRefCountArray. The cost of
-	 * calculating that exactly doesn't seem worth it, so just assume the max.
-	 */
-	max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES;
-
-	if (max_proportional_pins <= 0)
-		max_proportional_pins = 1;
-
-	if (*additional_pins > max_proportional_pins)
-		*additional_pins = max_proportional_pins;
+	limit = GetAdditionalPinLimit();
+	limit = Max(limit, 1);
+	if (limit < *additional_pins)
+		*additional_pins = limit;
 }
 
 /*
@@ -3574,6 +3603,16 @@ InitBufferManagerAccess(void)
 {
 	HASHCTL		hash_ctl;
 
+	/*
+	 * The soft limit on the number of pins each backend should respect, bast
+	 * on shared_buffers and the maximum number of connections possible.
+	 * That's very pessimistic, but outside toy-sized shared_buffers it should
+	 * allow plenty of pins.  Higher level code that pins non-trivial numbers
+	 * of buffers should use LimitAdditionalPins() or GetAdditionalPinLimit()
+	 * to stay under this limit.
+	 */
+	MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS);
+
 	memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
 
 	hash_ctl.keysize = sizeof(int32);
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 64931efaa75..3c055f6ec8b 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -286,6 +286,22 @@ GetLocalVictimBuffer(void)
 	return BufferDescriptorGetBuffer(bufHdr);
 }
 
+/* see GetSoftPinLimit() */
+uint32
+GetSoftLocalPinLimit(void)
+{
+	/* Every backend has its own temporary buffers, and can pin them all. */
+	return num_temp_buffers;
+}
+
+/* see GetAdditionalPinLimit() */
+uint32
+GetAdditionalLocalPinLimit(void)
+{
+	Assert(NLocalPinnedBuffers <= num_temp_buffers);
+	return num_temp_buffers - NLocalPinnedBuffers;
+}
+
 /* see LimitAdditionalPins() */
 void
 LimitAdditionalLocalPins(uint32 *additional_pins)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 7c1e4316dde..597ecb97897 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -290,6 +290,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern uint32 GetSoftPinLimit(void);
+extern uint32 GetSoftLocalPinLimit(void);
+extern uint32 GetAdditionalPinLimit(void);
+extern uint32 GetAdditionalLocalPinLimit(void);
 extern void LimitAdditionalPins(uint32 *additional_pins);
 extern void LimitAdditionalLocalPins(uint32 *additional_pins);
 
-- 
2.48.1

From b71e09f174c61adf475355e5fff4c01b6a0af399 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 24 Jan 2025 23:52:53 +1300
Subject: [PATCH v1 4/6] Respect pin limits accurately in read_stream.c.

Read streams pin multiple buffers at once as required to combine I/O.
This also avoids having to unpin and repin later when issuing read-ahead
advice, and will be needed for proposed work that starts "real"
asynchronous I/O.

To avoid pinning too much of the buffer pool at once, we previously used
LimitAdditionalBuffers() to avoid pinning more than this backend's fair
share of the pool as a cap.  The coding was a naive and only checked the
cap once at stream initialization.

This commit moves the check to the time of use with new bufmgr APIs from
an earlier commit, since the result might change later due to pins
acquired later outside this stream.  No extra CPU cycles are added to
the all-buffered fast-path code (it only pins one buffer at a time), but
the I/O-starting path now re-checks the limit every time using simple
arithmetic.

In practice it was difficult to exceed the limit, but you could contrive
a workload to do it using multiple CURSORs and FETCHing from sequential
scans in round-robin fashion, so that each underlying stream computes
its limit before all the others have ramped up to their full look-ahead
distance.  Therefore, no back-patch for now.

Per code review from Andres, in the course of his AIO work.

Reported-by: Andres Freund <and...@anarazel.de>
---
 src/backend/storage/aio/read_stream.c | 111 ++++++++++++++++++++++----
 1 file changed, 95 insertions(+), 16 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index cb308948b6e..dc5ae60a089 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -115,6 +115,7 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	bool		advice_enabled;
+	bool		temporary;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -274,7 +275,9 @@ read_stream_index_retreat(ReadStream *stream, int16 *index)
 #endif
 
 static void
-read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
+read_stream_start_pending_read(ReadStream *stream,
+							   int16 buffer_limit,
+							   bool suppress_advice)
 {
 	bool		need_wait;
 	int			nblocks;
@@ -308,10 +311,14 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		flags = 0;
 
-	/* We say how many blocks we want to read, but may be smaller on return. */
+	/*
+	 * We say how many blocks we want to read, but may be smaller on return.
+	 * On memory-constrained systems we may be also have to ask for a smaller
+	 * read ourselves.
+	 */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = stream->pending_read_nblocks;
+	nblocks = Min(buffer_limit, stream->pending_read_nblocks);
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -360,11 +367,60 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	stream->pending_read_nblocks -= nblocks;
 }
 
+/*
+ * How many more buffers could we use, while respecting the soft limit?
+ */
+static int16
+read_stream_get_buffer_limit(ReadStream *stream)
+{
+	uint32		buffers;
+
+	/* Check how many local or shared pins we could acquire. */
+	if (stream->temporary)
+		buffers = GetAdditionalLocalPinLimit();
+	else
+		buffers = GetAdditionalPinLimit();
+
+	/*
+	 * Each stream is always allowed to try to acquire one pin if it doesn't
+	 * hold one already.  This is needed to guarantee progress, and just like
+	 * the simple ReadBuffer() operation in code that is not using this stream
+	 * API, if a buffer can't be pinned we'll raise an error when trying to
+	 * pin, ie the buffer pool is simply too small for the workload.
+	 */
+	if (buffers == 0 && stream->pinned_buffers == 0)
+		return 1;
+
+	/*
+	 * Otherwise, see how many additional pins the backend can currently pin,
+	 * which may be zero.  As above, this only guarantees that this backend
+	 * won't use more than its fair share if all backends can respect the soft
+	 * limit, not that a pin can actually be acquired without error.
+	 */
+	return Min(buffers, INT16_MAX);
+}
+
 static void
 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 {
+	int16		buffer_limit;
+
+	/*
+	 * Check how many pins we could acquire now.  We do this here rather than
+	 * pushing it down into read_stream_start_pending_read(), because it
+	 * allows more flexibility in behavior when we run out of allowed pins.
+	 * Currently the policy is to start an I/O when we've run out of allowed
+	 * pins only if we have to to make progress, and otherwise to stop looking
+	 * ahead until more pins become available, so that we don't start issuing
+	 * a lot of smaller I/Os, prefering to build the largest ones we can. This
+	 * choice is debatable, but it should only really come up with the buffer
+	 * pool/connection ratio is very constrained.
+	 */
+	buffer_limit = read_stream_get_buffer_limit(stream);
+
 	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+		   stream->pinned_buffers + stream->pending_read_nblocks <
+		   Min(stream->distance, buffer_limit))
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
@@ -372,7 +428,9 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 
 		if (stream->pending_read_nblocks == io_combine_limit)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
+			read_stream_start_pending_read(stream, buffer_limit,
+										   suppress_advice);
+			buffer_limit = read_stream_get_buffer_limit(stream);
 			suppress_advice = false;
 			continue;
 		}
@@ -406,11 +464,12 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/* We have to start the pending read before we can build another. */
 		while (stream->pending_read_nblocks > 0)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
+			read_stream_start_pending_read(stream, buffer_limit, suppress_advice);
+			buffer_limit = read_stream_get_buffer_limit(stream);
 			suppress_advice = false;
-			if (stream->ios_in_progress == stream->max_ios)
+			if (stream->ios_in_progress == stream->max_ios || buffer_limit == 0)
 			{
-				/* And we've hit the limit.  Rewind, and stop here. */
+				/* And we've hit a limit.  Rewind, and stop here. */
 				read_stream_unget_block(stream, blocknum);
 				return;
 			}
@@ -426,16 +485,17 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 	 * limit, preferring to give it another chance to grow to full
 	 * io_combine_limit size once more buffers have been consumed.  However,
 	 * if we've already reached io_combine_limit, or we've reached the
-	 * distance limit and there isn't anything pinned yet, or the callback has
-	 * signaled end-of-stream, we start the read immediately.
+	 * distance limit or buffer limit and there isn't anything pinned yet, or
+	 * the callback has signaled end-of-stream, we start the read immediately.
 	 */
 	if (stream->pending_read_nblocks > 0 &&
 		(stream->pending_read_nblocks == io_combine_limit ||
-		 (stream->pending_read_nblocks == stream->distance &&
+		 ((stream->pending_read_nblocks == stream->distance ||
+		   stream->pending_read_nblocks == buffer_limit) &&
 		  stream->pinned_buffers == 0) ||
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
-		read_stream_start_pending_read(stream, suppress_advice);
+		read_stream_start_pending_read(stream, buffer_limit, suppress_advice);
 }
 
 /*
@@ -464,6 +524,7 @@ read_stream_begin_impl(int flags,
 	int			max_ios;
 	int			strategy_pin_limit;
 	uint32		max_pinned_buffers;
+	uint32		max_possible_buffer_limit;
 	Oid			tablespace_id;
 
 	/*
@@ -507,12 +568,23 @@ read_stream_begin_impl(int flags,
 	strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
 	max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
 
-	/* Don't allow this backend to pin more than its share of buffers. */
+	/*
+	 * Also limit by the maximum possible number of pins we could be allowed
+	 * to acquire according to bufmgr.  We may not be able to use them all due
+	 * to other pins held by this backend, but we'll enforce the dynamic limit
+	 * later when starting I/O.
+	 */
 	if (SmgrIsTemp(smgr))
-		LimitAdditionalLocalPins(&max_pinned_buffers);
+		max_possible_buffer_limit = GetSoftLocalPinLimit();
 	else
-		LimitAdditionalPins(&max_pinned_buffers);
-	Assert(max_pinned_buffers > 0);
+		max_possible_buffer_limit = GetSoftPinLimit();
+	max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
+
+	/*
+	 * The soft limit might be zero on a system configured with more
+	 * connections than buffers.  We need at least one.
+	 */
+	max_pinned_buffers = Max(1, max_pinned_buffers);
 
 	/*
 	 * We need one extra entry for buffers and per-buffer data, because users
@@ -572,6 +644,7 @@ read_stream_begin_impl(int flags,
 	stream->callback = callback;
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
+	stream->temporary = SmgrIsTemp(smgr);
 
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
@@ -700,6 +773,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			 * arbitrary I/O entry (they're all free).  We don't have to
 			 * adjust pinned_buffers because we're transferring one to caller
 			 * but pinning one more.
+			 *
+			 * In the fast path we don't need to check the pin limit.  We're
+			 * always allowed at least one pin so that progress can be made,
+			 * and that's all we need here.  Although two pins are momentarily
+			 * held at the same time, the model used here is that the stream
+			 * holds only one, and the other now belongs to the caller.
 			 */
 			if (likely(!StartReadBuffer(&stream->ios[0].op,
 										&stream->buffers[oldest_buffer_index],
-- 
2.48.1

From 4e2250ad385572c560c89ed0cf3e345c47651b8c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 30 Jan 2025 11:42:03 +1300
Subject: [PATCH v1 5/6] Support buffer forwarding in read_stream.c.

In preparation for a following change to the buffer manager, teach read
stream to keep track of buffers that were "forwarded" from one call to
StartReadBuffers() to the next.

Since StartReadBuffers() buffers argument will become an in/out
argument, we need to initialize the buffer queue entries with
InvalidBuffer.  We don't want to do that up front, because we try to
keep stream initialization cheap and code that uses the fast path stays
in one single buffer queue element.  Satisfy both goals by initializing
the queue incrementally on the first cycle.
---
 src/backend/storage/aio/read_stream.c | 108 ++++++++++++++++++++++----
 1 file changed, 94 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index dc5ae60a089..049fda98257 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -112,8 +112,10 @@ struct ReadStream
 	int16		ios_in_progress;
 	int16		queue_size;
 	int16		max_pinned_buffers;
+	int16		forwarded_buffers;
 	int16		pinned_buffers;
 	int16		distance;
+	int16		initialized_buffers;
 	bool		advice_enabled;
 	bool		temporary;
 
@@ -280,7 +282,9 @@ read_stream_start_pending_read(ReadStream *stream,
 							   bool suppress_advice)
 {
 	bool		need_wait;
+	int			requested_nblocks;
 	int			nblocks;
+	int			forwarded;
 	int			flags;
 	int16		io_index;
 	int16		overflow;
@@ -312,13 +316,34 @@ read_stream_start_pending_read(ReadStream *stream,
 		flags = 0;
 
 	/*
-	 * We say how many blocks we want to read, but may be smaller on return.
-	 * On memory-constrained systems we may be also have to ask for a smaller
-	 * read ourselves.
+	 * On buffer-constrained systems we may need to limit the I/O size by the
+	 * available pin count.
 	 */
+	requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+	nblocks = requested_nblocks;
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+
+	/*
+	 * The first time around the queue we initialize it as we go, including
+	 * the overflow zone, because otherwise the entries would appear as
+	 * forwarded buffers.  This avoids initializing the whole queue up front
+	 * in cases where it is large but we don't ever use it due to the
+	 * all-cached fast path or small scans.
+	 */
+	while (stream->initialized_buffers < buffer_index + nblocks)
+		stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
+
+	/*
+	 * Start the I/O.  Any buffers that are not InvalidBuffer will be
+	 * interpreted as already pinned, forwarded by an earlier call to
+	 * StartReadBuffers(), and must map to the expected blocks.  The nblocks
+	 * value may be smaller on return indicating the size of the I/O that
+	 * could be started.  Buffers beyond the output nblocks number may also
+	 * have been pinned without starting I/O due to various edge cases.  In
+	 * that case we'll just leave them in the queue ahead of us, "forwarded"
+	 * to the next call, avoiding the need to unpin/repin.
+	 */
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -347,16 +372,35 @@ read_stream_start_pending_read(ReadStream *stream,
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
 	}
 
+	/*
+	 * How many pins were acquired but forwarded to the next call?  These need
+	 * to be passed to the next StartReadBuffers() call, or released if the
+	 * stream ends early.  We need the number for accounting purposes, since
+	 * they are not counted in stream->pinned_buffers but we already hold
+	 * them.
+	 */
+	forwarded = 0;
+	while (nblocks + forwarded < requested_nblocks &&
+		   stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
+		forwarded++;
+	stream->forwarded_buffers = forwarded;
+
 	/*
 	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
-	 * we want it to wrap around at queue_size.  Slide overflowing buffers to
-	 * the front of the array.
+	 * we want it to wrap around at queue_size.  Copy overflowing buffers to
+	 * the front of the array where they'll be consumed, but also leave a copy
+	 * in the overflow zone which the I/O operation has a pointer to (it needs
+	 * a contiguous array).  Both copies will be cleared when the buffers are
+	 * handed to the consumer.
 	 */
-	overflow = (buffer_index + nblocks) - stream->queue_size;
+	overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
 	if (overflow > 0)
-		memmove(&stream->buffers[0],
-				&stream->buffers[stream->queue_size],
-				sizeof(stream->buffers[0]) * overflow);
+	{
+		Assert(overflow < stream->queue_size);	/* can't overlap */
+		memcpy(&stream->buffers[0],
+			   &stream->buffers[stream->queue_size],
+			   sizeof(stream->buffers[0]) * overflow);
+	}
 
 	/* Move to the location of start of next read. */
 	read_stream_index_advance_n(stream, &buffer_index, nblocks);
@@ -381,6 +425,15 @@ read_stream_get_buffer_limit(ReadStream *stream)
 	else
 		buffers = GetAdditionalPinLimit();
 
+	/*
+	 * If we already have some forwarded buffers, we can certainly use those.
+	 * They are already pinned, and are mapped to the starting blocks of the
+	 * pending read, they just don't have any I/O started yet and are not
+	 * counted in stream->pinned_buffers.
+	 */
+	Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
+	buffers += stream->forwarded_buffers;
+
 	/*
 	 * Each stream is always allowed to try to acquire one pin if it doesn't
 	 * hold one already.  This is needed to guarantee progress, and just like
@@ -389,7 +442,7 @@ read_stream_get_buffer_limit(ReadStream *stream)
 	 * pin, ie the buffer pool is simply too small for the workload.
 	 */
 	if (buffers == 0 && stream->pinned_buffers == 0)
-		return 1;
+		buffers = 1;
 
 	/*
 	 * Otherwise, see how many additional pins the backend can currently pin,
@@ -751,10 +804,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* Fast path assumptions. */
 		Assert(stream->ios_in_progress == 0);
+		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
 		Assert(stream->distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
+		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
 
 		/* We're going to return the buffer we pinned last time. */
 		oldest_buffer_index = stream->oldest_buffer_index;
@@ -803,6 +858,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
+			stream->buffers[oldest_buffer_index] = InvalidBuffer;
 		}
 
 		stream->fast_path = false;
@@ -887,10 +943,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		}
 	}
 
-#ifdef CLOBBER_FREED_MEMORY
-	/* Clobber old buffer for debugging purposes. */
+	/*
+	 * We must zap this queue entry, or else it would appear as a forwarded
+	 * buffer.  If it's potentially in the overflow zone (ie it wrapped around
+	 * the queue), also zap that copy.
+	 */
 	stream->buffers[oldest_buffer_index] = InvalidBuffer;
-#endif
+	if (oldest_buffer_index < io_combine_limit - 1)
+		stream->buffers[stream->queue_size + oldest_buffer_index] =
+			InvalidBuffer;
 
 #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
 
@@ -933,6 +994,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
 	if (stream->ios_in_progress == 0 &&
+		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
 		stream->distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
@@ -968,6 +1030,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 void
 read_stream_reset(ReadStream *stream)
 {
+	int16		index;
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
@@ -981,6 +1044,23 @@ read_stream_reset(ReadStream *stream)
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
 
+	/* Unpin any unused forwarded buffers. */
+	index = stream->next_buffer_index;
+	while (index < stream->initialized_buffers &&
+		   (buffer = stream->buffers[index]) != InvalidBuffer)
+	{
+		Assert(stream->forwarded_buffers > 0);
+		stream->forwarded_buffers--;
+		ReleaseBuffer(buffer);
+
+		stream->buffers[index] = InvalidBuffer;
+		if (index < io_combine_limit - 1)
+			stream->buffers[stream->queue_size + index] = InvalidBuffer;
+
+		read_stream_index_advance(stream, &index);
+	}
+
+	Assert(stream->forwarded_buffers == 0);
 	Assert(stream->pinned_buffers == 0);
 	Assert(stream->ios_in_progress == 0);
 
-- 
2.48.1

From 37316501bfbe9fc6fc86f72351e18d6f103b9077 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 10 Feb 2025 21:55:40 +1300
Subject: [PATCH v1 6/6] Support buffer forwarding in StartReadBuffers().

Sometimes we have to perform a short read because we hit a cached block
that ends a contiguous run of blocks requiring I/O.  We don't want
StartReadBuffers() to have to start more than one I/O, so we stop there.
We also don't want to have to unpin the cached block (and repin it
later), so previously we'd silently pretend the hit was part of the I/O,
and just leave it out of the read from disk.  Now, we'll "forward" it to
the next call.  We still write it to the buffers[] array for the caller
to pass back to us later, but it's not included in *nblocks.

This policy means that we no longer mix hits and misses in a single
operation's results, so we avoid the requirement to call
WaitReadBuffers(), which might stall, before the caller can make use of
the hits.  The caller will get the hit in the next call instead, and
know that it doesn't have to wait.  That's important for later work on
out-of-order read streams that minimize I/O stalls.

This also makes life easier for proposed work on true AIO, which
occasionally needs to split a large I/O after pinning all the buffers,
while the current coding only ever forwards a single bookending hit.

This API is natural for read_stream.c: it just leaves forwarded buffers
where they are in its circular queue, where the next call will pick them
up and continue, minimizing pin churn.

If we ever think of a good reason to disable this feature, i.e. for
other users of StartReadBuffers() that don't want to deal with forwarded
buffers, then we could add a flag for that.  For now read_steam.c is the
only user.
---
 src/backend/storage/buffer/bufmgr.c | 128 ++++++++++++++++++++--------
 src/include/storage/bufmgr.h        |   1 -
 2 files changed, 91 insertions(+), 38 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 2ca641204fb..6fe70328e38 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1257,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 					 Buffer *buffers,
 					 BlockNumber blockNum,
 					 int *nblocks,
-					 int flags)
+					 int flags,
+					 bool allow_forwarding)
 {
 	int			actual_nblocks = *nblocks;
-	int			io_buffers_len = 0;
 	int			maxcombine = 0;
 
 	Assert(*nblocks > 0);
@@ -1270,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	{
 		bool		found;
 
-		buffers[i] = PinBufferForBlock(operation->rel,
-									   operation->smgr,
-									   operation->persistence,
-									   operation->forknum,
-									   blockNum + i,
-									   operation->strategy,
-									   &found);
+		if (allow_forwarding && buffers[i] != InvalidBuffer)
+		{
+			BufferDesc *bufHdr;
+
+			/*
+			 * This is a buffer that was pinned by an earlier call to
+			 * StartReadBuffers(), but couldn't be handled in one operation at
+			 * that time.  The operation was split, and the caller has passed
+			 * an already pinned buffer back to us to handle the rest of the
+			 * operation.  It must continue at the expected block number.
+			 */
+			Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
+
+			/*
+			 * It might be an already valid buffer (a hit) that followed the
+			 * final contiguous block of an earlier I/O (a miss) marking the
+			 * end of it, or a buffer that some other backend has since made
+			 * valid by performing the I/O for us, in which case we can handle
+			 * it as a hit now.  It is safe to check for a BM_VALID flag with
+			 * a relaxed load, because we got a fresh view of it while pinning
+			 * it in the previous call.
+			 *
+			 * On the other hand if we don't see BM_VALID yet, it must be an
+			 * I/O that was split by the previous call and we need to try to
+			 * start a new I/O from this block.  We're also racing against any
+			 * other backend that might start the I/O or even manage to mark
+			 * it BM_VALID after this check, BM_VALID after this check, but
+			 * StartBufferIO() will handle those cases.
+			 */
+			if (BufferIsLocal(buffers[i]))
+				bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
+			else
+				bufHdr = GetBufferDescriptor(buffers[i] - 1);
+			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+		}
+		else
+		{
+			buffers[i] = PinBufferForBlock(operation->rel,
+										   operation->smgr,
+										   operation->persistence,
+										   operation->forknum,
+										   blockNum + i,
+										   operation->strategy,
+										   &found);
+		}
 
 		if (found)
 		{
 			/*
-			 * Terminate the read as soon as we get a hit.  It could be a
-			 * single buffer hit, or it could be a hit that follows a readable
-			 * range.  We don't want to create more than one readable range,
-			 * so we stop here.
+			 * We have a hit.  If it's the first block in the requested range,
+			 * we can return it immediately and report that WaitReadBuffers()
+			 * does not need to be called.  If the initial value of *nblocks
+			 * was larger, the caller will have to call again for the rest.
 			 */
-			actual_nblocks = i + 1;
+			if (i == 0)
+			{
+				*nblocks = 1;
+				return false;
+			}
+
+			/*
+			 * Otherwise we already have an I/O to perform, but this block
+			 * can't be included as it is already valid.  Split the I/O here.
+			 * There may or may not be more blocks requiring I/O after this
+			 * one, we haven't checked, but it can't be contiguous with this
+			 * hit in the way.  We'll leave this buffer pinned, forwarding it
+			 * to the next call, avoiding the need to unpin it here and re-pin
+			 * it in the next call.
+			 */
+			actual_nblocks = i;
 			break;
 		}
 		else
 		{
-			/* Extend the readable range to cover this block. */
-			io_buffers_len++;
-
 			/*
 			 * Check how many blocks we can cover with the same IO. The smgr
 			 * implementation might e.g. be limited due to a segment boundary.
@@ -1314,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	}
 	*nblocks = actual_nblocks;
 
-	if (likely(io_buffers_len == 0))
-		return false;
-
 	/* Populate information needed for I/O. */
 	operation->buffers = buffers;
 	operation->blocknum = blockNum;
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
-	operation->io_buffers_len = io_buffers_len;
 
 	if (flags & READ_BUFFERS_ISSUE_ADVICE)
 	{
@@ -1337,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 		smgrprefetch(operation->smgr,
 					 operation->forknum,
 					 blockNum,
-					 operation->io_buffers_len);
+					 actual_nblocks);
 	}
 
 	/* Indicate that WaitReadBuffers() should be called. */
@@ -1351,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
  * actual number, which may be fewer than requested.  Caller sets some of the
  * members of operation; see struct definition.
  *
+ * The initial contents of the elements of buffers up to *nblocks should
+ * either be InvalidBuffer or an already-pinned buffer that was left by an
+ * preceding call to StartReadBuffers() that had to be split.  On return, some
+ * elements of buffers may hold pinned buffers beyond the number indicated by
+ * the updated value of *nblocks.  Operations are split on boundaries known to
+ * smgr (eg md.c segment boundaries that require crossing into a different
+ * underlying file), or when already cached blocks are found in the buffer
+ * that prevent the formation of a contiguous read.
+ *
  * If false is returned, no I/O is necessary.  If true is returned, one I/O
  * has been started, and WaitReadBuffers() must be called with the same
  * operation object before the buffers are accessed.  Along with the operation
  * object, the caller-supplied array of buffers must remain valid until
- * WaitReadBuffers() is called.
+ * WaitReadBuffers() is called, and any forwarded buffers must also be
+ * preserved for a future call unless explicitly released.
  *
  * Currently the I/O is only started with optional operating system advice if
  * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1369,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation,
 				 int *nblocks,
 				 int flags)
 {
-	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
+	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
+								true /* expect forwarded buffers */ );
 }
 
 /*
  * Single block version of the StartReadBuffers().  This might save a few
  * instructions when called from another translation unit, because it is
  * specialized for nblocks == 1.
+ *
+ * This version does not support "forwarded" buffers: they cannot be created
+ * by reading only one block, and the current contents of *buffer is ignored
+ * on entry.
  */
 bool
 StartReadBuffer(ReadBuffersOperation *operation,
@@ -1386,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
 	int			nblocks = 1;
 	bool		result;
 
-	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
+	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
+								  false /* single block, no forwarding */ );
 	Assert(nblocks == 1);		/* single block can't be short */
 
 	return result;
@@ -1416,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	IOObject	io_object;
 	char		persistence;
 
-	/*
-	 * Currently operations are only allowed to include a read of some range,
-	 * with an optional extra buffer that is already pinned at the end.  So
-	 * nblocks can be at most one more than io_buffers_len.
-	 */
-	Assert((operation->nblocks == operation->io_buffers_len) ||
-		   (operation->nblocks == operation->io_buffers_len + 1));
-
 	/* Find the range of the physical read we need to perform. */
-	nblocks = operation->io_buffers_len;
-	if (nblocks == 0)
-		return;					/* nothing to do */
-
+	nblocks = operation->nblocks;
 	buffers = &operation->buffers[0];
 	blocknum = operation->blocknum;
 	forknum = operation->forknum;
 	persistence = operation->persistence;
 
+	Assert(nblocks > 0);
+	Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
+
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 597ecb97897..4a035f59a7d 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -130,7 +130,6 @@ struct ReadBuffersOperation
 	BlockNumber blocknum;
 	int			flags;
 	int16		nblocks;
-	int16		io_buffers_len;
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
-- 
2.48.1

Reply via email to