On Wed, Nov 12, 2025 at 5:47 AM Thomas Munro <[email protected]> wrote:
>
> I suppose an alternative design would be that _next_buffer() returns
> InvalidBuffer only once (= the block number callback returns
> InvalidBlock once) and then automatically resumes (= it restores the
> distance) and then you can call read_stream_next_buffer() again (= the
> block number callback will be called again to fill the stream up with
> new buffers before waiting for the first one to be ready to give to
> you if it isn't already).  That would have the advantage of not
> requiring a new function at all and make the patch even shorter, but I
> don't know, I guess I thought that would be a bit more fragile in some
> way, less explicit.  Hmm, would it actually be better if it worked
> like that?

We discussed off-list and decided that changing existing functionality
in an unexpected way is undesirable. So, it is better we stick with
adding read_stream_resume. However, in talking about
read_stream_resume() further, Thomas and I also thought of potential
issues with it:

If read_stream_resume() is called before the read stream user callback
has ever returned InvalidBlockNumber,
1) The value of resume_distance will be the original value of distance
from read_stream_begin_relation(). You don't want to reset the
distance to that value.
2) There may be inflight or completed buffers that have yet to be
yielded which will be returned the next time read_stream_next_buffer()
is invoked. If the user resets the state the callback is using to
return blocks and expects the next invocation of
read_stream_next_buffer() to return buffers with those blocks, they
will be disappointed.

If we try to address this by requiring that stream->distance is 0 when
read_stream_resume() is called, that won't work because while it is
set to 0 when the callback returns InvalidBlockNumber, there may still
be unreturned buffers in the stream.

If the user wants to use read_stream_reset() to exhaust the stream
before calling read_stream_resume(), read_stream_reset() sets
stream->distance to 1 at the end, so read_stream_resume() couldn't
detect if reset() was correctly called first or if the distance is > 0
because the stream is still in progress.

To make sure 1) distance isn't reset to a resume_distance from
read_stream_begin_relation() and 2) unexpected buffers aren't returned
from the read stream, we could error out in read_stream_resume() if
pinned_buffers > 0. And in read_stream_reset(), we would save distance
in resume_distance before clearing distance. That would allow calling
read_stream_resume() either if you called read_stream_reset() or if
you exhausted the stream yourself. See rough attached patch for a
sketch of this.

It would be nicer if we could error out if read_stream_next_buffer()
didn't return InvalidBuffer, but we can't do that if we want to allow
calling read_stream_reset() followed by read_stream_resume() because
distance won't be 0.

I tried this with a modified pgvector with an hnsw read stream user
and it seemed to work correctly.

- Melanie
From cf41f56ba0d8c7f3bf92242020f742f201cf08d6 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 18 Nov 2025 14:24:12 -0500
Subject: [PATCH] resume

---
 src/backend/storage/aio/read_stream.c | 19 +++++++++++++++++++
 src/include/storage/read_stream.h     |  1 +
 2 files changed, 20 insertions(+)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..75bf92dc683 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,6 +100,7 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	int16		initialized_buffers;
+	int16		resume_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -464,6 +465,7 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
+			stream->resume_distance = stream->distance;
 			stream->distance = 0;
 			break;
 		}
@@ -711,6 +713,7 @@ read_stream_begin_impl(int flags,
 		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
 	else
 		stream->distance = 1;
+	stream->resume_distance = stream->distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -862,6 +865,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
+			stream->resume_distance = stream->distance;
 			stream->distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
@@ -1034,6 +1038,19 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 	return read_stream_get_block(stream, NULL);
 }
 
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+	if (stream->pinned_buffers > 0)
+		elog(ERROR, "read stream must be exhausted before resuming");
+	stream->distance = stream->resume_distance;
+}
+
 /*
  * Reset a read stream by releasing any queued up buffers, allowing the stream
  * to be used again for different blocks.  This can be used to clear an
@@ -1047,6 +1064,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
+	if (stream->distance > 0)
+		stream->resume_distance = stream->distance;
 	stream->distance = 0;
 
 	/* Forget buffered block number and fast path state. */
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..e29ac50fc9e 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
 												   ReadStreamBlockNumberCB callback,
 												   void *callback_private_data,
 												   size_t per_buffer_data_size);
+extern void read_stream_resume(ReadStream *stream);
 extern void read_stream_reset(ReadStream *stream);
 extern void read_stream_end(ReadStream *stream);
 
-- 
2.47.3

Reply via email to