On Wed, Nov 12, 2025 at 5:47 AM Thomas Munro <[email protected]> wrote: > > I suppose an alternative design would be that _next_buffer() returns > InvalidBuffer only once (= the block number callback returns > InvalidBlock once) and then automatically resumes (= it restores the > distance) and then you can call read_stream_next_buffer() again (= the > block number callback will be called again to fill the stream up with > new buffers before waiting for the first one to be ready to give to > you if it isn't already). That would have the advantage of not > requiring a new function at all and make the patch even shorter, but I > don't know, I guess I thought that would be a bit more fragile in some > way, less explicit. Hmm, would it actually be better if it worked > like that?
We discussed off-list and decided that changing existing functionality in an unexpected way is undesirable. So, it is better we stick with adding read_stream_resume. However, in talking about read_stream_resume() further, Thomas and I also thought of potential issues with it: If read_stream_resume() is called before the read stream user callback has ever returned InvalidBlockNumber, 1) The value of resume_distance will be the original value of distance from read_stream_begin_relation(). You don't want to reset the distance to that value. 2) There may be inflight or completed buffers that have yet to be yielded which will be returned the next time read_stream_next_buffer() is invoked. If the user resets the state the callback is using to return blocks and expects the next invocation of read_stream_next_buffer() to return buffers with those blocks, they will be disappointed. If we try to address this by requiring that stream->distance is 0 when read_stream_resume() is called, that won't work because while it is set to 0 when the callback returns InvalidBlockNumber, there may still be unreturned buffers in the stream. If the user wants to use read_stream_reset() to exhaust the stream before calling read_stream_resume(), read_stream_reset() sets stream->distance to 1 at the end, so read_stream_resume() couldn't detect if reset() was correctly called first or if the distance is > 0 because the stream is still in progress. To make sure 1) distance isn't reset to a resume_distance from read_stream_begin_relation() and 2) unexpected buffers aren't returned from the read stream, we could error out in read_stream_resume() if pinned_buffers > 0. And in read_stream_reset(), we would save distance in resume_distance before clearing distance. That would allow calling read_stream_resume() either if you called read_stream_reset() or if you exhausted the stream yourself. See rough attached patch for a sketch of this. It would be nicer if we could error out if read_stream_next_buffer() didn't return InvalidBuffer, but we can't do that if we want to allow calling read_stream_reset() followed by read_stream_resume() because distance won't be 0. I tried this with a modified pgvector with an hnsw read stream user and it seemed to work correctly. - Melanie
From cf41f56ba0d8c7f3bf92242020f742f201cf08d6 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Tue, 18 Nov 2025 14:24:12 -0500 Subject: [PATCH] resume --- src/backend/storage/aio/read_stream.c | 19 +++++++++++++++++++ src/include/storage/read_stream.h | 1 + 2 files changed, 20 insertions(+) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 031fde9f4cb..75bf92dc683 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -100,6 +100,7 @@ struct ReadStream int16 pinned_buffers; int16 distance; int16 initialized_buffers; + int16 resume_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -464,6 +465,7 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ + stream->resume_distance = stream->distance; stream->distance = 0; break; } @@ -711,6 +713,7 @@ read_stream_begin_impl(int flags, stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); else stream->distance = 1; + stream->resume_distance = stream->distance; /* * Since we always access the same relation, we can initialize parts of @@ -862,6 +865,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ + stream->resume_distance = stream->distance; stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; @@ -1034,6 +1038,19 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) return read_stream_get_block(stream, NULL); } +/* + * Resume looking ahead after the block number callback reported end-of-stream. + * This is useful for streams of self-referential blocks, after a buffer needed + * to be consumed and examined to find more block numbers. + */ +void +read_stream_resume(ReadStream *stream) +{ + if (stream->pinned_buffers > 0) + elog(ERROR, "read stream must be exhausted before resuming"); + stream->distance = stream->resume_distance; +} + /* * Reset a read stream by releasing any queued up buffers, allowing the stream * to be used again for different blocks. This can be used to clear an @@ -1047,6 +1064,8 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ + if (stream->distance > 0) + stream->resume_distance = stream->distance; stream->distance = 0; /* Forget buffered block number and fast path state. */ diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9b0d65161d0..e29ac50fc9e 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -99,6 +99,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); +extern void read_stream_resume(ReadStream *stream); extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); -- 2.47.3
