On Fri, Nov 21, 2025 at 4:28 AM Melanie Plageman
<[email protected]> wrote:
> I'm not totally opposed to this. My rationale for making it an error
> is that the developer could have test cases where all the buffers are
> consumed but the code is written such that that won't always happen.
> Then if a real production query doesn't consume all the buffers, it
> could return wrong results (I think). That will mean the user can't
> complete their query until the extension author releases a new version
> of their code. But I'm not sure what the right answer is here.

Focusing on making sure v19 has a good interface for this, and
abandoning thoughts of back-patching a bandaid, and the constraints
that leads to, for now...

I think it'd be better if that were the consumer's choice.   I don't
want the consumer to be required to drain the stream before resuming,
as that'd be an unprincipled stall.  For example, if new WAL arrives
over the network then I think it should be possible for recovery's
WAL-powered stream of heap pages to resume looking ahead even if
recovery hasn't drained the existing stream completely.

Peter G (CC'd) and I discussed some problems he had in the index
prefetching work, and I tried to extend this a bit to give the
semantics he wanted, in point 2 below.  It's simple itself, but might
lead to some tricky questions higher up.  Posted for experimentation.
It'll be interesting to see if this goes somewhere.

1.  read_stream_resume() as before, but with a new explicit
read_stream_pause(): if a block number callback would like to report a
temporary lack of information, it should return
read_stream_pause(stream), not InvalidBlockNumber.  Then after
read_stream_resume(stream) is called, the next
read_stream_next_buffer() enters the lookahead loop again.  While
paused, if the consumer drains all the existing buffers in the stream
and then one more, it will receive InvalidBuffer, but if the _resume()
call is made sooner, the consumer won't ever know about the temporary
lack of buffers in the stream.

2.  read_stream_yield(): while streaming heap pages that come from
TIDs on index pages, Peter didn't like that the executor lost control
of how much work was done by the lookahead loop underneath
read_stream_next_buffer().  The consumer might have a heap page with
some tuples that could be emitted right now, but the block number
callback might be evaluating arbitrarily expensive filter qual
expressions far ahead, and they might prefer to emit more tuples now
before doing an unbounded amount of work finding more.  This interface
allows some limited coroutine-like multitasking, where the block
number callback can return read_stream_yield(stream) to return control
back to the consumer periodically if it knows the consumer could
already do something else.  It works by pausing the stream and
resuming it in the next read_stream_next_buffer() call, but that's an
internal detail.

Some half-baked thoughts about the resulting flow control:

Yielding control periodically just when it happens to be possible
within the constraints of the volcano executor is an interesting thing
to think about.  You can only yield if you already have a tuple to
emit.  There is no saying when control will return to you, and the
node you yield to might immediately block on I/O and yet you could
have been doing useful CPU work.  You probably need an event-driven
node-hopping executor to fix that in general, but on the flip side, I
can think of one bet that I'd take: if you already have a tuple to
emit AND if index scans themselves (not only referenced heap pages)
were also streamed AND if a hypothetical
read_stream_next_buffer_no_wait(btree_stream) said the next index page
you need is not ready yet, then you should yield.  You're gambling
that other plan nodes will have better luck running without an I/O
stall, but you have ~0% chance.

Yielding just because you've scanned N index pages/tuples/whatever is
harder to think about.  The stream shouldn't get far ahead unless it's
recently been useful for I/O concurrency (though optimal distance
heuristics are an open problem), but in this case a single invocation
of the block number callback can call ReadBuffer() an arbitrary number
of times, filtering out all the index tuples as it rampages through
the whole index IIUC.  I see why you might want to yield periodically
if you can, but I also wonder how much that can really help if you
still have to pick up where you left off next time.  I guess it
depends on the distribution of matches.  It's also clear that any
cold-cache testing done with direct I/O enabled will stall abominably
as long as that level calls ReadBuffer(), possibly confusing matters.
From ff6da44df5418d73ad9ec1911c87b66897f3b086 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH 1/2] Introduce read_stream_{pause,resume,yield}().

---
 src/backend/storage/aio/read_stream.c | 50 ++++++++++++++++++++++++++-
 src/include/storage/read_stream.h     |  3 ++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..964e1aa281c 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,11 +100,13 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	int16		initialized_buffers;
+	int16		resume_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
 	bool		advice_enabled;
 	bool		temporary;
+	bool		yielded;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -879,7 +881,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* End of stream reached?  */
 		if (stream->distance == 0)
-			return InvalidBuffer;
+		{
+			if (!stream->yielded)
+				return InvalidBuffer;
+
+			/* The callback yielded.  Resume. */
+			stream->yielded = false;
+			read_stream_resume(stream);
+			Assert(stream->distance != 0);
+		}
 
 		/*
 		 * The usual order of operations is that we look ahead at the bottom
@@ -1034,6 +1044,44 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 	return read_stream_get_block(stream, NULL);
 }
 
+/*
+ * Temporarily stop consuming block numbers from the block number callback.  If
+ * called inside the block number callback, its return value should be
+ * returned by the callback.
+ */
+BlockNumber
+read_stream_pause(ReadStream *stream)
+{
+	stream->resume_distance = stream->distance;
+	stream->distance = 0;
+	return InvalidBlockNumber;
+}
+
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+	stream->distance = stream->resume_distance;
+}
+
+/*
+ * Called from inside a block number callback, to return control to the caller
+ * of read_stream_next_buffer() without looking further ahead.  Its return
+ * value should be returned by the callback.  This is equivalent to pausing and
+ * resuming automatically at the next call to read_stream_next_buffer().
+ */
+BlockNumber
+read_stream_yield(ReadStream *stream)
+{
+	read_stream_pause(stream);
+	stream->yielded = true;
+	return InvalidBlockNumber;
+}
+
 /*
  * Reset a read stream by releasing any queued up buffers, allowing the stream
  * to be used again for different blocks.  This can be used to clear an
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..8ac53d2902d 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,9 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
 												   ReadStreamBlockNumberCB callback,
 												   void *callback_private_data,
 												   size_t per_buffer_data_size);
+extern BlockNumber read_stream_pause(ReadStream *stream);
+extern void read_stream_resume(ReadStream *stream);
+extern BlockNumber read_stream_yield(ReadStream *stream);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
-- 
2.51.2

From de8a800b48829ef619200a71dba9028be3ea09e9 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Wed, 12 Nov 2025 16:49:57 +1300
Subject: [PATCH 2/2] Add tests for read_stream_{pause,resume,yield}().

---
 src/test/modules/test_aio/Makefile           |   3 +-
 src/test/modules/test_aio/meson.build        |   1 +
 src/test/modules/test_aio/t/001_aio.pl       |  30 +++
 src/test/modules/test_aio/test_aio--1.0.sql  |  13 ++
 src/test/modules/test_aio/test_read_stream.c | 181 +++++++++++++++++++
 src/tools/pgindent/typedefs.list             |   1 +
 6 files changed, 228 insertions(+), 1 deletion(-)
 create mode 100644 src/test/modules/test_aio/test_read_stream.c

diff --git a/src/test/modules/test_aio/Makefile b/src/test/modules/test_aio/Makefile
index f53cc64671a..465eb09ee4f 100644
--- a/src/test/modules/test_aio/Makefile
+++ b/src/test/modules/test_aio/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_aio - test code for AIO"
 MODULE_big = test_aio
 OBJS = \
 	$(WIN32RES) \
-	test_aio.o
+	test_aio.o \
+	test_read_stream.o
 
 EXTENSION = test_aio
 DATA = test_aio--1.0.sql
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 73d2fd68eaa..6e6fcbfdad9 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -2,6 +2,7 @@
 
 test_aio_sources = files(
   'test_aio.c',
+  'test_read_stream.c',
 )
 
 if host_system == 'windows'
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 3f0453619e8..2a2c6523a6b 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -1489,6 +1489,35 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
 	$psql->quit();
 }
 
+# Read stream tests
+sub test_read_stream
+{
+	my $io_method = shift;
+	my $node = shift;
+	my ($ret, $output);
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	$psql->query_safe(
+		qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_resume('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+
+	$psql->query_safe(
+		qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_yield('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+
+	$psql->quit();
+}
+
+
 
 # Run all tests that are supported for all io_methods
 sub test_generic
@@ -1525,6 +1554,7 @@ CHECKPOINT;
 	test_checksum($io_method, $node);
 	test_ignore_checksum($io_method, $node);
 	test_checksum_createdb($io_method, $node);
+	test_read_stream($io_method, $node);
 
   SKIP:
 	{
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..e37810b7273 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -106,3 +106,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 CREATE FUNCTION inj_io_reopen_detach()
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
+
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_read_stream_yield(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_read_stream.c b/src/test/modules/test_aio/test_read_stream.c
new file mode 100644
index 00000000000..d1d436a90b7
--- /dev/null
+++ b/src/test/modules/test_aio/test_read_stream.c
@@ -0,0 +1,181 @@
+/*-------------------------------------------------------------------------
+ *
+ * test_read_stream.c
+ *		Helpers to write tests for read_stream.c
+ *
+ * Copyright (c) 2020-2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/test/modules/test_aio/test_read_stream.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "fmgr.h"
+#include "storage/bufmgr.h"
+#include "storage/read_stream.h"
+
+typedef struct
+{
+	BlockNumber blkno;
+	int			count;
+} test_read_stream_resume_state;
+
+static BlockNumber
+test_read_stream_resume_cb(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	test_read_stream_resume_state *state = callback_private_data;
+
+	/* Periodic end-of-stream. */
+	if (++state->count % 3 == 0)
+		return read_stream_pause(stream);
+
+	return state->blkno;
+}
+
+/*
+ * Test read_stream_resume(), allowing a stream to end temporarily and then
+ * continue where it left off.
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_resume);
+Datum
+test_read_stream_resume(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber blkno = PG_GETARG_UINT32(1);
+	Relation	rel;
+	Buffer		buf;
+	ReadStream *stream;
+	test_read_stream_resume_state state = {.blkno = blkno};
+
+	rel = relation_open(relid, AccessShareLock);
+	stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										test_read_stream_resume_cb,
+										&state,
+										0);
+
+	for (int i = 0; i < 3; ++i)
+	{
+		/* Same block twice. */
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferGetBlockNumber(buf) == blkno);
+		ReleaseBuffer(buf);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferGetBlockNumber(buf) == blkno);
+		ReleaseBuffer(buf);
+
+		/* End-of-stream. */
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(buf == InvalidBuffer);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(buf == InvalidBuffer);
+
+		/* Resume. */
+		read_stream_resume(stream);
+	}
+
+	read_stream_end(stream);
+	relation_close(rel, NoLock);
+
+	PG_RETURN_VOID();
+}
+
+typedef struct
+{
+	BlockNumber blkno;
+	int			count;
+	int			yields;
+	int			blocks;
+}			test_read_stream_yield_state;
+
+static BlockNumber
+test_read_stream_yield_cb(ReadStream *stream,
+						  void *callback_private_data,
+						  void *per_buffer_data)
+{
+	test_read_stream_yield_state *state = callback_private_data;
+
+	/* Yield every third call. */
+	if (++state->count % 3 == 2)
+	{
+		state->yields++;
+		return read_stream_yield(stream);
+	}
+
+	state->blocks++;
+	return state->blkno;
+}
+
+/*
+ * Test read_stream_yield(), allowing control to be yielded temporarily from
+ * the lookahead loop and returned to the caller of read_stream_next_buffer().
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_yield);
+Datum
+test_read_stream_yield(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber blkno = PG_GETARG_UINT32(1);
+	Relation	rel;
+	Buffer		buf;
+	ReadStream *stream;
+	test_read_stream_yield_state state = {.blkno = blkno};
+
+	rel = relation_open(relid, AccessShareLock);
+	stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										test_read_stream_yield_cb,
+										&state,
+										0);
+
+	buf = read_stream_next_buffer(stream, NULL);
+	Assert(BufferGetBlockNumber(buf) == blkno);
+	ReleaseBuffer(buf);
+	Assert(state.blocks == 1);
+	Assert(state.yields == 1);
+
+	buf = read_stream_next_buffer(stream, NULL);
+	Assert(BufferGetBlockNumber(buf) == blkno);
+	ReleaseBuffer(buf);
+	Assert(state.blocks == 3);
+	Assert(state.yields == 1);
+
+	buf = read_stream_next_buffer(stream, NULL);
+	Assert(BufferGetBlockNumber(buf) == blkno);
+	ReleaseBuffer(buf);
+	Assert(state.blocks == 3);
+	Assert(state.yields == 2);
+
+	buf = read_stream_next_buffer(stream, NULL);
+	Assert(BufferGetBlockNumber(buf) == blkno);
+	ReleaseBuffer(buf);
+	Assert(state.blocks == 5);
+	Assert(state.yields == 2);
+
+	buf = read_stream_next_buffer(stream, NULL);
+	Assert(BufferGetBlockNumber(buf) == blkno);
+	ReleaseBuffer(buf);
+	Assert(state.blocks == 5);
+	Assert(state.yields == 3);
+
+	buf = read_stream_next_buffer(stream, NULL);
+	Assert(BufferGetBlockNumber(buf) == blkno);
+	ReleaseBuffer(buf);
+	Assert(state.blocks == 7);
+	Assert(state.yields == 3);
+
+	read_stream_end(stream);
+	relation_close(rel, NoLock);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cf3f6a7dafd..7396e9ce14b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4158,6 +4158,7 @@ td_entry
 teSection
 temp_tablespaces_extra
 test_re_flags
+test_read_stream_resume_state
 test_regex_ctx
 test_shm_mq_header
 test_spec
-- 
2.51.2

Reply via email to