On Wed, Nov 12, 2025 at 12:19 PM Thomas Munro <[email protected]> wrote:
> On Wed, Nov 12, 2025 at 11:52 AM Melanie Plageman
> <[email protected]> wrote:
> > If we are worried about regressing other extensions using
> > read_stream_reset(), we could make the read stream reset which
> > preserves the distance a different function in backbranches.

Here is a draft patch like that, that tries to be as small as
possible.  Trying out the name read_stream_resume().
From ab494c6e5ccf93563dcf7059f7eec7d4252294f6 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH 1/2] Fix overloaded remit of read_stream_reset().

Some extensions need to examine page contents to find more block numbers
to stream.  That currently means that they have to signal end-of-stream
when data runs out, but later "reset" the stream to resume with new
information.

As an unfortunate by-product of that API economy, the look-ahead
distance would start out at one again, and then need to ramp back up to
find a useful concurrency level.  This created a significant loss of
performance for self-referential block streams with a high degree of
fan-out, as discovered by the pgvector project.

Split out a separate function read_stream_resume() that is explicitly
intended for that usage pattern.  Since distance == 0 is the internal
representation of end-of-stream, all it has to do is restore the
distance recorded at the time end-of-stream was signaled to resume
looking ahead with the same I/O concurrency level after a page jump.

We don't have any self-referential streaming in PostgreSQL itself yet,
but we decided to call this a bug all the same and back-patch a fix.
The intention was to support that exact usage pattern, the API just
missed the mark by trying to handle too many use cases.  Such extensions
should be able to benefit from streaming, and the behavior change is
isolated to code that calls the new variant, so a separation of duties
seems warranted and safe to back-patch.

The problem was reported as extremely poor look-ahead performance in v18
using AIO for prefetching, but it also affects the fadvise-based
prefetching in v17, where read_stream.c arrived.

Reviewed-by:
Tested-by:
Backpatch-through: 17
Discussion: https://postgr.es/m/CA%2BhUKGL-3mBtkA9RTbLFHuSS5cviuv0ko7nBhCg9KM7Q-GSEkw%40mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 15 +++++++++++++++
 src/include/storage/read_stream.h     |  1 +
 2 files changed, 16 insertions(+)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..beae8ef325a 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,6 +100,7 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	int16		initialized_buffers;
+	int16		resume_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -464,6 +465,7 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
+			stream->resume_distance = stream->distance;
 			stream->distance = 0;
 			break;
 		}
@@ -711,6 +713,7 @@ read_stream_begin_impl(int flags,
 		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
 	else
 		stream->distance = 1;
+	stream->resume_distance = stream->distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -862,6 +865,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
+			stream->resume_distance = stream->distance;
 			stream->distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
@@ -1034,6 +1038,17 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 	return read_stream_get_block(stream, NULL);
 }
 
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+	stream->distance = stream->resume_distance;
+}
+
 /*
  * Reset a read stream by releasing any queued up buffers, allowing the stream
  * to be used again for different blocks.  This can be used to clear an
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..e29ac50fc9e 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
 												   ReadStreamBlockNumberCB callback,
 												   void *callback_private_data,
 												   size_t per_buffer_data_size);
+extern void read_stream_resume(ReadStream *stream);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
-- 
2.51.1

From a3c0520aa382874c7028c42d7d720415d4b3b26d Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Wed, 12 Nov 2025 16:49:57 +1300
Subject: [PATCH 2/2] Add smoke test for read_stream_resume().

XXX Not sure if it's worth a permanent test for this, but since the new
function is not exercised in the tree it seemed worth writing a simple
demo...
---
 src/test/modules/test_aio/Makefile           |  3 +-
 src/test/modules/test_aio/meson.build        |  1 +
 src/test/modules/test_aio/t/001_aio.pl       | 21 +++++
 src/test/modules/test_aio/test_aio--1.0.sql  |  9 ++
 src/test/modules/test_aio/test_read_stream.c | 89 ++++++++++++++++++++
 src/tools/pgindent/typedefs.list             |  1 +
 6 files changed, 123 insertions(+), 1 deletion(-)
 create mode 100644 src/test/modules/test_aio/test_read_stream.c

diff --git a/src/test/modules/test_aio/Makefile b/src/test/modules/test_aio/Makefile
index f53cc64671a..465eb09ee4f 100644
--- a/src/test/modules/test_aio/Makefile
+++ b/src/test/modules/test_aio/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_aio - test code for AIO"
 MODULE_big = test_aio
 OBJS = \
 	$(WIN32RES) \
-	test_aio.o
+	test_aio.o \
+	test_read_stream.o
 
 EXTENSION = test_aio
 DATA = test_aio--1.0.sql
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 73d2fd68eaa..6e6fcbfdad9 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -2,6 +2,7 @@
 
 test_aio_sources = files(
   'test_aio.c',
+  'test_read_stream.c',
 )
 
 if host_system == 'windows'
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 3f0453619e8..cd1fd633bd8 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -1489,6 +1489,26 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
 	$psql->quit();
 }
 
+# Read stream tests
+sub test_read_stream
+{
+	my $io_method = shift;
+	my $node = shift;
+	my ($ret, $output);
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	$psql->query_safe(
+		qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_resume('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+	$psql->quit();
+}
+
+
 
 # Run all tests that are supported for all io_methods
 sub test_generic
@@ -1525,6 +1545,7 @@ CHECKPOINT;
 	test_checksum($io_method, $node);
 	test_ignore_checksum($io_method, $node);
 	test_checksum_createdb($io_method, $node);
+	test_read_stream($io_method, $node);
 
   SKIP:
 	{
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..dde8de35bc7 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -106,3 +106,12 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 CREATE FUNCTION inj_io_reopen_detach()
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
+
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_read_stream.c b/src/test/modules/test_aio/test_read_stream.c
new file mode 100644
index 00000000000..bf608db8a72
--- /dev/null
+++ b/src/test/modules/test_aio/test_read_stream.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * test_read_stream.c
+ *		Helpers to write tests for read_stream.c
+ *
+ * Copyright (c) 2020-2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/test/modules/test_aio/test_read_stream.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "fmgr.h"
+#include "storage/bufmgr.h"
+#include "storage/read_stream.h"
+
+typedef struct
+{
+	BlockNumber blkno;
+	int			count;
+} test_read_stream_resume_state;
+
+static BlockNumber
+test_read_stream_resume_cb(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	test_read_stream_resume_state *state = callback_private_data;
+
+	/* Periodic end-of-stream. */
+	if (++state->count % 3 == 0)
+		return InvalidBlockNumber;
+
+	return state->blkno;
+}
+
+/*
+ * Test read_stream_resume(), allowing a stream to end temporarily and then
+ * continue where it left off.
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_resume);
+Datum
+test_read_stream_resume(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber blkno = PG_GETARG_UINT32(1);
+	Relation	rel;
+	Buffer		buf;
+	ReadStream *stream;
+	test_read_stream_resume_state state = {.blkno = blkno};
+
+	rel = relation_open(relid, AccessShareLock);
+	stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										test_read_stream_resume_cb,
+										&state,
+										0);
+
+	for (int i = 0; i < 3; ++i)
+	{
+		/* Same block twice. */
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferGetBlockNumber(buf) == blkno);
+		ReleaseBuffer(buf);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferGetBlockNumber(buf) == blkno);
+		ReleaseBuffer(buf);
+
+		/* End-of-stream. */
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(buf == InvalidBuffer);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(buf == InvalidBuffer);
+
+		/* Resume. */
+		read_stream_resume(stream);
+	}
+
+	read_stream_end(stream);
+	relation_close(rel, NoLock);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 432509277c9..ca4c071ded2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4144,6 +4144,7 @@ td_entry
 teSection
 temp_tablespaces_extra
 test_re_flags
+test_read_stream_resume_state
 test_regex_ctx
 test_shm_mq_header
 test_spec
-- 
2.51.1

Reply via email to