On Wed, Nov 12, 2025 at 12:19 PM Thomas Munro <[email protected]> wrote: > On Wed, Nov 12, 2025 at 11:52 AM Melanie Plageman > <[email protected]> wrote: > > If we are worried about regressing other extensions using > > read_stream_reset(), we could make the read stream reset which > > preserves the distance a different function in backbranches.
Here is a draft patch like that, that tries to be as small as possible. Trying out the name read_stream_resume().
From ab494c6e5ccf93563dcf7059f7eec7d4252294f6 Mon Sep 17 00:00:00 2001 From: Thomas Munro <[email protected]> Date: Sat, 15 Jun 2024 14:37:26 +1200 Subject: [PATCH 1/2] Fix overloaded remit of read_stream_reset(). Some extensions need to examine page contents to find more block numbers to stream. That currently means that they have to signal end-of-stream when data runs out, but later "reset" the stream to resume with new information. As an unfortunate by-product of that API economy, the look-ahead distance would start out at one again, and then need to ramp back up to find a useful concurrency level. This created a significant loss of performance for self-referential block streams with a high degree of fan-out, as discovered by the pgvector project. Split out a separate function read_stream_resume() that is explicitly intended for that usage pattern. Since distance == 0 is the internal representation of end-of-stream, all it has to do is restore the distance recorded at the time end-of-stream was signaled to resume looking ahead with the same I/O concurrency level after a page jump. We don't have any self-referential streaming in PostgreSQL itself yet, but we decided to call this a bug all the same and back-patch a fix. The intention was to support that exact usage pattern, the API just missed the mark by trying to handle too many use cases. Such extensions should be able to benefit from streaming, and the behavior change is isolated to code that calls the new variant, so a separation of duties seems warranted and safe to back-patch. The problem was reported as extremely poor look-ahead performance in v18 using AIO for prefetching, but it also affects the fadvise-based prefetching in v17, where read_stream.c arrived. Reviewed-by: Tested-by: Backpatch-through: 17 Discussion: https://postgr.es/m/CA%2BhUKGL-3mBtkA9RTbLFHuSS5cviuv0ko7nBhCg9KM7Q-GSEkw%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 15 +++++++++++++++ src/include/storage/read_stream.h | 1 + 2 files changed, 16 insertions(+) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 031fde9f4cb..beae8ef325a 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -100,6 +100,7 @@ struct ReadStream int16 pinned_buffers; int16 distance; int16 initialized_buffers; + int16 resume_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -464,6 +465,7 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ + stream->resume_distance = stream->distance; stream->distance = 0; break; } @@ -711,6 +713,7 @@ read_stream_begin_impl(int flags, stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); else stream->distance = 1; + stream->resume_distance = stream->distance; /* * Since we always access the same relation, we can initialize parts of @@ -862,6 +865,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ + stream->resume_distance = stream->distance; stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; @@ -1034,6 +1038,17 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) return read_stream_get_block(stream, NULL); } +/* + * Resume looking ahead after the block number callback reported end-of-stream. + * This is useful for streams of self-referential blocks, after a buffer needed + * to be consumed and examined to find more block numbers. + */ +void +read_stream_resume(ReadStream *stream) +{ + stream->distance = stream->resume_distance; +} + /* * Reset a read stream by releasing any queued up buffers, allowing the stream * to be used again for different blocks. This can be used to clear an diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9b0d65161d0..e29ac50fc9e 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -99,6 +99,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); +extern void read_stream_resume(ReadStream *stream); extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); -- 2.51.1
From a3c0520aa382874c7028c42d7d720415d4b3b26d Mon Sep 17 00:00:00 2001 From: Thomas Munro <[email protected]> Date: Wed, 12 Nov 2025 16:49:57 +1300 Subject: [PATCH 2/2] Add smoke test for read_stream_resume(). XXX Not sure if it's worth a permanent test for this, but since the new function is not exercised in the tree it seemed worth writing a simple demo... --- src/test/modules/test_aio/Makefile | 3 +- src/test/modules/test_aio/meson.build | 1 + src/test/modules/test_aio/t/001_aio.pl | 21 +++++ src/test/modules/test_aio/test_aio--1.0.sql | 9 ++ src/test/modules/test_aio/test_read_stream.c | 89 ++++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 6 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 src/test/modules/test_aio/test_read_stream.c diff --git a/src/test/modules/test_aio/Makefile b/src/test/modules/test_aio/Makefile index f53cc64671a..465eb09ee4f 100644 --- a/src/test/modules/test_aio/Makefile +++ b/src/test/modules/test_aio/Makefile @@ -5,7 +5,8 @@ PGFILEDESC = "test_aio - test code for AIO" MODULE_big = test_aio OBJS = \ $(WIN32RES) \ - test_aio.o + test_aio.o \ + test_read_stream.o EXTENSION = test_aio DATA = test_aio--1.0.sql diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 73d2fd68eaa..6e6fcbfdad9 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -2,6 +2,7 @@ test_aio_sources = files( 'test_aio.c', + 'test_read_stream.c', ) if host_system == 'windows' diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 3f0453619e8..cd1fd633bd8 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -1489,6 +1489,26 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), $psql->quit(); } +# Read stream tests +sub test_read_stream +{ + my $io_method = shift; + my $node = shift; + my ($ret, $output); + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + $psql->query_safe( + qq( +CREATE TEMPORARY TABLE tmp_read_stream(data int not null); +INSERT INTO tmp_read_stream SELECT generate_series(1, 10000); +SELECT test_read_stream_resume('tmp_read_stream', 0); +DROP TABLE tmp_read_stream; +)); + $psql->quit(); +} + + # Run all tests that are supported for all io_methods sub test_generic @@ -1525,6 +1545,7 @@ CHECKPOINT; test_checksum($io_method, $node); test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + test_read_stream($io_method, $node); SKIP: { diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..dde8de35bc7 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -106,3 +106,12 @@ AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_reopen_detach() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + + + +/* + * Read stream related functions + */ +CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_read_stream.c b/src/test/modules/test_aio/test_read_stream.c new file mode 100644 index 00000000000..bf608db8a72 --- /dev/null +++ b/src/test/modules/test_aio/test_read_stream.c @@ -0,0 +1,89 @@ +/*------------------------------------------------------------------------- + * + * test_read_stream.c + * Helpers to write tests for read_stream.c + * + * Copyright (c) 2020-2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_aio/test_read_stream.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relation.h" +#include "fmgr.h" +#include "storage/bufmgr.h" +#include "storage/read_stream.h" + +typedef struct +{ + BlockNumber blkno; + int count; +} test_read_stream_resume_state; + +static BlockNumber +test_read_stream_resume_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + test_read_stream_resume_state *state = callback_private_data; + + /* Periodic end-of-stream. */ + if (++state->count % 3 == 0) + return InvalidBlockNumber; + + return state->blkno; +} + +/* + * Test read_stream_resume(), allowing a stream to end temporarily and then + * continue where it left off. + */ +PG_FUNCTION_INFO_V1(test_read_stream_resume); +Datum +test_read_stream_resume(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber blkno = PG_GETARG_UINT32(1); + Relation rel; + Buffer buf; + ReadStream *stream; + test_read_stream_resume_state state = {.blkno = blkno}; + + rel = relation_open(relid, AccessShareLock); + stream = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + rel, + MAIN_FORKNUM, + test_read_stream_resume_cb, + &state, + 0); + + for (int i = 0; i < 3; ++i) + { + /* Same block twice. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + + /* End-of-stream. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + + /* Resume. */ + read_stream_resume(stream); + } + + read_stream_end(stream); + relation_close(rel, NoLock); + + PG_RETURN_VOID(); +} diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 432509277c9..ca4c071ded2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -4144,6 +4144,7 @@ td_entry teSection temp_tablespaces_extra test_re_flags +test_read_stream_resume_state test_regex_ctx test_shm_mq_header test_spec -- 2.51.1
