On Wed, Jun 18, 2025 at 2:39 PM Andrey Borodin <x4...@yandex-team.ru> wrote: > > > > > On 18 Jun 2025, at 11:33, Arseniy Mukhin <arseniy.mukhin....@gmail.com> > > wrote: > > > > Interesting, I used btree check as reference when started > > writing brin check, and in btree check there 53 > > ERRCODE_INDEX_CORRUPTED ereports and only 1 ERRCODE_DATA_CORRUPTED > > ereport. So it was very hard to do, but I managed to pick the wrong > > one. I wonder if this btree check ereport should also be changed to > > ERRCODE_INDEX_CORRUPTED? > > It's there in a case of heapallindexes failure. I concur that > ERRCODE_INDEX_CORRUPTED is more appropriate in that case in verify_nbtree.c. > But I recollect Peter explained this code before somewhere in pgsql-hackers. > And the reasoning was something like "if you lack a tuple in unquie > constraints - it's almost certainly subsequent constrain violation and data > loss". But I'm not sure. > And I could not find this discussion in archives. >
There is a thread about heapallindexed feature [1], I guess this is a one you mentioned? Also it turned out that this error code is explained in the code comment: * Since the overall structure of the index has already been verified, the most * likely explanation for error here is a corrupt heap page (could be logical * or physical corruption). ... Now I wonder if we need to use ERRCODE_DATA_CORRUPTED in the 'heap all consistent' part? It's similar to btree heapallindexed check. We have a structurally consistent index, but for some reason it is not consistent with the heap. It seems to me it's impossible to say who we should blame here. I leave ERRCODE_INDEX_CORRUPTED for now. I noticed that fixes about year and error codes didn't get to the last version for some reason, so there is a new version with fixes. Also I changed the 'heap all consistent' error message "Index %s is corrupted" -> "Index %s is not consistent with the heap". New message looks less misleading as we don't know where the problem is. Thanks! [1] https://www.postgresql.org/message-id/CAH2-WzmVKiwcNrhYFH9CTLLcmQTMH_xjW%3DAvxfDKAftmY47QKw%40mail.gmail.com Best regards, Arseniy Mukhin
From 6cb2e5f42326d079645f47873b37ccd9549bc818 Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin <arseniy.mukhin....@gmail.com> Date: Mon, 16 Jun 2025 18:11:27 +0300 Subject: [PATCH v4 2/3] amcheck: brin_index_check() - index structure check Adds a new function brin_index_check() for validating BRIN indexes. It incudes next checks: - meta page checks - revmap pointers is valid and points to index tuples with expected range blkno - index tuples have expected format - some special checks for empty_ranges - every index tuple has corresponding revmap item that points to it (optional) --- contrib/amcheck/Makefile | 5 +- contrib/amcheck/amcheck--1.5--1.6.sql | 18 + contrib/amcheck/amcheck.control | 2 +- contrib/amcheck/expected/check_brin.out | 134 ++++ contrib/amcheck/meson.build | 4 + contrib/amcheck/sql/check_brin.sql | 102 +++ contrib/amcheck/t/007_verify_brin.pl | 291 +++++++++ contrib/amcheck/verify_brin.c | 807 ++++++++++++++++++++++++ 8 files changed, 1360 insertions(+), 3 deletions(-) create mode 100644 contrib/amcheck/amcheck--1.5--1.6.sql create mode 100644 contrib/amcheck/expected/check_brin.out create mode 100644 contrib/amcheck/sql/check_brin.sql create mode 100644 contrib/amcheck/t/007_verify_brin.pl create mode 100644 contrib/amcheck/verify_brin.c diff --git a/contrib/amcheck/Makefile b/contrib/amcheck/Makefile index 1b7a63cbaa4..bdfb274c89c 100644 --- a/contrib/amcheck/Makefile +++ b/contrib/amcheck/Makefile @@ -6,11 +6,12 @@ OBJS = \ verify_common.o \ verify_gin.o \ verify_heapam.o \ - verify_nbtree.o + verify_nbtree.o \ + verify_brin.o EXTENSION = amcheck DATA = amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql \ - amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql + amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql amcheck--1.5--1.6.sql PGFILEDESC = "amcheck - function for verifying relation integrity" REGRESS = check check_btree check_gin check_heap diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql new file mode 100644 index 00000000000..9ec046bb1cf --- /dev/null +++ b/contrib/amcheck/amcheck--1.5--1.6.sql @@ -0,0 +1,18 @@ +/* contrib/amcheck/amcheck--1.5--1.6.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "ALTER EXTENSION amcheck UPDATE TO '1.6'" to load this file. \quit + + +-- +-- brin_index_check() +-- +CREATE FUNCTION brin_index_check(index regclass, + regular_pages_check boolean default false +) + RETURNS VOID +AS 'MODULE_PATHNAME', 'brin_index_check' +LANGUAGE C STRICT PARALLEL RESTRICTED; + +-- We don't want this to be available to public +REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean) FROM PUBLIC; \ No newline at end of file diff --git a/contrib/amcheck/amcheck.control b/contrib/amcheck/amcheck.control index c8ba6d7c9bc..2f329ef2cf4 100644 --- a/contrib/amcheck/amcheck.control +++ b/contrib/amcheck/amcheck.control @@ -1,5 +1,5 @@ # amcheck extension comment = 'functions for verifying relation integrity' -default_version = '1.5' +default_version = '1.6' module_pathname = '$libdir/amcheck' relocatable = true diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out new file mode 100644 index 00000000000..bebca93d32f --- /dev/null +++ b/contrib/amcheck/expected/check_brin.out @@ -0,0 +1,134 @@ +-- helper func +CREATE OR REPLACE FUNCTION random_string( INT ) RETURNS TEXT AS $$ +SELECT string_agg(substring('0123456789abcdefghijklmnopqrstuvwxyz', ceil(random() * 36)::INTEGER, 1), '') FROM generate_series(1, $1); +$$ LANGUAGE sql; +-- empty table index should be valid +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- cleanup +DROP TABLE brintest; +-- multi attributes with varlena attribute test +CREATE TABLE brintest (id BIGSERIAL, a TEXT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,5000) x; +-- create some empty ranges +DELETE FROM brintest WHERE id > 2000 AND id < 4000; +SELECT brin_index_check('brintest_idx'::REGCLASS); + brin_index_check +------------------ + +(1 row) + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS); + brin_index_check +------------------ + +(1 row) + +-- cleanup +DROP TABLE brintest; +-- min_max opclass +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; +-- create some empty ranges +DELETE FROM brintest WHERE a > 20000 AND a < 40000; +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- cleanup +DROP TABLE brintest; +-- multi_min_max opclass +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; +-- create some empty ranges +DELETE FROM brintest WHERE a > 20000 AND a < 40000; +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- cleanup +DROP TABLE brintest; +-- bloom opclass +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; +-- create some empty ranges +DELETE FROM brintest WHERE a > 20000 AND a < 40000; +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- cleanup +DROP TABLE brintest; +-- inclusion opclass +CREATE TABLE brintest (id SERIAL PRIMARY KEY, a BOX); +CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) +SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000)) +FROM generate_series(1, 10000); +-- create some empty ranges +DELETE FROM brintest WHERE id > 2000 AND id < 4000; +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + brin_index_check +------------------ + +(1 row) + +-- cleanup +DROP TABLE brintest; +-- cleanup +DROP FUNCTION random_string; diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build index 1f0c347ed54..ba816c2faf0 100644 --- a/contrib/amcheck/meson.build +++ b/contrib/amcheck/meson.build @@ -5,6 +5,7 @@ amcheck_sources = files( 'verify_gin.c', 'verify_heapam.c', 'verify_nbtree.c', + 'verify_brin.c' ) if host_system == 'windows' @@ -27,6 +28,7 @@ install_data( 'amcheck--1.2--1.3.sql', 'amcheck--1.3--1.4.sql', 'amcheck--1.4--1.5.sql', + 'amcheck--1.5--1.6.sql', kwargs: contrib_data_args, ) @@ -40,6 +42,7 @@ tests += { 'check_btree', 'check_gin', 'check_heap', + 'check_brin' ], }, 'tap': { @@ -50,6 +53,7 @@ tests += { 't/004_verify_nbtree_unique.pl', 't/005_pitr.pl', 't/006_verify_gin.pl', + 't/007_verify_brin.pl', ], }, } diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql new file mode 100644 index 00000000000..0a5e26ea8f5 --- /dev/null +++ b/contrib/amcheck/sql/check_brin.sql @@ -0,0 +1,102 @@ +-- helper func +CREATE OR REPLACE FUNCTION random_string( INT ) RETURNS TEXT AS $$ +SELECT string_agg(substring('0123456789abcdefghijklmnopqrstuvwxyz', ceil(random() * 36)::INTEGER, 1), '') FROM generate_series(1, $1); +$$ LANGUAGE sql; + + +-- empty table index should be valid +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); +-- cleanup +DROP TABLE brintest; + + +-- multi attributes with varlena attribute test +CREATE TABLE brintest (id BIGSERIAL, a TEXT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,5000) x; +-- create some empty ranges +DELETE FROM brintest WHERE id > 2000 AND id < 4000; +SELECT brin_index_check('brintest_idx'::REGCLASS); + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS); +-- cleanup +DROP TABLE brintest; + + + +-- min_max opclass +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; +-- create some empty ranges +DELETE FROM brintest WHERE a > 20000 AND a < 40000; +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); +-- cleanup +DROP TABLE brintest; + + + +-- multi_min_max opclass +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; +-- create some empty ranges +DELETE FROM brintest WHERE a > 20000 AND a < 40000; +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); +-- cleanup +DROP TABLE brintest; + + + +-- bloom opclass +CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; +-- create some empty ranges +DELETE FROM brintest WHERE a > 20000 AND a < 40000; +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); +-- cleanup +DROP TABLE brintest; + + +-- inclusion opclass +CREATE TABLE brintest (id SERIAL PRIMARY KEY, a BOX); +CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); +INSERT INTO brintest (a) +SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000)) +FROM generate_series(1, 10000); +-- create some empty ranges +DELETE FROM brintest WHERE id > 2000 AND id < 4000; + +SELECT brin_index_check('brintest_idx'::REGCLASS, true); + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); +SELECT brin_index_check('brintest_idx'::REGCLASS, true); +-- cleanup +DROP TABLE brintest; + + +-- cleanup +DROP FUNCTION random_string; \ No newline at end of file diff --git a/contrib/amcheck/t/007_verify_brin.pl b/contrib/amcheck/t/007_verify_brin.pl new file mode 100644 index 00000000000..2c62b76cc70 --- /dev/null +++ b/contrib/amcheck/t/007_verify_brin.pl @@ -0,0 +1,291 @@ +# Copyright (c) 2021-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +my $node; +my $blksize; +my $meta_page_blkno = 0; + +# +# Test set-up +# +$node = PostgreSQL::Test::Cluster->new('test'); +$node->init(no_data_checksums => 1); +$node->append_conf('postgresql.conf', 'autovacuum=off'); +$node->start; +$blksize = int($node->safe_psql('postgres', 'SHOW block_size;')); +$node->safe_psql('postgres', q(CREATE EXTENSION amcheck)); + +# Tests +my @tests = ( + { + # invalid meta page type + + find => pack('S', 0xF091), + replace => pack('S', 0xAAAA), + blkno => $meta_page_blkno, + expected => wrap('metapage is corrupted') + }, + { + # invalid meta page magic word + + find => pack('L', 0xA8109CFA), + replace => pack('L', 0xBB109CFB), + blkno => $meta_page_blkno, + expected => wrap('metapage is corrupted'), + }, + { + # invalid meta page index version + + find => pack('L*', 0xA8109CFA, 1), + replace => pack('L*', 0xA8109CFA, 2), + blkno => $meta_page_blkno, + expected => wrap('metapage is corrupted') + }, + { + # pages_per_range below lower limit + + find => pack('L*', 0xA8109CFA, 1, 128), + replace => pack('L*', 0xA8109CFA, 1, 0), + blkno => $meta_page_blkno, + expected => wrap('metapage is corrupted') + }, + { + # pages_per_range above upper limit + + find => pack('L*', 0xA8109CFA, 1, 128), + replace => pack('L*', 0xA8109CFA, 1, 131073), + blkno => $meta_page_blkno, + expected => wrap('metapage is corrupted') + }, + { + # last_revmap_page below lower limit + + find => pack('L*', 0xA8109CFA, 1, 128, 1), + replace => pack('L*', 0xA8109CFA, 1, 128, 0), + blkno => $meta_page_blkno, + expected => wrap('metapage is corrupted'), + }, + { + + # last_revmap_page beyond index relation size + + find => pack('L*', 0xA8109CFA, 1, 128, 1), + replace => pack('L*', 0xA8109CFA, 1, 128, 100), + blkno => $meta_page_blkno, + expected => wrap('metapage is corrupted'), + }, + { + # invalid revmap page type + + find => pack('S', 0xF092), + replace => pack('S', 0xAAAA), + blkno => 1, # revmap page + expected => wrap('revmap page is expected at block 1, last revmap page 1'), + }, + { + # revmap item points beyond index relation size + # replace (2,1) with (100,1) + + find => pack('S*', 0, 2, 1), + replace => pack('S*', 0, 100, 1), + blkno => 1, # revmap page + expected => wrap('revmap item points to a non existing block 100, ' + . 'index max block 2. Range blkno: 0, revmap item: (1,0)') + }, + { + # invalid regular page type + + find => pack('S', 0xF093), + replace => pack('S', 0xAAAA), + blkno => 2, # regular page + expected => wrap('revmap item points to the page which is not regular (blkno: 2). ' + . 'Range blkno: 0, revmap item: (1,0)') + }, + { + # revmap item points beyond regular page max offset + # replace (2,1) with (2,2) + + find => pack('S*', 0, 2, 1), + replace => pack('S*', 0, 2, 2), + blkno => 1, # revmap page + expected => wrap('revmap item offset number 2 is greater than regular page 2 max offset 1. ' + . 'Range blkno: 0, revmap item: (1,0)') + }, + { + # invalid index tuple range blkno + + find => pack('LCC', 0, 0xA8, 0x01), + replace => pack('LCC', 1, 0xA8, 0x01), + blkno => 2, # regular page + expected => wrap('index tuple has invalid blkno 1. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)') + }, + { + # range beyond the table size and is not empty + + find => pack('LCC', 0, 0xA8, 0x01), + replace => pack('LCC', 0, 0x88, 0x01), + blkno => 2, # regular page + expected => wrap('the range is beyond the table size, but is not marked as empty, table size: 0 blocks. ' + . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)') + }, + { + # corrupt index tuple data offset + # here 0x00, 0x00, 0x00 is padding and '.' is varlena len byte + + find => pack('LCCCC', 0, 0x08, 0x00, 0x00, 0x00) . '(.)' . 'aaaaa', + replace => pack('LCCCC', 0, 0x1F, 0x00, 0x00, 0x00) . '$1' . 'aaaaa', + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa');); + }, + expected => qr/index tuple header length 31 is greater than tuple len ..\. \QRange blkno: 0, revmap item: (1,0), index tuple: (2,1)\E/ + }, + { + # empty range index tuple doesn't have null bitmap + + find => pack('LCC', 0, 0xA8, 0x01), + replace => pack('LCC', 0, 0x28, 0x01), + blkno => 2, # regular page + expected => wrap('empty range index tuple doesn\'t have null bitmap. ' + . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)') + }, + { + # empty range index tuple all_nulls -> false + + find => pack('LCC', 0, 0xA8, 0x01), + replace => pack('LCC', 0, 0xA8, 0x00), + blkno => 2, # regular page + expected => wrap('empty range index tuple attribute 0 with allnulls is false. ' + . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)') + }, + { + # empty range index tuple has_nulls -> true + + find => pack('LCC', 0, 0xA8, 0x01), + replace => pack('LCC', 0, 0xA8, 0x03), + blkno => 2, # regular page + expected => wrap('empty range index tuple attribute 0 with hasnulls is true. ' + . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)') + }, + { + # invalid index tuple data + # replace varlena len with FF - should work with any endianness + + find => pack('LCCCC', 0, 0x08, 0x00, 0x00, 0x00) . '.' . 'aaaaa', + replace => pack('LCCCCC', 0, 0x08, 0x00, 0x00, 0x00, 0xFF) . 'aaaaa', + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa');); + }, + expected => qr/attribute 0 stored value 0 with length -1 ends at offset 127 beyond total tuple length ..\.\Q Range blkno: 0, revmap item: (1,0), index tuple: (2,1)\E/ + }, + { + # orphan index tuple + # replace valid revmap item with (0,0) + + find => pack('S*', 0, 2, 1), + replace => pack('S*', 0, 0, 0), + blkno => 1, # revmap page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa');); + }, + expected => wrap("revmap doesn't point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)") + } +); + + +# init test data +my $i = 1; +foreach my $test_struct (@tests) { + + $test_struct->{table_name} = 't' . $i++; + $test_struct->{index_name} = $test_struct->{table_name} . '_brin_idx'; + + my $test_data_sql = ''; + if (exists $test_struct->{table_data}) { + $test_data_sql = $test_struct->{table_data}->($test_struct); + } + + $node->safe_psql('postgres', qq( + CREATE TABLE $test_struct->{table_name} (a TEXT); + $test_data_sql + CREATE INDEX $test_struct->{index_name} ON $test_struct->{table_name} USING BRIN (a); + )); + + $test_struct->{relpath} = relation_filepath($test_struct->{index_name}); +} + +# corrupt index +$node->stop; + +foreach my $test_struct (@tests) { + string_replace_block( + $test_struct->{relpath}, + $test_struct->{find}, + $test_struct->{replace}, + $test_struct->{blkno} + ); +} + +# assertions +$node->start; + +foreach my $test_struct (@tests) { + my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true))); + like($stderr, $test_struct->{expected}); +} + + +# Helpers + +# Returns the filesystem path for the named relation. +sub relation_filepath { + my ($relname) = @_; + + my $pgdata = $node->data_dir; + my $rel = $node->safe_psql('postgres', + qq(SELECT pg_relation_filepath('$relname'))); + die "path not found for relation $relname" unless defined $rel; + return "$pgdata/$rel"; +} + +sub string_replace_block { + my ($filename, $find, $replace, $blkno) = @_; + + my $fh; + open($fh, '+<', $filename) or BAIL_OUT("open failed: $!"); + binmode $fh; + + my $offset = $blkno * $blksize; + my $buffer; + + sysseek($fh, $offset, 0) or BAIL_OUT("seek failed: $!"); + sysread($fh, $buffer, $blksize) or BAIL_OUT("read failed: $!"); + + $buffer =~ s/$find/'"' . $replace . '"'/gee; + + sysseek($fh, $offset, 0) or BAIL_OUT("seek failed: $!"); + syswrite($fh, $buffer) or BAIL_OUT("write failed: $!"); + + close($fh) or BAIL_OUT("close failed: $!"); + + return; +} + +sub wrap +{ + my $input = @_; + return qr/\Q$input\E/ +} + +done_testing(); \ No newline at end of file diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c new file mode 100644 index 00000000000..9d5a1caebf6 --- /dev/null +++ b/contrib/amcheck/verify_brin.c @@ -0,0 +1,807 @@ +/*------------------------------------------------------------------------- + * + * verify_brin.c + * Functions to check postgresql brin indexes for corruption + * + * Copyright (c) 2016-2025, PostgreSQL Global Development Group + * + * contrib/amcheck/verify_brin.c + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/transam.h" +#include "access/brin.h" +#include "catalog/index.h" +#include "catalog/pg_am_d.h" +#include "catalog/pg_operator.h" +#include "miscadmin.h" +#include "storage/lmgr.h" +#include "storage/smgr.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "access/brin_page.h" +#include "access/brin_revmap.h" +#include "utils/lsyscache.h" +#include "verify_common.h" +#include "utils/builtins.h" +#include "utils/array.h" + + +PG_FUNCTION_INFO_V1(brin_index_check); + +typedef struct BrinCheckState +{ + + /* Check arguments */ + + bool regular_pages_check; + + /* BRIN check common fields */ + + Relation idxrel; + Relation heaprel; + BrinDesc *bdesc; + int natts; + BlockNumber pagesPerRange; + + /* Index structure check fields */ + + BufferAccessStrategy checkstrategy; + BlockNumber idxnblocks; + BlockNumber heapnblocks; + BlockNumber lastRevmapPage; + /* Current range blkno */ + BlockNumber rangeBlkno; + /* Current revmap item */ + BlockNumber revmapBlk; + Buffer revmapbuf; + Page revmappage; + uint32 revmapidx; + /* Current index tuple */ + BlockNumber regpageBlk; + Buffer regpagebuf; + Page regpage; + OffsetNumber regpageoffset; + +} BrinCheckState; + +static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly); + +static void check_brin_index_structure(BrinCheckState * pState); + +static void check_meta(BrinCheckState * state); + +static void check_revmap(BrinCheckState * state); + +static void check_revmap_item(BrinCheckState * state); + +static void check_index_tuple(BrinCheckState * state, BrinTuple *tuple, ItemId lp); + +static void check_regular_pages(BrinCheckState * state); + +static bool revmap_points_to_index_tuple(BrinCheckState * state); + +static ItemId PageGetItemIdCareful(BrinCheckState * state); + +static void brin_check_ereport(BrinCheckState * state, const char *fmt); + +static void revmap_item_ereport(BrinCheckState * state, const char *fmt); + +static void index_tuple_ereport(BrinCheckState * state, const char *fmt); + +static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt); + + +Datum +brin_index_check(PG_FUNCTION_ARGS) +{ + Oid indrelid = PG_GETARG_OID(0); + BrinCheckState *state = palloc0(sizeof(BrinCheckState)); + + state->regular_pages_check = PG_GETARG_BOOL(1); + + amcheck_lock_relation_and_check(indrelid, + BRIN_AM_OID, + brin_check, + ShareUpdateExclusiveLock, + state); + + PG_RETURN_VOID(); +} + +/* + * Main check function + */ +static void +brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly) +{ + BrinCheckState *state = (BrinCheckState *) callback_state; + + /* Initialize check common fields */ + state->idxrel = idxrel; + state->heaprel = heaprel; + state->bdesc = brin_build_desc(idxrel); + state->natts = state->bdesc->bd_tupdesc->natts; + + + check_brin_index_structure(state); + + + brin_free_desc(state->bdesc); +} + +/* + * Check that index has expected structure + * + * Some check expectations: + * - we hold ShareUpdateExclusiveLock, so revmap could not be extended (i.e. no evacuation) while check as well as + * all regular pages should stay regular and ranges could not be summarized and desummarized. + * Nevertheless, concurrent updates could lead to new regular page allocations + * and moving of index tuples. + * - if revmap pointer is valid there should be valid index tuple it points to. + * - there are no orphan index tuples (if there is an index tuple, the revmap item points to this tuple also must exist) + * - it's possible to encounter placeholder tuples (as a result of crash) + * - it's possible to encounter new pages instead of regular (as a result of crash) + * - it's possible to encounter pages with evacuation bit (as a result of crash) + * + */ +static void +check_brin_index_structure(BrinCheckState * state) +{ + /* Index structure check fields initialization */ + state->checkstrategy = GetAccessStrategy(BAS_BULKREAD); + + check_meta(state); + + /* Check revmap first, blocks: [1, lastRevmapPage] */ + check_revmap(state); + + /* Check regular pages, blocks: [lastRevmapPage + 1, idxnblocks] */ + check_regular_pages(state); +} + +/* Meta page check and save some data for the further check */ +static void +check_meta(BrinCheckState * state) +{ + Buffer metabuf; + Page metapage; + BrinMetaPageData *metadata; + + /* Meta page check */ + metabuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, BRIN_METAPAGE_BLKNO, RBM_NORMAL, + state->checkstrategy); + LockBuffer(metabuf, BUFFER_LOCK_SHARE); + metapage = BufferGetPage(metabuf); + metadata = (BrinMetaPageData *) PageGetContents(metapage); + state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel); + + + if (!BRIN_IS_META_PAGE(metapage) || + metadata->brinMagic != BRIN_META_MAGIC || + metadata->brinVersion != BRIN_CURRENT_VERSION || + metadata->pagesPerRange < 1 || metadata->pagesPerRange > BRIN_MAX_PAGES_PER_RANGE || + metadata->lastRevmapPage <= BRIN_METAPAGE_BLKNO || metadata->lastRevmapPage >= state->idxnblocks) + { + brin_check_ereport(state, "metapage is corrupted"); + } + + state->lastRevmapPage = metadata->lastRevmapPage; + state->pagesPerRange = metadata->pagesPerRange; + UnlockReleaseBuffer(metabuf); +} + +/* + * Walk revmap page by page from the beginning and check every revmap item. + * Also check that all pages within [1, lastRevmapPage] are revmap pages. + */ +static void +check_revmap(BrinCheckState * state) +{ + Relation idxrel = state->idxrel; + BlockNumber lastRevmapPage = state->lastRevmapPage; + + state->rangeBlkno = 0; + state->regpagebuf = InvalidBuffer; + state->heapnblocks = RelationGetNumberOfBlocks(state->heaprel); + + /* Walk each revmap page */ + for (state->revmapBlk = BRIN_METAPAGE_BLKNO + 1; state->revmapBlk <= lastRevmapPage; state->revmapBlk++) + { + + state->revmapbuf = ReadBufferExtended(idxrel, MAIN_FORKNUM, state->revmapBlk, RBM_NORMAL, + state->checkstrategy); + LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE); + state->revmappage = BufferGetPage(state->revmapbuf); + + /* + * Pages with block numbers in [1, lastRevmapPage] should be revmap + * pages + */ + if (!BRIN_IS_REVMAP_PAGE(state->revmappage)) + { + brin_check_ereport(state, psprintf("revmap page is expected at block %u, last revmap page %u", + state->revmapBlk, + lastRevmapPage)); + } + LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK); + + /* Walk and check all brin tuples from the current revmap page */ + state->revmapidx = 0; + while (state->revmapidx < REVMAP_PAGE_MAXITEMS) + { + CHECK_FOR_INTERRUPTS(); + + /* Check revmap item */ + check_revmap_item(state); + + state->rangeBlkno += state->pagesPerRange; + state->revmapidx++; + } + + elog(DEBUG3, "Complete revmap page check: %d", state->revmapBlk); + + ReleaseBuffer(state->revmapbuf); + } + + if (BufferIsValid(state->regpagebuf)) + { + ReleaseBuffer(state->regpagebuf); + } +} + +/* + * Check revmap item. + * + * We check revmap item pointer itself and if it is ok we check the index tuple it points to. + * + * To avoid deadlock we need to unlock revmap page before locking regular page, + * so when we get the lock on the regular page our index tuple pointer may no longer be relevant. + * So for some checks before reporting an error we need to make sure that our pointer is still relevant and if it's not - retry. + */ +static void +check_revmap_item(BrinCheckState * state) +{ + ItemPointerData *revmaptids; + RevmapContents *contents; + ItemPointerData *iptr; + ItemId lp; + BrinTuple *tup; + Relation idxrel = state->idxrel; + + /* Loop to retry revmap item check if there was a concurrent update. */ + for (;;) + { + LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE); + + contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf)); + revmaptids = contents->rm_tids; + /* Pointer for the range with start at state->rangeBlkno */ + iptr = revmaptids + state->revmapidx; + + /* At first check revmap item pointer */ + + /* + * Tuple pointer is invalid means range isn't summarized, just move + * further + */ + if (!ItemPointerIsValid(iptr)) + { + elog(DEBUG3, "Range %u is not summarized", state->rangeBlkno); + LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK); + break; + } + + /* + * Pointer is valid, it should points to index tuple for the range + * with blkno rangeBlkno. Remember it and unlock revmap page to avoid + * deadlock + */ + state->regpageBlk = ItemPointerGetBlockNumber(iptr); + state->regpageoffset = ItemPointerGetOffsetNumber(iptr); + + LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK); + + /* + * Check if the regpage block number is greater than the relation + * size. To avoid fetching the number of blocks for each tuple, use + * cached value first + */ + if (state->regpageBlk >= state->idxnblocks) + { + /* + * Regular pages may have been added, so refresh idxnblocks and + * recheck + */ + state->idxnblocks = RelationGetNumberOfBlocks(idxrel); + if (state->regpageBlk >= state->idxnblocks) + { + revmap_item_ereport(state, + psprintf("revmap item points to a non existing block %u, index max block %u", + state->regpageBlk, + state->idxnblocks - 1)); + } + } + + /* + * To avoid some pin/unpin cycles we cache last used regular page. + * Check if we need different regular page and fetch it. + */ + if (!BufferIsValid(state->regpagebuf) || BufferGetBlockNumber(state->regpagebuf) != state->regpageBlk) + { + if (BufferIsValid(state->regpagebuf)) + { + ReleaseBuffer(state->regpagebuf); + } + state->regpagebuf = ReadBufferExtended(idxrel, MAIN_FORKNUM, state->regpageBlk, RBM_NORMAL, + state->checkstrategy); + } + + LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE); + state->regpage = BufferGetPage(state->regpagebuf); + + /* Revmap should always point to a regular page */ + if (!BRIN_IS_REGULAR_PAGE(state->regpage)) + { + revmap_item_ereport(state, + psprintf("revmap item points to the page which is not regular (blkno: %u)", + state->regpageBlk)); + + } + + /* Check item offset is valid */ + if (state->regpageoffset > PageGetMaxOffsetNumber(state->regpage)) + { + + /* If concurrent update moved our tuple we need to retry */ + if (!revmap_points_to_index_tuple(state)) + { + LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK); + continue; + } + + revmap_item_ereport(state, + psprintf("revmap item offset number %u is greater than regular page %u max offset %u", + state->regpageoffset, + state->regpageBlk, + PageGetMaxOffsetNumber(state->regpage))); + } + + elog(DEBUG3, "Process range: %u, iptr: (%u,%u)", state->rangeBlkno, state->regpageBlk, state->regpageoffset); + + /* + * Revmap pointer is OK. It points to existing regular page, offset + * also is ok. Let's check index tuple it points to. + */ + + lp = PageGetItemIdCareful(state); + + /* Revmap should point to NORMAL tuples only */ + if (!ItemIdIsUsed(lp)) + { + + /* If concurrent update moved our tuple we need to retry */ + if (!revmap_points_to_index_tuple(state)) + { + LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK); + continue; + } + + index_tuple_ereport(state, "revmap item points to unused index tuple"); + } + + + tup = (BrinTuple *) PageGetItem(state->regpage, lp); + + /* Check if range block number is as expected */ + if (tup->bt_blkno != state->rangeBlkno) + { + + /* If concurrent update moved our tuple we need to retry */ + if (!revmap_points_to_index_tuple(state)) + { + LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK); + continue; + } + + index_tuple_ereport(state, psprintf("index tuple has invalid blkno %u", tup->bt_blkno)); + } + + /* + * If the range is beyond the table size - the range must be empty. + * It's valid situation for empty table now. + */ + if (state->rangeBlkno >= state->heapnblocks) + { + if (!BrinTupleIsEmptyRange(tup)) + { + index_tuple_ereport(state, + psprintf("the range is beyond the table size, " + "but is not marked as empty, table size: %u blocks", + state->heapnblocks)); + } + } + + /* Check index tuple itself */ + check_index_tuple(state, tup, lp); + + LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK); + break; + } +} + +/* + * Check that index tuple has expected structure. + * + * This function follows the logic performed by brin_deform_tuple(). + * After this check is complete we are sure that brin_deform_tuple can process it. + * + * In case of empty range check that for all attributes allnulls are true, hasnulls are false and + * there is no data. All core opclasses expect allnulls is true for empty range. + */ +static void +check_index_tuple(BrinCheckState * state, BrinTuple *tuple, ItemId lp) +{ + + char *tp; /* tuple data */ + uint16 off; + bits8 *nullbits; + TupleDesc disktdesc; + int stored; + bool empty_range = BrinTupleIsEmptyRange(tuple); + bool hasnullbitmap = BrinTupleHasNulls(tuple); + uint8 hoff = BrinTupleDataOffset(tuple); + uint16 tuplen = ItemIdGetLength(lp); + + + /* Check that header length is not greater than tuple length */ + if (hoff > tuplen) + { + index_tuple_ereport(state, psprintf("index tuple header length %u is greater than tuple len %u", hoff, tuplen)); + } + + /* If tuple has null bitmap - initialize it */ + if (hasnullbitmap) + { + nullbits = (bits8 *) ((char *) tuple + SizeOfBrinTuple); + } + else + { + nullbits = NULL; + } + + /* Empty range index tuple checks */ + if (empty_range) + { + /* Empty range tuple should have null bitmap */ + if (!hasnullbitmap) + { + index_tuple_ereport(state, "empty range index tuple doesn't have null bitmap"); + } + + Assert(nullbits != NULL); + + /* Check every attribute has allnulls is true and hasnulls is false */ + for (int attindex = 0; attindex < state->natts; ++attindex) + { + + /* Attribute allnulls should be true for empty range */ + if (att_isnull(attindex, nullbits)) + { + index_tuple_ereport(state, + psprintf("empty range index tuple attribute %d with allnulls is false", + attindex)); + } + + /* Attribute hasnulls should be false for empty range */ + if (!att_isnull(state->natts + attindex, nullbits)) + { + index_tuple_ereport(state, + psprintf("empty range index tuple attribute %d with hasnulls is true", + attindex)); + } + } + + /* We are done with empty range tuple */ + return; + } + + /* + * Range is marked as not empty so we can have some data in the tuple. + * Walk all attributes and checks that all stored values fit into the + * tuple + */ + + tp = (char *) tuple + BrinTupleDataOffset(tuple); + stored = 0; + off = 0; + + disktdesc = brtuple_disk_tupdesc(state->bdesc); + + for (int attindex = 0; attindex < state->natts; ++attindex) + { + BrinOpcInfo *opclass = state->bdesc->bd_info[attindex]; + + /* + * if allnulls is set we have no data for this attribute, move to the + * next + */ + if (hasnullbitmap && !att_isnull(attindex, nullbits)) + { + stored += opclass->oi_nstored; + continue; + } + + /* Walk all stored values for the current attribute */ + for (int datumno = 0; datumno < opclass->oi_nstored; datumno++) + { + CompactAttribute *thisatt = TupleDescCompactAttr(disktdesc, stored); + + if (thisatt->attlen == -1) + { + off = att_pointer_alignby(off, + thisatt->attalignby, + -1, + tp + off); + } + else + { + off = att_nominal_alignby(off, thisatt->attalignby); + } + + /* Check that we are still in the tuple */ + if (hoff + off > tuplen) + { + index_tuple_ereport(state, + psprintf("attribute %u stored value %u with length %d " + "starts at offset %u beyond total tuple length %u", + attindex, datumno, thisatt->attlen, off, tuplen)); + } + + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + + /* Check that we are still in the tuple */ + if (hoff + off > tuplen) + { + index_tuple_ereport(state, + psprintf("attribute %u stored value %u with length %d " + "ends at offset %u beyond total tuple length %u", + attindex, datumno, thisatt->attlen, off, tuplen)); + } + stored++; + } + + } + +} + +/* + * Check all pages within the range [lastRevmapPage + 1, indexnblocks] are regular pages or new + * and there is a pointer in revmap to each NORMAL index tuple. + */ +static void +check_regular_pages(BrinCheckState * state) +{ + if (!state->regular_pages_check) + { + return; + } + + /* reset state */ + state->revmapBlk = InvalidBlockNumber; + state->revmapbuf = InvalidBuffer; + state->revmapidx = -1; + state->regpageBlk = InvalidBlockNumber; + state->regpagebuf = InvalidBuffer; + state->regpageoffset = InvalidOffsetNumber; + state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel); + + for (state->regpageBlk = state->lastRevmapPage + 1; state->regpageBlk < state->idxnblocks; state->regpageBlk++) + { + OffsetNumber maxoff; + + state->regpagebuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, state->regpageBlk, RBM_NORMAL, + state->checkstrategy); + LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE); + state->regpage = BufferGetPage(state->regpagebuf); + + /* Skip new pages */ + if (PageIsNew(state->regpage)) + { + UnlockReleaseBuffer(state->regpagebuf); + continue; + } + + if (!BRIN_IS_REGULAR_PAGE(state->regpage)) + { + brin_check_ereport(state, psprintf("expected new or regular page at block %u", state->regpageBlk)); + } + + /* Check that all NORMAL index tuples within the page are not orphans */ + maxoff = PageGetMaxOffsetNumber(state->regpage); + for (state->regpageoffset = FirstOffsetNumber; state->regpageoffset <= maxoff; state->regpageoffset++) + { + ItemId lp; + BrinTuple *tup; + BlockNumber revmapBlk; + + lp = PageGetItemIdCareful(state); + + if (ItemIdIsUsed(lp)) + { + tup = (BrinTuple *) PageGetItem(state->regpage, lp); + + /* Get revmap block number for index tuple blkno */ + revmapBlk = ((tup->bt_blkno / state->pagesPerRange) / REVMAP_PAGE_MAXITEMS) + 1; + if (revmapBlk > state->lastRevmapPage) + { + index_tuple_only_ereport(state, psprintf("no revmap page for the index tuple with blkno %u", + tup->bt_blkno)); + } + + /* Fetch another revmap page if needed */ + if (state->revmapBlk != revmapBlk) + { + if (BlockNumberIsValid(state->revmapBlk)) + { + ReleaseBuffer(state->revmapbuf); + } + state->revmapBlk = revmapBlk; + state->revmapbuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, state->revmapBlk, RBM_NORMAL, + state->checkstrategy); + } + + state->revmapidx = (tup->bt_blkno / state->pagesPerRange) % REVMAP_PAGE_MAXITEMS; + state->rangeBlkno = tup->bt_blkno; + + /* check that revmap item points to index tuple */ + if (!revmap_points_to_index_tuple(state)) + { + index_tuple_ereport(state, psprintf("revmap doesn't point to index tuple")); + } + + } + } + + UnlockReleaseBuffer(state->regpagebuf); + } + + if (state->revmapbuf != InvalidBuffer) + { + ReleaseBuffer(state->revmapbuf); + } +} + +/* + * Check if the revmap item points to the index tuple (regpageBlk, regpageoffset). + * We have locked reg page, and lock revmap page here. + * It's a valid lock ordering, so no deadlock is possible. + */ +static bool +revmap_points_to_index_tuple(BrinCheckState * state) +{ + ItemPointerData *revmaptids; + RevmapContents *contents; + ItemPointerData *tid; + bool points; + + LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE); + contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf)); + revmaptids = contents->rm_tids; + tid = revmaptids + state->revmapidx; + + points = ItemPointerGetBlockNumberNoCheck(tid) == state->regpageBlk && + ItemPointerGetOffsetNumberNoCheck(tid) == state->regpageoffset; + + LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK); + return points; +} + +/* + * PageGetItemId() wrapper that validates returned line pointer. + * + * itemId in brin index could be UNUSED or NORMAL. + */ +static ItemId +PageGetItemIdCareful(BrinCheckState * state) +{ + Page page = state->regpage; + OffsetNumber offset = state->regpageoffset; + ItemId itemid = PageGetItemId(page, offset); + + if (ItemIdGetOffset(itemid) + ItemIdGetLength(itemid) > + BLCKSZ - MAXALIGN(sizeof(BrinSpecialSpace))) + index_tuple_ereport(state, + psprintf("line pointer points past end of tuple space in index. " + "lp_off=%u, lp_len=%u lp_flags=%u", + ItemIdGetOffset(itemid), + ItemIdGetLength(itemid), + ItemIdGetFlags(itemid) + ) + ); + + /* Verify that line pointer is LP_NORMAL or LP_UNUSED */ + if (!((ItemIdIsNormal(itemid) && ItemIdHasStorage(itemid)) || + (!ItemIdIsUsed(itemid) && !ItemIdHasStorage(itemid)))) + { + index_tuple_ereport(state, + psprintf("invalid line pointer storage in index. " + "lp_off=%u, lp_len=%u lp_flags=%u", + ItemIdGetOffset(itemid), + ItemIdGetLength(itemid), + ItemIdGetFlags(itemid) + )); + } + + return itemid; +} + + +/* Report without any additional info */ +static void +brin_check_ereport(BrinCheckState * state, const char *fmt) +{ + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is corrupted - %s", RelationGetRelationName(state->idxrel), fmt))); +} + +/* Report with range blkno, revmap item info, index tuple info */ +void +index_tuple_ereport(BrinCheckState * state, const char *fmt) +{ + Assert(state->rangeBlkno != InvalidBlockNumber); + Assert(state->revmapBlk != InvalidBlockNumber); + Assert(state->revmapidx >= 0 && state->revmapidx < REVMAP_PAGE_MAXITEMS); + Assert(state->regpageBlk != InvalidBlockNumber); + Assert(state->regpageoffset != InvalidOffsetNumber); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is corrupted - %s. Range blkno: %u, revmap item: (%u,%u), index tuple: (%u,%u)", + RelationGetRelationName(state->idxrel), + fmt, + state->rangeBlkno, + state->revmapBlk, + state->revmapidx, + state->regpageBlk, + state->regpageoffset))); +} + +/* Report with index tuple info */ +void +index_tuple_only_ereport(BrinCheckState * state, const char *fmt) +{ + Assert(state->regpageBlk != InvalidBlockNumber); + Assert(state->regpageoffset != InvalidOffsetNumber); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is corrupted - %s. Index tuple: (%u,%u)", + RelationGetRelationName(state->idxrel), + fmt, + state->regpageBlk, + state->regpageoffset))); +} + +/* Report with range blkno, revmap item info */ +void +revmap_item_ereport(BrinCheckState * state, const char *fmt) +{ + Assert(state->rangeBlkno != InvalidBlockNumber); + Assert(state->revmapBlk != InvalidBlockNumber); + Assert(state->revmapidx >= 0 && state->revmapidx < REVMAP_PAGE_MAXITEMS); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is corrupted - %s. Range blkno: %u, revmap item: (%u,%u).", + RelationGetRelationName(state->idxrel), + fmt, + state->rangeBlkno, + state->revmapBlk, + state->revmapidx))); +} -- 2.43.0
From c933716493ee47742c047b6997f742dcc51d5334 Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin <arseniy.mukhin....@gmail.com> Date: Mon, 16 Jun 2025 18:41:34 +0300 Subject: [PATCH v4 3/3] amcheck: brin_index_check() - heap all consistent This commit extends functionality of brin_index_check() with heap_all_consistent check: we validate every index range tuple against every heap tuple within the range using consistentFn. Also, we check here that fields 'has_nulls', 'all_nulls' and 'empty_range' are consistent with the range heap data. It's the most expensive part of the brin_index_check(), so it's optional. --- contrib/amcheck/amcheck--1.5--1.6.sql | 6 +- contrib/amcheck/expected/check_brin.out | 18 +- contrib/amcheck/sql/check_brin.sql | 18 +- contrib/amcheck/t/007_verify_brin.pl | 51 ++- contrib/amcheck/verify_brin.c | 482 ++++++++++++++++++++++++ 5 files changed, 554 insertions(+), 21 deletions(-) diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql index 9ec046bb1cf..0c850a97d16 100644 --- a/contrib/amcheck/amcheck--1.5--1.6.sql +++ b/contrib/amcheck/amcheck--1.5--1.6.sql @@ -8,11 +8,13 @@ -- brin_index_check() -- CREATE FUNCTION brin_index_check(index regclass, - regular_pages_check boolean default false + regular_pages_check boolean default false, + heap_all_consistent boolean default false, + consistent_operator_names text[] default '{}' ) RETURNS VOID AS 'MODULE_PATHNAME', 'brin_index_check' LANGUAGE C STRICT PARALLEL RESTRICTED; -- We don't want this to be available to public -REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean) FROM PUBLIC; \ No newline at end of file +REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC; \ No newline at end of file diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out index bebca93d32f..0aa90dafa20 100644 --- a/contrib/amcheck/expected/check_brin.out +++ b/contrib/amcheck/expected/check_brin.out @@ -5,7 +5,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); CREATE INDEX brintest_idx ON brintest USING BRIN (a); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -42,7 +42,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -51,7 +51,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -65,7 +65,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -74,7 +74,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -88,7 +88,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -97,7 +97,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -113,7 +113,7 @@ SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, rando FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); brin_index_check ------------------ @@ -122,7 +122,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); brin_index_check ------------------ diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql index 0a5e26ea8f5..0f58567f76f 100644 --- a/contrib/amcheck/sql/check_brin.sql +++ b/contrib/amcheck/sql/check_brin.sql @@ -7,7 +7,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); CREATE INDEX brintest_idx ON brintest USING BRIN (a); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -35,12 +35,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -52,12 +52,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -69,12 +69,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -88,12 +88,12 @@ FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); -- cleanup DROP TABLE brintest; diff --git a/contrib/amcheck/t/007_verify_brin.pl b/contrib/amcheck/t/007_verify_brin.pl index 2c62b76cc70..51bfed7e273 100644 --- a/contrib/amcheck/t/007_verify_brin.pl +++ b/contrib/amcheck/t/007_verify_brin.pl @@ -200,6 +200,55 @@ my @tests = ( return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa');); }, expected => wrap("revmap doesn't point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)") + }, + { + # range is marked as empty_range, but heap has some data for the range + + find => pack('LCC', 0, 0x88, 0x03), + replace => pack('LCC', 0, 0xA8, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null);); + }, + expected => wrap('range is marked as empty but contains qualified live tuples. Range blkno: 0, heap tid (0,1)') + }, + { + # range hasnulls & allnulls are false, but heap contains null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x00), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range hasnulls and allnulls are false, but contains a null value. Range blkno: 0, heap tid (0,1)') + }, + { + # range allnulls is true, but heap contains non-null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range allnulls is true, but contains nonnull value. Range blkno: 0, heap tid (0,2)') + }, + { + # consistent function return FALSE for the valid heap value + # replace "ccccc" with "bbbbb" so that min_max index was too narrow + + find => 'ccccc', + replace => 'bbbbb', + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'), ('ccccc');); + }, + expected => wrap('heap tuple inconsistent with index. Range blkno: 0, heap tid (0,2)') } ); @@ -241,7 +290,7 @@ foreach my $test_struct (@tests) { $node->start; foreach my $test_struct (@tests) { - my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true))); + my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true, true))); like($stderr, $test_struct->{expected}); } diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c index 9d5a1caebf6..020cba50638 100644 --- a/contrib/amcheck/verify_brin.c +++ b/contrib/amcheck/verify_brin.c @@ -39,6 +39,8 @@ typedef struct BrinCheckState /* Check arguments */ bool regular_pages_check; + bool heap_all_consistent; + ArrayType *heap_all_consistent_oper_names; /* BRIN check common fields */ @@ -67,6 +69,30 @@ typedef struct BrinCheckState Page regpage; OffsetNumber regpageoffset; + /* Heap all consistent check fields */ + + String **operatorNames; + BrinRevmap *revmap; + Buffer buf; + FmgrInfo *consistentFn; + /* Scan keys for regular values */ + ScanKey *nonnull_sk; + /* Scan keys for null values */ + ScanKey *isnull_sk; + double range_cnt; + /* first block of the next range */ + BlockNumber nextrangeBlk; + + /* + * checkable_range shows if current range could be checked and dtup + * contains valid index tuple for the range. It could be false if the + * current range is not summarized, or it's placeholder, or it's just a + * beginning of the check + */ + bool checkable_range; + BrinMemTuple *dtup; + MemoryContext rangeCtx; + MemoryContext heaptupleCtx; } BrinCheckState; static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly); @@ -87,6 +113,23 @@ static bool revmap_points_to_index_tuple(BrinCheckState * state); static ItemId PageGetItemIdCareful(BrinCheckState * state); +static void check_heap_all_consistent(BrinCheckState * state); + +static void check_and_prepare_operator_names(BrinCheckState * state); + +static void brin_check_callback(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate); + +static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid); + +static ScanKey prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno); + +static ScanKey prepare_isnull_scan_key(AttrNumber attno); + static void brin_check_ereport(BrinCheckState * state, const char *fmt); static void revmap_item_ereport(BrinCheckState * state, const char *fmt); @@ -95,6 +138,7 @@ static void index_tuple_ereport(BrinCheckState * state, const char *fmt); static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt); +static void all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message); Datum brin_index_check(PG_FUNCTION_ARGS) @@ -103,6 +147,8 @@ brin_index_check(PG_FUNCTION_ARGS) BrinCheckState *state = palloc0(sizeof(BrinCheckState)); state->regular_pages_check = PG_GETARG_BOOL(1); + state->heap_all_consistent = PG_GETARG_BOOL(2); + state->heap_all_consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3); amcheck_lock_relation_and_check(indrelid, BRIN_AM_OID, @@ -127,9 +173,21 @@ brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonl state->bdesc = brin_build_desc(idxrel); state->natts = state->bdesc->bd_tupdesc->natts; + /* + * We know how many attributes index has, so let's process operator names + * array + */ + if (state->heap_all_consistent) + { + check_and_prepare_operator_names(state); + } check_brin_index_structure(state); + if (state->heap_all_consistent) + { + check_heap_all_consistent(state); + } brin_free_desc(state->bdesc); } @@ -740,6 +798,414 @@ PageGetItemIdCareful(BrinCheckState * state) return itemid; } +/* + * Check that every heap tuple are consistent with the index. + * + * Here we generate ScanKey for every heap tuple and test it against + * appropriate range using consistentFn (for ScanKey generation logic look 'prepare_nonnull_scan_key') + * + * Also, we check that fields 'empty_range', 'all_nulls' and 'has_nulls' + * are not too "narrow" for each range, which means: + * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true) + * 2) all_nulls = true, but we see nonnull value. + * 3) empty_range = true, but we see tuple within the range. + * + * We use allowSync = false, because this way + * we process full ranges one by one from the first range. + * It's not necessary, but makes the code simpler and this way + * we need to fetch every index tuple only once. + */ +static void +check_heap_all_consistent(BrinCheckState * state) +{ + Relation idxrel = state->idxrel; + Relation heaprel = state->heaprel; + double reltuples; + IndexInfo *indexInfo; + + /* Heap all consistent check fields initialization */ + + state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange); + state->dtup = brin_new_memtuple(state->bdesc); + state->checkable_range = false; + state->consistentFn = palloc0_array(FmgrInfo, state->natts); + state->range_cnt = 0; + /* next range is the first range in the beginning */ + state->nextrangeBlk = 0; + state->nonnull_sk = palloc0_array(ScanKey, state->natts); + state->isnull_sk = palloc0_array(ScanKey, state->natts); + state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check range context", + ALLOCSET_DEFAULT_SIZES); + state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check tuple context", + ALLOCSET_DEFAULT_SIZES); + + /* + * Prepare "non-null" and "is_null" scan keys and consistent fn for each + * attribute + */ + for (AttrNumber attno = 1; attno <= state->natts; attno++) + { + FmgrInfo *tmp; + + tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT); + fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext); + + state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno); + state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno); + } + + indexInfo = BuildIndexInfo(idxrel); + + /* + * Use snapshot to check only those tuples that are guaranteed to be + * indexed already. Using SnapshotAny would make it more difficult to say + * if there is a corruption or checked tuple just haven't been indexed + * yet. + */ + indexInfo->ii_Concurrent = true; + reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true, + brin_check_callback, (void *) state, NULL); + + elog(DEBUG3, "ranges were checked: %f", state->range_cnt); + elog(DEBUG3, "scan total tuples: %f", reltuples); + + if (state->buf != InvalidBuffer) + ReleaseBuffer(state->buf); + + brinRevmapTerminate(state->revmap); + MemoryContextDelete(state->rangeCtx); + MemoryContextDelete(state->heaptupleCtx); +} + +/* + * Check operator names array input parameter and convert it to array of strings + * Empty input array means we use "=" operator for every attribute + */ +static void +check_and_prepare_operator_names(BrinCheckState * state) +{ + Oid element_type = ARR_ELEMTYPE(state->heap_all_consistent_oper_names); + int16 typlen; + bool typbyval; + char typalign; + Datum *values; + bool *elem_nulls; + int num_elems; + + state->operatorNames = palloc(sizeof(String) * state->natts); + + get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign); + deconstruct_array(state->heap_all_consistent_oper_names, element_type, typlen, typbyval, typalign, + &values, &elem_nulls, &num_elems); + + /* If we have some input check it and convert to String** */ + if (num_elems != 0) + { + if (num_elems != state->natts) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Operator names array length %u, but index has %u attributes", + num_elems, state->natts))); + } + + for (int i = 0; i < num_elems; i++) + { + if (elem_nulls[i]) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Operator names array contains NULL"))); + } + state->operatorNames[i] = makeString(TextDatumGetCString(values[i])); + } + } + else + { + /* If there is no input just use "=" operator for all attributes */ + for (int i = 0; i < state->natts; i++) + { + state->operatorNames[i] = makeString("="); + } + } +} + +/* + * Prepare ScanKey for index attribute. + * + * ConsistentFn requires ScanKey, so we need to generate ScanKey for every + * attribute somehow. We want ScanKey that would result in TRUE for every heap + * tuple within the range when we use its indexed value as sk_argument. + * To generate such a ScanKey we need to define the right operand type and the strategy number. + * Right operand type is a type of data that index is built on, so it's 'opcintype'. + * There is no strategy number that we can always use, + * because every opclass defines its own set of operators it supports and strategy number + * for the same operator can differ from opclass to opclass. + * So to get strategy number we look up an operator that gives us desired behavior + * and which both operand types are 'opcintype' and then retrieve the strategy number for it. + * Most of the time we can use '='. We let user define operator name in case opclass doesn't + * support '=' operator. Also, if such operator doesn't exist, we can't proceed with the check. + * + * Generated once, and will be reused for all heap tuples. + * Argument field will be filled for every heap tuple before + * consistent function invocation, so leave it NULL for a while. + * + */ +static ScanKey +prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno) +{ + ScanKey scanKey; + Oid opOid; + Oid opFamilyOid; + bool defined; + StrategyNumber strategy; + RegProcedure opRegProc; + List *operNameList; + int attindex = attno - 1; + Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex); + Oid type = state->idxrel->rd_opcintype[attindex]; + String *opname = state->operatorNames[attno - 1]; + + opFamilyOid = state->idxrel->rd_opfamily[attindex]; + operNameList = list_make1(opname); + opOid = OperatorLookup(operNameList, type, type, &defined); + + if (opOid == InvalidOid) + { + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("There is no operator %s for type %u", + opname->sval, type))); + } + + strategy = get_op_opfamily_strategy(opOid, opFamilyOid); + + if (strategy == 0) + { + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("operator %s is not a member of operator family \"%s\"", + opname->sval, + get_opfamily_name(opFamilyOid, false)))); + } + + opRegProc = get_opcode(opOid); + scanKey = palloc0(sizeof(ScanKeyData)); + ScanKeyEntryInitialize( + scanKey, + 0, + attno, + strategy, + type, + attr->attcollation, + opRegProc, + (Datum) NULL + ); + pfree(operNameList); + + return scanKey; +} + +static ScanKey +prepare_isnull_scan_key(AttrNumber attno) +{ + ScanKey scanKey; + + scanKey = palloc0(sizeof(ScanKeyData)); + ScanKeyEntryInitialize(scanKey, + SK_ISNULL | SK_SEARCHNULL, + attno, + InvalidStrategy, + InvalidOid, + InvalidOid, + InvalidOid, + (Datum) 0); + return scanKey; +} + +/* + * We walk from the first range (blkno = 0) to the last as the scan proceed. + * For every heap tuple we check if we are done with the current range, and we need to move further + * to the current heap tuple's range. While moving to the next range we check that it's not empty (because + * we have at least one tuple for this range). + * Every heap tuple are checked to be consistent with the range it belongs to. + * In case of unsummarized ranges and placeholders we skip all checks. + * + * While moving, we may jump over some ranges, + * but it's okay because we would not be able to check them anyway. + * We also can't say whether skipped ranges should be marked as empty or not, + * since it's possible that there were some tuples before that are now deleted. + * + */ +static void +brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate) +{ + BrinCheckState *state; + BlockNumber heapblk; + + state = (BrinCheckState *) brstate; + heapblk = ItemPointerGetBlockNumber(tid); + + /* If we went beyond the current range let's fetch new range */ + if (heapblk >= state->nextrangeBlk) + { + BrinTuple *tup; + BrinTuple *tupcopy = NULL; + MemoryContext oldCtx; + OffsetNumber off; + Size size; + Size btupsz = 0; + + MemoryContextReset(state->rangeCtx); + oldCtx = MemoryContextSwitchTo(state->rangeCtx); + + state->range_cnt++; + + /* Move to the range that contains current heap tuple */ + tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf, + &off, &size, BUFFER_LOCK_SHARE); + + if (tup) + { + tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz); + LockBuffer(state->buf, BUFFER_LOCK_UNLOCK); + state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup); + + /* We can't check placeholder ranges */ + state->checkable_range = !state->dtup->bt_placeholder; + } + else + { + /* We can't check unsummarized ranges. */ + state->checkable_range = false; + } + + /* + * Update nextrangeBlk so we know when we are done with the current + * range + */ + state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange; + + MemoryContextSwitchTo(oldCtx); + + /* Range must not be empty */ + if (state->checkable_range && state->dtup->bt_empty_range) + { + all_consist_ereport(state, tid, "range is marked as empty but contains qualified live tuples"); + } + + } + + /* Check tuple is consistent with the index */ + if (state->checkable_range) + { + check_heap_tuple(state, values, isnull, tid); + } + +} + +/* + * We check hasnulls flags for null values and oi_regular_nulls = true, + * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not, + * For all other cases we call consistentFn with appropriate scanKey: + * - for oi_regular_nulls = false and null values we use 'isNull' scanKey, + * - for nonnull values we use 'nonnull' scanKey + */ +static void +check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid) +{ + int attindex; + BrinMemTuple *dtup = state->dtup; + BrinDesc *bdesc = state->bdesc; + MemoryContext oldCtx; + + Assert(state->checkable_range); + + MemoryContextReset(state->heaptupleCtx); + oldCtx = MemoryContextSwitchTo(state->heaptupleCtx); + + /* check every index attribute */ + for (attindex = 0; attindex < state->natts; attindex++) + { + BrinValues *bval; + Datum consistentFnResult; + bool consistent; + ScanKey scanKey; + bool oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls; + + bval = &dtup->bt_columns[attindex]; + + if (nulls[attindex]) + { + /* + * Use hasnulls flag for oi_regular_nulls is true. Otherwise, + * delegate check to consistentFn + */ + if (oi_regular_nulls) + { + /* We have null value, so hasnulls or allnulls must be true */ + if (!(bval->bv_hasnulls || bval->bv_allnulls)) + { + all_consist_ereport(state, tid, "range hasnulls and allnulls are false, but contains a null value"); + } + continue; + } + + /* + * In case of null and oi_regular_nulls = false we use isNull + * scanKey for invocation of consistentFn + */ + scanKey = state->isnull_sk[attindex]; + } + else + { + /* We have a nonnull value, so allnulls should be false */ + if (bval->bv_allnulls) + { + all_consist_ereport(state, tid, "range allnulls is true, but contains nonnull value"); + } + + /* use "attr = value" scan key for nonnull values */ + scanKey = state->nonnull_sk[attindex]; + scanKey->sk_argument = values[attindex]; + } + + /* If oi_regular_nulls = true we should never get there with null */ + Assert(!oi_regular_nulls || !nulls[attindex]); + + if (state->consistentFn[attindex].fn_nargs >= 4) + { + consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex], + state->idxrel->rd_indcollation[attindex], + PointerGetDatum(state->bdesc), + PointerGetDatum(bval), + PointerGetDatum(&scanKey), + Int32GetDatum(1) + ); + } + else + { + consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex], + state->idxrel->rd_indcollation[attindex], + PointerGetDatum(state->bdesc), + PointerGetDatum(bval), + PointerGetDatum(scanKey) + ); + } + + consistent = DatumGetBool(consistentFnResult); + + if (!consistent) + { + all_consist_ereport(state, tid, "heap tuple inconsistent with index"); + } + + } + + MemoryContextSwitchTo(oldCtx); +} /* Report without any additional info */ static void @@ -805,3 +1271,19 @@ revmap_item_ereport(BrinCheckState * state, const char *fmt) state->revmapBlk, state->revmapidx))); } + +/* Report with range blkno, heap tuple info */ +static void +all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message) +{ + Assert(state->rangeBlkno != InvalidBlockNumber); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is not consistent with the heap - %s. Range blkno: %u, heap tid (%u,%u)", + RelationGetRelationName(state->idxrel), + message, + state->dtup->bt_blkno, + ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)))); +} -- 2.43.0
From 894520865c4fa697ba6104beb79d64a630f03b3a Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin <arseniy.mukhin....@gmail.com> Date: Wed, 16 Apr 2025 11:26:45 +0300 Subject: [PATCH v4 1/3] brin refactoring For adding BRIN index support in amcheck we need some tiny changes in BRIN core code: * We need to have tuple descriptor for on-disk storage of BRIN tuples. It is a public field 'bd_disktdesc' in BrinDesc, but to access it we need function 'brtuple_disk_tupdesc' which is internal. This commit makes it extern. * For meta page check we need to know pages_per_range upper limit. It's hardcoded now. This commit moves its value to macros BRIN_MAX_PAGES_PER_RANGE so that we can use it in amcheck too. --- src/backend/access/brin/brin_tuple.c | 2 +- src/backend/access/common/reloptions.c | 3 ++- src/include/access/brin.h | 1 + src/include/access/brin_tuple.h | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c index 861f397e6db..4d1d8d9addd 100644 --- a/src/backend/access/brin/brin_tuple.c +++ b/src/backend/access/brin/brin_tuple.c @@ -57,7 +57,7 @@ static inline void brin_deconstruct_tuple(BrinDesc *brdesc, /* * Return a tuple descriptor used for on-disk storage of BRIN tuples. */ -static TupleDesc +TupleDesc brtuple_disk_tupdesc(BrinDesc *brdesc) { /* We cache these in the BrinDesc */ diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 50747c16396..bc494847341 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -22,6 +22,7 @@ #include "access/heaptoast.h" #include "access/htup_details.h" #include "access/nbtree.h" +#include "access/brin.h" #include "access/reloptions.h" #include "access/spgist_private.h" #include "catalog/pg_type.h" @@ -343,7 +344,7 @@ static relopt_int intRelOpts[] = "Number of pages that each page range covers in a BRIN index", RELOPT_KIND_BRIN, AccessExclusiveLock - }, 128, 1, 131072 + }, 128, 1, BRIN_MAX_PAGES_PER_RANGE }, { { diff --git a/src/include/access/brin.h b/src/include/access/brin.h index 821f1e02806..334ce973b67 100644 --- a/src/include/access/brin.h +++ b/src/include/access/brin.h @@ -37,6 +37,7 @@ typedef struct BrinStatsData #define BRIN_DEFAULT_PAGES_PER_RANGE 128 +#define BRIN_MAX_PAGES_PER_RANGE 131072 #define BrinGetPagesPerRange(relation) \ (AssertMacro(relation->rd_rel->relkind == RELKIND_INDEX && \ relation->rd_rel->relam == BRIN_AM_OID), \ diff --git a/src/include/access/brin_tuple.h b/src/include/access/brin_tuple.h index 010ba4ea3c0..9472ca638dd 100644 --- a/src/include/access/brin_tuple.h +++ b/src/include/access/brin_tuple.h @@ -109,4 +109,6 @@ extern BrinMemTuple *brin_memtuple_initialize(BrinMemTuple *dtuple, extern BrinMemTuple *brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple, BrinMemTuple *dMemtuple); +extern TupleDesc brtuple_disk_tupdesc(BrinDesc *brdesc); + #endif /* BRIN_TUPLE_H */ -- 2.43.0