Thanks for the review!

On Thu, Feb 5, 2026 at 11:56 AM Nazir Bilal Yavuz <[email protected]> wrote:
>
> On Sat, 24 Jan 2026 at 00:04, Melanie Plageman
> <[email protected]> wrote:
>
> > - In the 004_read_stream tests, I wonder if there is a way to test
> > that we don't wait for foreign IO until WaitReadBuffers(). We have
> > tests for the stream accessing the same block, which in some cases
> > will exercise the foreign IO path. But it doesn't distinguish between
> > the old behavior -- waiting for the IO to complete when starting read
> > IO on it -- and the new behavior -- not waiting until
> > WaitReadBuffers(). That may not be possible to test, though.
>
> Won't 'stream accessing the same block test' almost always test the
> new behavior (not waiting until WaitReadBuffers())? Having dedicated
> tests for both cases would be helpful, though.

Yea, I was thinking something like testing that if session A is
blocked completing read of block 2 and session B is requesting blocks
2-4 that buffers containing blocks 3 and 4 are valid when session B is
waiting on block 2 to finish.

I started working on something but it needed some new infrastructure
to check if the buffer is valid, and I wanted to see what others
thought first.

I did finally review Andres' test patches and have included my review
feedback here as well.

"aio: Refactor tests in preparation for more tests" (v4-0001) looks
good to me as well. I included one tiny idea AI suggested to me in a
follow-on patch (v4-0002).

> diff --git a/src/test/modules/test_aio/t/004_read_stream.pl
> b/src/test/modules/test_aio/t/004_read_stream.pl
> +foreach my $method (TestAio::supported_io_methods())
> +{
> +    $node->adjust_conf('postgresql.conf', 'io_method', 'worker');
> +    $node->start();
> +    test_io_method($method, $node);
> +    $node->stop();
> +}
>
> This seems wrong, we always test io_method=worker. I think we need to
> replace 'worker' with the $method. Also, we can add check below to the
> test_io_method function in the 004_read_stream.pl:
> ```
>     is($node->safe_psql('postgres', 'SHOW io_method'),
>         $io_method, "$io_method: io_method set correctly");

Good catch. Fixed. I also found a few other small things in this patch
(v4-0003) which I put in v4-0004.

Some ideas I had that I didn't include in v4-0003 because its Andres
patch and is subjective:

For test_repeated_blocks, the first test:

    # test miss of the same block twice in a row
    $psql->query_safe(
        qq/
SELECT evict_rel('largeish');
/);
    $psql->query_safe(
        qq/
SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
/);
    ok(1, "$io_method: stream missing the same block repeatedly");

It says that it will miss the same block repeatedly, is that because
we won't start a read for any of the blocks until after
read_stream_get_block has returned all of them? If so, could be
clearer in the comment. Not everyone understands all the read stream
internals.

I know a lot of other tests do this, but I find it so hard to read the
test with the SQL is totally left-aligned like that -- especially with
comments interspersed. You can easily flow the queries on multiple
lines and indent it more.

For test_repeated_blocks, the second test:

    # test hit of the same block twice in a row
    $psql->query_safe(
        qq/
SELECT evict_rel('largeish');
/);
    $psql->query_safe(
        qq/
SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4,
5, 6, 5, 4, 3, 2, 1, 0]);
/);
    ok(1, "$io_method: stream accessing same block");

I assume that the second access of 2 is a hit because we actually did
IO for the first one (unlike in the earlier case)?

For test_inject_foreign:

In general, I am not ramped up enough on injection point stuff to know
if the actual new injection point stuff you added in test_aio.c is is
correct and optimal, but I did review the read stream additions to
test_aio.c and the tests added to 004_read_stream.pl.

For test_inject_foreign, the 3rd test:

    # Test read stream encountering two buffers that are undergoing the same
    # IO, started by another backend

I see that psql_b is requesting 3 blocks which can be combined into 1
IO, which makes it different than the 1st foreign IO test case:

    ###
    # Test read stream encountering buffers undergoing IO in another backend,
    # with the other backend's reads succeeding.
    ###

where psql_b only requests 1 but I don't really see how these are
covering different code. Maybe if the read stream one (psql_a) is
doing a combined IO it might exercise slightly different code, but
otherwise I don't get it.

> Nitpick but I prefer something like TrackBufferHit() or
> CountBufferHit() as a function name instead of ProcessBufferHit().
> ProcessBufferHit() gives the impression that the function is doing a
> job more than it currently does. Other than that, 0004 LGTM.

I changed this in attached v4.

- Melanie
From 1d2f564b211a59fc6ea483fbcddd5fa788b3534c Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 9 Sep 2025 10:14:34 -0400
Subject: [PATCH v4 1/6] aio: Refactor tests in preparation for more tests

In a future commit more AIO related tests are due to be introduced. However
001_aio.pl already is fairly large.

This commit introduces a new TestAio package with helpers for writing AIO
related tests. Then it uses the new helpers to simplify the existing
001_aio.pl by iterating over all supported io_methods. This will be
particularly helpful because additional methods already have been submitted.

Additionally this commit splits out testing of initdb using a non-default
method into its own test. While that test is somewhat important, it's fairly
slow and doesn't break that often. For development velocity it's helpful for
001_aio.pl to be faster.

While particularly the latter could benefit from being its own commit, it
seems to introduce more back-and-forth than it's worth.

Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/
---
 src/test/modules/test_aio/meson.build     |   1 +
 src/test/modules/test_aio/t/001_aio.pl    | 140 +++++++---------------
 src/test/modules/test_aio/t/003_initdb.pl |  71 +++++++++++
 src/test/modules/test_aio/t/TestAio.pm    |  90 ++++++++++++++
 4 files changed, 203 insertions(+), 99 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/003_initdb.pl
 create mode 100644 src/test/modules/test_aio/t/TestAio.pm

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index fefa25bc5ab..18a797f3a3b 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -32,6 +32,7 @@ tests += {
     'tests': [
       't/001_aio.pl',
       't/002_io_workers.pl',
+      't/003_initdb.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 5c634ec3ca9..27ee96898e0 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -7,126 +7,55 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+use FindBin;
+use lib $FindBin::RealBin;
 
-###
-# Test io_method=worker
-###
-my $node_worker = create_node('worker');
-$node_worker->start();
-
-test_generic('worker', $node_worker);
-SKIP:
-{
-	skip 'Injection points not supported by this build', 1
-	  unless $ENV{enable_injection_points} eq 'yes';
-	test_inject_worker('worker', $node_worker);
-}
+use TestAio;
 
-$node_worker->stop();
+my %nodes;
 
 
 ###
-# Test io_method=io_uring
+# Create and configure one instance for each io_method
 ###
 
-if (have_io_uring())
+foreach my $method (TestAio::supported_io_methods())
 {
-	my $node_uring = create_node('io_uring');
-	$node_uring->start();
-	test_generic('io_uring', $node_uring);
-	$node_uring->stop();
-}
-
-
-###
-# Test io_method=sync
-###
-
-my $node_sync = create_node('sync');
+	my $node = PostgreSQL::Test::Cluster->new($method);
 
-# just to have one test not use the default auto-tuning
+	$nodes{$method} = $node;
+	$node->init();
+	$node->append_conf('postgresql.conf', "io_method=$method");
+	TestAio::configure($node);
+}
 
-$node_sync->append_conf(
+# Just to have one test not use the default auto-tuning
+$nodes{'sync'}->append_conf(
 	'postgresql.conf', qq(
-io_max_concurrency=4
+ io_max_concurrency=4
 ));
 
-$node_sync->start();
-test_generic('sync', $node_sync);
-$node_sync->stop();
-
-done_testing();
-
 
 ###
-# Test Helpers
+# Execute the tests for each io_method
 ###
 
-sub create_node
+foreach my $method (TestAio::supported_io_methods())
 {
-	local $Test::Builder::Level = $Test::Builder::Level + 1;
-
-	my $io_method = shift;
+	my $node = $nodes{$method};
 
-	my $node = PostgreSQL::Test::Cluster->new($io_method);
-
-	# Want to test initdb for each IO method, otherwise we could just reuse
-	# the cluster.
-	#
-	# Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
-	# options specified by ->extra, if somebody puts -c io_method=xyz in
-	# PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
-	# detect it.
-	local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
-	if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
-		&& $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
-	{
-		$ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
-	}
-
-	$node->init(extra => [ '-c', "io_method=$io_method" ]);
-
-	$node->append_conf(
-		'postgresql.conf', qq(
-shared_preload_libraries=test_aio
-log_min_messages = 'DEBUG3'
-log_statement=all
-log_error_verbosity=default
-restart_after_crash=false
-temp_buffers=100
-));
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
 
-	# Even though we used -c io_method=... above, if TEMP_CONFIG sets
-	# io_method, it'd override the setting persisted at initdb time. While
-	# using (and later verifying) the setting from initdb provides some
-	# verification of having used the io_method during initdb, it's probably
-	# not worth the complication of only appending if the variable is set in
-	# in TEMP_CONFIG.
-	$node->append_conf(
-		'postgresql.conf', qq(
-io_method=$io_method
-));
+done_testing();
 
-	ok(1, "$io_method: initdb");
 
-	return $node;
-}
+###
+# Test Helpers
+###
 
-sub have_io_uring
-{
-	# To detect if io_uring is supported, we look at the error message for
-	# assigning an invalid value to an enum GUC, which lists all the valid
-	# options. We need to use -C to deal with running as administrator on
-	# windows, the superuser check is omitted if -C is used.
-	my ($stdout, $stderr) =
-	  run_command [qw(postgres -C invalid -c io_method=invalid)];
-	die "can't determine supported io_method values"
-	  unless $stderr =~ m/Available values: ([^\.]+)\./;
-	my $methods = $1;
-	note "supported io_method values are: $methods";
-
-	return ($methods =~ m/io_uring/) ? 1 : 0;
-}
 
 sub psql_like
 {
@@ -1490,8 +1419,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
 }
 
 
-# Run all tests that are supported for all io_methods
-sub test_generic
+# Run all tests that for the specified node / io_method
+sub test_io_method
 {
 	my $io_method = shift;
 	my $node = shift;
@@ -1526,10 +1455,23 @@ CHECKPOINT;
 	test_ignore_checksum($io_method, $node);
 	test_checksum_createdb($io_method, $node);
 
+	# generic injection tests
   SKIP:
 	{
 		skip 'Injection points not supported by this build', 1
 		  unless $ENV{enable_injection_points} eq 'yes';
 		test_inject($io_method, $node);
 	}
+
+	# worker specific injection tests
+	if ($io_method eq 'worker')
+	{
+	  SKIP:
+		{
+			skip 'Injection points not supported by this build', 1
+			  unless $ENV{enable_injection_points} eq 'yes';
+
+			test_inject_worker($io_method, $node);
+		}
+	}
 }
diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl
new file mode 100644
index 00000000000..c03ae58d00a
--- /dev/null
+++ b/src/test/modules/test_aio/t/003_initdb.pl
@@ -0,0 +1,71 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test initdb for each IO method. This is done separately from 001_aio.pl, as
+# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can
+# be iterated on more quickly.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	test_create_node($method);
+}
+
+done_testing();
+
+
+sub test_create_node
+{
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	my $io_method = shift;
+
+	my $node = PostgreSQL::Test::Cluster->new($io_method);
+
+	# Want to test initdb for each IO method, otherwise we could just reuse
+	# the cluster.
+	#
+	# Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
+	# options specified by ->extra, if somebody puts -c io_method=xyz in
+	# PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
+	# detect it.
+	local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
+	if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
+		&& $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
+	{
+		$ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
+	}
+
+	$node->init(extra => [ '-c', "io_method=$io_method" ]);
+
+	TestAio::configure($node);
+
+	# Even though we used -c io_method=... above, if TEMP_CONFIG sets
+	# io_method, it'd override the setting persisted at initdb time. While
+	# using (and later verifying) the setting from initdb provides some
+	# verification of having used the io_method during initdb, it's probably
+	# not worth the complication of only appending if the variable is set in
+	# in TEMP_CONFIG.
+	$node->append_conf(
+		'postgresql.conf', qq(
+io_method=$io_method
+));
+
+	ok(1, "$io_method: initdb");
+
+	$node->start();
+	$node->stop();
+	ok(1, "$io_method: start & stop");
+
+	return $node;
+}
diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm
new file mode 100644
index 00000000000..5bc80a9b130
--- /dev/null
+++ b/src/test/modules/test_aio/t/TestAio.pm
@@ -0,0 +1,90 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+TestAio - helpers for writing AIO related tests
+
+=cut
+
+package TestAio;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item TestAio::supported_io_methods()
+
+Return an array of all the supported values for the io_method GUC
+
+=cut
+
+sub supported_io_methods()
+{
+	my @io_methods = ('worker');
+
+	push(@io_methods, "io_uring") if have_io_uring();
+
+	# Return sync last, as it will least commonly fail
+	push(@io_methods, "sync");
+
+	return @io_methods;
+}
+
+
+=item TestAio::configure()
+
+Prepare a cluster for AIO test
+
+=cut
+
+sub configure
+{
+	my $node = shift;
+
+	$node->append_conf(
+		'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+));
+
+}
+
+
+=pod
+
+=item TestAio::have_io_uring()
+
+Return if io_uring is supported
+
+=cut
+
+sub have_io_uring
+{
+	# To detect if io_uring is supported, we look at the error message for
+	# assigning an invalid value to an enum GUC, which lists all the valid
+	# options. We need to use -C to deal with running as administrator on
+	# windows, the superuser check is omitted if -C is used.
+	my ($stdout, $stderr) =
+	  run_command [qw(postgres -C invalid -c io_method=invalid)];
+	die "can't determine supported io_method values"
+	  unless $stderr =~ m/Available values: ([^\.]+)\./;
+	my $methods = $1;
+	note "supported io_method values are: $methods";
+
+	return ($methods =~ m/io_uring/) ? 1 : 0;
+}
+
+1;
-- 
2.43.0

From a4c32be0224bb18bc55e98d7789135258824463d Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 3 Mar 2026 12:27:10 -0500
Subject: [PATCH v4 2/6] small optimization for test refactor

---
 src/test/modules/test_aio/t/001_aio.pl | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 27ee96898e0..e18b2a2b8ae 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -12,6 +12,7 @@ use lib $FindBin::RealBin;
 
 use TestAio;
 
+my @methods = TestAio::supported_io_methods();
 my %nodes;
 
 
@@ -19,7 +20,7 @@ my %nodes;
 # Create and configure one instance for each io_method
 ###
 
-foreach my $method (TestAio::supported_io_methods())
+foreach my $method (@methods)
 {
 	my $node = PostgreSQL::Test::Cluster->new($method);
 
@@ -40,7 +41,7 @@ $nodes{'sync'}->append_conf(
 # Execute the tests for each io_method
 ###
 
-foreach my $method (TestAio::supported_io_methods())
+foreach my $method (@methods)
 {
 	my $node = $nodes{$method};
 
-- 
2.43.0

From afc102665a6b2989f557c58bb8ae3d03eb3192cc Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 10 Sep 2025 14:00:02 -0400
Subject: [PATCH v4 3/6] test_aio: Add read_stream test infrastructure & tests

Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/
---
 src/test/modules/test_aio/meson.build         |   1 +
 .../modules/test_aio/t/004_read_stream.pl     | 282 ++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql   |  26 +-
 src/test/modules/test_aio/test_aio.c          | 344 +++++++++++++++---
 src/tools/pgindent/typedefs.list              |   1 +
 5 files changed, 602 insertions(+), 52 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 18a797f3a3b..909f81d96c1 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
       't/001_aio.pl',
       't/002_io_workers.pl',
       't/003_initdb.pl',
+      't/004_read_stream.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..89cfabbb1d3
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,282 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+$node->append_conf(
+	'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	$node->adjust_conf('postgresql.conf', 'io_method', 'worker');
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+	my $node = shift;
+
+	$node->safe_psql(
+		'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+	ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	# Preventing larger reads makes testing easier
+	$psql->query_safe(
+		qq/
+SET io_combine_limit = 1;
+/);
+
+	# test miss of the same block twice in a row
+	$psql->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+	$psql->query_safe(
+		qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+	ok(1, "$io_method: stream missing the same block repeatedly");
+
+	$psql->query_safe(
+		qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+	ok(1, "$io_method: stream hitting the same block repeatedly");
+
+	# test hit of the same block twice in a row
+	$psql->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+	$psql->query_safe(
+		qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);
+/);
+	ok(1, "$io_method: stream accessing same block");
+
+	$psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads succeeding.
+	###
+	$psql_a->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	ok(1,
+		qq/$io_method: read stream encounters succeeding IO by another backend/
+	);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads failing.
+	###
+	$psql_a->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_short_read_attach(-errno_from_string('EIO'), pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	$psql_b->{run}->pump_nb();
+	like(
+		$psql_b->{stderr},
+		qr/.*ERROR.*could not read blocks 5..5.*$/,
+		"$io_method: injected error occurred");
+	$psql_b->{stderr} = '';
+	$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+
+	ok(1,
+		qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+	###
+	# Test read stream encountering two buffers that are undergoing the same
+	# IO, started by another backend
+	###
+	$psql_a->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>2, nblocks=>3);
+/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);
+/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+	ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
+sub test_io_method
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	test_repeated_blocks($io_method, $node);
+
+  SKIP:
+	{
+		skip 'Injection points not supported by this build', 1
+		  unless $ENV{enable_injection_points} eq 'yes';
+		test_inject_foreign($io_method, $node);
+	}
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..da7cc03829a 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -41,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false)
 RETURNS pg_catalog.bool STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
@@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
 
 /*
  * Handle related functions
@@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Injection point related functions
  */
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index b1aa8af9ec0..911a7102a34 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -20,16 +20,23 @@
 
 #include "access/relation.h"
 #include "fmgr.h"
+#include "funcapi.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/checksum.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
+#include "utils/wait_event.h"
 
 
 PG_MODULE_MAGIC;
@@ -37,13 +44,30 @@ PG_MODULE_MAGIC;
 
 typedef struct InjIoErrorState
 {
+	ConditionVariable cv;
+
 	bool		enabled_short_read;
 	bool		enabled_reopen;
 
+	bool		enabled_completion_wait;
+	Oid			completion_wait_relfilenode;
+	pid_t		completion_wait_pid;
+	uint32		completion_wait_event;
+
 	bool		short_read_result_set;
+	Oid			short_read_relfilenode;
+	pid_t		short_read_pid;
 	int			short_read_result;
 } InjIoErrorState;
 
+typedef struct BlocksReadStreamData
+{
+	int			nblocks;
+	int			curblock;
+	uint32	   *blocks;
+} BlocksReadStreamData;
+
+
 static InjIoErrorState *inj_io_error_state;
 
 /* Shared memory init callbacks */
@@ -85,10 +109,13 @@ test_aio_shmem_startup(void)
 		inj_io_error_state->enabled_short_read = false;
 		inj_io_error_state->enabled_reopen = false;
 
+		ConditionVariableInit(&inj_io_error_state->cv);
+		inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
+
 #ifdef USE_INJECTION_POINTS
 		InjectionPointAttach("aio-process-completion-before-shared",
 							 "test_aio",
-							 "inj_io_short_read",
+							 "inj_io_completion_hook",
 							 NULL,
 							 0);
 		InjectionPointLoad("aio-process-completion-before-shared");
@@ -384,7 +411,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (nblocks <= 0 || nblocks > PG_IOV_MAX)
 		elog(ERROR, "nblocks is out of range");
 
-	rel = relation_open(relid, AccessExclusiveLock);
+	rel = relation_open(relid, AccessShareLock);
 
 	for (int i = 0; i < nblocks; i++)
 	{
@@ -458,6 +485,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	Relation	rel;
+	int32		buffers_evicted,
+				buffers_flushed,
+				buffers_skipped;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+							&buffers_skipped);
+
+	relation_close(rel, AccessExclusiveLock);
+
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(invalidate_rel_block);
 Datum
 invalidate_rel_block(PG_FUNCTION_ARGS)
@@ -610,6 +658,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+						  void *callback_private_data,
+						  void *per_buffer_data)
+{
+	BlocksReadStreamData *stream_data = callback_private_data;
+
+	if (stream_data->curblock >= stream_data->nblocks)
+		return InvalidBlockNumber;
+	return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	BlocksReadStreamData stream_data;
+	ReadStream *stream;
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/*
+	 * We expect the input to be an N-element int4 array; verify that. We
+	 * don't need to use deconstruct_array() since the array data is just
+	 * going to look like a C array of N int4 values.
+	 */
+	if (ARR_NDIM(blocksarray) != 1 ||
+		ARR_HASNULL(blocksarray) ||
+		ARR_ELEMTYPE(blocksarray) != INT4OID)
+		elog(ERROR, "expected 1 dimensional int4 array");
+
+	stream_data.curblock = 0;
+	stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+	stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+	rel = relation_open(relid, AccessShareLock);
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										read_stream_for_blocks_cb,
+										&stream_data,
+										0);
+
+	for (int i = 0; i < stream_data.nblocks; i++)
+	{
+		Buffer		buf = read_stream_next_buffer(stream, NULL);
+		Datum		values[3] = {0};
+		bool		nulls[3] = {0};
+
+		if (!BufferIsValid(buf))
+			elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+		values[0] = Int32GetDatum(i);
+		values[1] = UInt32GetDatum(stream_data.blocks[i]);
+		values[2] = UInt32GetDatum(buf);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		ReleaseBuffer(buf);
+	}
+
+	if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+		elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+			 stream_data.nblocks + 1);
+
+	read_stream_end(stream);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
@@ -680,15 +808,98 @@ batch_end(PG_FUNCTION_ARGS)
 }
 
 #ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
-										  const void *private_data,
-										  void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+											   const void *private_data,
+											   void *arg);
 extern PGDLLEXPORT void inj_io_reopen(const char *name,
 									  const void *private_data,
 									  void *arg);
 
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_short_read)
+		return false;
+
+	if (!inj_io_error_state->short_read_result_set)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->short_read_pid != 0 &&
+		inj_io_error_state->short_read_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+		return false;
+
+	/*
+	 * Only shorten reads that are actually longer than the target size,
+	 * otherwise we can trigger over-reads.
+	 */
+	if (inj_io_error_state->short_read_result >= ioh->result)
+		return false;
+
+	return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_completion_wait)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->completion_wait_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->completion_wait_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode)
+		return false;
+
+	return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+	PgAioHandle *ioh = (PgAioHandle *) arg;
+
+	if (!inj_io_completion_wait_matches(ioh))
+		return;
+
+	ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+	while (true)
+	{
+		if (!inj_io_completion_wait_matches(ioh))
+			break;
+
+		ConditionVariableSleep(&inj_io_error_state->cv,
+							   inj_io_error_state->completion_wait_event);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
 {
 	PgAioHandle *ioh = (PgAioHandle *) arg;
 
@@ -697,58 +908,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
 				   inj_io_error_state->enabled_reopen),
 			errhidestmt(true), errhidecontext(true));
 
-	if (inj_io_error_state->enabled_short_read)
+	if (inj_io_short_read_matches(ioh))
 	{
+		struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+		int32		old_result = ioh->result;
+		int32		new_result = inj_io_error_state->short_read_result;
+		int32		processed = 0;
+
+		ereport(LOG,
+				errmsg("short read inject point, changing result from %d to %d",
+					   old_result, new_result),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
-		 * Only shorten reads that are actually longer than the target size,
-		 * otherwise we can trigger over-reads.
+		 * The underlying IO actually completed OK, and thus the "invalid"
+		 * portion of the IOV actually contains valid data. That can hide a
+		 * lot of problems, e.g. if we were to wrongly mark a buffer, that
+		 * wasn't read according to the shortened-read, IO as valid, the
+		 * contents would look valid and we might miss a bug.
+		 *
+		 * To avoid that, iterate through the IOV and zero out the "failed"
+		 * portion of the IO.
 		 */
-		if (inj_io_error_state->short_read_result_set
-			&& ioh->op == PGAIO_OP_READV
-			&& inj_io_error_state->short_read_result <= ioh->result)
+		for (int i = 0; i < ioh->op_data.read.iov_length; i++)
 		{
-			struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
-			int32		old_result = ioh->result;
-			int32		new_result = inj_io_error_state->short_read_result;
-			int32		processed = 0;
-
-			ereport(LOG,
-					errmsg("short read inject point, changing result from %d to %d",
-						   old_result, new_result),
-					errhidestmt(true), errhidecontext(true));
-
-			/*
-			 * The underlying IO actually completed OK, and thus the "invalid"
-			 * portion of the IOV actually contains valid data. That can hide
-			 * a lot of problems, e.g. if we were to wrongly mark a buffer,
-			 * that wasn't read according to the shortened-read, IO as valid,
-			 * the contents would look valid and we might miss a bug.
-			 *
-			 * To avoid that, iterate through the IOV and zero out the
-			 * "failed" portion of the IO.
-			 */
-			for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+			if (processed + iov[i].iov_len <= new_result)
+				processed += iov[i].iov_len;
+			else if (processed <= new_result)
 			{
-				if (processed + iov[i].iov_len <= new_result)
-					processed += iov[i].iov_len;
-				else if (processed <= new_result)
-				{
-					uint32		ok_part = new_result - processed;
-
-					memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
-					processed += iov[i].iov_len;
-				}
-				else
-				{
-					memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
-				}
-			}
+				uint32		ok_part = new_result - processed;
 
-			ioh->result = new_result;
+				memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+				processed += iov[i].iov_len;
+			}
+			else
+			{
+				memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
+			}
 		}
+
+		ioh->result = new_result;
 	}
 }
 
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+	inj_io_completion_wait_hook(name, private_data, arg);
+	inj_io_short_read_hook(name, private_data, arg);
+}
+
 void
 inj_io_reopen(const char *name, const void *private_data, void *arg)
 {
@@ -762,6 +971,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
 }
 #endif
 
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = true;
+	inj_io_error_state->completion_wait_pid =
+		PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+	inj_io_error_state->completion_wait_relfilenode =
+		PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = false;
+	inj_io_error_state->completion_wait_pid = 0;
+	inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+	ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
 Datum
 inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -771,6 +1013,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
 	inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
 	if (inj_io_error_state->short_read_result_set)
 		inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+	inj_io_error_state->short_read_pid =
+		PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1);
+	inj_io_error_state->short_read_relfilenode =
+		PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2);
 #else
 	elog(ERROR, "injection points not supported");
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 77e3c04144e..668faaa5615 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -303,6 +303,7 @@ BlockSampler
 BlockSamplerData
 BlockedProcData
 BlockedProcsData
+BlocksReadStreamData
 BlocktableEntry
 BloomBuildState
 BloomFilter
-- 
2.43.0

From d1eb014b043d71702bff6d1ba11e90c1e7f0c17a Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 3 Mar 2026 13:27:51 -0500
Subject: [PATCH v4 4/6] test fixes

The pump_until change is needed for the test to work reliably on BSD.
---
 src/test/modules/test_aio/t/004_read_stream.pl | 15 ++++++++-------
 src/test/modules/test_aio/test_aio--1.0.sql    |  2 +-
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 89cfabbb1d3..f3fa018dd1c 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -35,7 +35,7 @@ $node->stop();
 
 foreach my $method (TestAio::supported_io_methods())
 {
-	$node->adjust_conf('postgresql.conf', 'io_method', 'worker');
+	$node->adjust_conf('postgresql.conf', 'io_method', $method);
 	$node->start();
 	test_io_method($method, $node);
 	$node->stop();
@@ -205,15 +205,13 @@ SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5
 		$psql_a->{run}, $psql_a->{timeout},
 		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
 
-	$psql_b->{run}->pump_nb();
-	like(
-		$psql_b->{stderr},
-		qr/.*ERROR.*could not read blocks 5..5.*$/,
-		"$io_method: injected error occurred");
+	pump_until(
+		$psql_b->{run}, $psql_b->{timeout},
+		\$psql_b->{stderr}, qr/ERROR.*could not read blocks 5\.\.5/);
+	ok(1, "$io_method: injected error occurred");
 	$psql_b->{stderr} = '';
 	$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
 
-
 	ok(1,
 		qq/$io_method: read stream encounters failing IO by another backend/);
 
@@ -271,6 +269,9 @@ sub test_io_method
 	my $io_method = shift;
 	my $node = shift;
 
+	is($node->safe_psql('postgres', 'SHOW io_method'),
+		$io_method, "$io_method: io_method set correctly");
+
 	test_repeated_blocks($io_method, $node);
 
   SKIP:
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index da7cc03829a..1cc4734a746 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
 RETURNS pg_catalog.bool STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-- 
2.43.0

From a893d82fe0accd77c807ca3b791713954e319a2c Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Fri, 23 Jan 2026 13:54:02 -0500
Subject: [PATCH v4 5/6] Make buffer hit helper

Already two places count buffer hits, requiring quite a few lines of
code since we do accounting in so many places. Future commits will add
more locations, so refactor into a helper.

Reviewed-by: Nazir Bilal Yavuz <[email protected]>
---
 src/backend/storage/buffer/bufmgr.c | 111 ++++++++++++++--------------
 1 file changed, 56 insertions(+), 55 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index d1babaff023..a749971ba7e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -639,6 +639,10 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 									  bool *foundPtr, IOContext io_context);
 static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress);
 static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
+
+static pg_attribute_always_inline void CountBufferHit(BufferAccessStrategy strategy,
+													  Relation rel, char persistence, SMgrRelation smgr,
+													  ForkNumber forknum, BlockNumber blocknum);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 								IOObject io_object, IOContext io_context);
@@ -1217,8 +1221,6 @@ PinBufferForBlock(Relation rel,
 				  bool *foundPtr)
 {
 	BufferDesc *bufHdr;
-	IOContext	io_context;
-	IOObject	io_object;
 
 	Assert(blockNum != P_NEW);
 
@@ -1227,17 +1229,6 @@ PinBufferForBlock(Relation rel,
 			persistence == RELPERSISTENCE_PERMANENT ||
 			persistence == RELPERSISTENCE_UNLOGGED));
 
-	if (persistence == RELPERSISTENCE_TEMP)
-	{
-		io_context = IOCONTEXT_NORMAL;
-		io_object = IOOBJECT_TEMP_RELATION;
-	}
-	else
-	{
-		io_context = IOContextForStrategy(strategy);
-		io_object = IOOBJECT_RELATION;
-	}
-
 	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
 									   smgr->smgr_rlocator.locator.spcOid,
 									   smgr->smgr_rlocator.locator.dbOid,
@@ -1245,18 +1236,11 @@ PinBufferForBlock(Relation rel,
 									   smgr->smgr_rlocator.backend);
 
 	if (persistence == RELPERSISTENCE_TEMP)
-	{
 		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr);
-		if (*foundPtr)
-			pgBufferUsage.local_blks_hit++;
-	}
 	else
-	{
 		bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum,
-							 strategy, foundPtr, io_context);
-		if (*foundPtr)
-			pgBufferUsage.shared_blks_hit++;
-	}
+							 strategy, foundPtr, IOContextForStrategy(strategy));
+
 	if (rel)
 	{
 		/*
@@ -1265,22 +1249,10 @@ PinBufferForBlock(Relation rel,
 		 * zeroed instead), the per-relation stats always count them.
 		 */
 		pgstat_count_buffer_read(rel);
-		if (*foundPtr)
-			pgstat_count_buffer_hit(rel);
 	}
-	if (*foundPtr)
-	{
-		pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageHit;
 
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  true);
-	}
+	if (*foundPtr)
+		CountBufferHit(strategy, rel, persistence, smgr, forkNum, blockNum);
 
 	return BufferDescriptorGetBuffer(bufHdr);
 }
@@ -1686,6 +1658,51 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 	return ReadBuffersCanStartIOOnce(buffer, nowait);
 }
 
+/*
+ * We track various stats related to buffer hits. Because this is done in a
+ * few separate places, this helper exists for convenience.
+ */
+static pg_attribute_always_inline void
+CountBufferHit(BufferAccessStrategy strategy,
+			   Relation rel, char persistence, SMgrRelation smgr,
+			   ForkNumber forknum, BlockNumber blocknum)
+{
+	IOContext	io_context;
+	IOObject	io_object;
+
+	if (persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
+	TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum,
+									  blocknum,
+									  smgr->smgr_rlocator.locator.spcOid,
+									  smgr->smgr_rlocator.locator.dbOid,
+									  smgr->smgr_rlocator.locator.relNumber,
+									  smgr->smgr_rlocator.backend,
+									  true);
+
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_hit += 1;
+	else
+		pgBufferUsage.shared_blks_hit += 1;
+
+	if (rel)
+		pgstat_count_buffer_hit(rel);
+
+	pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageHit;
+}
+
 /*
  * Helper for WaitReadBuffers() that processes the results of a readv
  * operation, raising an error if necessary.
@@ -1981,25 +1998,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		 * must have started out as a miss in PinBufferForBlock(). The other
 		 * backend will track this as a 'read'.
 		 */
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done,
-										  operation->smgr->smgr_rlocator.locator.spcOid,
-										  operation->smgr->smgr_rlocator.locator.dbOid,
-										  operation->smgr->smgr_rlocator.locator.relNumber,
-										  operation->smgr->smgr_rlocator.backend,
-										  true);
-
-		if (persistence == RELPERSISTENCE_TEMP)
-			pgBufferUsage.local_blks_hit += 1;
-		else
-			pgBufferUsage.shared_blks_hit += 1;
-
-		if (operation->rel)
-			pgstat_count_buffer_hit(operation->rel);
-
-		pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageHit;
+		CountBufferHit(operation->strategy, operation->rel, persistence,
+					   operation->smgr, forknum,
+					   blocknum + operation->nblocks_done);
 	}
 	else
 	{
-- 
2.43.0

From d6a2d6d3316f33f5a7dfdd1a7084ea230d26ae3b Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Fri, 23 Jan 2026 14:00:31 -0500
Subject: [PATCH v4 6/6] Don't wait for already in-progress IO
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

When a backend attempts to start a read on a buffer and finds that I/O
is already in progress, it previously waited for that I/O to complete
before initiating reads for any other buffers. Although the backend must
still wait for the I/O to finish when later acquiring the buffer, it
should not need to wait at read start time. Other buffers may be
available for I/O, and in some workloads this waiting significantly
reduces concurrency.

For example, index scans may repeatedly request the same heap block. If
the backend waits each time it encounters an in-progress read, the
access pattern effectively degenerates into synchronous I/O. By
introducing the concept of foreign I/O operations, a backend can record
the buffer’s wait reference and defer waiting until WaitReadBuffers()
when it actually acquires the buffer.

In rare cases, a backend may still need to wait when starting a read if
it encounters a buffer after another backend has set BM_IO_IN_PROGRESS
but before the buffer descriptor’s wait reference has been set. Such
windows should be brief and uncommon.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
---
 src/backend/storage/buffer/bufmgr.c | 485 ++++++++++++++++++----------
 src/include/storage/bufmgr.h        |   1 +
 src/tools/pgindent/typedefs.list    |   1 +
 3 files changed, 324 insertions(+), 163 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a749971ba7e..f8205c3b845 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -170,6 +170,21 @@ typedef struct SMgrSortArray
 	SMgrRelation srel;
 } SMgrSortArray;
 
+
+/*
+ * In AsyncReadBuffers(), when preparing a buffer for reading and setting
+ * BM_IO_IN_PROGRESS, the buffer may already have I/O in progress or may
+ * already contain the desired block. AsyncReadBuffers() must distinguish
+ * between these cases (and the case where it should initiate I/O) so it can
+ * mark an in-progress buffer as foreign I/O rather than waiting on it.
+ */
+typedef enum PrepareReadBuffer_Status
+{
+	READ_BUFFER_ALREADY_DONE,
+	READ_BUFFER_IN_PROGRESS,
+	READ_BUFFER_READY_FOR_IO,
+} PrepareReadBuffer_Status;
+
 /* GUC variables */
 bool		zero_damaged_pages = false;
 int			bgwriter_lru_maxpages = 100;
@@ -1619,45 +1634,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 #endif
 }
 
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
-	if (BufferIsLocal(buffer))
-		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
-								  true, nowait);
-	else
-		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
-	/*
-	 * If this backend currently has staged IO, we need to submit the pending
-	 * IO before waiting for the right to issue IO, to avoid the potential for
-	 * deadlocks (and, more commonly, unnecessary delays for other backends).
-	 */
-	if (!nowait && pgaio_have_staged())
-	{
-		if (ReadBuffersCanStartIOOnce(buffer, true))
-			return true;
-
-		/*
-		 * Unfortunately StartBufferIO() returning false doesn't allow to
-		 * distinguish between the buffer already being valid and IO already
-		 * being in progress. Since IO already being in progress is quite
-		 * rare, this approach seems fine.
-		 */
-		pgaio_submit_staged();
-	}
-
-	return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
 /*
  * We track various stats related to buffer hits. Because this is done in a
  * few separate places, this helper exists for convenience.
@@ -1807,7 +1783,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 			 *
 			 * we first check if we already know the IO is complete.
 			 */
-			if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+			if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
 				!pgaio_wref_check_done(&operation->io_wref))
 			{
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1826,11 +1802,33 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				Assert(pgaio_wref_check_done(&operation->io_wref));
 			}
 
-			/*
-			 * We now are sure the IO completed. Check the results. This
-			 * includes reporting on errors if there were any.
-			 */
-			ProcessReadBuffersResult(operation);
+			if (unlikely(operation->foreign_io))
+			{
+				Buffer		buffer = operation->buffers[operation->nblocks_done];
+				BufferDesc *desc = BufferIsLocal(buffer) ?
+					GetLocalBufferDescriptor(-buffer - 1) :
+					GetBufferDescriptor(buffer - 1);
+				uint32		buf_state = pg_atomic_read_u64(&desc->state);
+
+				if (buf_state & BM_VALID)
+				{
+					operation->nblocks_done += 1;
+					Assert(operation->nblocks_done <= operation->nblocks);
+
+					CountBufferHit(operation->strategy,
+								   operation->rel, operation->persistence,
+								   operation->smgr, operation->forknum,
+								   operation->blocknum + operation->nblocks_done);
+				}
+			}
+			else
+			{
+				/*
+				 * We now are sure the IO completed. Check the results. This
+				 * includes reporting on errors if there were any.
+				 */
+				ProcessReadBuffersResult(operation);
+			}
 		}
 
 		/*
@@ -1861,6 +1859,163 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	/* NB: READ_DONE tracepoint was already executed in completion callback */
 }
 
+/*
+ * Local version of PrepareNewReadBufferIO(). Here instead of localbuf.c to
+ * avoid an external function call.
+ */
+static PrepareReadBuffer_Status
+PrepareNewLocalReadBufferIO(ReadBuffersOperation *operation,
+							Buffer buffer)
+{
+	BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1);
+	uint64		buf_state = pg_atomic_read_u64(&desc->state);
+
+	/* Already valid, no work to do */
+	if (buf_state & BM_VALID)
+	{
+		pgaio_wref_clear(&operation->io_wref);
+		return READ_BUFFER_ALREADY_DONE;
+	}
+
+	pgaio_submit_staged();
+
+	if (pgaio_wref_valid(&desc->io_wref))
+	{
+		operation->io_wref = desc->io_wref;
+		operation->foreign_io = true;
+		return READ_BUFFER_IN_PROGRESS;
+	}
+
+	/*
+	 * While it is possible for a buffer to have been prepared for IO but not
+	 * yet had its wait reference set, there's no way for us to know that for
+	 * temporary buffers. Thus, we'll prepare for own IO on this buffer.
+	 */
+	return READ_BUFFER_READY_FOR_IO;
+}
+
+/*
+ * Try to start IO on the first buffer in a new run of blocks. If AIO is in
+ * progress, be it in this backend or another backend, we just associate the
+ * wait reference with the operation and wait in WaitReadBuffers(). This turns
+ * out to be important for performance in two workloads:
+ *
+ * 1) A read stream that has to read the same block multiple times within the
+ *    readahead distance. This can happen e.g. for the table accesses of an
+ *    index scan.
+ *
+ * 2) Concurrent scans by multiple backends on the same relation.
+ *
+ * If we were to synchronously wait for the in-progress IO, we'd not be able
+ * to keep enough I/O in flight.
+ *
+ * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+ * ReadBuffersOperation that WaitReadBuffers then can wait on.
+ *
+ * It's possible that another backend has started IO on the buffer but not yet
+ * set its wait reference. In this case, we have no choice but to wait for
+ * either the wait reference to be valid or the IO to be done.
+ */
+static PrepareReadBuffer_Status
+PrepareNewReadBufferIO(ReadBuffersOperation *operation,
+					   Buffer buffer)
+{
+	uint64		buf_state;
+	BufferDesc *desc;
+
+	if (BufferIsLocal(buffer))
+		return PrepareNewLocalReadBufferIO(operation, buffer);
+
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	desc = GetBufferDescriptor(buffer - 1);
+
+	for (;;)
+	{
+		buf_state = LockBufHdr(desc);
+
+		/* Already valid, no work to do */
+		if (buf_state & BM_VALID)
+		{
+			UnlockBufHdr(desc);
+			pgaio_wref_clear(&operation->io_wref);
+			return READ_BUFFER_ALREADY_DONE;
+		}
+
+		if (buf_state & BM_IO_IN_PROGRESS)
+		{
+			/* Join existing read */
+			if (pgaio_wref_valid(&desc->io_wref))
+			{
+				operation->io_wref = desc->io_wref;
+				operation->foreign_io = true;
+				UnlockBufHdr(desc);
+				return READ_BUFFER_IN_PROGRESS;
+			}
+
+			/*
+			 * If the wait ref is not valid but the IO is in progress, someone
+			 * else started IO but hasn't set the wait ref yet. We have no
+			 * choice but to wait until the IO completes.
+			 */
+			UnlockBufHdr(desc);
+			pgaio_submit_staged();
+			WaitIO(desc);
+			continue;
+		}
+
+		/*
+		 * No IO in progress and not already valid; We will start IO. It's
+		 * possible that the IO was in progress and never became valid because
+		 * the IO errored out. We'll do the IO ourselves.
+		 */
+		UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+		ResourceOwnerRememberBufferIO(CurrentResourceOwner,
+									  BufferDescriptorGetBuffer(desc));
+
+		return READ_BUFFER_READY_FOR_IO;
+	}
+}
+
+
+/*
+ * When building a new IO from multiple buffers, we won't include buffers
+ * that are already valid or already in progress. This function should only be
+ * used for additional adjacent buffers following the head buffer in a new IO.
+ *
+ * Returns true if the buffer was successfully prepared for IO and false if it
+ * is rejected and the read IO should not include this buffer.
+*/
+static bool
+PrepareAdditionalReadBuffer(Buffer buffer)
+{
+	uint64		buf_state;
+	BufferDesc *desc;
+
+	if (BufferIsLocal(buffer))
+	{
+		desc = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u64(&desc->state);
+		/* Local buffers don't use BM_IO_IN_PROGRESS */
+		if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref))
+			return false;
+	}
+	else
+	{
+		ResourceOwnerEnlarge(CurrentResourceOwner);
+		desc = GetBufferDescriptor(buffer - 1);
+		buf_state = LockBufHdr(desc);
+		if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS))
+		{
+			UnlockBufHdr(desc);
+			return false;
+		}
+		UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+		ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer);
+	}
+
+	return true;
+}
+
 /*
  * Initiate IO for the ReadBuffersOperation
  *
@@ -1894,7 +2049,75 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	void	   *io_pages[MAX_IO_COMBINE_LIMIT];
 	IOContext	io_context;
 	IOObject	io_object;
-	bool		did_start_io;
+	instr_time	io_start;
+	PrepareReadBuffer_Status status;
+
+	/*
+	 * We must get an IO handle before StartNewBufferReadIO(), as
+	 * pgaio_io_acquire() might block, which we don't want after setting
+	 * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the
+	 * handle.
+	 *
+	 * If we need to wait for IO before we can get a handle, submit
+	 * already-staged IO first, so that other backends don't need to wait.
+	 * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
+	 * wait for already submitted IO, which doesn't require additional locks,
+	 * but it could still cause undesirable waits.
+	 *
+	 * A secondary benefit is that this would allow us to measure the time in
+	 * pgaio_io_acquire() without causing undue timer overhead in the common,
+	 * non-blocking, case.  However, currently the pgstats infrastructure
+	 * doesn't really allow that, as it a) asserts that an operation can't
+	 * have time without operations b) doesn't have an API to report
+	 * "accumulated" time.
+	 */
+	ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
+	if (unlikely(!ioh))
+	{
+		pgaio_submit_staged();
+		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
+	}
+
+	operation->foreign_io = false;
+
+	/* Check if we can start IO on the first to-be-read buffer */
+	if ((status = PrepareNewReadBufferIO(operation, buffers[nblocks_done])) <
+		READ_BUFFER_READY_FOR_IO)
+	{
+		pgaio_io_release(ioh);
+		*nblocks_progress = 1;
+		if (status == READ_BUFFER_ALREADY_DONE)
+		{
+			/*
+			 * Someone else has already completed this block, we're done.
+			 *
+			 * When IO is necessary, ->nblocks_done is updated in
+			 * ProcessReadBuffersResult(), but that is not called if no IO is
+			 * necessary. Thus update here.
+			 */
+			operation->nblocks_done += 1;
+			Assert(operation->nblocks_done <= operation->nblocks);
+
+			/*
+			 * Report and track this as a 'hit' for this backend, even though
+			 * it must have started out as a miss in PinBufferForBlock(). The
+			 * other backend will track this as a 'read'.
+			 */
+			CountBufferHit(operation->strategy,
+						   operation->rel, operation->persistence,
+						   operation->smgr, operation->forknum,
+						   operation->blocknum + operation->nblocks_done);
+			return false;
+		}
+
+		/* The IO is already in-progress */
+		Assert(status == READ_BUFFER_IN_PROGRESS);
+		CheckReadBuffersOperation(operation, false);
+		return true;
+	}
+
+	/* We can read in at least the head buffer . */
+	Assert(status == READ_BUFFER_READY_FOR_IO);
 
 	/*
 	 * When this IO is executed synchronously, either because the caller will
@@ -1945,138 +2168,74 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	 */
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
-	/*
-	 * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
-	 * might block, which we don't want after setting IO_IN_PROGRESS.
-	 *
-	 * If we need to wait for IO before we can get a handle, submit
-	 * already-staged IO first, so that other backends don't need to wait.
-	 * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
-	 * wait for already submitted IO, which doesn't require additional locks,
-	 * but it could still cause undesirable waits.
-	 *
-	 * A secondary benefit is that this would allow us to measure the time in
-	 * pgaio_io_acquire() without causing undue timer overhead in the common,
-	 * non-blocking, case.  However, currently the pgstats infrastructure
-	 * doesn't really allow that, as it a) asserts that an operation can't
-	 * have time without operations b) doesn't have an API to report
-	 * "accumulated" time.
-	 */
-	ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
-	if (unlikely(!ioh))
-	{
-		pgaio_submit_staged();
-
-		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
-	}
+	Assert(io_buffers[0] == buffers[nblocks_done]);
+	io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
+	io_buffers_len = 1;
 
 	/*
-	 * Check if we can start IO on the first to-be-read buffer.
-	 *
-	 * If an I/O is already in progress in another backend, we want to wait
-	 * for the outcome: either done, or something went wrong and we will
-	 * retry.
+	 * How many neighboring-on-disk blocks can we scatter-read into other
+	 * buffers at the same time?  In this case we don't wait if we see an I/O
+	 * already in progress.  We already set BM_IO_IN_PROGRESS for the head
+	 * block, so we should get on with that I/O as soon as possible.
 	 */
-	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+	for (int i = nblocks_done + 1; i < operation->nblocks; i++)
 	{
-		/*
-		 * Someone else has already completed this block, we're done.
-		 *
-		 * When IO is necessary, ->nblocks_done is updated in
-		 * ProcessReadBuffersResult(), but that is not called if no IO is
-		 * necessary. Thus update here.
-		 */
-		operation->nblocks_done += 1;
-		*nblocks_progress = 1;
-
-		pgaio_io_release(ioh);
-		pgaio_wref_clear(&operation->io_wref);
-		did_start_io = false;
+		if (!PrepareAdditionalReadBuffer(buffers[i]))
+			break;
+		/* Must be consecutive block numbers. */
+		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+			   BufferGetBlockNumber(buffers[i]) - 1);
+		Assert(io_buffers[io_buffers_len] == buffers[i]);
 
-		/*
-		 * Report and track this as a 'hit' for this backend, even though it
-		 * must have started out as a miss in PinBufferForBlock(). The other
-		 * backend will track this as a 'read'.
-		 */
-		CountBufferHit(operation->strategy, operation->rel, persistence,
-					   operation->smgr, forknum,
-					   blocknum + operation->nblocks_done);
+		io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 	}
-	else
-	{
-		instr_time	io_start;
-
-		/* We found a buffer that we need to read in. */
-		Assert(io_buffers[0] == buffers[nblocks_done]);
-		io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
-		io_buffers_len = 1;
-
-		/*
-		 * How many neighboring-on-disk blocks can we scatter-read into other
-		 * buffers at the same time?  In this case we don't wait if we see an
-		 * I/O already in progress.  We already set BM_IO_IN_PROGRESS for the
-		 * head block, so we should get on with that I/O as soon as possible.
-		 */
-		for (int i = nblocks_done + 1; i < operation->nblocks; i++)
-		{
-			if (!ReadBuffersCanStartIO(buffers[i], true))
-				break;
-			/* Must be consecutive block numbers. */
-			Assert(BufferGetBlockNumber(buffers[i - 1]) ==
-				   BufferGetBlockNumber(buffers[i]) - 1);
-			Assert(io_buffers[io_buffers_len] == buffers[i]);
 
-			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
-		}
+	/* get a reference to wait for in WaitReadBuffers() */
+	pgaio_io_get_wref(ioh, &operation->io_wref);
 
-		/* get a reference to wait for in WaitReadBuffers() */
-		pgaio_io_get_wref(ioh, &operation->io_wref);
+	/* provide the list of buffers to the completion callbacks */
+	pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
 
-		/* provide the list of buffers to the completion callbacks */
-		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+	pgaio_io_register_callbacks(ioh,
+								persistence == RELPERSISTENCE_TEMP ?
+								PGAIO_HCB_LOCAL_BUFFER_READV :
+								PGAIO_HCB_SHARED_BUFFER_READV,
+								flags);
 
-		pgaio_io_register_callbacks(ioh,
-									persistence == RELPERSISTENCE_TEMP ?
-									PGAIO_HCB_LOCAL_BUFFER_READV :
-									PGAIO_HCB_SHARED_BUFFER_READV,
-									flags);
+	pgaio_io_set_flag(ioh, ioh_flags);
 
-		pgaio_io_set_flag(ioh, ioh_flags);
+	/* ---
+	* Even though we're trying to issue IO asynchronously, track the time
+	* in smgrstartreadv():
+	* - if io_method == IOMETHOD_SYNC, we will always perform the IO
+	*   immediately
+	* - the io method might not support the IO (e.g. worker IO for a temp
+	*   table)
+	* ---
+	*/
+	io_start = pgstat_prepare_io_time(track_io_timing);
+	smgrstartreadv(ioh, operation->smgr, forknum,
+				   blocknum + nblocks_done,
+				   io_pages, io_buffers_len);
+	pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+							io_start, 1, io_buffers_len * BLCKSZ);
 
-		/* ---
-		 * Even though we're trying to issue IO asynchronously, track the time
-		 * in smgrstartreadv():
-		 * - if io_method == IOMETHOD_SYNC, we will always perform the IO
-		 *   immediately
-		 * - the io method might not support the IO (e.g. worker IO for a temp
-		 *   table)
-		 * ---
-		 */
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrstartreadv(ioh, operation->smgr, forknum,
-					   blocknum + nblocks_done,
-					   io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
-								io_start, 1, io_buffers_len * BLCKSZ);
-
-		if (persistence == RELPERSISTENCE_TEMP)
-			pgBufferUsage.local_blks_read += io_buffers_len;
-		else
-			pgBufferUsage.shared_blks_read += io_buffers_len;
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_read += io_buffers_len;
+	else
+		pgBufferUsage.shared_blks_read += io_buffers_len;
 
-		/*
-		 * Track vacuum cost when issuing IO, not after waiting for it.
-		 * Otherwise we could end up issuing a lot of IO in a short timespan,
-		 * despite a low cost limit.
-		 */
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	/*
+	 * Track vacuum cost when issuing IO, not after waiting for it. Otherwise
+	 * we could end up issuing a lot of IO in a short timespan, despite a low
+	 * cost limit.
+	 */
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
 
-		*nblocks_progress = io_buffers_len;
-		did_start_io = true;
-	}
+	*nblocks_progress = io_buffers_len;
 
-	return did_start_io;
+	return true;
 }
 
 /*
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a40adf6b2a8..1358fc7fa64 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -147,6 +147,7 @@ struct ReadBuffersOperation
 	int			flags;
 	int16		nblocks;
 	int16		nblocks_done;
+	bool		foreign_io;
 	PgAioWaitRef io_wref;
 	PgAioReturn io_return;
 };
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 668faaa5615..a656bbf9110 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2345,6 +2345,7 @@ PredicateLockData
 PredicateLockTargetType
 PrefetchBufferResult
 PrepParallelRestorePtrType
+PrepareReadBuffer_Status
 PrepareStmt
 PreparedStatement
 PresortedKeyData
-- 
2.43.0

Reply via email to