Thanks for the review!
On Thu, Feb 5, 2026 at 11:56 AM Nazir Bilal Yavuz <[email protected]> wrote:
>
> On Sat, 24 Jan 2026 at 00:04, Melanie Plageman
> <[email protected]> wrote:
>
> > - In the 004_read_stream tests, I wonder if there is a way to test
> > that we don't wait for foreign IO until WaitReadBuffers(). We have
> > tests for the stream accessing the same block, which in some cases
> > will exercise the foreign IO path. But it doesn't distinguish between
> > the old behavior -- waiting for the IO to complete when starting read
> > IO on it -- and the new behavior -- not waiting until
> > WaitReadBuffers(). That may not be possible to test, though.
>
> Won't 'stream accessing the same block test' almost always test the
> new behavior (not waiting until WaitReadBuffers())? Having dedicated
> tests for both cases would be helpful, though.
Yea, I was thinking something like testing that if session A is
blocked completing read of block 2 and session B is requesting blocks
2-4 that buffers containing blocks 3 and 4 are valid when session B is
waiting on block 2 to finish.
I started working on something but it needed some new infrastructure
to check if the buffer is valid, and I wanted to see what others
thought first.
I did finally review Andres' test patches and have included my review
feedback here as well.
"aio: Refactor tests in preparation for more tests" (v4-0001) looks
good to me as well. I included one tiny idea AI suggested to me in a
follow-on patch (v4-0002).
> diff --git a/src/test/modules/test_aio/t/004_read_stream.pl
> b/src/test/modules/test_aio/t/004_read_stream.pl
> +foreach my $method (TestAio::supported_io_methods())
> +{
> + $node->adjust_conf('postgresql.conf', 'io_method', 'worker');
> + $node->start();
> + test_io_method($method, $node);
> + $node->stop();
> +}
>
> This seems wrong, we always test io_method=worker. I think we need to
> replace 'worker' with the $method. Also, we can add check below to the
> test_io_method function in the 004_read_stream.pl:
> ```
> is($node->safe_psql('postgres', 'SHOW io_method'),
> $io_method, "$io_method: io_method set correctly");
Good catch. Fixed. I also found a few other small things in this patch
(v4-0003) which I put in v4-0004.
Some ideas I had that I didn't include in v4-0003 because its Andres
patch and is subjective:
For test_repeated_blocks, the first test:
# test miss of the same block twice in a row
$psql->query_safe(
qq/
SELECT evict_rel('largeish');
/);
$psql->query_safe(
qq/
SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
/);
ok(1, "$io_method: stream missing the same block repeatedly");
It says that it will miss the same block repeatedly, is that because
we won't start a read for any of the blocks until after
read_stream_get_block has returned all of them? If so, could be
clearer in the comment. Not everyone understands all the read stream
internals.
I know a lot of other tests do this, but I find it so hard to read the
test with the SQL is totally left-aligned like that -- especially with
comments interspersed. You can easily flow the queries on multiple
lines and indent it more.
For test_repeated_blocks, the second test:
# test hit of the same block twice in a row
$psql->query_safe(
qq/
SELECT evict_rel('largeish');
/);
$psql->query_safe(
qq/
SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4,
5, 6, 5, 4, 3, 2, 1, 0]);
/);
ok(1, "$io_method: stream accessing same block");
I assume that the second access of 2 is a hit because we actually did
IO for the first one (unlike in the earlier case)?
For test_inject_foreign:
In general, I am not ramped up enough on injection point stuff to know
if the actual new injection point stuff you added in test_aio.c is is
correct and optimal, but I did review the read stream additions to
test_aio.c and the tests added to 004_read_stream.pl.
For test_inject_foreign, the 3rd test:
# Test read stream encountering two buffers that are undergoing the same
# IO, started by another backend
I see that psql_b is requesting 3 blocks which can be combined into 1
IO, which makes it different than the 1st foreign IO test case:
###
# Test read stream encountering buffers undergoing IO in another backend,
# with the other backend's reads succeeding.
###
where psql_b only requests 1 but I don't really see how these are
covering different code. Maybe if the read stream one (psql_a) is
doing a combined IO it might exercise slightly different code, but
otherwise I don't get it.
> Nitpick but I prefer something like TrackBufferHit() or
> CountBufferHit() as a function name instead of ProcessBufferHit().
> ProcessBufferHit() gives the impression that the function is doing a
> job more than it currently does. Other than that, 0004 LGTM.
I changed this in attached v4.
- Melanie
From 1d2f564b211a59fc6ea483fbcddd5fa788b3534c Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 9 Sep 2025 10:14:34 -0400
Subject: [PATCH v4 1/6] aio: Refactor tests in preparation for more tests
In a future commit more AIO related tests are due to be introduced. However
001_aio.pl already is fairly large.
This commit introduces a new TestAio package with helpers for writing AIO
related tests. Then it uses the new helpers to simplify the existing
001_aio.pl by iterating over all supported io_methods. This will be
particularly helpful because additional methods already have been submitted.
Additionally this commit splits out testing of initdb using a non-default
method into its own test. While that test is somewhat important, it's fairly
slow and doesn't break that often. For development velocity it's helpful for
001_aio.pl to be faster.
While particularly the latter could benefit from being its own commit, it
seems to introduce more back-and-forth than it's worth.
Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/
---
src/test/modules/test_aio/meson.build | 1 +
src/test/modules/test_aio/t/001_aio.pl | 140 +++++++---------------
src/test/modules/test_aio/t/003_initdb.pl | 71 +++++++++++
src/test/modules/test_aio/t/TestAio.pm | 90 ++++++++++++++
4 files changed, 203 insertions(+), 99 deletions(-)
create mode 100644 src/test/modules/test_aio/t/003_initdb.pl
create mode 100644 src/test/modules/test_aio/t/TestAio.pm
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index fefa25bc5ab..18a797f3a3b 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -32,6 +32,7 @@ tests += {
'tests': [
't/001_aio.pl',
't/002_io_workers.pl',
+ 't/003_initdb.pl',
],
},
}
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 5c634ec3ca9..27ee96898e0 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -7,126 +7,55 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
+use FindBin;
+use lib $FindBin::RealBin;
-###
-# Test io_method=worker
-###
-my $node_worker = create_node('worker');
-$node_worker->start();
-
-test_generic('worker', $node_worker);
-SKIP:
-{
- skip 'Injection points not supported by this build', 1
- unless $ENV{enable_injection_points} eq 'yes';
- test_inject_worker('worker', $node_worker);
-}
+use TestAio;
-$node_worker->stop();
+my %nodes;
###
-# Test io_method=io_uring
+# Create and configure one instance for each io_method
###
-if (have_io_uring())
+foreach my $method (TestAio::supported_io_methods())
{
- my $node_uring = create_node('io_uring');
- $node_uring->start();
- test_generic('io_uring', $node_uring);
- $node_uring->stop();
-}
-
-
-###
-# Test io_method=sync
-###
-
-my $node_sync = create_node('sync');
+ my $node = PostgreSQL::Test::Cluster->new($method);
-# just to have one test not use the default auto-tuning
+ $nodes{$method} = $node;
+ $node->init();
+ $node->append_conf('postgresql.conf', "io_method=$method");
+ TestAio::configure($node);
+}
-$node_sync->append_conf(
+# Just to have one test not use the default auto-tuning
+$nodes{'sync'}->append_conf(
'postgresql.conf', qq(
-io_max_concurrency=4
+ io_max_concurrency=4
));
-$node_sync->start();
-test_generic('sync', $node_sync);
-$node_sync->stop();
-
-done_testing();
-
###
-# Test Helpers
+# Execute the tests for each io_method
###
-sub create_node
+foreach my $method (TestAio::supported_io_methods())
{
- local $Test::Builder::Level = $Test::Builder::Level + 1;
-
- my $io_method = shift;
+ my $node = $nodes{$method};
- my $node = PostgreSQL::Test::Cluster->new($io_method);
-
- # Want to test initdb for each IO method, otherwise we could just reuse
- # the cluster.
- #
- # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
- # options specified by ->extra, if somebody puts -c io_method=xyz in
- # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
- # detect it.
- local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
- if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
- && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
- {
- $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
- }
-
- $node->init(extra => [ '-c', "io_method=$io_method" ]);
-
- $node->append_conf(
- 'postgresql.conf', qq(
-shared_preload_libraries=test_aio
-log_min_messages = 'DEBUG3'
-log_statement=all
-log_error_verbosity=default
-restart_after_crash=false
-temp_buffers=100
-));
+ $node->start();
+ test_io_method($method, $node);
+ $node->stop();
+}
- # Even though we used -c io_method=... above, if TEMP_CONFIG sets
- # io_method, it'd override the setting persisted at initdb time. While
- # using (and later verifying) the setting from initdb provides some
- # verification of having used the io_method during initdb, it's probably
- # not worth the complication of only appending if the variable is set in
- # in TEMP_CONFIG.
- $node->append_conf(
- 'postgresql.conf', qq(
-io_method=$io_method
-));
+done_testing();
- ok(1, "$io_method: initdb");
- return $node;
-}
+###
+# Test Helpers
+###
-sub have_io_uring
-{
- # To detect if io_uring is supported, we look at the error message for
- # assigning an invalid value to an enum GUC, which lists all the valid
- # options. We need to use -C to deal with running as administrator on
- # windows, the superuser check is omitted if -C is used.
- my ($stdout, $stderr) =
- run_command [qw(postgres -C invalid -c io_method=invalid)];
- die "can't determine supported io_method values"
- unless $stderr =~ m/Available values: ([^\.]+)\./;
- my $methods = $1;
- note "supported io_method values are: $methods";
-
- return ($methods =~ m/io_uring/) ? 1 : 0;
-}
sub psql_like
{
@@ -1490,8 +1419,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
}
-# Run all tests that are supported for all io_methods
-sub test_generic
+# Run all tests that for the specified node / io_method
+sub test_io_method
{
my $io_method = shift;
my $node = shift;
@@ -1526,10 +1455,23 @@ CHECKPOINT;
test_ignore_checksum($io_method, $node);
test_checksum_createdb($io_method, $node);
+ # generic injection tests
SKIP:
{
skip 'Injection points not supported by this build', 1
unless $ENV{enable_injection_points} eq 'yes';
test_inject($io_method, $node);
}
+
+ # worker specific injection tests
+ if ($io_method eq 'worker')
+ {
+ SKIP:
+ {
+ skip 'Injection points not supported by this build', 1
+ unless $ENV{enable_injection_points} eq 'yes';
+
+ test_inject_worker($io_method, $node);
+ }
+ }
}
diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl
new file mode 100644
index 00000000000..c03ae58d00a
--- /dev/null
+++ b/src/test/modules/test_aio/t/003_initdb.pl
@@ -0,0 +1,71 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test initdb for each IO method. This is done separately from 001_aio.pl, as
+# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can
+# be iterated on more quickly.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+ test_create_node($method);
+}
+
+done_testing();
+
+
+sub test_create_node
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my $io_method = shift;
+
+ my $node = PostgreSQL::Test::Cluster->new($io_method);
+
+ # Want to test initdb for each IO method, otherwise we could just reuse
+ # the cluster.
+ #
+ # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
+ # options specified by ->extra, if somebody puts -c io_method=xyz in
+ # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
+ # detect it.
+ local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
+ if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
+ && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
+ {
+ $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
+ }
+
+ $node->init(extra => [ '-c', "io_method=$io_method" ]);
+
+ TestAio::configure($node);
+
+ # Even though we used -c io_method=... above, if TEMP_CONFIG sets
+ # io_method, it'd override the setting persisted at initdb time. While
+ # using (and later verifying) the setting from initdb provides some
+ # verification of having used the io_method during initdb, it's probably
+ # not worth the complication of only appending if the variable is set in
+ # in TEMP_CONFIG.
+ $node->append_conf(
+ 'postgresql.conf', qq(
+io_method=$io_method
+));
+
+ ok(1, "$io_method: initdb");
+
+ $node->start();
+ $node->stop();
+ ok(1, "$io_method: start & stop");
+
+ return $node;
+}
diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm
new file mode 100644
index 00000000000..5bc80a9b130
--- /dev/null
+++ b/src/test/modules/test_aio/t/TestAio.pm
@@ -0,0 +1,90 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+TestAio - helpers for writing AIO related tests
+
+=cut
+
+package TestAio;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item TestAio::supported_io_methods()
+
+Return an array of all the supported values for the io_method GUC
+
+=cut
+
+sub supported_io_methods()
+{
+ my @io_methods = ('worker');
+
+ push(@io_methods, "io_uring") if have_io_uring();
+
+ # Return sync last, as it will least commonly fail
+ push(@io_methods, "sync");
+
+ return @io_methods;
+}
+
+
+=item TestAio::configure()
+
+Prepare a cluster for AIO test
+
+=cut
+
+sub configure
+{
+ my $node = shift;
+
+ $node->append_conf(
+ 'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+));
+
+}
+
+
+=pod
+
+=item TestAio::have_io_uring()
+
+Return if io_uring is supported
+
+=cut
+
+sub have_io_uring
+{
+ # To detect if io_uring is supported, we look at the error message for
+ # assigning an invalid value to an enum GUC, which lists all the valid
+ # options. We need to use -C to deal with running as administrator on
+ # windows, the superuser check is omitted if -C is used.
+ my ($stdout, $stderr) =
+ run_command [qw(postgres -C invalid -c io_method=invalid)];
+ die "can't determine supported io_method values"
+ unless $stderr =~ m/Available values: ([^\.]+)\./;
+ my $methods = $1;
+ note "supported io_method values are: $methods";
+
+ return ($methods =~ m/io_uring/) ? 1 : 0;
+}
+
+1;
--
2.43.0
From a4c32be0224bb18bc55e98d7789135258824463d Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 3 Mar 2026 12:27:10 -0500
Subject: [PATCH v4 2/6] small optimization for test refactor
---
src/test/modules/test_aio/t/001_aio.pl | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 27ee96898e0..e18b2a2b8ae 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -12,6 +12,7 @@ use lib $FindBin::RealBin;
use TestAio;
+my @methods = TestAio::supported_io_methods();
my %nodes;
@@ -19,7 +20,7 @@ my %nodes;
# Create and configure one instance for each io_method
###
-foreach my $method (TestAio::supported_io_methods())
+foreach my $method (@methods)
{
my $node = PostgreSQL::Test::Cluster->new($method);
@@ -40,7 +41,7 @@ $nodes{'sync'}->append_conf(
# Execute the tests for each io_method
###
-foreach my $method (TestAio::supported_io_methods())
+foreach my $method (@methods)
{
my $node = $nodes{$method};
--
2.43.0
From afc102665a6b2989f557c58bb8ae3d03eb3192cc Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 10 Sep 2025 14:00:02 -0400
Subject: [PATCH v4 3/6] test_aio: Add read_stream test infrastructure & tests
Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/
---
src/test/modules/test_aio/meson.build | 1 +
.../modules/test_aio/t/004_read_stream.pl | 282 ++++++++++++++
src/test/modules/test_aio/test_aio--1.0.sql | 26 +-
src/test/modules/test_aio/test_aio.c | 344 +++++++++++++++---
src/tools/pgindent/typedefs.list | 1 +
5 files changed, 602 insertions(+), 52 deletions(-)
create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 18a797f3a3b..909f81d96c1 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
't/001_aio.pl',
't/002_io_workers.pl',
't/003_initdb.pl',
+ 't/004_read_stream.pl',
],
},
}
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..89cfabbb1d3
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,282 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+$node->append_conf(
+ 'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+ $node->adjust_conf('postgresql.conf', 'io_method', 'worker');
+ $node->start();
+ test_io_method($method, $node);
+ $node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+ my $node = shift;
+
+ $node->safe_psql(
+ 'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+ ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ # Preventing larger reads makes testing easier
+ $psql->query_safe(
+ qq/
+SET io_combine_limit = 1;
+/);
+
+ # test miss of the same block twice in a row
+ $psql->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+ ok(1, "$io_method: stream missing the same block repeatedly");
+
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+ ok(1, "$io_method: stream hitting the same block repeatedly");
+
+ # test hit of the same block twice in a row
+ $psql->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);
+/);
+ ok(1, "$io_method: stream accessing same block");
+
+ $psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+ my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+ my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads succeeding.
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ ok(1,
+ qq/$io_method: read stream encounters succeeding IO by another backend/
+ );
+
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads failing.
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_short_read_attach(-errno_from_string('EIO'), pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ $psql_b->{run}->pump_nb();
+ like(
+ $psql_b->{stderr},
+ qr/.*ERROR.*could not read blocks 5..5.*$/,
+ "$io_method: injected error occurred");
+ $psql_b->{stderr} = '';
+ $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+
+ ok(1,
+ qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+ ###
+ # Test read stream encountering two buffers that are undergoing the same
+ # IO, started by another backend
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>2, nblocks=>3);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+ ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+
+ $psql_a->quit();
+ $psql_b->quit();
+}
+
+
+sub test_io_method
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ test_repeated_blocks($io_method, $node);
+
+ SKIP:
+ {
+ skip 'Injection points not supported by this build', 1
+ unless $ENV{enable_injection_points} eq 'yes';
+ test_inject_foreign($io_method, $node);
+ }
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..da7cc03829a 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -41,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false)
RETURNS pg_catalog.bool STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
/*
* Handle related functions
@@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Injection point related functions
*/
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index b1aa8af9ec0..911a7102a34 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -20,16 +20,23 @@
#include "access/relation.h"
#include "fmgr.h"
+#include "funcapi.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/checksum.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
+#include "utils/wait_event.h"
PG_MODULE_MAGIC;
@@ -37,13 +44,30 @@ PG_MODULE_MAGIC;
typedef struct InjIoErrorState
{
+ ConditionVariable cv;
+
bool enabled_short_read;
bool enabled_reopen;
+ bool enabled_completion_wait;
+ Oid completion_wait_relfilenode;
+ pid_t completion_wait_pid;
+ uint32 completion_wait_event;
+
bool short_read_result_set;
+ Oid short_read_relfilenode;
+ pid_t short_read_pid;
int short_read_result;
} InjIoErrorState;
+typedef struct BlocksReadStreamData
+{
+ int nblocks;
+ int curblock;
+ uint32 *blocks;
+} BlocksReadStreamData;
+
+
static InjIoErrorState *inj_io_error_state;
/* Shared memory init callbacks */
@@ -85,10 +109,13 @@ test_aio_shmem_startup(void)
inj_io_error_state->enabled_short_read = false;
inj_io_error_state->enabled_reopen = false;
+ ConditionVariableInit(&inj_io_error_state->cv);
+ inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
+
#ifdef USE_INJECTION_POINTS
InjectionPointAttach("aio-process-completion-before-shared",
"test_aio",
- "inj_io_short_read",
+ "inj_io_completion_hook",
NULL,
0);
InjectionPointLoad("aio-process-completion-before-shared");
@@ -384,7 +411,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
if (nblocks <= 0 || nblocks > PG_IOV_MAX)
elog(ERROR, "nblocks is out of range");
- rel = relation_open(relid, AccessExclusiveLock);
+ rel = relation_open(relid, AccessShareLock);
for (int i = 0; i < nblocks; i++)
{
@@ -458,6 +485,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ Relation rel;
+ int32 buffers_evicted,
+ buffers_flushed,
+ buffers_skipped;
+
+ rel = relation_open(relid, AccessExclusiveLock);
+
+ EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+ &buffers_skipped);
+
+ relation_close(rel, AccessExclusiveLock);
+
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(invalidate_rel_block);
Datum
invalidate_rel_block(PG_FUNCTION_ARGS)
@@ -610,6 +658,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ BlocksReadStreamData *stream_data = callback_private_data;
+
+ if (stream_data->curblock >= stream_data->nblocks)
+ return InvalidBlockNumber;
+ return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Relation rel;
+ BlocksReadStreamData stream_data;
+ ReadStream *stream;
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ /*
+ * We expect the input to be an N-element int4 array; verify that. We
+ * don't need to use deconstruct_array() since the array data is just
+ * going to look like a C array of N int4 values.
+ */
+ if (ARR_NDIM(blocksarray) != 1 ||
+ ARR_HASNULL(blocksarray) ||
+ ARR_ELEMTYPE(blocksarray) != INT4OID)
+ elog(ERROR, "expected 1 dimensional int4 array");
+
+ stream_data.curblock = 0;
+ stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+ stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+ rel = relation_open(relid, AccessShareLock);
+
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ read_stream_for_blocks_cb,
+ &stream_data,
+ 0);
+
+ for (int i = 0; i < stream_data.nblocks; i++)
+ {
+ Buffer buf = read_stream_next_buffer(stream, NULL);
+ Datum values[3] = {0};
+ bool nulls[3] = {0};
+
+ if (!BufferIsValid(buf))
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+ values[0] = Int32GetDatum(i);
+ values[1] = UInt32GetDatum(stream_data.blocks[i]);
+ values[2] = UInt32GetDatum(buf);
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+ ReleaseBuffer(buf);
+ }
+
+ if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+ stream_data.nblocks + 1);
+
+ read_stream_end(stream);
+
+ relation_close(rel, NoLock);
+
+ return (Datum) 0;
+}
+
+
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)
@@ -680,15 +808,98 @@ batch_end(PG_FUNCTION_ARGS)
}
#ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
- const void *private_data,
- void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+ const void *private_data,
+ void *arg);
extern PGDLLEXPORT void inj_io_reopen(const char *name,
const void *private_data,
void *arg);
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioTargetData *td;
+
+ if (!inj_io_error_state->enabled_short_read)
+ return false;
+
+ if (!inj_io_error_state->short_read_result_set)
+ return false;
+
+ owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ owner_pid = owner_proc->pid;
+
+ if (inj_io_error_state->short_read_pid != 0 &&
+ inj_io_error_state->short_read_pid != owner_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+ return false;
+
+ /*
+ * Only shorten reads that are actually longer than the target size,
+ * otherwise we can trigger over-reads.
+ */
+ if (inj_io_error_state->short_read_result >= ioh->result)
+ return false;
+
+ return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioTargetData *td;
+
+ if (!inj_io_error_state->enabled_completion_wait)
+ return false;
+
+ owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ owner_pid = owner_proc->pid;
+
+ if (inj_io_error_state->completion_wait_pid != owner_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ if (inj_io_error_state->completion_wait_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode)
+ return false;
+
+ return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+ PgAioHandle *ioh = (PgAioHandle *) arg;
+
+ if (!inj_io_completion_wait_matches(ioh))
+ return;
+
+ ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+ while (true)
+ {
+ if (!inj_io_completion_wait_matches(ioh))
+ break;
+
+ ConditionVariableSleep(&inj_io_error_state->cv,
+ inj_io_error_state->completion_wait_event);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
{
PgAioHandle *ioh = (PgAioHandle *) arg;
@@ -697,58 +908,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
inj_io_error_state->enabled_reopen),
errhidestmt(true), errhidecontext(true));
- if (inj_io_error_state->enabled_short_read)
+ if (inj_io_short_read_matches(ioh))
{
+ struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+ int32 old_result = ioh->result;
+ int32 new_result = inj_io_error_state->short_read_result;
+ int32 processed = 0;
+
+ ereport(LOG,
+ errmsg("short read inject point, changing result from %d to %d",
+ old_result, new_result),
+ errhidestmt(true), errhidecontext(true));
+
/*
- * Only shorten reads that are actually longer than the target size,
- * otherwise we can trigger over-reads.
+ * The underlying IO actually completed OK, and thus the "invalid"
+ * portion of the IOV actually contains valid data. That can hide a
+ * lot of problems, e.g. if we were to wrongly mark a buffer, that
+ * wasn't read according to the shortened-read, IO as valid, the
+ * contents would look valid and we might miss a bug.
+ *
+ * To avoid that, iterate through the IOV and zero out the "failed"
+ * portion of the IO.
*/
- if (inj_io_error_state->short_read_result_set
- && ioh->op == PGAIO_OP_READV
- && inj_io_error_state->short_read_result <= ioh->result)
+ for (int i = 0; i < ioh->op_data.read.iov_length; i++)
{
- struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
- int32 old_result = ioh->result;
- int32 new_result = inj_io_error_state->short_read_result;
- int32 processed = 0;
-
- ereport(LOG,
- errmsg("short read inject point, changing result from %d to %d",
- old_result, new_result),
- errhidestmt(true), errhidecontext(true));
-
- /*
- * The underlying IO actually completed OK, and thus the "invalid"
- * portion of the IOV actually contains valid data. That can hide
- * a lot of problems, e.g. if we were to wrongly mark a buffer,
- * that wasn't read according to the shortened-read, IO as valid,
- * the contents would look valid and we might miss a bug.
- *
- * To avoid that, iterate through the IOV and zero out the
- * "failed" portion of the IO.
- */
- for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+ if (processed + iov[i].iov_len <= new_result)
+ processed += iov[i].iov_len;
+ else if (processed <= new_result)
{
- if (processed + iov[i].iov_len <= new_result)
- processed += iov[i].iov_len;
- else if (processed <= new_result)
- {
- uint32 ok_part = new_result - processed;
-
- memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
- processed += iov[i].iov_len;
- }
- else
- {
- memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
- }
- }
+ uint32 ok_part = new_result - processed;
- ioh->result = new_result;
+ memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+ processed += iov[i].iov_len;
+ }
+ else
+ {
+ memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
+ }
}
+
+ ioh->result = new_result;
}
}
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+ inj_io_completion_wait_hook(name, private_data, arg);
+ inj_io_short_read_hook(name, private_data, arg);
+}
+
void
inj_io_reopen(const char *name, const void *private_data, void *arg)
{
@@ -762,6 +971,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
}
#endif
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = true;
+ inj_io_error_state->completion_wait_pid =
+ PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+ inj_io_error_state->completion_wait_relfilenode =
+ PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = false;
+ inj_io_error_state->completion_wait_pid = 0;
+ inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+ ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
Datum
inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -771,6 +1013,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
if (inj_io_error_state->short_read_result_set)
inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+ inj_io_error_state->short_read_pid =
+ PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1);
+ inj_io_error_state->short_read_relfilenode =
+ PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2);
#else
elog(ERROR, "injection points not supported");
#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 77e3c04144e..668faaa5615 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -303,6 +303,7 @@ BlockSampler
BlockSamplerData
BlockedProcData
BlockedProcsData
+BlocksReadStreamData
BlocktableEntry
BloomBuildState
BloomFilter
--
2.43.0
From d1eb014b043d71702bff6d1ba11e90c1e7f0c17a Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 3 Mar 2026 13:27:51 -0500
Subject: [PATCH v4 4/6] test fixes
The pump_until change is needed for the test to work reliably on BSD.
---
src/test/modules/test_aio/t/004_read_stream.pl | 15 ++++++++-------
src/test/modules/test_aio/test_aio--1.0.sql | 2 +-
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 89cfabbb1d3..f3fa018dd1c 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -35,7 +35,7 @@ $node->stop();
foreach my $method (TestAio::supported_io_methods())
{
- $node->adjust_conf('postgresql.conf', 'io_method', 'worker');
+ $node->adjust_conf('postgresql.conf', 'io_method', $method);
$node->start();
test_io_method($method, $node);
$node->stop();
@@ -205,15 +205,13 @@ SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5
$psql_a->{run}, $psql_a->{timeout},
\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
- $psql_b->{run}->pump_nb();
- like(
- $psql_b->{stderr},
- qr/.*ERROR.*could not read blocks 5..5.*$/,
- "$io_method: injected error occurred");
+ pump_until(
+ $psql_b->{run}, $psql_b->{timeout},
+ \$psql_b->{stderr}, qr/ERROR.*could not read blocks 5\.\.5/);
+ ok(1, "$io_method: injected error occurred");
$psql_b->{stderr} = '';
$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
-
ok(1,
qq/$io_method: read stream encounters failing IO by another backend/);
@@ -271,6 +269,9 @@ sub test_io_method
my $io_method = shift;
my $node = shift;
+ is($node->safe_psql('postgres', 'SHOW io_method'),
+ $io_method, "$io_method: io_method set correctly");
+
test_repeated_blocks($io_method, $node);
SKIP:
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index da7cc03829a..1cc4734a746 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
RETURNS pg_catalog.bool STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
--
2.43.0
From a893d82fe0accd77c807ca3b791713954e319a2c Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Fri, 23 Jan 2026 13:54:02 -0500
Subject: [PATCH v4 5/6] Make buffer hit helper
Already two places count buffer hits, requiring quite a few lines of
code since we do accounting in so many places. Future commits will add
more locations, so refactor into a helper.
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
---
src/backend/storage/buffer/bufmgr.c | 111 ++++++++++++++--------------
1 file changed, 56 insertions(+), 55 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index d1babaff023..a749971ba7e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -639,6 +639,10 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
bool *foundPtr, IOContext io_context);
static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress);
static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
+
+static pg_attribute_always_inline void CountBufferHit(BufferAccessStrategy strategy,
+ Relation rel, char persistence, SMgrRelation smgr,
+ ForkNumber forknum, BlockNumber blocknum);
static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
@@ -1217,8 +1221,6 @@ PinBufferForBlock(Relation rel,
bool *foundPtr)
{
BufferDesc *bufHdr;
- IOContext io_context;
- IOObject io_object;
Assert(blockNum != P_NEW);
@@ -1227,17 +1229,6 @@ PinBufferForBlock(Relation rel,
persistence == RELPERSISTENCE_PERMANENT ||
persistence == RELPERSISTENCE_UNLOGGED));
- if (persistence == RELPERSISTENCE_TEMP)
- {
- io_context = IOCONTEXT_NORMAL;
- io_object = IOOBJECT_TEMP_RELATION;
- }
- else
- {
- io_context = IOContextForStrategy(strategy);
- io_object = IOOBJECT_RELATION;
- }
-
TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
smgr->smgr_rlocator.locator.spcOid,
smgr->smgr_rlocator.locator.dbOid,
@@ -1245,18 +1236,11 @@ PinBufferForBlock(Relation rel,
smgr->smgr_rlocator.backend);
if (persistence == RELPERSISTENCE_TEMP)
- {
bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr);
- if (*foundPtr)
- pgBufferUsage.local_blks_hit++;
- }
else
- {
bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum,
- strategy, foundPtr, io_context);
- if (*foundPtr)
- pgBufferUsage.shared_blks_hit++;
- }
+ strategy, foundPtr, IOContextForStrategy(strategy));
+
if (rel)
{
/*
@@ -1265,22 +1249,10 @@ PinBufferForBlock(Relation rel,
* zeroed instead), the per-relation stats always count them.
*/
pgstat_count_buffer_read(rel);
- if (*foundPtr)
- pgstat_count_buffer_hit(rel);
}
- if (*foundPtr)
- {
- pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageHit;
- TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
- smgr->smgr_rlocator.locator.spcOid,
- smgr->smgr_rlocator.locator.dbOid,
- smgr->smgr_rlocator.locator.relNumber,
- smgr->smgr_rlocator.backend,
- true);
- }
+ if (*foundPtr)
+ CountBufferHit(strategy, rel, persistence, smgr, forkNum, blockNum);
return BufferDescriptorGetBuffer(bufHdr);
}
@@ -1686,6 +1658,51 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait)
return ReadBuffersCanStartIOOnce(buffer, nowait);
}
+/*
+ * We track various stats related to buffer hits. Because this is done in a
+ * few separate places, this helper exists for convenience.
+ */
+static pg_attribute_always_inline void
+CountBufferHit(BufferAccessStrategy strategy,
+ Relation rel, char persistence, SMgrRelation smgr,
+ ForkNumber forknum, BlockNumber blocknum)
+{
+ IOContext io_context;
+ IOObject io_object;
+
+ if (persistence == RELPERSISTENCE_TEMP)
+ {
+ io_context = IOCONTEXT_NORMAL;
+ io_object = IOOBJECT_TEMP_RELATION;
+ }
+ else
+ {
+ io_context = IOContextForStrategy(strategy);
+ io_object = IOOBJECT_RELATION;
+ }
+
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum,
+ blocknum,
+ smgr->smgr_rlocator.locator.spcOid,
+ smgr->smgr_rlocator.locator.dbOid,
+ smgr->smgr_rlocator.locator.relNumber,
+ smgr->smgr_rlocator.backend,
+ true);
+
+ if (persistence == RELPERSISTENCE_TEMP)
+ pgBufferUsage.local_blks_hit += 1;
+ else
+ pgBufferUsage.shared_blks_hit += 1;
+
+ if (rel)
+ pgstat_count_buffer_hit(rel);
+
+ pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageHit;
+}
+
/*
* Helper for WaitReadBuffers() that processes the results of a readv
* operation, raising an error if necessary.
@@ -1981,25 +1998,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
* must have started out as a miss in PinBufferForBlock(). The other
* backend will track this as a 'read'.
*/
- TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done,
- operation->smgr->smgr_rlocator.locator.spcOid,
- operation->smgr->smgr_rlocator.locator.dbOid,
- operation->smgr->smgr_rlocator.locator.relNumber,
- operation->smgr->smgr_rlocator.backend,
- true);
-
- if (persistence == RELPERSISTENCE_TEMP)
- pgBufferUsage.local_blks_hit += 1;
- else
- pgBufferUsage.shared_blks_hit += 1;
-
- if (operation->rel)
- pgstat_count_buffer_hit(operation->rel);
-
- pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageHit;
+ CountBufferHit(operation->strategy, operation->rel, persistence,
+ operation->smgr, forknum,
+ blocknum + operation->nblocks_done);
}
else
{
--
2.43.0
From d6a2d6d3316f33f5a7dfdd1a7084ea230d26ae3b Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Fri, 23 Jan 2026 14:00:31 -0500
Subject: [PATCH v4 6/6] Don't wait for already in-progress IO
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
When a backend attempts to start a read on a buffer and finds that I/O
is already in progress, it previously waited for that I/O to complete
before initiating reads for any other buffers. Although the backend must
still wait for the I/O to finish when later acquiring the buffer, it
should not need to wait at read start time. Other buffers may be
available for I/O, and in some workloads this waiting significantly
reduces concurrency.
For example, index scans may repeatedly request the same heap block. If
the backend waits each time it encounters an in-progress read, the
access pattern effectively degenerates into synchronous I/O. By
introducing the concept of foreign I/O operations, a backend can record
the buffer’s wait reference and defer waiting until WaitReadBuffers()
when it actually acquires the buffer.
In rare cases, a backend may still need to wait when starting a read if
it encounters a buffer after another backend has set BM_IO_IN_PROGRESS
but before the buffer descriptor’s wait reference has been set. Such
windows should be brief and uncommon.
Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
---
src/backend/storage/buffer/bufmgr.c | 485 ++++++++++++++++++----------
src/include/storage/bufmgr.h | 1 +
src/tools/pgindent/typedefs.list | 1 +
3 files changed, 324 insertions(+), 163 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a749971ba7e..f8205c3b845 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -170,6 +170,21 @@ typedef struct SMgrSortArray
SMgrRelation srel;
} SMgrSortArray;
+
+/*
+ * In AsyncReadBuffers(), when preparing a buffer for reading and setting
+ * BM_IO_IN_PROGRESS, the buffer may already have I/O in progress or may
+ * already contain the desired block. AsyncReadBuffers() must distinguish
+ * between these cases (and the case where it should initiate I/O) so it can
+ * mark an in-progress buffer as foreign I/O rather than waiting on it.
+ */
+typedef enum PrepareReadBuffer_Status
+{
+ READ_BUFFER_ALREADY_DONE,
+ READ_BUFFER_IN_PROGRESS,
+ READ_BUFFER_READY_FOR_IO,
+} PrepareReadBuffer_Status;
+
/* GUC variables */
bool zero_damaged_pages = false;
int bgwriter_lru_maxpages = 100;
@@ -1619,45 +1634,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
#endif
}
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
- if (BufferIsLocal(buffer))
- return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
- true, nowait);
- else
- return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
- /*
- * If this backend currently has staged IO, we need to submit the pending
- * IO before waiting for the right to issue IO, to avoid the potential for
- * deadlocks (and, more commonly, unnecessary delays for other backends).
- */
- if (!nowait && pgaio_have_staged())
- {
- if (ReadBuffersCanStartIOOnce(buffer, true))
- return true;
-
- /*
- * Unfortunately StartBufferIO() returning false doesn't allow to
- * distinguish between the buffer already being valid and IO already
- * being in progress. Since IO already being in progress is quite
- * rare, this approach seems fine.
- */
- pgaio_submit_staged();
- }
-
- return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
/*
* We track various stats related to buffer hits. Because this is done in a
* few separate places, this helper exists for convenience.
@@ -1807,7 +1783,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
*
* we first check if we already know the IO is complete.
*/
- if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+ if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
!pgaio_wref_check_done(&operation->io_wref))
{
instr_time io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1826,11 +1802,33 @@ WaitReadBuffers(ReadBuffersOperation *operation)
Assert(pgaio_wref_check_done(&operation->io_wref));
}
- /*
- * We now are sure the IO completed. Check the results. This
- * includes reporting on errors if there were any.
- */
- ProcessReadBuffersResult(operation);
+ if (unlikely(operation->foreign_io))
+ {
+ Buffer buffer = operation->buffers[operation->nblocks_done];
+ BufferDesc *desc = BufferIsLocal(buffer) ?
+ GetLocalBufferDescriptor(-buffer - 1) :
+ GetBufferDescriptor(buffer - 1);
+ uint32 buf_state = pg_atomic_read_u64(&desc->state);
+
+ if (buf_state & BM_VALID)
+ {
+ operation->nblocks_done += 1;
+ Assert(operation->nblocks_done <= operation->nblocks);
+
+ CountBufferHit(operation->strategy,
+ operation->rel, operation->persistence,
+ operation->smgr, operation->forknum,
+ operation->blocknum + operation->nblocks_done);
+ }
+ }
+ else
+ {
+ /*
+ * We now are sure the IO completed. Check the results. This
+ * includes reporting on errors if there were any.
+ */
+ ProcessReadBuffersResult(operation);
+ }
}
/*
@@ -1861,6 +1859,163 @@ WaitReadBuffers(ReadBuffersOperation *operation)
/* NB: READ_DONE tracepoint was already executed in completion callback */
}
+/*
+ * Local version of PrepareNewReadBufferIO(). Here instead of localbuf.c to
+ * avoid an external function call.
+ */
+static PrepareReadBuffer_Status
+PrepareNewLocalReadBufferIO(ReadBuffersOperation *operation,
+ Buffer buffer)
+{
+ BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1);
+ uint64 buf_state = pg_atomic_read_u64(&desc->state);
+
+ /* Already valid, no work to do */
+ if (buf_state & BM_VALID)
+ {
+ pgaio_wref_clear(&operation->io_wref);
+ return READ_BUFFER_ALREADY_DONE;
+ }
+
+ pgaio_submit_staged();
+
+ if (pgaio_wref_valid(&desc->io_wref))
+ {
+ operation->io_wref = desc->io_wref;
+ operation->foreign_io = true;
+ return READ_BUFFER_IN_PROGRESS;
+ }
+
+ /*
+ * While it is possible for a buffer to have been prepared for IO but not
+ * yet had its wait reference set, there's no way for us to know that for
+ * temporary buffers. Thus, we'll prepare for own IO on this buffer.
+ */
+ return READ_BUFFER_READY_FOR_IO;
+}
+
+/*
+ * Try to start IO on the first buffer in a new run of blocks. If AIO is in
+ * progress, be it in this backend or another backend, we just associate the
+ * wait reference with the operation and wait in WaitReadBuffers(). This turns
+ * out to be important for performance in two workloads:
+ *
+ * 1) A read stream that has to read the same block multiple times within the
+ * readahead distance. This can happen e.g. for the table accesses of an
+ * index scan.
+ *
+ * 2) Concurrent scans by multiple backends on the same relation.
+ *
+ * If we were to synchronously wait for the in-progress IO, we'd not be able
+ * to keep enough I/O in flight.
+ *
+ * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+ * ReadBuffersOperation that WaitReadBuffers then can wait on.
+ *
+ * It's possible that another backend has started IO on the buffer but not yet
+ * set its wait reference. In this case, we have no choice but to wait for
+ * either the wait reference to be valid or the IO to be done.
+ */
+static PrepareReadBuffer_Status
+PrepareNewReadBufferIO(ReadBuffersOperation *operation,
+ Buffer buffer)
+{
+ uint64 buf_state;
+ BufferDesc *desc;
+
+ if (BufferIsLocal(buffer))
+ return PrepareNewLocalReadBufferIO(operation, buffer);
+
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+ desc = GetBufferDescriptor(buffer - 1);
+
+ for (;;)
+ {
+ buf_state = LockBufHdr(desc);
+
+ /* Already valid, no work to do */
+ if (buf_state & BM_VALID)
+ {
+ UnlockBufHdr(desc);
+ pgaio_wref_clear(&operation->io_wref);
+ return READ_BUFFER_ALREADY_DONE;
+ }
+
+ if (buf_state & BM_IO_IN_PROGRESS)
+ {
+ /* Join existing read */
+ if (pgaio_wref_valid(&desc->io_wref))
+ {
+ operation->io_wref = desc->io_wref;
+ operation->foreign_io = true;
+ UnlockBufHdr(desc);
+ return READ_BUFFER_IN_PROGRESS;
+ }
+
+ /*
+ * If the wait ref is not valid but the IO is in progress, someone
+ * else started IO but hasn't set the wait ref yet. We have no
+ * choice but to wait until the IO completes.
+ */
+ UnlockBufHdr(desc);
+ pgaio_submit_staged();
+ WaitIO(desc);
+ continue;
+ }
+
+ /*
+ * No IO in progress and not already valid; We will start IO. It's
+ * possible that the IO was in progress and never became valid because
+ * the IO errored out. We'll do the IO ourselves.
+ */
+ UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+ ResourceOwnerRememberBufferIO(CurrentResourceOwner,
+ BufferDescriptorGetBuffer(desc));
+
+ return READ_BUFFER_READY_FOR_IO;
+ }
+}
+
+
+/*
+ * When building a new IO from multiple buffers, we won't include buffers
+ * that are already valid or already in progress. This function should only be
+ * used for additional adjacent buffers following the head buffer in a new IO.
+ *
+ * Returns true if the buffer was successfully prepared for IO and false if it
+ * is rejected and the read IO should not include this buffer.
+*/
+static bool
+PrepareAdditionalReadBuffer(Buffer buffer)
+{
+ uint64 buf_state;
+ BufferDesc *desc;
+
+ if (BufferIsLocal(buffer))
+ {
+ desc = GetLocalBufferDescriptor(-buffer - 1);
+ buf_state = pg_atomic_read_u64(&desc->state);
+ /* Local buffers don't use BM_IO_IN_PROGRESS */
+ if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref))
+ return false;
+ }
+ else
+ {
+ ResourceOwnerEnlarge(CurrentResourceOwner);
+ desc = GetBufferDescriptor(buffer - 1);
+ buf_state = LockBufHdr(desc);
+ if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS))
+ {
+ UnlockBufHdr(desc);
+ return false;
+ }
+ UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0);
+ ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer);
+ }
+
+ return true;
+}
+
/*
* Initiate IO for the ReadBuffersOperation
*
@@ -1894,7 +2049,75 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
void *io_pages[MAX_IO_COMBINE_LIMIT];
IOContext io_context;
IOObject io_object;
- bool did_start_io;
+ instr_time io_start;
+ PrepareReadBuffer_Status status;
+
+ /*
+ * We must get an IO handle before StartNewBufferReadIO(), as
+ * pgaio_io_acquire() might block, which we don't want after setting
+ * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the
+ * handle.
+ *
+ * If we need to wait for IO before we can get a handle, submit
+ * already-staged IO first, so that other backends don't need to wait.
+ * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
+ * wait for already submitted IO, which doesn't require additional locks,
+ * but it could still cause undesirable waits.
+ *
+ * A secondary benefit is that this would allow us to measure the time in
+ * pgaio_io_acquire() without causing undue timer overhead in the common,
+ * non-blocking, case. However, currently the pgstats infrastructure
+ * doesn't really allow that, as it a) asserts that an operation can't
+ * have time without operations b) doesn't have an API to report
+ * "accumulated" time.
+ */
+ ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
+ if (unlikely(!ioh))
+ {
+ pgaio_submit_staged();
+ ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
+ }
+
+ operation->foreign_io = false;
+
+ /* Check if we can start IO on the first to-be-read buffer */
+ if ((status = PrepareNewReadBufferIO(operation, buffers[nblocks_done])) <
+ READ_BUFFER_READY_FOR_IO)
+ {
+ pgaio_io_release(ioh);
+ *nblocks_progress = 1;
+ if (status == READ_BUFFER_ALREADY_DONE)
+ {
+ /*
+ * Someone else has already completed this block, we're done.
+ *
+ * When IO is necessary, ->nblocks_done is updated in
+ * ProcessReadBuffersResult(), but that is not called if no IO is
+ * necessary. Thus update here.
+ */
+ operation->nblocks_done += 1;
+ Assert(operation->nblocks_done <= operation->nblocks);
+
+ /*
+ * Report and track this as a 'hit' for this backend, even though
+ * it must have started out as a miss in PinBufferForBlock(). The
+ * other backend will track this as a 'read'.
+ */
+ CountBufferHit(operation->strategy,
+ operation->rel, operation->persistence,
+ operation->smgr, operation->forknum,
+ operation->blocknum + operation->nblocks_done);
+ return false;
+ }
+
+ /* The IO is already in-progress */
+ Assert(status == READ_BUFFER_IN_PROGRESS);
+ CheckReadBuffersOperation(operation, false);
+ return true;
+ }
+
+ /* We can read in at least the head buffer . */
+ Assert(status == READ_BUFFER_READY_FOR_IO);
/*
* When this IO is executed synchronously, either because the caller will
@@ -1945,138 +2168,74 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
*/
pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
- /*
- * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
- * might block, which we don't want after setting IO_IN_PROGRESS.
- *
- * If we need to wait for IO before we can get a handle, submit
- * already-staged IO first, so that other backends don't need to wait.
- * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
- * wait for already submitted IO, which doesn't require additional locks,
- * but it could still cause undesirable waits.
- *
- * A secondary benefit is that this would allow us to measure the time in
- * pgaio_io_acquire() without causing undue timer overhead in the common,
- * non-blocking, case. However, currently the pgstats infrastructure
- * doesn't really allow that, as it a) asserts that an operation can't
- * have time without operations b) doesn't have an API to report
- * "accumulated" time.
- */
- ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
- if (unlikely(!ioh))
- {
- pgaio_submit_staged();
-
- ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
- }
+ Assert(io_buffers[0] == buffers[nblocks_done]);
+ io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
+ io_buffers_len = 1;
/*
- * Check if we can start IO on the first to-be-read buffer.
- *
- * If an I/O is already in progress in another backend, we want to wait
- * for the outcome: either done, or something went wrong and we will
- * retry.
+ * How many neighboring-on-disk blocks can we scatter-read into other
+ * buffers at the same time? In this case we don't wait if we see an I/O
+ * already in progress. We already set BM_IO_IN_PROGRESS for the head
+ * block, so we should get on with that I/O as soon as possible.
*/
- if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+ for (int i = nblocks_done + 1; i < operation->nblocks; i++)
{
- /*
- * Someone else has already completed this block, we're done.
- *
- * When IO is necessary, ->nblocks_done is updated in
- * ProcessReadBuffersResult(), but that is not called if no IO is
- * necessary. Thus update here.
- */
- operation->nblocks_done += 1;
- *nblocks_progress = 1;
-
- pgaio_io_release(ioh);
- pgaio_wref_clear(&operation->io_wref);
- did_start_io = false;
+ if (!PrepareAdditionalReadBuffer(buffers[i]))
+ break;
+ /* Must be consecutive block numbers. */
+ Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+ BufferGetBlockNumber(buffers[i]) - 1);
+ Assert(io_buffers[io_buffers_len] == buffers[i]);
- /*
- * Report and track this as a 'hit' for this backend, even though it
- * must have started out as a miss in PinBufferForBlock(). The other
- * backend will track this as a 'read'.
- */
- CountBufferHit(operation->strategy, operation->rel, persistence,
- operation->smgr, forknum,
- blocknum + operation->nblocks_done);
+ io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
}
- else
- {
- instr_time io_start;
-
- /* We found a buffer that we need to read in. */
- Assert(io_buffers[0] == buffers[nblocks_done]);
- io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
- io_buffers_len = 1;
-
- /*
- * How many neighboring-on-disk blocks can we scatter-read into other
- * buffers at the same time? In this case we don't wait if we see an
- * I/O already in progress. We already set BM_IO_IN_PROGRESS for the
- * head block, so we should get on with that I/O as soon as possible.
- */
- for (int i = nblocks_done + 1; i < operation->nblocks; i++)
- {
- if (!ReadBuffersCanStartIO(buffers[i], true))
- break;
- /* Must be consecutive block numbers. */
- Assert(BufferGetBlockNumber(buffers[i - 1]) ==
- BufferGetBlockNumber(buffers[i]) - 1);
- Assert(io_buffers[io_buffers_len] == buffers[i]);
- io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
- }
+ /* get a reference to wait for in WaitReadBuffers() */
+ pgaio_io_get_wref(ioh, &operation->io_wref);
- /* get a reference to wait for in WaitReadBuffers() */
- pgaio_io_get_wref(ioh, &operation->io_wref);
+ /* provide the list of buffers to the completion callbacks */
+ pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
- /* provide the list of buffers to the completion callbacks */
- pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+ pgaio_io_register_callbacks(ioh,
+ persistence == RELPERSISTENCE_TEMP ?
+ PGAIO_HCB_LOCAL_BUFFER_READV :
+ PGAIO_HCB_SHARED_BUFFER_READV,
+ flags);
- pgaio_io_register_callbacks(ioh,
- persistence == RELPERSISTENCE_TEMP ?
- PGAIO_HCB_LOCAL_BUFFER_READV :
- PGAIO_HCB_SHARED_BUFFER_READV,
- flags);
+ pgaio_io_set_flag(ioh, ioh_flags);
- pgaio_io_set_flag(ioh, ioh_flags);
+ /* ---
+ * Even though we're trying to issue IO asynchronously, track the time
+ * in smgrstartreadv():
+ * - if io_method == IOMETHOD_SYNC, we will always perform the IO
+ * immediately
+ * - the io method might not support the IO (e.g. worker IO for a temp
+ * table)
+ * ---
+ */
+ io_start = pgstat_prepare_io_time(track_io_timing);
+ smgrstartreadv(ioh, operation->smgr, forknum,
+ blocknum + nblocks_done,
+ io_pages, io_buffers_len);
+ pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+ io_start, 1, io_buffers_len * BLCKSZ);
- /* ---
- * Even though we're trying to issue IO asynchronously, track the time
- * in smgrstartreadv():
- * - if io_method == IOMETHOD_SYNC, we will always perform the IO
- * immediately
- * - the io method might not support the IO (e.g. worker IO for a temp
- * table)
- * ---
- */
- io_start = pgstat_prepare_io_time(track_io_timing);
- smgrstartreadv(ioh, operation->smgr, forknum,
- blocknum + nblocks_done,
- io_pages, io_buffers_len);
- pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
- io_start, 1, io_buffers_len * BLCKSZ);
-
- if (persistence == RELPERSISTENCE_TEMP)
- pgBufferUsage.local_blks_read += io_buffers_len;
- else
- pgBufferUsage.shared_blks_read += io_buffers_len;
+ if (persistence == RELPERSISTENCE_TEMP)
+ pgBufferUsage.local_blks_read += io_buffers_len;
+ else
+ pgBufferUsage.shared_blks_read += io_buffers_len;
- /*
- * Track vacuum cost when issuing IO, not after waiting for it.
- * Otherwise we could end up issuing a lot of IO in a short timespan,
- * despite a low cost limit.
- */
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+ /*
+ * Track vacuum cost when issuing IO, not after waiting for it. Otherwise
+ * we could end up issuing a lot of IO in a short timespan, despite a low
+ * cost limit.
+ */
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
- *nblocks_progress = io_buffers_len;
- did_start_io = true;
- }
+ *nblocks_progress = io_buffers_len;
- return did_start_io;
+ return true;
}
/*
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a40adf6b2a8..1358fc7fa64 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -147,6 +147,7 @@ struct ReadBuffersOperation
int flags;
int16 nblocks;
int16 nblocks_done;
+ bool foreign_io;
PgAioWaitRef io_wref;
PgAioReturn io_return;
};
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 668faaa5615..a656bbf9110 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2345,6 +2345,7 @@ PredicateLockData
PredicateLockTargetType
PrefetchBufferResult
PrepParallelRestorePtrType
+PrepareReadBuffer_Status
PrepareStmt
PreparedStatement
PresortedKeyData
--
2.43.0