On Tue, Dec 9, 2025 at 4:42 PM Melanie Plageman <[email protected]> wrote: > > > 1. read_stream_resume() as before, but with a new explicit > > read_stream_pause(): if a block number callback would like to report a > > temporary lack of information, it should return > > read_stream_pause(stream), not InvalidBlockNumber. Then after > > read_stream_resume(stream) is called, the next > > read_stream_next_buffer() enters the lookahead loop again. While > > paused, if the consumer drains all the existing buffers in the stream > > and then one more, it will receive InvalidBuffer, but if the _resume() > > call is made sooner, the consumer won't ever know about the temporary > > lack of buffers in the stream.
I ended up committing read_stream_resume() in 38229cb905165fe but without the tests because 1f6f200cab67e6, which added other read stream tests, was imminent. I'd like to add the read_stream_resume() test back now -- especially because we didn't end up adding another user of read_stream_resume() in this release. Attached 0001 is the test Thomas wrote ported over to be in the new 0004_read_stream.pl. It uses asserts instead of comparing output of the SQL function to expected output, so I included a potential alternative version of it in 0002 that uses that pattern. Note that 0002 is a diff from 0001, not an independent alternative patch. I think the test needs more work either way, but I wanted to get the ball rolling. - Melanie
From 36668cc39dd8a6e249ece3ed53201cb3ea17073e Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 8 Apr 2026 13:40:16 -0400 Subject: [PATCH 1/2] Add test for read_stream_resume() --- .../modules/test_aio/t/004_read_stream.pl | 22 ++++++ src/test/modules/test_aio/test_aio--1.0.sql | 9 +++ src/test/modules/test_aio/test_aio.c | 71 +++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 4 files changed, 103 insertions(+) diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl index 32311c07ac0..28aedd7d163 100644 --- a/src/test/modules/test_aio/t/004_read_stream.pl +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -115,6 +115,27 @@ sub test_repeated_blocks } +sub test_read_stream_resume +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + $psql->query_safe( + qq( +CREATE TEMPORARY TABLE tmp_read_stream(data int not null); +INSERT INTO tmp_read_stream SELECT generate_series(1, 10000); +SELECT test_read_stream_resume('tmp_read_stream', 0); +DROP TABLE tmp_read_stream; +)); + + ok(1, "$io_method: read_stream_resume"); + + $psql->quit(); +} + + sub test_inject_foreign { my $io_method = shift; @@ -268,6 +289,7 @@ sub test_io_method $io_method, "$io_method: io_method set correctly"); test_repeated_blocks($io_method, $node); + test_read_stream_resume($io_method, $node); SKIP: { diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index 762ac29512f..48caad25bda 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -128,3 +128,12 @@ AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_reopen_detach() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + + + +/* + * Read stream related functions + */ +CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index 35efba1a5e3..9d2d32ad1a2 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -916,6 +916,77 @@ read_stream_for_blocks(PG_FUNCTION_ARGS) } +typedef struct +{ + BlockNumber blkno; + int count; +} test_read_stream_resume_state; + +static BlockNumber +test_read_stream_resume_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + test_read_stream_resume_state *state = callback_private_data; + + /* Periodic end-of-stream. */ + if (++state->count % 3 == 0) + return read_stream_pause(stream); + + return state->blkno; +} + +/* + * Test read_stream_resume(), allowing a stream to end temporarily and then + * continue where it left off. + */ +PG_FUNCTION_INFO_V1(test_read_stream_resume); +Datum +test_read_stream_resume(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber blkno = PG_GETARG_UINT32(1); + Relation rel; + Buffer buf; + ReadStream *stream; + test_read_stream_resume_state state = {.blkno = blkno}; + + rel = relation_open(relid, AccessShareLock); + stream = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + rel, + MAIN_FORKNUM, + test_read_stream_resume_cb, + &state, + 0); + + for (int i = 0; i < 3; ++i) + { + /* Same block twice. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + + /* End-of-stream. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + + /* Resume. */ + read_stream_resume(stream); + } + + read_stream_end(stream); + relation_close(rel, NoLock); + + PG_RETURN_VOID(); +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ea95e7984bc..4236afefa2d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -4329,6 +4329,7 @@ teSection temp_tablespaces_extra test128 test_re_flags +test_read_stream_resume_state test_regex_ctx test_shm_mq_header test_spec -- 2.43.0
From 8e8b0a3d967c1e9ae1be64257616d3a02f6f64b4 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 8 Apr 2026 13:55:05 -0400 Subject: [PATCH 2/2] alternative approach for test read_stream_resume --- .../modules/test_aio/t/004_read_stream.pl | 21 +++++---- src/test/modules/test_aio/test_aio--1.0.sql | 4 +- src/test/modules/test_aio/test_aio.c | 45 +++++++++++++++---- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl index 28aedd7d163..68d3be452e2 100644 --- a/src/test/modules/test_aio/t/004_read_stream.pl +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -122,15 +122,18 @@ sub test_read_stream_resume my $psql = $node->background_psql('postgres', on_error_stop => 0); - $psql->query_safe( - qq( -CREATE TEMPORARY TABLE tmp_read_stream(data int not null); -INSERT INTO tmp_read_stream SELECT generate_series(1, 10000); -SELECT test_read_stream_resume('tmp_read_stream', 0); -DROP TABLE tmp_read_stream; -)); - - ok(1, "$io_method: read_stream_resume"); + # The callback returns block 0 twice then pauses. We resume 3 times. + # -1 means read_stream_next_buffer() returned InvalidBuffer (paused). + my @one_cycle = (0, 0, -1); + my $expected = '{' . join(',', @one_cycle, @one_cycle, @one_cycle) . '}'; + + my $result = $psql->query_safe( + qq(SELECT array_agg(blocknum ORDER BY call_index) + FROM test_read_stream_resume('largeish', 0);)); + chomp($result); + + is($result, $expected, + "$io_method: read_stream_resume"); $psql->quit(); } diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index 48caad25bda..d3c941d5221 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -134,6 +134,6 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Read stream related functions */ -CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4, OUT call_index int4, OUT blocknum int4) +RETURNS SETOF record STRICT AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index 9d2d32ad1a2..abdfa1cb448 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -939,6 +939,11 @@ test_read_stream_resume_cb(ReadStream *stream, /* * Test read_stream_resume(), allowing a stream to end temporarily and then * continue where it left off. + * + * Returns a result set of (call_index int4, blocknum int4) rows so that the + * caller can validate the exact sequence. A blocknum of -1 indicates that + * read_stream_next_buffer() returned InvalidBuffer (i.e. the stream was + * paused). */ PG_FUNCTION_INFO_V1(test_read_stream_resume); Datum @@ -946,10 +951,14 @@ test_read_stream_resume(PG_FUNCTION_ARGS) { Oid relid = PG_GETARG_OID(0); BlockNumber blkno = PG_GETARG_UINT32(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; Relation rel; Buffer buf; ReadStream *stream; test_read_stream_resume_state state = {.blkno = blkno}; + int call_index = 0; + + InitMaterializedSRF(fcinfo, 0); rel = relation_open(relid, AccessShareLock); stream = read_stream_begin_relation(READ_STREAM_DEFAULT, @@ -962,19 +971,37 @@ test_read_stream_resume(PG_FUNCTION_ARGS) for (int i = 0; i < 3; ++i) { + Datum values[2] = {0}; + bool nulls[2] = {0}; + /* Same block twice. */ buf = read_stream_next_buffer(stream, NULL); - Assert(BufferGetBlockNumber(buf) == blkno); - ReleaseBuffer(buf); - buf = read_stream_next_buffer(stream, NULL); - Assert(BufferGetBlockNumber(buf) == blkno); - ReleaseBuffer(buf); + values[0] = Int32GetDatum(call_index++); + values[1] = BufferIsValid(buf) ? + Int32GetDatum((int32) BufferGetBlockNumber(buf)) : + Int32GetDatum(-1); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + if (BufferIsValid(buf)) + ReleaseBuffer(buf); - /* End-of-stream. */ buf = read_stream_next_buffer(stream, NULL); - Assert(buf == InvalidBuffer); + values[0] = Int32GetDatum(call_index++); + values[1] = BufferIsValid(buf) ? + Int32GetDatum((int32) BufferGetBlockNumber(buf)) : + Int32GetDatum(-1); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + if (BufferIsValid(buf)) + ReleaseBuffer(buf); + + /* End-of-stream (paused). */ buf = read_stream_next_buffer(stream, NULL); - Assert(buf == InvalidBuffer); + values[0] = Int32GetDatum(call_index++); + values[1] = BufferIsValid(buf) ? + Int32GetDatum((int32) BufferGetBlockNumber(buf)) : + Int32GetDatum(-1); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + if (BufferIsValid(buf)) + ReleaseBuffer(buf); /* Resume. */ read_stream_resume(stream); @@ -983,7 +1010,7 @@ test_read_stream_resume(PG_FUNCTION_ARGS) read_stream_end(stream); relation_close(rel, NoLock); - PG_RETURN_VOID(); + return (Datum) 0; } -- 2.43.0
