On Tue, Dec 9, 2025 at 4:42 PM Melanie Plageman
<[email protected]> wrote:
>
> > 1.  read_stream_resume() as before, but with a new explicit
> > read_stream_pause(): if a block number callback would like to report a
> > temporary lack of information, it should return
> > read_stream_pause(stream), not InvalidBlockNumber.  Then after
> > read_stream_resume(stream) is called, the next
> > read_stream_next_buffer() enters the lookahead loop again.  While
> > paused, if the consumer drains all the existing buffers in the stream
> > and then one more, it will receive InvalidBuffer, but if the _resume()
> > call is made sooner, the consumer won't ever know about the temporary
> > lack of buffers in the stream.

I ended up committing read_stream_resume() in 38229cb905165fe but
without the tests because 1f6f200cab67e6, which added other read
stream tests, was imminent. I'd like to add the read_stream_resume()
test back now -- especially because we didn't end up adding another
user of read_stream_resume() in this release.

Attached 0001 is the test Thomas wrote ported over to be in the new
0004_read_stream.pl. It uses asserts instead of comparing output of
the SQL function to expected output, so I included a potential
alternative version of it in 0002 that uses that pattern. Note that
0002 is a diff from 0001, not an independent alternative patch. I
think the test needs more work either way, but I wanted to get the
ball rolling.

- Melanie
From 36668cc39dd8a6e249ece3ed53201cb3ea17073e Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 8 Apr 2026 13:40:16 -0400
Subject: [PATCH 1/2] Add test for read_stream_resume()

---
 .../modules/test_aio/t/004_read_stream.pl     | 22 ++++++
 src/test/modules/test_aio/test_aio--1.0.sql   |  9 +++
 src/test/modules/test_aio/test_aio.c          | 71 +++++++++++++++++++
 src/tools/pgindent/typedefs.list              |  1 +
 4 files changed, 103 insertions(+)

diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 32311c07ac0..28aedd7d163 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -115,6 +115,27 @@ sub test_repeated_blocks
 }
 
 
+sub test_read_stream_resume
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	$psql->query_safe(
+		qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_resume('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+
+	ok(1, "$io_method: read_stream_resume");
+
+	$psql->quit();
+}
+
+
 sub test_inject_foreign
 {
 	my $io_method = shift;
@@ -268,6 +289,7 @@ sub test_io_method
 		$io_method, "$io_method: io_method set correctly");
 
 	test_repeated_blocks($io_method, $node);
+	test_read_stream_resume($io_method, $node);
 
   SKIP:
 	{
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 762ac29512f..48caad25bda 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -128,3 +128,12 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 CREATE FUNCTION inj_io_reopen_detach()
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
+
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 35efba1a5e3..9d2d32ad1a2 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -916,6 +916,77 @@ read_stream_for_blocks(PG_FUNCTION_ARGS)
 }
 
 
+typedef struct
+{
+	BlockNumber blkno;
+	int			count;
+} test_read_stream_resume_state;
+
+static BlockNumber
+test_read_stream_resume_cb(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	test_read_stream_resume_state *state = callback_private_data;
+
+	/* Periodic end-of-stream. */
+	if (++state->count % 3 == 0)
+		return read_stream_pause(stream);
+
+	return state->blkno;
+}
+
+/*
+ * Test read_stream_resume(), allowing a stream to end temporarily and then
+ * continue where it left off.
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_resume);
+Datum
+test_read_stream_resume(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber blkno = PG_GETARG_UINT32(1);
+	Relation	rel;
+	Buffer		buf;
+	ReadStream *stream;
+	test_read_stream_resume_state state = {.blkno = blkno};
+
+	rel = relation_open(relid, AccessShareLock);
+	stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										test_read_stream_resume_cb,
+										&state,
+										0);
+
+	for (int i = 0; i < 3; ++i)
+	{
+		/* Same block twice. */
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferGetBlockNumber(buf) == blkno);
+		ReleaseBuffer(buf);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferGetBlockNumber(buf) == blkno);
+		ReleaseBuffer(buf);
+
+		/* End-of-stream. */
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(buf == InvalidBuffer);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(buf == InvalidBuffer);
+
+		/* Resume. */
+		read_stream_resume(stream);
+	}
+
+	read_stream_end(stream);
+	relation_close(rel, NoLock);
+
+	PG_RETURN_VOID();
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ea95e7984bc..4236afefa2d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4329,6 +4329,7 @@ teSection
 temp_tablespaces_extra
 test128
 test_re_flags
+test_read_stream_resume_state
 test_regex_ctx
 test_shm_mq_header
 test_spec
-- 
2.43.0

From 8e8b0a3d967c1e9ae1be64257616d3a02f6f64b4 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 8 Apr 2026 13:55:05 -0400
Subject: [PATCH 2/2] alternative approach for test read_stream_resume

---
 .../modules/test_aio/t/004_read_stream.pl     | 21 +++++----
 src/test/modules/test_aio/test_aio--1.0.sql   |  4 +-
 src/test/modules/test_aio/test_aio.c          | 45 +++++++++++++++----
 3 files changed, 50 insertions(+), 20 deletions(-)

diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 28aedd7d163..68d3be452e2 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -122,15 +122,18 @@ sub test_read_stream_resume
 
 	my $psql = $node->background_psql('postgres', on_error_stop => 0);
 
-	$psql->query_safe(
-		qq(
-CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
-INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
-SELECT test_read_stream_resume('tmp_read_stream', 0);
-DROP TABLE tmp_read_stream;
-));
-
-	ok(1, "$io_method: read_stream_resume");
+	# The callback returns block 0 twice then pauses.  We resume 3 times.
+	# -1 means read_stream_next_buffer() returned InvalidBuffer (paused).
+	my @one_cycle = (0, 0, -1);
+	my $expected = '{' . join(',', @one_cycle, @one_cycle, @one_cycle) . '}';
+
+	my $result = $psql->query_safe(
+		qq(SELECT array_agg(blocknum ORDER BY call_index)
+		   FROM test_read_stream_resume('largeish', 0);));
+	chomp($result);
+
+	is($result, $expected,
+		"$io_method: read_stream_resume");
 
 	$psql->quit();
 }
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 48caad25bda..d3c941d5221 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -134,6 +134,6 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Read stream related functions
  */
-CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4, OUT call_index int4, OUT blocknum int4)
+RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 9d2d32ad1a2..abdfa1cb448 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -939,6 +939,11 @@ test_read_stream_resume_cb(ReadStream *stream,
 /*
  * Test read_stream_resume(), allowing a stream to end temporarily and then
  * continue where it left off.
+ *
+ * Returns a result set of (call_index int4, blocknum int4) rows so that the
+ * caller can validate the exact sequence.  A blocknum of -1 indicates that
+ * read_stream_next_buffer() returned InvalidBuffer (i.e. the stream was
+ * paused).
  */
 PG_FUNCTION_INFO_V1(test_read_stream_resume);
 Datum
@@ -946,10 +951,14 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
 {
 	Oid			relid = PG_GETARG_OID(0);
 	BlockNumber blkno = PG_GETARG_UINT32(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	Relation	rel;
 	Buffer		buf;
 	ReadStream *stream;
 	test_read_stream_resume_state state = {.blkno = blkno};
+	int			call_index = 0;
+
+	InitMaterializedSRF(fcinfo, 0);
 
 	rel = relation_open(relid, AccessShareLock);
 	stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
@@ -962,19 +971,37 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
 
 	for (int i = 0; i < 3; ++i)
 	{
+		Datum		values[2] = {0};
+		bool		nulls[2] = {0};
+
 		/* Same block twice. */
 		buf = read_stream_next_buffer(stream, NULL);
-		Assert(BufferGetBlockNumber(buf) == blkno);
-		ReleaseBuffer(buf);
-		buf = read_stream_next_buffer(stream, NULL);
-		Assert(BufferGetBlockNumber(buf) == blkno);
-		ReleaseBuffer(buf);
+		values[0] = Int32GetDatum(call_index++);
+		values[1] = BufferIsValid(buf) ?
+			Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+			Int32GetDatum(-1);
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+		if (BufferIsValid(buf))
+			ReleaseBuffer(buf);
 
-		/* End-of-stream. */
 		buf = read_stream_next_buffer(stream, NULL);
-		Assert(buf == InvalidBuffer);
+		values[0] = Int32GetDatum(call_index++);
+		values[1] = BufferIsValid(buf) ?
+			Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+			Int32GetDatum(-1);
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+		if (BufferIsValid(buf))
+			ReleaseBuffer(buf);
+
+		/* End-of-stream (paused). */
 		buf = read_stream_next_buffer(stream, NULL);
-		Assert(buf == InvalidBuffer);
+		values[0] = Int32GetDatum(call_index++);
+		values[1] = BufferIsValid(buf) ?
+			Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+			Int32GetDatum(-1);
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+		if (BufferIsValid(buf))
+			ReleaseBuffer(buf);
 
 		/* Resume. */
 		read_stream_resume(stream);
@@ -983,7 +1010,7 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
 	read_stream_end(stream);
 	relation_close(rel, NoLock);
 
-	PG_RETURN_VOID();
+	return (Datum) 0;
 }
 
 
-- 
2.43.0

Reply via email to