Hi,

It's not for v18, just wanted to share with the community and register
it in the upcoming Commitfest 2025-07.

Here is the patch with amcheck support for BRIN indexes.

Patch uses amcheck common infrastructure that was introduced in [1].
It works and I deleted all the code that
I copied from btree check at first. Great.

During the check we hold ShareUpdateExclusiveLock, so we don't block
regular reads/inserts/updates
and the same time range summarizations/desummarizations are impossible
during the check which simplifies check logic.
While checking we do ereport(ERROR) on the first issue, the same way
as btree, gin checks do.

There are two parts:

First part is 'index structure check':
1) Meta page check
2) Revmap check. Walk revmap and check every valid revmap item points
to the index tuple with the expected range blkno,
and index tuple is consistent with the tuple description. Also it's
not documented, but known from the brin code that
for every empty range we should have allnulls = true, hasnulls =
false. So this is also checked here.
3) Regular pages check. Walk regular pages and check that every index
tuple has a corresponding revmap item that points to it.
We don't check index tuple structures here, because it was already
done in 2 steps.

Regular pages check is optional. Orphan index tuple errors in this
check doesn't necessary mean that index is corrupted,
but AFAIS brin is not supposed to have such orphan index tuples now,
so if we encounter one than probably something
wrong with the index.

Second part is 'all consistent check':
We check all heap tuples are consistent with the index. It's the most
expensive part and it's optional.
Here we call consistent functions for every heap tuple. Also here we
check that fields like 'has_nulls', 'all_nulls',
'empty_range' are consistent with what we see in the heap.


There are two patch files:

0001-brin-refactoring.patch

It's just two tiny changes in the brin code.
1) For index tuple structure check we need to know how on-disk index
tuples look like.
Function that lets you get it 'brtuple_disk_tupdesc' is internal. This
patch makes it extern.
2) Create macros for BRIN_MAX_PAGES_PER_RANGE. For the meta page check.

0002-amechek-brin-support.patch

It's check functionality itself + regression tests + amcheck extension updates.


Some open questions:

1) How to get the correct strategy number for the scan key in "all
heap consistent" check. The consistent function wants
a scan key, and to generate it we have to pick a strategy number. We
can't just use the same strategy number for all
indexes because its meaning differs from opclass to opclass (for
example equal strategy number is 1 in Bloom
and 3 in min_max). We also can't pick an operator and use it for every
check, because opclasses don't have any requirements
about what operators they should support. The solution was to let user
to define check operator
(parameter consistent_operator_names). It's an array as we can have a
multicolumn index. We can use '=' as default check
operator, because AFAIS '=' operator is supported by all core
opclasses except 'box_inclusion_ops', and IMHO it's the
most obvious candidate for such a check. So if param
'consistent_operator_names' is an empty array (param default value),
then for all attributes we use operator '='. In most cases operator
'=' does the job and you don't need to worry about
consistent_operator_names param. Not sure about it, what do you think?

2) The current implementation of "all heap consistent" uses the scan
of the entire heap. So if we have a lot of
unsummarized ranges, we do a lot of wasted work because we can't use
the tuples that belong to the unsummarized ranges.
Instead of having one scan for the entire heap, we can walk the
revmap, take only the summarized ranges, and
scan only the pages that belong to those ranges. So we have one scan
per range. This approach helps us avoid touching
those heap tuples that we can't use for index checks. But I'm not sure
if we should to worry about that because every
autovacuum summarizes all the unsummarized ranges, so don't expect a
lot of unsummarized ranges on average.

TODO list:

1) add TAP tests
2) update SGML docs for amcheck (think it's better to do after patch
is reviewed and more or less finalized)
3) pg_amcheck integration

Big thanks to Tomas Vondra for the first patch idea and initial review.


[1] 
https://www.postgresql.org/message-id/45AC9B0A-2B45-40EE-B08F-BDCF5739D1E1%40yandex-team.ru


Best regards,
Arseniy Mukhin
From 7de60e8da824a97327a5c28ccd9d8a384c07b997 Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <arseniy.mukhin....@gmail.com>
Date: Tue, 22 Apr 2025 11:00:36 +0300
Subject: [PATCH 2/2] amcheck - brin support

---
 contrib/amcheck/Makefile                |    5 +-
 contrib/amcheck/amcheck--1.5--1.6.sql   |   20 +
 contrib/amcheck/amcheck.control         |    2 +-
 contrib/amcheck/expected/check_brin.out |  134 +++
 contrib/amcheck/meson.build             |    3 +
 contrib/amcheck/sql/check_brin.sql      |  102 ++
 contrib/amcheck/verify_brinam.c         | 1270 +++++++++++++++++++++++
 7 files changed, 1533 insertions(+), 3 deletions(-)
 create mode 100644 contrib/amcheck/amcheck--1.5--1.6.sql
 create mode 100644 contrib/amcheck/expected/check_brin.out
 create mode 100644 contrib/amcheck/sql/check_brin.sql
 create mode 100644 contrib/amcheck/verify_brinam.c

diff --git a/contrib/amcheck/Makefile b/contrib/amcheck/Makefile
index 1b7a63cbaa4..10cba6c8feb 100644
--- a/contrib/amcheck/Makefile
+++ b/contrib/amcheck/Makefile
@@ -6,11 +6,12 @@ OBJS = \
 	verify_common.o \
 	verify_gin.o \
 	verify_heapam.o \
-	verify_nbtree.o
+	verify_nbtree.o \
+	verify_brinam.o
 
 EXTENSION = amcheck
 DATA = amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql \
-		amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql
+		amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql amcheck--1.5--1.6.sql
 PGFILEDESC = "amcheck - function for verifying relation integrity"
 
 REGRESS = check check_btree check_gin check_heap
diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql
new file mode 100644
index 00000000000..0c850a97d16
--- /dev/null
+++ b/contrib/amcheck/amcheck--1.5--1.6.sql
@@ -0,0 +1,20 @@
+/* contrib/amcheck/amcheck--1.5--1.6.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "ALTER EXTENSION amcheck UPDATE TO '1.6'" to load this file. \quit
+
+
+--
+-- brin_index_check()
+--
+CREATE FUNCTION brin_index_check(index regclass,
+                                 regular_pages_check boolean default false,
+                                 heap_all_consistent boolean default false,
+                                 consistent_operator_names text[] default '{}'
+)
+    RETURNS VOID
+AS 'MODULE_PATHNAME', 'brin_index_check'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+-- We don't want this to be available to public
+REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC;
\ No newline at end of file
diff --git a/contrib/amcheck/amcheck.control b/contrib/amcheck/amcheck.control
index c8ba6d7c9bc..2f329ef2cf4 100644
--- a/contrib/amcheck/amcheck.control
+++ b/contrib/amcheck/amcheck.control
@@ -1,5 +1,5 @@
 # amcheck extension
 comment = 'functions for verifying relation integrity'
-default_version = '1.5'
+default_version = '1.6'
 module_pathname = '$libdir/amcheck'
 relocatable = true
diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out
new file mode 100644
index 00000000000..2690d629723
--- /dev/null
+++ b/contrib/amcheck/expected/check_brin.out
@@ -0,0 +1,134 @@
+-- helper func
+CREATE OR REPLACE FUNCTION  random_string( INT ) RETURNS TEXT AS $$
+SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', ceil(random() * 30)::INTEGER, 1), '') FROM generate_series(1, $1);
+$$ LANGUAGE sql;
+-- empty table index should be valid
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- multi attributes with varlena attribute test
+CREATE TABLE brintest (id BIGSERIAL, a TEXT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,5000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- multi_min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- bloom opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- inclusion opclass
+CREATE TABLE brintest (id SERIAL PRIMARY KEY, a BOX);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a)
+SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 10000);
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- cleanup
+DROP FUNCTION random_string;
diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build
index b33e8c9b062..15c4d17a9bc 100644
--- a/contrib/amcheck/meson.build
+++ b/contrib/amcheck/meson.build
@@ -5,6 +5,7 @@ amcheck_sources = files(
   'verify_gin.c',
   'verify_heapam.c',
   'verify_nbtree.c',
+  'verify_brinam.c'
 )
 
 if host_system == 'windows'
@@ -27,6 +28,7 @@ install_data(
   'amcheck--1.2--1.3.sql',
   'amcheck--1.3--1.4.sql',
   'amcheck--1.4--1.5.sql',
+  'amcheck--1.5--1.6.sql',
   kwargs: contrib_data_args,
 )
 
@@ -40,6 +42,7 @@ tests += {
       'check_btree',
       'check_gin',
       'check_heap',
+      'check_brin'
     ],
   },
   'tap': {
diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql
new file mode 100644
index 00000000000..36e091a6884
--- /dev/null
+++ b/contrib/amcheck/sql/check_brin.sql
@@ -0,0 +1,102 @@
+-- helper func
+CREATE OR REPLACE FUNCTION  random_string( INT ) RETURNS TEXT AS $$
+SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', ceil(random() * 30)::INTEGER, 1), '') FROM generate_series(1, $1);
+$$ LANGUAGE sql;
+
+
+-- empty table index should be valid
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+-- multi attributes with varlena attribute test
+CREATE TABLE brintest (id BIGSERIAL, a TEXT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,5000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- multi_min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- bloom opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+-- inclusion opclass
+CREATE TABLE brintest (id SERIAL PRIMARY KEY, a BOX);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a)
+SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 10000);
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+-- cleanup
+DROP TABLE brintest;
+
+
+-- cleanup
+DROP FUNCTION random_string;
\ No newline at end of file
diff --git a/contrib/amcheck/verify_brinam.c b/contrib/amcheck/verify_brinam.c
new file mode 100644
index 00000000000..d01b3a2ad64
--- /dev/null
+++ b/contrib/amcheck/verify_brinam.c
@@ -0,0 +1,1270 @@
+/*-------------------------------------------------------------------------
+ *
+ * verify_brinam.c
+ *	  Functions to check postgresql brin indexes for corruption
+ *
+ * Copyright (c) 2016-2024, PostgreSQL Global Development Group
+ *
+ *	  contrib/amcheck/verify_brinam.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/transam.h"
+#include "access/brin.h"
+#include "catalog/index.h"
+#include "catalog/pg_am_d.h"
+#include "catalog/pg_operator.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "access/brin_page.h"
+#include "access/brin_revmap.h"
+#include "utils/lsyscache.h"
+#include "verify_common.h"
+#include "utils/builtins.h"
+#include "utils/array.h"
+
+
+PG_FUNCTION_INFO_V1(brin_index_check);
+
+typedef struct BrinCheckState
+{
+
+	/* Check arguments */
+
+	bool		regular_pages_check;
+	bool		heap_all_consistent;
+	ArrayType  *heap_all_consistent_oper_names;
+
+	/* BRIN check common fields */
+
+	Relation	idxrel;
+	Relation	heaprel;
+	BrinDesc   *bdesc;
+	int			natts;
+	BlockNumber pagesPerRange;
+
+	/* Index structure check fields */
+
+	BufferAccessStrategy checkstrategy;
+	BlockNumber idxnblocks;
+	BlockNumber heapnblocks;
+	BlockNumber lastRevmapPage;
+	/* Current range blkno */
+	BlockNumber rangeBlkno;
+	/* Current revmap item */
+	BlockNumber revmapBlk;
+	Buffer		revmapbuf;
+	Page		revmappage;
+	uint32		revmapidx;
+	/* Current index tuple */
+	BlockNumber regpageBlk;
+	Buffer		regpagebuf;
+	Page		regpage;
+	OffsetNumber regpageoffset;
+
+	/* All heap consistent check fields */
+
+	String	  **operatorNames;
+	BrinRevmap *revmap;
+	Buffer		buf;
+	FmgrInfo   *consistentFn;
+	/* Scan keys for regular values */
+	ScanKey    *nonnull_sk;
+	/* Scan keys for null values */
+	ScanKey    *isnull_sk;
+	double		range_cnt;
+	/* first block of the next range */
+	BlockNumber nextrangeBlk;
+
+	/*
+	 * checkable_range shows if current range could be checked and dtup
+	 * contains valid index tuple for the range. It could be false if the
+	 * current range is not summarized, or it's placeholder, or it's just a
+	 * beginning of the check
+	 */
+	bool		checkable_range;
+	BrinMemTuple *dtup;
+	MemoryContext rangeCtx;
+	MemoryContext heaptupleCtx;
+}			BrinCheckState;
+
+static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly);
+
+static void check_brin_index_structure(BrinCheckState * pState);
+
+static void check_meta(BrinCheckState * state);
+
+static void check_revmap(BrinCheckState * state);
+
+static void check_revmap_item(BrinCheckState * state);
+
+static void check_index_tuple(BrinCheckState * state, BrinTuple *tuple, ItemId lp);
+
+static void check_regular_pages(BrinCheckState * state);
+
+static bool revmap_points_to_index_tuple(BrinCheckState * state);
+
+static ItemId PageGetItemIdCareful(BrinCheckState * state);
+
+static void check_all_heap_consistent(BrinCheckState * state);
+
+static void check_and_prepare_operator_names(BrinCheckState * state);
+
+static void brin_check_callback(Relation index,
+								ItemPointer tid,
+								Datum *values,
+								bool *isnull,
+								bool tupleIsAlive,
+								void *brstate);
+
+static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid);
+
+static ScanKey prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno);
+
+static ScanKey prepare_isnull_scan_key(AttrNumber attno);
+
+static void brin_check_ereport(const char *fmt);
+
+static void revmap_item_ereport(BrinCheckState * state, const char *fmt);
+
+static void index_tuple_ereport(BrinCheckState * state, const char *fmt);
+
+static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt);
+
+static void all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message);
+
+Datum
+brin_index_check(PG_FUNCTION_ARGS)
+{
+	Oid			indrelid = PG_GETARG_OID(0);
+	BrinCheckState *state = palloc0(sizeof(BrinCheckState));
+
+	state->regular_pages_check = PG_GETARG_BOOL(1);
+	state->heap_all_consistent = PG_GETARG_BOOL(2);
+	state->heap_all_consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3);
+
+	amcheck_lock_relation_and_check(indrelid,
+									BRIN_AM_OID,
+									brin_check,
+									ShareUpdateExclusiveLock,
+									state);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Main check function
+ */
+static void
+brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly)
+{
+	BrinCheckState *state = (BrinCheckState *) callback_state;
+
+	/* Initialize check common fields */
+	state->idxrel = idxrel;
+	state->heaprel = heaprel;
+	state->bdesc = brin_build_desc(idxrel);
+	state->natts = state->bdesc->bd_tupdesc->natts;
+
+	/*
+	 * We know how many attributes index has, so let's process operator names
+	 * array
+	 */
+	if (state->heap_all_consistent)
+	{
+		check_and_prepare_operator_names(state);
+	}
+
+	check_brin_index_structure(state);
+
+	if (state->heap_all_consistent)
+	{
+		check_all_heap_consistent(state);
+	}
+
+	brin_free_desc(state->bdesc);
+}
+
+/*
+ * Check that index has expected structure
+ *
+ *  Some check expectations:
+ * - we hold ShareUpdateExclusiveLock, so revmap could not be extended (i.e. no evacuation) while check as well as
+ *   all regular pages should stay regular and ranges could not be summarized and desummarized.
+ *   Nevertheless, concurrent updates could lead to new regular page allocations
+ *   and moving of index tuples.
+ * - if revmap pointer is valid there should be valid index tuple it points to.
+ * - there are no orphan index tuples (if there is an index tuple, the revmap item points to this tuple also must exist)
+ * - it's possible to encounter placeholder tuples (as a result of crash)
+ * - it's possible to encounter new pages instead of regular (as a result of crash)
+ * - it's possible to encounter pages with evacuation bit (as a result of crash)
+ *
+ */
+static void
+check_brin_index_structure(BrinCheckState * state)
+{
+	/* Index structure check fields initialization */
+	state->checkstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+	check_meta(state);
+
+	/* Check revmap first, blocks: [1, lastRevmapPage] */
+	check_revmap(state);
+
+	/* Check regular pages, blocks: [lastRevmapPage + 1, idxnblocks] */
+	check_regular_pages(state);
+}
+
+/* Meta page check and save some data for the further check */
+static void
+check_meta(BrinCheckState * state)
+{
+	Buffer		metabuf;
+	Page		metapage;
+	BrinMetaPageData *metadata;
+
+	/* Meta page check */
+	metabuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, BRIN_METAPAGE_BLKNO, RBM_NORMAL,
+								 state->checkstrategy);
+	LockBuffer(metabuf, BUFFER_LOCK_SHARE);
+	metapage = BufferGetPage(metabuf);
+	metadata = (BrinMetaPageData *) PageGetContents(metapage);
+	state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel);
+
+
+	if (!BRIN_IS_META_PAGE(metapage) ||
+		metadata->brinMagic != BRIN_META_MAGIC ||
+		metadata->brinVersion != BRIN_CURRENT_VERSION ||
+		metadata->pagesPerRange < 1 || metadata->pagesPerRange > BRIN_MAX_PAGES_PER_RANGE ||
+		metadata->lastRevmapPage <= BRIN_METAPAGE_BLKNO || metadata->lastRevmapPage >= state->idxnblocks)
+	{
+		brin_check_ereport("metapage is corrupted");
+	}
+
+	state->lastRevmapPage = metadata->lastRevmapPage;
+	state->pagesPerRange = metadata->pagesPerRange;
+	UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * Walk revmap page by page from the beginning and check every revmap item.
+ * Also check that all pages within [1, lastRevmapPage] are revmap pages.
+ */
+static void
+check_revmap(BrinCheckState * state)
+{
+	Relation	idxrel = state->idxrel;
+	BlockNumber lastRevmapPage = state->lastRevmapPage;
+
+	state->rangeBlkno = 0;
+	state->regpagebuf = InvalidBuffer;
+	state->heapnblocks = RelationGetNumberOfBlocks(state->heaprel);
+
+	/* Walk each revmap page */
+	for (state->revmapBlk = BRIN_METAPAGE_BLKNO + 1; state->revmapBlk <= lastRevmapPage; state->revmapBlk++)
+	{
+
+		state->revmapbuf = ReadBufferExtended(idxrel, MAIN_FORKNUM, state->revmapBlk, RBM_NORMAL,
+											  state->checkstrategy);
+		LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+		state->revmappage = BufferGetPage(state->revmapbuf);
+
+		/*
+		 * Pages with block numbers in [1, lastRevmapPage] should be revmap
+		 * pages
+		 */
+		if (!BRIN_IS_REVMAP_PAGE(state->revmappage))
+		{
+			brin_check_ereport(psprintf("revmap page is expected at block %u, last revmap page %u",
+										state->revmapBlk,
+										lastRevmapPage));
+		}
+		LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+
+		/* Walk and check all brin tuples from the current revmap page */
+		state->revmapidx = 0;
+		while (state->revmapidx < REVMAP_PAGE_MAXITEMS)
+		{
+			CHECK_FOR_INTERRUPTS();
+
+			/* Check revmap item */
+			check_revmap_item(state);
+
+			state->rangeBlkno += state->pagesPerRange;
+			state->revmapidx++;
+		}
+
+		elog(DEBUG1, "Complete revmap page check: %d", state->revmapBlk);
+
+		ReleaseBuffer(state->revmapbuf);
+	}
+
+	if (BufferIsValid(state->regpagebuf))
+	{
+		ReleaseBuffer(state->regpagebuf);
+	}
+}
+
+/*
+ * Check revmap item.
+ *
+ * We check revmap item pointer itself and if it is ok we check the index tuple it points to.
+ *
+ * To avoid deadlock we need to unlock revmap page before locking regular page,
+ * so when we get the lock on the regular page our index tuple pointer may no longer be relevant.
+ * So for some checks before reporting an error we need to make sure that our pointer is still relevant and if it's not - retry.
+ */
+static void
+check_revmap_item(BrinCheckState * state)
+{
+	ItemPointerData *revmaptids;
+	RevmapContents *contents;
+	ItemPointerData *iptr;
+	ItemId		lp;
+	BrinTuple  *tup;
+	Relation	idxrel = state->idxrel;
+
+	/* Loop to retry revmap item check if there was a concurrent update. */
+	for (;;)
+	{
+		LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+
+		contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf));
+		revmaptids = contents->rm_tids;
+		/* Pointer for the range with start at state->rangeBlkno */
+		iptr = revmaptids + state->revmapidx;
+
+		/* At first check revmap item pointer */
+
+		/*
+		 * Tuple pointer is invalid means range isn't summarized, just move
+		 * further
+		 */
+		if (!ItemPointerIsValid(iptr))
+		{
+			elog(DEBUG1, "Range %u is not summarized", state->rangeBlkno);
+			LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+			break;
+		}
+
+		/*
+		 * Pointer is valid, it should points to index tuple for the range
+		 * with blkno rangeBlkno. Remember it and unlock revmap page to avoid
+		 * deadlock
+		 */
+		state->regpageBlk = ItemPointerGetBlockNumber(iptr);
+		state->regpageoffset = ItemPointerGetOffsetNumber(iptr);
+
+		LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+
+		/*
+		 * Check if the regpage block number is greater than the relation
+		 * size. To avoid fetching the number of blocks for each tuple, use
+		 * cached value first
+		 */
+		if (state->regpageBlk >= state->idxnblocks)
+		{
+			/*
+			 * Regular pages may have been added, so refresh idxnblocks and
+			 * recheck
+			 */
+			state->idxnblocks = RelationGetNumberOfBlocks(idxrel);
+			if (state->regpageBlk >= state->idxnblocks)
+			{
+				revmap_item_ereport(state,
+									psprintf("revmap item points to a non existing block %u, index max block %u",
+											 state->regpageBlk,
+											 state->idxnblocks));
+			}
+		}
+
+		/*
+		 * To avoid some pin/unpin cycles we cache last used regular page.
+		 * Check if we need different regular page and fetch it.
+		 */
+		if (!BufferIsValid(state->regpagebuf) || BufferGetBlockNumber(state->regpagebuf) != state->regpageBlk)
+		{
+			if (BufferIsValid(state->regpagebuf))
+			{
+				ReleaseBuffer(state->regpagebuf);
+			}
+			state->regpagebuf = ReadBufferExtended(idxrel, MAIN_FORKNUM, state->regpageBlk, RBM_NORMAL,
+												   state->checkstrategy);
+		}
+
+		LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE);
+		state->regpage = BufferGetPage(state->regpagebuf);
+
+		/* Revmap should always point to a regular page */
+		if (!BRIN_IS_REGULAR_PAGE(state->regpage))
+		{
+			revmap_item_ereport(state,
+								psprintf("revmap item points to the page which is not regular (blkno: %u)",
+										 state->regpageBlk));
+
+		}
+
+		/* Check item offset is valid */
+		if (state->regpageoffset > PageGetMaxOffsetNumber(state->regpage))
+		{
+
+			/* If concurrent update moved our tuple we need to retry */
+			if (!revmap_points_to_index_tuple(state))
+			{
+				LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+				continue;
+			}
+
+			revmap_item_ereport(state,
+								psprintf("revmap item offset number %u is greater than regular page %u max offset %u",
+										 state->regpageoffset,
+										 state->regpageBlk,
+										 PageGetMaxOffsetNumber(state->regpage)));
+		}
+
+		elog(DEBUG1, "Process range: %u, iptr: (%u,%u)", state->rangeBlkno, state->regpageBlk, state->regpageoffset);
+
+		/*
+		 * Revmap pointer is OK. It points to existing regular page, offset
+		 * also is ok. Let's check index tuple it points to.
+		 */
+
+		lp = PageGetItemIdCareful(state);
+
+		/* Revmap should point to NORMAL tuples only */
+		if (!ItemIdIsUsed(lp))
+		{
+
+			/* If concurrent update moved our tuple we need to retry */
+			if (!revmap_points_to_index_tuple(state))
+			{
+				LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+				continue;
+			}
+
+			index_tuple_ereport(state, "revmap item points to unused index tuple");
+		}
+
+
+		tup = (BrinTuple *) PageGetItem(state->regpage, lp);
+
+		/* Check if range block number is as expected */
+		if (tup->bt_blkno != state->rangeBlkno)
+		{
+
+			/* If concurrent update moved our tuple we need to retry */
+			if (!revmap_points_to_index_tuple(state))
+			{
+				LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+				continue;
+			}
+
+			index_tuple_ereport(state, psprintf("index tuple has invalid blkno %u", tup->bt_blkno));
+		}
+
+		/*
+		 * If the range is beyond the table size - the range must be empty.
+		 * It's valid situation for empty table now.
+		 */
+		if (state->rangeBlkno >= state->heapnblocks)
+		{
+			if (!BrinTupleIsEmptyRange(tup))
+			{
+				index_tuple_ereport(state,
+									psprintf("the range is beyond the table size, "
+											 "but is not marked as empty, table size: %u blocks",
+											 state->heapnblocks));
+			}
+		}
+
+		/* Check index tuple itself */
+		check_index_tuple(state, tup, lp);
+
+		LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+		break;
+	}
+}
+
+/*
+ * Check that index tuple has expected structure.
+ *
+ * This function follows the logic performed by brin_deform_tuple().
+ * After this check is complete we are sure that brin_deform_tuple can process it.
+ *
+ * In case of empty range check that for all attributes allnulls are true, hasnulls are false and
+ * there is no data. All core opclasses expect allnulls is true for empty range.
+ */
+static void
+check_index_tuple(BrinCheckState * state, BrinTuple *tuple, ItemId lp)
+{
+
+	char	   *tp;				/* tuple data */
+	Size		off;
+	bits8	   *nullbits;
+	TupleDesc	disktdesc;
+	int			stored;
+	bool		empty_range = BrinTupleIsEmptyRange(tuple);
+	bool		hasnullbitmap = BrinTupleHasNulls(tuple);
+	Size		hoff = BrinTupleDataOffset(tuple);
+	Size		tuplen = ItemIdGetLength(lp);
+
+
+	/* Check that header length is not greater than tuple length */
+	if (hoff > tuplen)
+	{
+		index_tuple_ereport(state, psprintf("index tuple header length %lu is greater than tuple len %lu", hoff, tuplen));
+	}
+
+	/* If tuple has null bitmap - initialize it */
+	if (hasnullbitmap)
+	{
+		nullbits = (bits8 *) ((char *) tuple + SizeOfBrinTuple);
+	}
+	else
+	{
+		nullbits = NULL;
+	}
+
+	/* Empty range index tuple checks */
+	if (empty_range)
+	{
+		/* Empty range tuple should have null bitmap */
+		if (!hasnullbitmap)
+		{
+			index_tuple_ereport(state, "empty range index tuple doesn't have null bitmap");
+		}
+
+		Assert(nullbits != NULL);
+
+		/* Check every attribute has allnulls is true and hasnulls is false */
+		for (int attindex = 0; attindex < state->natts; ++attindex)
+		{
+
+			/* Attribute allnulls should be true for empty range */
+			if (att_isnull(attindex, nullbits))
+			{
+				index_tuple_ereport(state,
+									psprintf("empty range index tuple attribute %d with allnulls is false",
+											 attindex));
+			}
+
+			/* Attribute hasnulls should be false for empty range */
+			if (!att_isnull(state->natts + attindex, nullbits))
+			{
+				index_tuple_ereport(state,
+									psprintf("empty range index tuple attribute %d with hasnulls is true",
+											 attindex));
+			}
+		}
+
+		/* We are done with empty range tuple */
+		return;
+	}
+
+	/*
+	 * Range is marked as not empty so we can have some data in the tuple.
+	 * Walk all attributes and checks that all stored values fit into the
+	 * tuple
+	 */
+
+	tp = (char *) tuple + BrinTupleDataOffset(tuple);
+	stored = 0;
+	off = 0;
+
+	disktdesc = brtuple_disk_tupdesc(state->bdesc);
+
+	for (int attindex = 0; attindex < state->natts; ++attindex)
+	{
+		BrinOpcInfo *opclass = state->bdesc->bd_info[attindex];
+
+		/*
+		 * if allnulls is set we have no data for this attribute, move to the
+		 * next
+		 */
+		if (hasnullbitmap && !att_isnull(attindex, nullbits))
+		{
+			stored += opclass->oi_nstored;
+			continue;
+		}
+
+		/* Walk all stored values for the current attribute */
+		for (int datumno = 0; datumno < opclass->oi_nstored; datumno++)
+		{
+			CompactAttribute *thisatt = TupleDescCompactAttr(disktdesc, stored);
+
+			if (thisatt->attlen == -1)
+			{
+				off = att_pointer_alignby(off,
+										  thisatt->attalignby,
+										  -1,
+										  tp + off);
+			}
+			else
+			{
+				off = att_nominal_alignby(off, thisatt->attalignby);
+			}
+
+			/* Check that we are still in the tuple */
+			if (hoff + off > tuplen)
+			{
+				index_tuple_ereport(state,
+									psprintf("attribute %u stored value %u with length %u "
+											 "starts at offset %lu beyond total tuple length %lu",
+											 attindex, datumno, thisatt->attlen, off, tuplen));
+			}
+
+			off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+			/* Check that we are still in the tuple */
+			if (hoff + off > tuplen)
+			{
+				index_tuple_ereport(state,
+									psprintf("attribute %u stored value %u with length %u "
+											 "ends at offset %lu beyond total tuple length %lu",
+											 attindex, datumno, thisatt->attlen, off, tuplen));
+			}
+			stored++;
+		}
+
+	}
+
+}
+
+/*
+ * Check all pages within the range [lastRevmapPage + 1, indexnblocks] are regular pages or new
+ * and there is a pointer in revmap to each NORMAL index tuple.
+ */
+static void
+check_regular_pages(BrinCheckState * state)
+{
+	if (!state->regular_pages_check)
+	{
+		return;
+	}
+
+	/* reset state */
+	state->revmapBlk = InvalidBlockNumber;
+	state->revmapbuf = InvalidBuffer;
+	state->revmapidx = -1;
+	state->regpageBlk = InvalidBlockNumber;
+	state->regpagebuf = InvalidBuffer;
+	state->regpageoffset = InvalidOffsetNumber;
+	state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel);
+
+	for (state->regpageBlk = state->lastRevmapPage + 1; state->regpageBlk < state->idxnblocks; state->regpageBlk++)
+	{
+		OffsetNumber maxoff;
+
+		state->regpagebuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, state->regpageBlk, RBM_NORMAL,
+											   state->checkstrategy);
+		LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE);
+		state->regpage = BufferGetPage(state->regpagebuf);
+
+		/* Skip new pages */
+		if (PageIsNew(state->regpage))
+		{
+			UnlockReleaseBuffer(state->regpagebuf);
+			continue;
+		}
+
+		if (!BRIN_IS_REGULAR_PAGE(state->regpage))
+		{
+			brin_check_ereport(psprintf("expected new or regular page at block %u", state->regpageBlk));
+		}
+
+		/* Check that all NORMAL index tuples within the page are not orphans */
+		maxoff = PageGetMaxOffsetNumber(state->regpage);
+		for (state->regpageoffset = FirstOffsetNumber; state->regpageoffset <= maxoff; state->regpageoffset++)
+		{
+			ItemId		lp;
+			BrinTuple  *tup;
+			BlockNumber revmapBlk;
+
+			lp = PageGetItemIdCareful(state);
+
+			if (ItemIdIsUsed(lp))
+			{
+				tup = (BrinTuple *) PageGetItem(state->regpage, lp);
+
+				/* Get revmap block number for index tuple blkno */
+				revmapBlk = ((tup->bt_blkno / state->pagesPerRange) / REVMAP_PAGE_MAXITEMS) + 1;
+				if (revmapBlk > state->lastRevmapPage)
+				{
+					index_tuple_only_ereport(state, psprintf("no revmap page for the index tuple with blkno %u",
+															 tup->bt_blkno));
+				}
+
+				/* Fetch another revmap page if needed */
+				if (state->revmapBlk != revmapBlk)
+				{
+					if (BlockNumberIsValid(state->revmapBlk))
+					{
+						ReleaseBuffer(state->revmapbuf);
+					}
+					state->revmapBlk = revmapBlk;
+					state->revmapbuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, state->revmapBlk, RBM_NORMAL,
+														  state->checkstrategy);
+				}
+
+				state->revmapidx = (tup->bt_blkno / state->pagesPerRange) % REVMAP_PAGE_MAXITEMS;
+				state->rangeBlkno = tup->bt_blkno;
+
+				/* check that revmap item points to index tuple */
+				if (!revmap_points_to_index_tuple(state))
+				{
+					index_tuple_ereport(state, psprintf("revmap doesn't point to index tuple"));
+				}
+
+			}
+		}
+
+		UnlockReleaseBuffer(state->regpagebuf);
+	}
+
+	if (state->revmapbuf != InvalidBuffer)
+	{
+		ReleaseBuffer(state->revmapbuf);
+	}
+}
+
+/*
+ * Check if the revmap item points to the index tuple (regpageBlk, regpageoffset).
+ * We have locked reg page, and lock revmap page here.
+ * It's a valid lock ordering, so no deadlock is possible.
+ */
+static bool
+revmap_points_to_index_tuple(BrinCheckState * state)
+{
+	ItemPointerData *revmaptids;
+	RevmapContents *contents;
+	ItemPointerData *tid;
+	bool		points;
+
+	LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+	contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf));
+	revmaptids = contents->rm_tids;
+	tid = revmaptids + state->revmapidx;
+
+	points = ItemPointerGetBlockNumber(tid) == state->regpageBlk &&
+		ItemPointerGetOffsetNumber(tid) == state->regpageoffset;
+
+	LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+	return points;
+}
+
+/*
+ * PageGetItemId() wrapper that validates returned line pointer.
+ *
+ * itemId in brin index could be UNUSED or NORMAL.
+ */
+static ItemId
+PageGetItemIdCareful(BrinCheckState * state)
+{
+	Page		page = state->regpage;
+	OffsetNumber offset = state->regpageoffset;
+	ItemId		itemid = PageGetItemId(page, offset);
+
+	if (ItemIdGetOffset(itemid) + ItemIdGetLength(itemid) >
+		BLCKSZ - MAXALIGN(sizeof(BrinSpecialSpace)))
+		index_tuple_ereport(state,
+							psprintf("line pointer points past end of tuple space in index. "
+									 "lp_off=%u, lp_len=%u lp_flags=%u",
+									 ItemIdGetOffset(itemid),
+									 ItemIdGetLength(itemid),
+									 ItemIdGetFlags(itemid)
+									 )
+			);
+
+	/* Verify that line pointer is LP_NORMAL or LP_UNUSED */
+	if (!((ItemIdIsNormal(itemid) && ItemIdGetLength(itemid) != 0) ||
+		  (!ItemIdIsUsed(itemid) && ItemIdGetLength(itemid) == 0)))
+	{
+		index_tuple_ereport(state,
+							psprintf("invalid line pointer storage in index. "
+									 "lp_off=%u, lp_len=%u lp_flags=%u",
+									 ItemIdGetOffset(itemid),
+									 ItemIdGetLength(itemid),
+									 ItemIdGetFlags(itemid)
+									 ));
+	}
+
+	return itemid;
+}
+
+/*
+ * Check that every heap tuple are consistent with the index.
+ *
+ * Also check that fields 'empty_range', 'all_nulls' and 'has_nulls'
+ * are not too "narrow" for each range, which means:
+ * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true)
+ * 2) all_nulls = true, but we see nonnull value.
+ * 3) empty_range = true, but we see tuple within the range.
+ *
+ * We use allowSync = false, because this way
+ * we process full ranges one by one from the first range.
+ * It's not necessary, but makes the code simpler and this way
+ * we need to fetch every index tuple only once.
+ */
+static void
+check_all_heap_consistent(BrinCheckState * state)
+{
+	Relation	idxrel = state->idxrel;
+	Relation	heaprel = state->heaprel;
+	double		reltuples;
+	IndexInfo  *indexInfo;
+
+	/* All heap consistent check fields initialization */
+
+	state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange);
+	state->dtup = brin_new_memtuple(state->bdesc);
+	state->checkable_range = false;
+	state->consistentFn = palloc0_array(FmgrInfo, state->natts);
+	state->range_cnt = 0;
+	/* next range is the first range in the beginning */
+	state->nextrangeBlk = 0;
+	state->nonnull_sk = palloc0_array(ScanKey, state->natts);
+	state->isnull_sk = palloc0_array(ScanKey, state->natts);
+	state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext,
+											"brin check range context",
+											ALLOCSET_DEFAULT_SIZES);
+	state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext,
+												"brin check tuple context",
+												ALLOCSET_DEFAULT_SIZES);
+
+	/*
+	 * Prepare "non-null" and "is_null" scan keys and consistent fn for each
+	 * attribute
+	 */
+	for (AttrNumber attno = 1; attno <= state->natts; attno++)
+	{
+		FmgrInfo   *tmp;
+
+		tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT);
+		fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext);
+
+		state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno);
+		state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno);
+	}
+
+	indexInfo = BuildIndexInfo(idxrel);
+
+	/*
+	 * Use snapshot to check only those tuples that are guaranteed to be
+	 * indexed already. Using SnapshotAny would make it more difficult to say
+	 * if there is a corruption or checked tuple just haven't been indexed
+	 * yet.
+	 */
+	indexInfo->ii_Concurrent = true;
+	reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true,
+									   brin_check_callback, (void *) state, NULL);
+
+	elog(DEBUG1, "ranges were checked: %f", state->range_cnt);
+	elog(DEBUG1, "scan total tuples: %f", reltuples);
+
+	if (state->buf != InvalidBuffer)
+		ReleaseBuffer(state->buf);
+
+	brinRevmapTerminate(state->revmap);
+	MemoryContextDelete(state->rangeCtx);
+	MemoryContextDelete(state->heaptupleCtx);
+}
+
+/*
+ * Check operator names array input parameter and convert it to array of strings
+ * Empty input array means we use "=" operator for every attribute
+ */
+static void
+check_and_prepare_operator_names(BrinCheckState * state)
+{
+	Oid			element_type = ARR_ELEMTYPE(state->heap_all_consistent_oper_names);
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	Datum	   *values;
+	bool	   *elem_nulls;
+	int			num_elems;
+
+	state->operatorNames = palloc(sizeof(String) * state->natts);
+
+	get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
+	deconstruct_array(state->heap_all_consistent_oper_names, element_type, typlen, typbyval, typalign,
+					  &values, &elem_nulls, &num_elems);
+
+	/* If we have some input check it and convert to String** */
+	if (num_elems != 0)
+	{
+		if (num_elems != state->natts)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("Operator names array length %u, but index has %u attributes",
+							num_elems, state->natts)));
+		}
+
+		for (int i = 0; i < num_elems; i++)
+		{
+			if (elem_nulls[i])
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("Operator names array contains NULL")));
+			}
+			state->operatorNames[i] = makeString(TextDatumGetCString(values[i]));
+		}
+	}
+	else
+	{
+		/* If there is no input just use "=" operator for all attributes */
+		for (int i = 0; i < state->natts; i++)
+		{
+			state->operatorNames[i] = makeString("=");
+		}
+	}
+}
+
+/*
+ * Prepare equals ScanKey for index attribute.
+ *
+ * Generated once, and will be reused for all heap tuples.
+ * Argument field will be filled for every heap tuple before
+ * consistent function invocation, so leave it NULL for a while.
+ *
+ * Operator strategy number can vary from opclass to opclass, so we need to lookup it for every attribute.
+ */
+static ScanKey
+prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno)
+{
+	ScanKey		scanKey;
+	Oid			opOid;
+	Oid			opFamilyOid;
+	bool		defined;
+	StrategyNumber strategy;
+	RegProcedure opRegProc;
+	List	   *operNameList;
+	int			attindex = attno - 1;
+	Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex);
+	Oid			type = attr->atttypid;
+	String	   *opname = state->operatorNames[attno - 1];
+
+	opFamilyOid = state->idxrel->rd_opfamily[attindex];
+	operNameList = list_make1(opname);
+	opOid = OperatorLookup(operNameList, type, type, &defined);
+
+	if (opOid == InvalidOid)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_FUNCTION),
+				 errmsg("There is no operator %s for type %u",
+						opname->sval, type)));
+	}
+
+	strategy = get_op_opfamily_strategy(opOid, opFamilyOid);
+
+	if (strategy == 0)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("operator %s is not a member of operator family \"%s\"",
+						opname->sval,
+						get_opfamily_name(opFamilyOid, false))));
+	}
+
+	opRegProc = get_opcode(opOid);
+	scanKey = palloc0(sizeof(ScanKeyData));
+	ScanKeyEntryInitialize(
+						   scanKey,
+						   0,
+						   attno,
+						   strategy,
+						   type,
+						   attr->attcollation,
+						   opRegProc,
+						   (Datum) NULL
+		);
+	pfree(operNameList);
+
+	return scanKey;
+}
+
+static ScanKey
+prepare_isnull_scan_key(AttrNumber attno)
+{
+	ScanKey		scanKey;
+
+	scanKey = palloc0(sizeof(ScanKeyData));
+	ScanKeyEntryInitialize(scanKey,
+						   SK_ISNULL | SK_SEARCHNULL,
+						   attno,
+						   InvalidStrategy,
+						   InvalidOid,
+						   InvalidOid,
+						   InvalidOid,
+						   (Datum) 0);
+	return scanKey;
+}
+
+/*
+ * We walk from the first range (blkno = 0) to the last as the scan proceed.
+ * For every heap tuple we check if we are done with the current range, and we need to move further
+ * to the current heap tuple's range. While moving to the next range we check that it's not empty (because
+ * we have at least one tuple for this range).
+ * Every heap tuple are checked to be consistent with the range it belongs to.
+ * In case of unsummarized ranges and placeholders we skip all checks.
+ *
+ * While moving, we may jump over some ranges,
+ * but it's okay because we would not be able to check them anyway.
+ * We also can't say whether skipped ranges should be marked as empty or not,
+ * since it's possible that there were some tuples before that are now deleted.
+ *
+ */
+static void
+brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate)
+{
+	BrinCheckState *state;
+	BlockNumber heapblk;
+
+	state = (BrinCheckState *) brstate;
+	heapblk = ItemPointerGetBlockNumber(tid);
+
+	/* If we went beyond the current range let's fetch new range */
+	if (heapblk >= state->nextrangeBlk)
+	{
+		BrinTuple  *tup;
+		BrinTuple  *tupcopy = NULL;
+		MemoryContext oldCtx;
+		OffsetNumber off;
+		Size		size;
+		Size		btupsz = 0;
+
+		MemoryContextReset(state->rangeCtx);
+		oldCtx = MemoryContextSwitchTo(state->rangeCtx);
+
+		state->range_cnt++;
+
+		/* Move to the range that contains current heap tuple */
+		tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf,
+									   &off, &size, BUFFER_LOCK_SHARE);
+
+		if (tup)
+		{
+			tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz);
+			LockBuffer(state->buf, BUFFER_LOCK_UNLOCK);
+			state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup);
+
+			/* We can't check placeholder ranges */
+			state->checkable_range = !state->dtup->bt_placeholder;
+		}
+		else
+		{
+			/* We can't check unsummarized ranges. */
+			state->checkable_range = false;
+		}
+
+		/*
+		 * Update nextrangeBlk so we know when we are done with the current
+		 * range
+		 */
+		state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange;
+
+		MemoryContextSwitchTo(oldCtx);
+
+		/* Range must not be empty */
+		if (state->checkable_range && state->dtup->bt_empty_range)
+		{
+			all_consist_ereport(state, tid, "range is marked as empty but contains qualified live tuples");
+		}
+
+	}
+
+	/* Check tuple is consistent with the index */
+	if (state->checkable_range)
+	{
+		check_heap_tuple(state, values, isnull, tid);
+	}
+
+}
+
+/*
+ * We check hasnulls flags for null values and oi_regular_nulls = true,
+ * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not,
+ * For all other cases we call consistentFn with appropriate scanKey:
+ * - for oi_regular_nulls = false and null values we use 'isNull' scanKey,
+ * - for nonnull values we use 'nonnull' scanKey
+ */
+static void
+check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid)
+{
+	int			attindex;
+	BrinMemTuple *dtup = state->dtup;
+	BrinDesc   *bdesc = state->bdesc;
+	MemoryContext oldCtx;
+
+	Assert(state->checkable_range);
+
+	MemoryContextReset(state->heaptupleCtx);
+	oldCtx = MemoryContextSwitchTo(state->heaptupleCtx);
+
+	/* check every index attribute */
+	for (attindex = 0; attindex < state->natts; attindex++)
+	{
+		BrinValues *bval;
+		Datum		consistentFnResult;
+		bool		consistent;
+		ScanKey		scanKey;
+		bool		oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls;
+
+		bval = &dtup->bt_columns[attindex];
+
+		if (nulls[attindex])
+		{
+			/*
+			 * Use hasnulls flag for oi_regular_nulls is true. Otherwise,
+			 * delegate check to consistentFn
+			 */
+			if (oi_regular_nulls)
+			{
+				/* We have null value, so hasnulls or allnulls must be true */
+				if (!(bval->bv_hasnulls || bval->bv_allnulls))
+				{
+					all_consist_ereport(state, tid, "range hasnulls and allnulls are false, but contains a null value");
+				}
+				continue;
+			}
+
+			/*
+			 * In case of null and oi_regular_nulls = false we use isNull
+			 * scanKey for invocation of consistentFn
+			 */
+			scanKey = state->isnull_sk[attindex];
+		}
+		else
+		{
+			/* We have a nonnull value, so allnulls should be false */
+			if (bval->bv_allnulls)
+			{
+				all_consist_ereport(state, tid, "range allnulls is true, but contains nonnull value");
+			}
+
+			/* use "attr = value" scan key for nonnull values */
+			scanKey = state->nonnull_sk[attindex];
+			scanKey->sk_argument = values[attindex];
+		}
+
+		/* If oi_regular_nulls = true we should never get there with null */
+		Assert(!oi_regular_nulls || !nulls[attindex]);
+
+		if (state->consistentFn[attindex].fn_nargs >= 4)
+		{
+			consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex],
+												   state->idxrel->rd_indcollation[attindex],
+												   PointerGetDatum(state->bdesc),
+												   PointerGetDatum(bval),
+												   PointerGetDatum(&scanKey),
+												   Int32GetDatum(1)
+				);
+		}
+		else
+		{
+			consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex],
+												   state->idxrel->rd_indcollation[attindex],
+												   PointerGetDatum(state->bdesc),
+												   PointerGetDatum(bval),
+												   PointerGetDatum(scanKey)
+				);
+		}
+
+		consistent = DatumGetBool(consistentFnResult);
+
+		if (!consistent)
+		{
+			all_consist_ereport(state, tid, "heap tuple inconsistent with index");
+		}
+
+	}
+
+	MemoryContextSwitchTo(oldCtx);
+}
+
+/* Report without any additional info */
+static void
+brin_check_ereport(const char *fmt)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_DATA_CORRUPTED),
+			 errmsg("index is corrupted - %s", fmt)));
+}
+
+/* Report with range blkno, revmap item info, index tuple info */
+void
+index_tuple_ereport(BrinCheckState * state, const char *fmt)
+{
+	Assert(state->rangeBlkno != InvalidBlockNumber);
+	Assert(state->revmapBlk != InvalidBlockNumber);
+	Assert(state->revmapidx >= 0 && state->revmapidx < REVMAP_PAGE_MAXITEMS);
+	Assert(state->regpageBlk != InvalidBlockNumber);
+	Assert(state->regpageoffset != InvalidOffsetNumber);
+
+	ereport(ERROR,
+			(errcode(ERRCODE_DATA_CORRUPTED),
+			 errmsg("index is corrupted - %s. Range blkno: %u, revmap item: (%u,%u), index tuple: (%u,%u)",
+					fmt,
+					state->rangeBlkno,
+					state->revmapBlk,
+					state->revmapidx,
+					state->regpageBlk,
+					state->regpageoffset)));
+}
+
+/* Report with index tuple info */
+void
+index_tuple_only_ereport(BrinCheckState * state, const char *fmt)
+{
+	Assert(state->regpageBlk != InvalidBlockNumber);
+	Assert(state->regpageoffset != InvalidOffsetNumber);
+
+	ereport(ERROR,
+			(errcode(ERRCODE_DATA_CORRUPTED),
+			 errmsg("index is corrupted - %s. Index tuple: (%u,%u)",
+					fmt,
+					state->regpageBlk,
+					state->regpageoffset)));
+}
+
+/* Report with range blkno, revmap item info */
+void
+revmap_item_ereport(BrinCheckState * state, const char *fmt)
+{
+	Assert(state->rangeBlkno != InvalidBlockNumber);
+	Assert(state->revmapBlk != InvalidBlockNumber);
+	Assert(state->revmapidx >= 0 && state->revmapidx < REVMAP_PAGE_MAXITEMS);
+
+	ereport(ERROR,
+			(errcode(ERRCODE_DATA_CORRUPTED),
+			 errmsg("index is corrupted - %s. Range blkno: %u, revmap item: (%u,%u).",
+					fmt,
+					state->rangeBlkno,
+					state->revmapBlk,
+					state->revmapidx)));
+}
+
+/* Report with range blkno, heap tuple info */
+static void
+all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message)
+{
+	Assert(state->rangeBlkno != InvalidBlockNumber);
+
+	ereport(ERROR,
+			(errcode(ERRCODE_DATA_CORRUPTED),
+			 errmsg("index is corrupted - %s. Range blkno: %u, heap tid (%u,%u)",
+					message,
+					state->dtup->bt_blkno,
+					ItemPointerGetBlockNumber(tid),
+					ItemPointerGetOffsetNumber(tid))));
+}
-- 
2.43.0

From 89ca3fb9ed992831de60f0bcc2246e28aa89a5ac Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <arseniy.mukhin....@gmail.com>
Date: Wed, 16 Apr 2025 11:26:45 +0300
Subject: [PATCH 1/2] brin refactoring

---
 src/backend/access/brin/brin_tuple.c   | 2 +-
 src/backend/access/common/reloptions.c | 3 ++-
 src/include/access/brin.h              | 1 +
 src/include/access/brin_tuple.h        | 2 ++
 4 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c
index 861f397e6db..4d1d8d9addd 100644
--- a/src/backend/access/brin/brin_tuple.c
+++ b/src/backend/access/brin/brin_tuple.c
@@ -57,7 +57,7 @@ static inline void brin_deconstruct_tuple(BrinDesc *brdesc,
 /*
  * Return a tuple descriptor used for on-disk storage of BRIN tuples.
  */
-static TupleDesc
+TupleDesc
 brtuple_disk_tupdesc(BrinDesc *brdesc)
 {
 	/* We cache these in the BrinDesc */
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 46c1dce222d..72703ba150d 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -22,6 +22,7 @@
 #include "access/heaptoast.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
+#include "access/brin.h"
 #include "access/reloptions.h"
 #include "access/spgist_private.h"
 #include "catalog/pg_type.h"
@@ -343,7 +344,7 @@ static relopt_int intRelOpts[] =
 			"Number of pages that each page range covers in a BRIN index",
 			RELOPT_KIND_BRIN,
 			AccessExclusiveLock
-		}, 128, 1, 131072
+		}, 128, 1, BRIN_MAX_PAGES_PER_RANGE
 	},
 	{
 		{
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index 821f1e02806..334ce973b67 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -37,6 +37,7 @@ typedef struct BrinStatsData
 
 
 #define BRIN_DEFAULT_PAGES_PER_RANGE	128
+#define BRIN_MAX_PAGES_PER_RANGE	131072
 #define BrinGetPagesPerRange(relation) \
 	(AssertMacro(relation->rd_rel->relkind == RELKIND_INDEX && \
 				 relation->rd_rel->relam == BRIN_AM_OID), \
diff --git a/src/include/access/brin_tuple.h b/src/include/access/brin_tuple.h
index 010ba4ea3c0..9472ca638dd 100644
--- a/src/include/access/brin_tuple.h
+++ b/src/include/access/brin_tuple.h
@@ -109,4 +109,6 @@ extern BrinMemTuple *brin_memtuple_initialize(BrinMemTuple *dtuple,
 extern BrinMemTuple *brin_deform_tuple(BrinDesc *brdesc,
 									   BrinTuple *tuple, BrinMemTuple *dMemtuple);
 
+extern TupleDesc brtuple_disk_tupdesc(BrinDesc *brdesc);
+
 #endif							/* BRIN_TUPLE_H */
-- 
2.43.0

Reply via email to