On Fri, Nov 21, 2025 at 4:28 AM Melanie Plageman <[email protected]> wrote: > I'm not totally opposed to this. My rationale for making it an error > is that the developer could have test cases where all the buffers are > consumed but the code is written such that that won't always happen. > Then if a real production query doesn't consume all the buffers, it > could return wrong results (I think). That will mean the user can't > complete their query until the extension author releases a new version > of their code. But I'm not sure what the right answer is here.
Focusing on making sure v19 has a good interface for this, and abandoning thoughts of back-patching a bandaid, and the constraints that leads to, for now... I think it'd be better if that were the consumer's choice. I don't want the consumer to be required to drain the stream before resuming, as that'd be an unprincipled stall. For example, if new WAL arrives over the network then I think it should be possible for recovery's WAL-powered stream of heap pages to resume looking ahead even if recovery hasn't drained the existing stream completely. Peter G (CC'd) and I discussed some problems he had in the index prefetching work, and I tried to extend this a bit to give the semantics he wanted, in point 2 below. It's simple itself, but might lead to some tricky questions higher up. Posted for experimentation. It'll be interesting to see if this goes somewhere. 1. read_stream_resume() as before, but with a new explicit read_stream_pause(): if a block number callback would like to report a temporary lack of information, it should return read_stream_pause(stream), not InvalidBlockNumber. Then after read_stream_resume(stream) is called, the next read_stream_next_buffer() enters the lookahead loop again. While paused, if the consumer drains all the existing buffers in the stream and then one more, it will receive InvalidBuffer, but if the _resume() call is made sooner, the consumer won't ever know about the temporary lack of buffers in the stream. 2. read_stream_yield(): while streaming heap pages that come from TIDs on index pages, Peter didn't like that the executor lost control of how much work was done by the lookahead loop underneath read_stream_next_buffer(). The consumer might have a heap page with some tuples that could be emitted right now, but the block number callback might be evaluating arbitrarily expensive filter qual expressions far ahead, and they might prefer to emit more tuples now before doing an unbounded amount of work finding more. This interface allows some limited coroutine-like multitasking, where the block number callback can return read_stream_yield(stream) to return control back to the consumer periodically if it knows the consumer could already do something else. It works by pausing the stream and resuming it in the next read_stream_next_buffer() call, but that's an internal detail. Some half-baked thoughts about the resulting flow control: Yielding control periodically just when it happens to be possible within the constraints of the volcano executor is an interesting thing to think about. You can only yield if you already have a tuple to emit. There is no saying when control will return to you, and the node you yield to might immediately block on I/O and yet you could have been doing useful CPU work. You probably need an event-driven node-hopping executor to fix that in general, but on the flip side, I can think of one bet that I'd take: if you already have a tuple to emit AND if index scans themselves (not only referenced heap pages) were also streamed AND if a hypothetical read_stream_next_buffer_no_wait(btree_stream) said the next index page you need is not ready yet, then you should yield. You're gambling that other plan nodes will have better luck running without an I/O stall, but you have ~0% chance. Yielding just because you've scanned N index pages/tuples/whatever is harder to think about. The stream shouldn't get far ahead unless it's recently been useful for I/O concurrency (though optimal distance heuristics are an open problem), but in this case a single invocation of the block number callback can call ReadBuffer() an arbitrary number of times, filtering out all the index tuples as it rampages through the whole index IIUC. I see why you might want to yield periodically if you can, but I also wonder how much that can really help if you still have to pick up where you left off next time. I guess it depends on the distribution of matches. It's also clear that any cold-cache testing done with direct I/O enabled will stall abominably as long as that level calls ReadBuffer(), possibly confusing matters.
From ff6da44df5418d73ad9ec1911c87b66897f3b086 Mon Sep 17 00:00:00 2001 From: Thomas Munro <[email protected]> Date: Sat, 15 Jun 2024 14:37:26 +1200 Subject: [PATCH 1/2] Introduce read_stream_{pause,resume,yield}(). --- src/backend/storage/aio/read_stream.c | 50 ++++++++++++++++++++++++++- src/include/storage/read_stream.h | 3 ++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 031fde9f4cb..964e1aa281c 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -100,11 +100,13 @@ struct ReadStream int16 pinned_buffers; int16 distance; int16 initialized_buffers; + int16 resume_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ bool advice_enabled; bool temporary; + bool yielded; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -879,7 +881,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->distance == 0) - return InvalidBuffer; + { + if (!stream->yielded) + return InvalidBuffer; + + /* The callback yielded. Resume. */ + stream->yielded = false; + read_stream_resume(stream); + Assert(stream->distance != 0); + } /* * The usual order of operations is that we look ahead at the bottom @@ -1034,6 +1044,44 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) return read_stream_get_block(stream, NULL); } +/* + * Temporarily stop consuming block numbers from the block number callback. If + * called inside the block number callback, its return value should be + * returned by the callback. + */ +BlockNumber +read_stream_pause(ReadStream *stream) +{ + stream->resume_distance = stream->distance; + stream->distance = 0; + return InvalidBlockNumber; +} + +/* + * Resume looking ahead after the block number callback reported end-of-stream. + * This is useful for streams of self-referential blocks, after a buffer needed + * to be consumed and examined to find more block numbers. + */ +void +read_stream_resume(ReadStream *stream) +{ + stream->distance = stream->resume_distance; +} + +/* + * Called from inside a block number callback, to return control to the caller + * of read_stream_next_buffer() without looking further ahead. Its return + * value should be returned by the callback. This is equivalent to pausing and + * resuming automatically at the next call to read_stream_next_buffer(). + */ +BlockNumber +read_stream_yield(ReadStream *stream) +{ + read_stream_pause(stream); + stream->yielded = true; + return InvalidBlockNumber; +} + /* * Reset a read stream by releasing any queued up buffers, allowing the stream * to be used again for different blocks. This can be used to clear an diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9b0d65161d0..8ac53d2902d 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -99,6 +99,9 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); +extern BlockNumber read_stream_pause(ReadStream *stream); +extern void read_stream_resume(ReadStream *stream); +extern BlockNumber read_stream_yield(ReadStream *stream); extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); -- 2.51.2
From de8a800b48829ef619200a71dba9028be3ea09e9 Mon Sep 17 00:00:00 2001 From: Thomas Munro <[email protected]> Date: Wed, 12 Nov 2025 16:49:57 +1300 Subject: [PATCH 2/2] Add tests for read_stream_{pause,resume,yield}(). --- src/test/modules/test_aio/Makefile | 3 +- src/test/modules/test_aio/meson.build | 1 + src/test/modules/test_aio/t/001_aio.pl | 30 +++ src/test/modules/test_aio/test_aio--1.0.sql | 13 ++ src/test/modules/test_aio/test_read_stream.c | 181 +++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 6 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 src/test/modules/test_aio/test_read_stream.c diff --git a/src/test/modules/test_aio/Makefile b/src/test/modules/test_aio/Makefile index f53cc64671a..465eb09ee4f 100644 --- a/src/test/modules/test_aio/Makefile +++ b/src/test/modules/test_aio/Makefile @@ -5,7 +5,8 @@ PGFILEDESC = "test_aio - test code for AIO" MODULE_big = test_aio OBJS = \ $(WIN32RES) \ - test_aio.o + test_aio.o \ + test_read_stream.o EXTENSION = test_aio DATA = test_aio--1.0.sql diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 73d2fd68eaa..6e6fcbfdad9 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -2,6 +2,7 @@ test_aio_sources = files( 'test_aio.c', + 'test_read_stream.c', ) if host_system == 'windows' diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 3f0453619e8..2a2c6523a6b 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -1489,6 +1489,35 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), $psql->quit(); } +# Read stream tests +sub test_read_stream +{ + my $io_method = shift; + my $node = shift; + my ($ret, $output); + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + $psql->query_safe( + qq( +CREATE TEMPORARY TABLE tmp_read_stream(data int not null); +INSERT INTO tmp_read_stream SELECT generate_series(1, 10000); +SELECT test_read_stream_resume('tmp_read_stream', 0); +DROP TABLE tmp_read_stream; +)); + + $psql->query_safe( + qq( +CREATE TEMPORARY TABLE tmp_read_stream(data int not null); +INSERT INTO tmp_read_stream SELECT generate_series(1, 10000); +SELECT test_read_stream_yield('tmp_read_stream', 0); +DROP TABLE tmp_read_stream; +)); + + $psql->quit(); +} + + # Run all tests that are supported for all io_methods sub test_generic @@ -1525,6 +1554,7 @@ CHECKPOINT; test_checksum($io_method, $node); test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + test_read_stream($io_method, $node); SKIP: { diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..e37810b7273 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -106,3 +106,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_reopen_detach() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + + + +/* + * Read stream related functions + */ +CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION test_read_stream_yield(rel regclass, blockno int4) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_read_stream.c b/src/test/modules/test_aio/test_read_stream.c new file mode 100644 index 00000000000..d1d436a90b7 --- /dev/null +++ b/src/test/modules/test_aio/test_read_stream.c @@ -0,0 +1,181 @@ +/*------------------------------------------------------------------------- + * + * test_read_stream.c + * Helpers to write tests for read_stream.c + * + * Copyright (c) 2020-2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_aio/test_read_stream.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relation.h" +#include "fmgr.h" +#include "storage/bufmgr.h" +#include "storage/read_stream.h" + +typedef struct +{ + BlockNumber blkno; + int count; +} test_read_stream_resume_state; + +static BlockNumber +test_read_stream_resume_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + test_read_stream_resume_state *state = callback_private_data; + + /* Periodic end-of-stream. */ + if (++state->count % 3 == 0) + return read_stream_pause(stream); + + return state->blkno; +} + +/* + * Test read_stream_resume(), allowing a stream to end temporarily and then + * continue where it left off. + */ +PG_FUNCTION_INFO_V1(test_read_stream_resume); +Datum +test_read_stream_resume(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber blkno = PG_GETARG_UINT32(1); + Relation rel; + Buffer buf; + ReadStream *stream; + test_read_stream_resume_state state = {.blkno = blkno}; + + rel = relation_open(relid, AccessShareLock); + stream = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + rel, + MAIN_FORKNUM, + test_read_stream_resume_cb, + &state, + 0); + + for (int i = 0; i < 3; ++i) + { + /* Same block twice. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + + /* End-of-stream. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + + /* Resume. */ + read_stream_resume(stream); + } + + read_stream_end(stream); + relation_close(rel, NoLock); + + PG_RETURN_VOID(); +} + +typedef struct +{ + BlockNumber blkno; + int count; + int yields; + int blocks; +} test_read_stream_yield_state; + +static BlockNumber +test_read_stream_yield_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + test_read_stream_yield_state *state = callback_private_data; + + /* Yield every third call. */ + if (++state->count % 3 == 2) + { + state->yields++; + return read_stream_yield(stream); + } + + state->blocks++; + return state->blkno; +} + +/* + * Test read_stream_yield(), allowing control to be yielded temporarily from + * the lookahead loop and returned to the caller of read_stream_next_buffer(). + */ +PG_FUNCTION_INFO_V1(test_read_stream_yield); +Datum +test_read_stream_yield(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber blkno = PG_GETARG_UINT32(1); + Relation rel; + Buffer buf; + ReadStream *stream; + test_read_stream_yield_state state = {.blkno = blkno}; + + rel = relation_open(relid, AccessShareLock); + stream = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + rel, + MAIN_FORKNUM, + test_read_stream_yield_cb, + &state, + 0); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 1); + Assert(state.yields == 1); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 3); + Assert(state.yields == 1); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 3); + Assert(state.yields == 2); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 5); + Assert(state.yields == 2); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 5); + Assert(state.yields == 3); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 7); + Assert(state.yields == 3); + + read_stream_end(stream); + relation_close(rel, NoLock); + + PG_RETURN_VOID(); +} diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cf3f6a7dafd..7396e9ce14b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -4158,6 +4158,7 @@ td_entry teSection temp_tablespaces_extra test_re_flags +test_read_stream_resume_state test_regex_ctx test_shm_mq_header test_spec -- 2.51.2
