Hi,

Attached is a, unfortunately long, series of patches implementing what I
described upthread.

0001 Add very basic test for kill_prior_tuples

    We currently don't exercise this patch for gist and hash, which seems
    somewhat criminal.


0002 heapam: Move logic to handle HEAP_MOVED into a helper function

    A prep patch, which consolidates HEAP_MOVED handling into a helper. This
    isn't strictly necessary, but I got bothered by it while writing this patch
    series.


0003 bufmgr: Add BufferLockHeldByMe()

    Useful for writing assertions in later patches.


0004 heapam: Use exclusive lock on old page in CLUSTER

    When I started looking into this I wanted to make sure that it's actually
    safe to skip setting hint bits in all places. One place that's *not* safe
    is cluster - we assume that all hint bits are set in rewrite_heap_tuple().

    This might not strictly be required, but it seemed very fragile as
    is. While using an exclusive lock does seem like it makes it a bit safer,
    it's not a very good fix, as we still dirty the old buffer, which seems
    like a shame when one uses VACUUM FULL.


0005 heapam: Only set tuple's block once per page in pagemode

    Minor, but measurable, optimization.


0006 heapam: Add batch mode mvcc check and use it in page mode

    This is a good bit faster on its own, but is required to avoid a
    performance regression later, when setting hint bits only only when no IO
    going on at the same.


0007 bufmgr: Make it easier to change number of buffer state bits

    This just makes it easier to change the division of bits in
    BufferDesc->state.


0008 bufmgr: Add interface to acquire right to set hint bits

    The main change. Adds BufferPrepareToSetHintBits() which, if true is
    returned, gives the rigth to modify a buffer without an exclusive
    lock. Once the modification is done BufferFinishSetHintBits() needs to be
    called.

    I dithered a bit on the name, we don't really call them hint bits in all
    the places that modify buffers without an exlusive lock.

    The new infra is unused as of this commit.


0009 heapam: Acquire right to set hint bits

    Moves over heapam to the new interface.


0010 Acquire right to set hint bits in the remaining places

    Moves indexams and freespace/ over to the new interface.


0011 bufmgr: Detect some missing BufferPrepareToSetHintBits() calls

    Now that we don't set hint bits without calling
    BufferPrepareToSetHintBits() first, we can detect many missing calls by
    adding an assertion to MarkBufferDirtyHint().


0012 bufmgr: Don't copy pages while writing out

    Finally remove the copy of the buffer while writing it out.


0013 WIP: bufmgr: Detect some bad buffer accesses

    This is a lot more thorough infrastructure for detecting modifications of
    buffers without appropriate locks, by using mprotect() to detect such
    cases. Only works when not using huge pages, and not on windows (not sure
    how mprotect() is spelled there) and not arm macos (due to 16kB pages).

    Has two modes, one just disallows modifications, and is reasonably
    cheap. The more expensive mode disallows any access when the buffer is
    unpinned. The commit message has some performance numbers.

    This has proven to be a good investment as far as this thread goes - I
    wasn't initially aware that freespace modified buffers without a lock and
    this quickly found that.


Greetings,

Andres Freund
>From 13ce1a3c5840e3d76a25fdcbeeda24fe4d2d4017 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 28 Oct 2024 15:48:50 -0400
Subject: [PATCH v1 01/14] Add very basic test for kill_prior_tuples

Previously our tests did not exercise kill_prior_tuples for hash and gist. The
latter only because it does not work if all the dead tuples are on one
page (or something like that).

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 .../isolation/expected/index-killtuples.out   | 338 ++++++++++++++++++
 .../isolation/specs/index-killtuples.spec     | 124 +++++++
 2 files changed, 462 insertions(+)
 create mode 100644 src/test/isolation/expected/index-killtuples.out
 create mode 100644 src/test/isolation/specs/index-killtuples.spec

diff --git a/src/test/isolation/expected/index-killtuples.out b/src/test/isolation/expected/index-killtuples.out
new file mode 100644
index 00000000000..1361b60c267
--- /dev/null
+++ b/src/test/isolation/expected/index-killtuples.out
@@ -0,0 +1,338 @@
+unused step name: delete_1
+Parsed test spec with 1 sessions
+
+starting permutation: create_table fill_500 create_btree flush disable_seq disable_bitmap measure access flush result measure access flush result delete flush measure access flush result measure access flush result drop_table
+step create_table: CREATE TEMPORARY TABLE kill_prior_tuple(key int not null, cat text not null);
+step fill_500: INSERT INTO kill_prior_tuple(key, cat) SELECT g.i, 'a' FROM generate_series(1, 500) g(i);
+step create_btree: CREATE INDEX kill_prior_tuple_btree ON kill_prior_tuple USING btree (key);
+step flush: SELECT FROM pg_stat_force_next_flush();
+step disable_seq: SET enable_seqscan = false;
+step disable_bitmap: SET enable_bitmapscan = false;
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                         
+-----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_btree on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                            
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                         
+-----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_btree on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                            
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step delete: DELETE FROM kill_prior_tuple;
+step flush: SELECT FROM pg_stat_force_next_flush();
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                         
+-----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_btree on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                            
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                         
+-----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_btree on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                            
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                0
+(1 row)
+
+step drop_table: DROP TABLE IF EXISTS kill_prior_tuple;
+
+starting permutation: create_table fill_500 create_ext_btree_gist create_gist flush disable_seq disable_bitmap measure access flush result measure access flush result delete flush measure access flush result measure access flush result drop_table drop_ext_btree_gist
+step create_table: CREATE TEMPORARY TABLE kill_prior_tuple(key int not null, cat text not null);
+step fill_500: INSERT INTO kill_prior_tuple(key, cat) SELECT g.i, 'a' FROM generate_series(1, 500) g(i);
+step create_ext_btree_gist: CREATE EXTENSION btree_gist;
+step create_gist: CREATE INDEX kill_prior_tuple_gist ON kill_prior_tuple USING gist (key);
+step flush: SELECT FROM pg_stat_force_next_flush();
+step disable_seq: SET enable_seqscan = false;
+step disable_bitmap: SET enable_bitmapscan = false;
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step delete: DELETE FROM kill_prior_tuple;
+step flush: SELECT FROM pg_stat_force_next_flush();
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                0
+(1 row)
+
+step drop_table: DROP TABLE IF EXISTS kill_prior_tuple;
+step drop_ext_btree_gist: DROP EXTENSION btree_gist;
+
+starting permutation: create_table fill_10 create_ext_btree_gist create_gist flush disable_seq disable_bitmap measure access flush result measure access flush result delete flush measure access flush result measure access flush result drop_table drop_ext_btree_gist
+step create_table: CREATE TEMPORARY TABLE kill_prior_tuple(key int not null, cat text not null);
+step fill_10: INSERT INTO kill_prior_tuple(key, cat) SELECT g.i, 'a' FROM generate_series(1, 10) g(i);
+step create_ext_btree_gist: CREATE EXTENSION btree_gist;
+step create_gist: CREATE INDEX kill_prior_tuple_gist ON kill_prior_tuple USING gist (key);
+step flush: SELECT FROM pg_stat_force_next_flush();
+step disable_seq: SET enable_seqscan = false;
+step disable_bitmap: SET enable_bitmapscan = false;
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step delete: DELETE FROM kill_prior_tuple;
+step flush: SELECT FROM pg_stat_force_next_flush();
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_gist on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step drop_table: DROP TABLE IF EXISTS kill_prior_tuple;
+step drop_ext_btree_gist: DROP EXTENSION btree_gist;
+
+starting permutation: create_table fill_500 create_hash flush disable_seq disable_bitmap measure access flush result measure access flush result delete flush measure access flush result measure access flush result drop_table
+step create_table: CREATE TEMPORARY TABLE kill_prior_tuple(key int not null, cat text not null);
+step fill_500: INSERT INTO kill_prior_tuple(key, cat) SELECT g.i, 'a' FROM generate_series(1, 500) g(i);
+step create_hash: CREATE INDEX kill_prior_tuple_hash ON kill_prior_tuple USING hash (key);
+step flush: SELECT FROM pg_stat_force_next_flush();
+step disable_seq: SET enable_seqscan = false;
+step disable_bitmap: SET enable_bitmapscan = false;
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_hash on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_hash on kill_prior_tuple (actual rows=1 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step delete: DELETE FROM kill_prior_tuple;
+step flush: SELECT FROM pg_stat_force_next_flush();
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_hash on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                                        
+----------------------------------------------------------------------------------
+Index Scan using kill_prior_tuple_hash on kill_prior_tuple (actual rows=0 loops=1)
+  Index Cond: (key = 1)                                                           
+(2 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                0
+(1 row)
+
+step drop_table: DROP TABLE IF EXISTS kill_prior_tuple;
+
+starting permutation: create_table fill_500 create_ext_btree_gin create_gin flush disable_seq delete flush measure access flush result measure access flush result drop_table drop_ext_btree_gin
+step create_table: CREATE TEMPORARY TABLE kill_prior_tuple(key int not null, cat text not null);
+step fill_500: INSERT INTO kill_prior_tuple(key, cat) SELECT g.i, 'a' FROM generate_series(1, 500) g(i);
+step create_ext_btree_gin: CREATE EXTENSION btree_gin;
+step create_gin: CREATE INDEX kill_prior_tuple_gin ON kill_prior_tuple USING gin (key);
+step flush: SELECT FROM pg_stat_force_next_flush();
+step disable_seq: SET enable_seqscan = false;
+step delete: DELETE FROM kill_prior_tuple;
+step flush: SELECT FROM pg_stat_force_next_flush();
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                             
+-----------------------------------------------------------------------
+Bitmap Heap Scan on kill_prior_tuple (actual rows=0 loops=1)           
+  Recheck Cond: (key = 1)                                              
+  Heap Blocks: exact=1                                                 
+  ->  Bitmap Index Scan on kill_prior_tuple_gin (actual rows=1 loops=1)
+        Index Cond: (key = 1)                                          
+(5 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step measure: UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple');
+step access: EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1;
+QUERY PLAN                                                             
+-----------------------------------------------------------------------
+Bitmap Heap Scan on kill_prior_tuple (actual rows=0 loops=1)           
+  Recheck Cond: (key = 1)                                              
+  Heap Blocks: exact=1                                                 
+  ->  Bitmap Index Scan on kill_prior_tuple_gin (actual rows=1 loops=1)
+        Index Cond: (key = 1)                                          
+(5 rows)
+
+step flush: SELECT FROM pg_stat_force_next_flush();
+step result: SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple';
+new_heap_accesses
+-----------------
+                1
+(1 row)
+
+step drop_table: DROP TABLE IF EXISTS kill_prior_tuple;
+step drop_ext_btree_gin: DROP EXTENSION btree_gin;
diff --git a/src/test/isolation/specs/index-killtuples.spec b/src/test/isolation/specs/index-killtuples.spec
new file mode 100644
index 00000000000..d597be02c23
--- /dev/null
+++ b/src/test/isolation/specs/index-killtuples.spec
@@ -0,0 +1,124 @@
+# Basic testing of killtuples / kill_prior_tuples / all_dead testing
+# for various index AMs
+#
+# This tests just enough to ensure that the kill* routines are actually
+# executed and does something approximately reasonble. It's *not* sufficient
+# testing for adding killitems support to a new AM!
+
+setup
+{
+    CREATE TABLE counter(heap_accesses int);
+    INSERT INTO counter(heap_accesses) VALUES (0);
+}
+
+teardown
+{
+    DROP TABLE counter;
+}
+
+session s1
+# to ensure GUCs are reset
+setup { RESET ALL; }
+
+step disable_seq { SET enable_seqscan = false; }
+
+step disable_bitmap { SET enable_bitmapscan = false; }
+
+# use a temporary table to make sure no other session can interfere with
+# visibility determinations
+step create_table { CREATE TEMPORARY TABLE kill_prior_tuple(key int not null, cat text not null); }
+
+step fill_10 { INSERT INTO kill_prior_tuple(key, cat) SELECT g.i, 'a' FROM generate_series(1, 10) g(i); }
+
+step fill_500 { INSERT INTO kill_prior_tuple(key, cat) SELECT g.i, 'a' FROM generate_series(1, 500) g(i); }
+
+# column-less select to make output easier to read
+step flush { SELECT FROM pg_stat_force_next_flush(); }
+
+step measure { UPDATE counter SET heap_accesses = (SELECT heap_blks_read + heap_blks_hit FROM pg_statio_all_tables WHERE relname = 'kill_prior_tuple'); }
+
+step result { SELECT heap_blks_read + heap_blks_hit - counter.heap_accesses AS new_heap_accesses FROM counter, pg_statio_all_tables WHERE relname = 'kill_prior_tuple'; }
+
+step access { EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF) SELECT * FROM kill_prior_tuple WHERE key = 1; }
+
+step delete { DELETE FROM kill_prior_tuple; }
+step delete_1 { DELETE FROM kill_prior_tuple WHERE cat = 1; }
+
+step drop_table { DROP TABLE IF EXISTS kill_prior_tuple; }
+
+### steps for testing btree indexes ###
+step create_btree { CREATE INDEX kill_prior_tuple_btree ON kill_prior_tuple USING btree (key); }
+
+### steps for testing gist indexes ###
+# Creating the extensions takes time, so we don't want to do so when testing
+# other AMs
+step create_ext_btree_gist { CREATE EXTENSION btree_gist; }
+step drop_ext_btree_gist { DROP EXTENSION btree_gist; }
+step create_gist { CREATE INDEX kill_prior_tuple_gist ON kill_prior_tuple USING gist (key); }
+
+### steps for testing gin indexes ###
+# See create_ext_btree_gist
+step create_ext_btree_gin { CREATE EXTENSION btree_gin; }
+step drop_ext_btree_gin { DROP EXTENSION btree_gin; }
+step create_gin { CREATE INDEX kill_prior_tuple_gin ON kill_prior_tuple USING gin (key); }
+
+### steps for testing hash indexes ###
+step create_hash { CREATE INDEX kill_prior_tuple_hash ON kill_prior_tuple USING hash (key); }
+
+
+# test killtuples with btree inex
+permutation
+  create_table fill_500 create_btree flush
+  disable_seq disable_bitmap
+  # show each access to non-deleted tuple increments heap_blks_*
+  measure access flush result
+  measure access flush result
+  delete flush
+  # first access after accessing deleted tuple still needs to access heap
+  measure access flush result
+  # but after kill_prior_tuple did its thing, we shouldn't access heap anymore
+  measure access flush result
+  drop_table
+
+# Same as first permutation, except testing gist
+permutation
+  create_table fill_500 create_ext_btree_gist create_gist flush
+  disable_seq disable_bitmap
+  measure access flush result
+  measure access flush result
+  delete flush
+  measure access flush result
+  measure access flush result
+  drop_table drop_ext_btree_gist
+
+# Test gist, but with fewer rows - shows that killitems doesn't work anymore!
+permutation
+  create_table fill_10 create_ext_btree_gist create_gist flush
+  disable_seq disable_bitmap
+  measure access flush result
+  measure access flush result
+  delete flush
+  measure access flush result
+  measure access flush result
+  drop_table drop_ext_btree_gist
+
+# Same as first permutation, except testing hash
+permutation
+  create_table fill_500 create_hash flush
+  disable_seq disable_bitmap
+  measure access flush result
+  measure access flush result
+  delete flush
+  measure access flush result
+  measure access flush result
+  drop_table
+
+# # Similar to first permutation, except that gin does not have killtuples support
+permutation
+  create_table fill_500 create_ext_btree_gin create_gin flush
+  disable_seq
+  delete flush
+  measure access flush result
+  # will still fetch from heap
+  measure access flush result
+  drop_table drop_ext_btree_gin
-- 
2.46.0.519.g2e7b89e038

>From cddf2ae3d5d4300a99e8bff6a98ac8fee82fe169 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 23 Sep 2024 12:23:33 -0400
Subject: [PATCH v1 02/14] heapam: Move logic to handle HEAP_MOVED into a
 helper function

Before we dealt with this in 6 near identical and one very similar copy.

The helper function errors out when encountering a
HEAP_MOVED_IN/HEAP_MOVED_OUT tuple with xvac considered current or
in-progress. It'd be preferrable to do that change separately, but otherwise
it'd not be possible to deduplicate the handling in
HeapTupleSatisfiesVacuum().

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/heap/heapam_visibility.c | 307 ++++----------------
 1 file changed, 61 insertions(+), 246 deletions(-)

diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 9243feed01f..057b88767db 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -144,6 +144,55 @@ HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
 	SetHintBits(tuple, buffer, infomask, xid);
 }
 
+/*
+ * If HEAP_MOVED_OFF or HEAP_MOVED_IN are set on the tuple, remove them and
+ * adjust hint bits. See the comment for SetHintBits() for more background.
+ *
+ * This helper returns false if the row ought to be invisible, true otherwise.
+ */
+static inline bool
+HeapTupleCleanMoved(HeapTupleHeader tuple, Buffer buffer)
+{
+	TransactionId xvac;
+
+	/* only used by pre-9.0 binary upgrades */
+	if (likely(!(tuple->t_infomask & (HEAP_MOVED_OFF | HEAP_MOVED_IN))))
+		return true;
+
+	xvac = HeapTupleHeaderGetXvac(tuple);
+
+	if (TransactionIdIsCurrentTransactionId(xvac))
+		elog(ERROR, "encountered tuple with HEAP_MOVED considered current");
+
+	if (TransactionIdIsInProgress(xvac))
+		elog(ERROR, "encountered tuple with HEAP_MOVED considered in-progress");
+
+	if (tuple->t_infomask & HEAP_MOVED_OFF)
+	{
+		if (TransactionIdDidCommit(xvac))
+		{
+			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
+						InvalidTransactionId);
+			return false;
+		}
+		SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
+					InvalidTransactionId);
+	}
+	else if (tuple->t_infomask & HEAP_MOVED_IN)
+	{
+		if (TransactionIdDidCommit(xvac))
+			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
+						InvalidTransactionId);
+		else
+		{
+			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
+						InvalidTransactionId);
+			return false;
+		}
+	}
+
+	return true;
+}
 
 /*
  * HeapTupleSatisfiesSelf
@@ -179,45 +228,8 @@ HeapTupleSatisfiesSelf(HeapTuple htup, Snapshot snapshot, Buffer buffer)
 		if (HeapTupleHeaderXminInvalid(tuple))
 			return false;
 
-		/* Used by pre-9.0 binary upgrades */
-		if (tuple->t_infomask & HEAP_MOVED_OFF)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (TransactionIdIsCurrentTransactionId(xvac))
-				return false;
-			if (!TransactionIdIsInProgress(xvac))
-			{
-				if (TransactionIdDidCommit(xvac))
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-				SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-							InvalidTransactionId);
-			}
-		}
-		/* Used by pre-9.0 binary upgrades */
-		else if (tuple->t_infomask & HEAP_MOVED_IN)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (!TransactionIdIsCurrentTransactionId(xvac))
-			{
-				if (TransactionIdIsInProgress(xvac))
-					return false;
-				if (TransactionIdDidCommit(xvac))
-					SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-								InvalidTransactionId);
-				else
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-			}
-		}
+		if (!HeapTupleCleanMoved(tuple, buffer))
+			return false;
 		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))
 		{
 			if (tuple->t_infomask & HEAP_XMAX_INVALID)	/* xid invalid */
@@ -372,45 +384,8 @@ HeapTupleSatisfiesToast(HeapTuple htup, Snapshot snapshot,
 		if (HeapTupleHeaderXminInvalid(tuple))
 			return false;
 
-		/* Used by pre-9.0 binary upgrades */
-		if (tuple->t_infomask & HEAP_MOVED_OFF)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (TransactionIdIsCurrentTransactionId(xvac))
-				return false;
-			if (!TransactionIdIsInProgress(xvac))
-			{
-				if (TransactionIdDidCommit(xvac))
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-				SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-							InvalidTransactionId);
-			}
-		}
-		/* Used by pre-9.0 binary upgrades */
-		else if (tuple->t_infomask & HEAP_MOVED_IN)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (!TransactionIdIsCurrentTransactionId(xvac))
-			{
-				if (TransactionIdIsInProgress(xvac))
-					return false;
-				if (TransactionIdDidCommit(xvac))
-					SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-								InvalidTransactionId);
-				else
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-			}
-		}
+		if (!HeapTupleCleanMoved(tuple, buffer))
+			return false;
 
 		/*
 		 * An invalid Xmin can be left behind by a speculative insertion that
@@ -468,45 +443,8 @@ HeapTupleSatisfiesUpdate(HeapTuple htup, CommandId curcid,
 		if (HeapTupleHeaderXminInvalid(tuple))
 			return TM_Invisible;
 
-		/* Used by pre-9.0 binary upgrades */
-		if (tuple->t_infomask & HEAP_MOVED_OFF)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (TransactionIdIsCurrentTransactionId(xvac))
-				return TM_Invisible;
-			if (!TransactionIdIsInProgress(xvac))
-			{
-				if (TransactionIdDidCommit(xvac))
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return TM_Invisible;
-				}
-				SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-							InvalidTransactionId);
-			}
-		}
-		/* Used by pre-9.0 binary upgrades */
-		else if (tuple->t_infomask & HEAP_MOVED_IN)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (!TransactionIdIsCurrentTransactionId(xvac))
-			{
-				if (TransactionIdIsInProgress(xvac))
-					return TM_Invisible;
-				if (TransactionIdDidCommit(xvac))
-					SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-								InvalidTransactionId);
-				else
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return TM_Invisible;
-				}
-			}
-		}
+		else if (!HeapTupleCleanMoved(tuple, buffer))
+			return false;
 		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))
 		{
 			if (HeapTupleHeaderGetCmin(tuple) >= curcid)
@@ -756,45 +694,8 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 		if (HeapTupleHeaderXminInvalid(tuple))
 			return false;
 
-		/* Used by pre-9.0 binary upgrades */
-		if (tuple->t_infomask & HEAP_MOVED_OFF)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (TransactionIdIsCurrentTransactionId(xvac))
-				return false;
-			if (!TransactionIdIsInProgress(xvac))
-			{
-				if (TransactionIdDidCommit(xvac))
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-				SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-							InvalidTransactionId);
-			}
-		}
-		/* Used by pre-9.0 binary upgrades */
-		else if (tuple->t_infomask & HEAP_MOVED_IN)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (!TransactionIdIsCurrentTransactionId(xvac))
-			{
-				if (TransactionIdIsInProgress(xvac))
-					return false;
-				if (TransactionIdDidCommit(xvac))
-					SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-								InvalidTransactionId);
-				else
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-			}
-		}
+		if (!HeapTupleCleanMoved(tuple, buffer))
+			return false;
 		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))
 		{
 			if (tuple->t_infomask & HEAP_XMAX_INVALID)	/* xid invalid */
@@ -970,45 +871,8 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		if (HeapTupleHeaderXminInvalid(tuple))
 			return false;
 
-		/* Used by pre-9.0 binary upgrades */
-		if (tuple->t_infomask & HEAP_MOVED_OFF)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (TransactionIdIsCurrentTransactionId(xvac))
-				return false;
-			if (!XidInMVCCSnapshot(xvac, snapshot))
-			{
-				if (TransactionIdDidCommit(xvac))
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-				SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-							InvalidTransactionId);
-			}
-		}
-		/* Used by pre-9.0 binary upgrades */
-		else if (tuple->t_infomask & HEAP_MOVED_IN)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (!TransactionIdIsCurrentTransactionId(xvac))
-			{
-				if (XidInMVCCSnapshot(xvac, snapshot))
-					return false;
-				if (TransactionIdDidCommit(xvac))
-					SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-								InvalidTransactionId);
-				else
-				{
-					SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-								InvalidTransactionId);
-					return false;
-				}
-			}
-		}
+		if (!HeapTupleCleanMoved(tuple, buffer))
+			return false;
 		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))
 		{
 			if (HeapTupleHeaderGetCmin(tuple) >= snapshot->curcid)
@@ -1213,57 +1077,8 @@ HeapTupleSatisfiesVacuumHorizon(HeapTuple htup, Buffer buffer, TransactionId *de
 	{
 		if (HeapTupleHeaderXminInvalid(tuple))
 			return HEAPTUPLE_DEAD;
-		/* Used by pre-9.0 binary upgrades */
-		else if (tuple->t_infomask & HEAP_MOVED_OFF)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (TransactionIdIsCurrentTransactionId(xvac))
-				return HEAPTUPLE_DELETE_IN_PROGRESS;
-			if (TransactionIdIsInProgress(xvac))
-				return HEAPTUPLE_DELETE_IN_PROGRESS;
-			if (TransactionIdDidCommit(xvac))
-			{
-				SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-							InvalidTransactionId);
-				return HEAPTUPLE_DEAD;
-			}
-			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-						InvalidTransactionId);
-		}
-		/* Used by pre-9.0 binary upgrades */
-		else if (tuple->t_infomask & HEAP_MOVED_IN)
-		{
-			TransactionId xvac = HeapTupleHeaderGetXvac(tuple);
-
-			if (TransactionIdIsCurrentTransactionId(xvac))
-				return HEAPTUPLE_INSERT_IN_PROGRESS;
-			if (TransactionIdIsInProgress(xvac))
-				return HEAPTUPLE_INSERT_IN_PROGRESS;
-			if (TransactionIdDidCommit(xvac))
-				SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-							InvalidTransactionId);
-			else
-			{
-				SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-							InvalidTransactionId);
-				return HEAPTUPLE_DEAD;
-			}
-		}
-		else if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tuple)))
-		{
-			if (tuple->t_infomask & HEAP_XMAX_INVALID)	/* xid invalid */
-				return HEAPTUPLE_INSERT_IN_PROGRESS;
-			/* only locked? run infomask-only check first, for performance */
-			if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask) ||
-				HeapTupleHeaderIsOnlyLocked(tuple))
-				return HEAPTUPLE_INSERT_IN_PROGRESS;
-			/* inserted and then deleted by same xact */
-			if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple)))
-				return HEAPTUPLE_DELETE_IN_PROGRESS;
-			/* deleting subtransaction must have aborted */
-			return HEAPTUPLE_INSERT_IN_PROGRESS;
-		}
+		else if (!HeapTupleCleanMoved(tuple, buffer))
+			return HEAPTUPLE_DEAD;
 		else if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmin(tuple)))
 		{
 			/*
-- 
2.46.0.519.g2e7b89e038

>From aa054c44b41669c44f238f127c20bd48dff3af11 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 23 Sep 2024 12:51:37 -0400
Subject: [PATCH v1 03/14] bufmgr: Add BufferLockHeldByMe()

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufmgr.h        |  1 +
 src/backend/storage/buffer/bufmgr.c | 41 +++++++++++++++++++++++++++++
 2 files changed, 42 insertions(+)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index eb0fba4230b..ded46a57889 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -291,6 +291,7 @@ extern void LockBufferForCleanup(Buffer buffer);
 extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
+extern bool BufferLockHeldByMe(Buffer buffer, int mode);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 0f02bf62fa3..88d18e85d64 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5492,6 +5492,47 @@ IsBufferCleanupOK(Buffer buffer)
 	return false;
 }
 
+/*
+ * BufferLockHeldByMe - does the backend have the buffer locked?
+ *
+ * This likely should only be used for asserts etc.
+ *
+ * Note that this can only be called for non-temp buffers - there is no
+ * correct value to return for temporary buffers. One might think that just
+ * returning true for temp buffers would work, but the caller might assert
+ * that a lock is *not* held.
+ */
+bool
+BufferLockHeldByMe(Buffer buffer, int mode)
+{
+	BufferDesc *buf;
+	LWLockMode	lwmode;
+
+	/*
+	 * Can't hold a lock without a pin, there never should be uncertainty
+	 * about having a pin.
+	 */
+	Assert(BufferIsPinned(buffer));
+
+	/* there'd be no correct value to return */
+	Assert(!BufferIsLocal(buffer));
+
+	buf = GetBufferDescriptor(buffer - 1);
+
+	if (mode == BUFFER_LOCK_EXCLUSIVE)
+		lwmode = LW_EXCLUSIVE;
+	else if (mode == BUFFER_LOCK_SHARE)
+		lwmode = LW_SHARED;
+	else
+	{
+		Assert(false);
+		pg_unreachable();
+		lwmode = LW_EXCLUSIVE;  /* assuage compiler */
+	}
+
+	return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(buf), lwmode);
+}
+
 
 /*
  *	Functions for buffer I/O handling
-- 
2.46.0.519.g2e7b89e038

>From 08c1b45cfd7c223b146d228e997d16624274d0b4 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 17 Oct 2024 11:45:36 -0400
Subject: [PATCH v1 04/14] heapam: Use exclusive lock on old page in CLUSTER

To be able to guarantee that we can set the hint bit, acquire an exclusive
lock on the old buffer. We need the hint bits to be set as otherwise
reform_and_rewrite_tuple() -> rewrite_heap_tuple() -> heap_freeze_tuple() will
get confused.

It'd be better if we somehow coulda void setting hint bits on the old
page. One reason to use VACUUM FULL are very bloated tables - rewriting most
of the old table before during VACUUM FULL doesn't.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/heap/heapam_handler.c    | 13 ++++++++++++-
 src/backend/access/heap/heapam_visibility.c |  7 +++++++
 src/backend/storage/buffer/bufmgr.c         |  2 +-
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index a8d95e0f1c1..009601445aa 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -833,7 +833,18 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 		tuple = ExecFetchSlotHeapTuple(slot, false, NULL);
 		buf = hslot->buffer;
 
-		LockBuffer(buf, BUFFER_LOCK_SHARE);
+		/*
+		 * To be able to guarantee that we can set the hint bit, acquire an
+		 * exclusive lock on the old buffer. We need the hint bits to be set
+		 * as otherwise reform_and_rewrite_tuple() -> rewrite_heap_tuple() ->
+		 * heap_freeze_tuple() will get confused.
+		 *
+		 * It'd be better if we somehow could avoid setting hint bits on the
+		 * old page. One reason to use VACUUM FULL are very bloated tables -
+		 * rewriting most of the old table before during VACUUM FULL doesn't
+		 * exactly help...
+		 */
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
 		switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))
 		{
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 057b88767db..b7aa8bb7a52 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -141,6 +141,13 @@ void
 HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
 					 uint16 infomask, TransactionId xid)
 {
+	/*
+	 * The uses from heapam.c rely on being able to perform the hint bit
+	 * updates, which can only be guaranteed if we are holding an exclusive
+	 * lock on the buffer - which all callers are doing.
+	 */
+	Assert(BufferIsLocal(buffer) || BufferLockHeldByMe(buffer, BUFFER_LOCK_EXCLUSIVE));
+
 	SetHintBits(tuple, buffer, infomask, xid);
 }
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 88d18e85d64..f56f7f9f441 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5527,7 +5527,7 @@ BufferLockHeldByMe(Buffer buffer, int mode)
 	{
 		Assert(false);
 		pg_unreachable();
-		lwmode = LW_EXCLUSIVE;  /* assuage compiler */
+		lwmode = LW_EXCLUSIVE;	/* assuage compiler */
 	}
 
 	return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(buf), lwmode);
-- 
2.46.0.519.g2e7b89e038

>From 9b56778c495de3ed7b179083c926598ab5fbaf57 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 17 Oct 2024 11:50:13 -0400
Subject: [PATCH v1 05/14] heapam: Only set tuple's block once per page in
 pagemode

Due to splitting the block id into two 16 bit integers, BlockIdSet() is more
expensive than one might think. Doing it once per returned shows up as small
but relibly reproducible cost.  It's simple enough to just set the block
number so once per block in pagemode, so do so.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/heap/heapam.c | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1748eafa100..d0cb4b1e29b 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -981,6 +981,9 @@ heapgettup_pagemode(HeapScanDesc scan,
 		linesleft = scan->rs_ntuples;
 		lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
 
+		/* set block once per page, instead of doing so for every tuple */
+		BlockIdSet(&tuple->t_self.ip_blkid, scan->rs_cblock);
+
 		/* lineindex now references the next or previous visible tid */
 continue_page:
 
@@ -995,7 +998,7 @@ continue_page:
 
 			tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
 			tuple->t_len = ItemIdGetLength(lpp);
-			ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff);
+			tuple->t_self.ip_posid = lineoff;
 
 			/* skip any tuples that don't match the scan key */
 			if (key != NULL &&
-- 
2.46.0.519.g2e7b89e038

>From ed74f0f2c6dfc4e4afe31ac162200c0783bf2240 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 17 Oct 2024 13:16:36 -0400
Subject: [PATCH v1 06/14] heapam: Add batch mode mvcc check and use it in page
 mode

There are two reasons for doing so:

1) It is generally faster to perform checks in a batched fashion and making
   sequential scans faster is nice.

2) We would like to stop setting hint bits while pages are being written
   out. The necessary locking becomes visible for page mode scans if done for
   every tuple. With batching the overhead can be amortized to only happen
   once per page.

There are substantial further optimization opportunities along these
lines:

- Right now HeapTupleSatisfiesMVCCBatch() simply uses the single-tuple
  HeapTupleSatisfiesMVCC(), relying on the compiler to inline it. We could
  instead write an explicitly optimized version that avoids repeated xid
  tests.

- Introduce batched version of the serializability test

- Introduce batched version of HeapTupleSatisfiesVacuum

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/access/heapam.h                 |  28 +++++
 src/backend/access/heap/heapam.c            | 110 ++++++++++++++++----
 src/backend/access/heap/heapam_visibility.c |  50 +++++++++
 src/tools/pgindent/typedefs.list            |   1 +
 4 files changed, 166 insertions(+), 23 deletions(-)

diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 96cf82f97b7..d4a790251cc 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -417,6 +417,34 @@ extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple);
 extern bool HeapTupleIsSurelyDead(HeapTuple htup,
 								  struct GlobalVisState *vistest);
 
+/*
+ * FIXME: define to be removed
+ *
+ * Without this I see worse performance. But it's a bit ugly, so I thought
+ * it'd be useful to leave a way in for others to experiment with this.
+ */
+#define BATCHMVCC_FEWER_ARGS
+
+#ifdef BATCHMVCC_FEWER_ARGS
+typedef struct BatchMVCCState
+{
+	HeapTupleData tuples[MaxHeapTuplesPerPage];
+	bool		visible[MaxHeapTuplesPerPage];
+} BatchMVCCState;
+#endif
+
+extern int	HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
+										int ntups,
+#ifdef BATCHMVCC_FEWER_ARGS
+										BatchMVCCState *batchmvcc,
+#else
+										HeapTupleData *tuples,
+										bool *visible,
+#endif
+										OffsetNumber *vistuples_dense);
+
+
+
 /*
  * To avoid leaking too much knowledge about reorderbuffer implementation
  * details this is implemented in reorderbuffer.c not heapam_visibility.c
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index d0cb4b1e29b..99656a1aac5 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -437,42 +437,106 @@ page_collect_tuples(HeapScanDesc scan, Snapshot snapshot,
 					BlockNumber block, int lines,
 					bool all_visible, bool check_serializable)
 {
+	Oid			relid = RelationGetRelid(scan->rs_base.rs_rd);
+#ifdef BATCHMVCC_FEWER_ARGS
+	BatchMVCCState batchmvcc;
+#else
+	HeapTupleData tuples[MaxHeapTuplesPerPage];
+	bool		visible[MaxHeapTuplesPerPage];
+#endif
 	int			ntup = 0;
-	OffsetNumber lineoff;
+	int			nvis = 0;
 
-	for (lineoff = FirstOffsetNumber; lineoff <= lines; lineoff++)
+	/* page at a time should have been disabled otherwise */
+	Assert(IsMVCCSnapshot(snapshot));
+
+	/* first find all tuples on the page */
+	for (OffsetNumber lineoff = FirstOffsetNumber; lineoff <= lines; lineoff++)
 	{
 		ItemId		lpp = PageGetItemId(page, lineoff);
-		HeapTupleData loctup;
-		bool		valid;
+		HeapTuple	tup;
 
-		if (!ItemIdIsNormal(lpp))
+		if (unlikely(!ItemIdIsNormal(lpp)))
 			continue;
 
-		loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp);
-		loctup.t_len = ItemIdGetLength(lpp);
-		loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
-		ItemPointerSet(&(loctup.t_self), block, lineoff);
-
-		if (all_visible)
-			valid = true;
-		else
-			valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
-
-		if (check_serializable)
-			HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
-												&loctup, buffer, snapshot);
-
-		if (valid)
+		/*
+		 * If the page is not all-visible or we need to check serializability,
+		 * maintain enough state to be able to refind the tuple efficiently,
+		 * without again needing to extract it from the page.
+		 */
+		if (!all_visible || check_serializable)
 		{
-			scan->rs_vistuples[ntup] = lineoff;
-			ntup++;
+#ifdef BATCHMVCC_FEWER_ARGS
+			tup = &batchmvcc.tuples[ntup];
+#else
+			tup = &tuples[ntup];
+#endif
+
+			tup->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+			tup->t_len = ItemIdGetLength(lpp);
+			tup->t_tableOid = relid;
+			ItemPointerSet(&(tup->t_self), block, lineoff);
 		}
+
+		/*
+		 * If the page is all visible, these fields won'otherwise wont be
+		 * populated in loop below.
+		 */
+		if (all_visible)
+		{
+			if (check_serializable)
+			{
+#ifdef BATCHMVCC_FEWER_ARGS
+				batchmvcc.visible[ntup] = true;
+#else
+				visible[ntup] = true;
+#endif
+			}
+			scan->rs_vistuples[ntup] = lineoff;
+		}
+
+		ntup++;
 	}
 
 	Assert(ntup <= MaxHeapTuplesPerPage);
 
-	return ntup;
+	/* unless the page is all visible, test visibility for all tuples one go */
+	if (all_visible)
+		nvis = ntup;
+	else
+		nvis = HeapTupleSatisfiesMVCCBatch(snapshot, buffer,
+										   ntup,
+#ifdef BATCHMVCC_FEWER_ARGS
+										   &batchmvcc,
+#else
+										   tuples, visible,
+#endif
+										   scan->rs_vistuples
+			);
+
+	/*
+	 * So far we don't have batch API for testing serializabilty, so do so
+	 * one-by-one.
+	 */
+	if (check_serializable)
+	{
+		for (int i = 0; i < ntup; i++)
+		{
+#ifdef BATCHMVCC_FEWER_ARGS
+			HeapCheckForSerializableConflictOut(batchmvcc.visible[i],
+												scan->rs_base.rs_rd,
+												&batchmvcc.tuples[i],
+												buffer, snapshot);
+#else
+			HeapCheckForSerializableConflictOut(visible[i],
+												scan->rs_base.rs_rd,
+												&tuples[i],
+												buffer, snapshot);
+#endif
+		}
+	}
+
+	return nvis;
 }
 
 /*
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index b7aa8bb7a52..4baafc01a2c 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -1575,6 +1575,56 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
 		return true;
 }
 
+/*
+ * Perform HeaptupleSatisfiesMVCC() on each passed in tuple. This is more
+ * efficient than doing HeapTupleSatisfiesMVCC() one-by-one.
+ *
+ * To be checked tuples are passed via BatchMVCCState->tuples. Each tuple's
+ * visibility is set in batchmvcc->visible[]. In addition, ->vistuples_dense
+ * is set to contain the offsets of visible tuples.
+ *
+ * Returns the number of visible tuples.
+ */
+int
+HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
+							int ntups,
+#ifdef BATCHMVCC_FEWER_ARGS
+							BatchMVCCState *batchmvcc,
+#else
+							HeapTupleData *tuples,
+							bool *visible,
+#endif
+							OffsetNumber *vistuples_dense)
+{
+	int			nvis = 0;
+
+	Assert(IsMVCCSnapshot(snapshot));
+
+	for (int i = 0; i < ntups; i++)
+	{
+		bool		valid;
+#ifdef BATCHMVCC_FEWER_ARGS
+		HeapTuple	tup = &batchmvcc->tuples[i];
+#else
+		HeapTuple	tup = &tuples[i];
+#endif
+
+		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer);
+#ifdef BATCHMVCC_FEWER_ARGS
+		batchmvcc->visible[i] = valid;
+#else
+		visible[i] = valid;
+#endif
+		if (likely(valid))
+		{
+			vistuples_dense[nvis] = tup->t_self.ip_posid;
+			nvis++;
+		}
+	}
+
+	return nvis;
+}
+
 /*
  * HeapTupleSatisfiesVisibility
  *		True iff heap tuple satisfies a time qual.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 171a7dd5d2b..15c7673465b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -244,6 +244,7 @@ Barrier
 BaseBackupCmd
 BaseBackupTargetHandle
 BaseBackupTargetType
+BatchMVCCState
 BeginDirectModify_function
 BeginForeignInsert_function
 BeginForeignModify_function
-- 
2.46.0.519.g2e7b89e038

>From 7a3db51e47865fb2d3691e02f39cda790c9f6d10 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 28 Oct 2024 18:01:14 -0400
Subject: [PATCH v1 07/14] bufmgr: Make it easier to change number of buffer
 state bits

In an upcoming commit I'd like to change the number of bits for the usage
count (the current max is 5, fitting in three bits, but we resere four
bits). Until now that required adjusting a bunch of magic constants, now the
constants are defined based on the number of bits reserved.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/buf_internals.h | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index eda6c699212..d8444651736 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -39,12 +39,19 @@
  *
  * The definition of buffer state components is below.
  */
+#define BUF_REFCOUNT_BITS 18
+#define BUF_USAGECOUNT_BITS 4
+#define BUF_FLAG_BITS 10
+
+StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
+				 "buffer bit counts need to equal 32");
+
 #define BUF_REFCOUNT_ONE 1
-#define BUF_REFCOUNT_MASK ((1U << 18) - 1)
-#define BUF_USAGECOUNT_MASK 0x003C0000U
-#define BUF_USAGECOUNT_ONE (1U << 18)
-#define BUF_USAGECOUNT_SHIFT 18
-#define BUF_FLAG_MASK 0xFFC00000U
+#define BUF_REFCOUNT_MASK ((1U << BUF_REFCOUNT_BITS) - 1)
+#define BUF_USAGECOUNT_MASK (((1U << BUF_USAGECOUNT_BITS) - 1) << (BUF_REFCOUNT_BITS))
+#define BUF_USAGECOUNT_ONE (1U << BUF_REFCOUNT_BITS)
+#define BUF_USAGECOUNT_SHIFT BUF_REFCOUNT_BITS
+#define BUF_FLAG_MASK (((1U << BUF_FLAG_BITS) - 1) << (BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS))
 
 /* Get refcount and usagecount from buffer state */
 #define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK)
@@ -77,6 +84,9 @@
  */
 #define BM_MAX_USAGE_COUNT	5
 
+StaticAssertDecl(BM_MAX_USAGE_COUNT < ((1 << BUF_USAGECOUNT_BITS) - 1),
+				 "BM_MAX_USAGE_COUNT doesn't fit in BUF_USAGECOUNT_BITS bits");
+
 /*
  * Buffer tag identifies which disk block the buffer contains.
  *
-- 
2.46.0.519.g2e7b89e038

>From ab2a7ea025a32053f6f201d14ee764a235c480ce Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 17 Oct 2024 13:59:13 -0400
Subject: [PATCH v1 08/14] bufmgr: Add interface to acquire right to set hint
 bits

At the moment hint bits can be set with just a share lock on a page (and in
one place even without any lock). Because of this we need to copy pages while
writing them out, as otherwise the checksum could be corrupted.

The need to copy the page is problematic for the AIO patchset:

1) Instead of just needing a single buffer for a copied page we need one for
   each page that's potentially undergoing IO
2) To be able to use the "worker" AIO implementation the copied page needs to
   reside in shared memory.

Even without AIO copying the page isn't free...

This commit starts to address that by adding BufferPrepareToSetHintBits(),
which needs to be called before setting hint bits on a
buffer. BufferPrepareToSetHintBits() only allows hint bit writes if there's no
ongoing IO and while hint bits are being set no IO is allowed to be started.

To know that a buffer is undergoing IO a new BufferDesc state flag is used,
BM_SETTING_HINTS. Theoretically it'd be possible to reuse BM_IO_IN_PROGRESS,
but that'd make it harder to debug the system.

The new interface is not yet used, that will happen in subsequent commits, to
make review a bit easier. Therefore we cannot yet rely on not needing a copy
of the buffer during IO.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/buf_internals.h           |  22 ++-
 src/include/storage/bufmgr.h                  |   3 +
 src/backend/storage/buffer/README             |   6 +-
 src/backend/storage/buffer/bufmgr.c           | 177 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 5 files changed, 198 insertions(+), 11 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index d8444651736..1f8a0482e90 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -31,8 +31,8 @@
  * Buffer state is a single 32-bit variable where following data is combined.
  *
  * - 18 bits refcount
- * - 4 bits usage count
- * - 10 bits of flags
+ * - 3 bits usage count
+ * - 11 bits of flags
  *
  * Combining these values allows to perform some operations without locking
  * the buffer header, by modifying them together with a CAS loop.
@@ -40,8 +40,8 @@
  * The definition of buffer state components is below.
  */
 #define BUF_REFCOUNT_BITS 18
-#define BUF_USAGECOUNT_BITS 4
-#define BUF_FLAG_BITS 10
+#define BUF_USAGECOUNT_BITS 3
+#define BUF_FLAG_BITS 11
 
 StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
 				 "buffer bit counts need to equal 32");
@@ -63,6 +63,7 @@ StaticAssertDecl(BUF_REFCOUNT_BITS + BUF_USAGECOUNT_BITS + BUF_FLAG_BITS == 32,
  * Note: BM_TAG_VALID essentially means that there is a buffer hashtable
  * entry associated with the buffer's tag.
  */
+#define BM_SETTING_HINTS		(1U << 21)	/* hint bits are being set */
 #define BM_LOCKED				(1U << 22)	/* buffer header is locked */
 #define BM_DIRTY				(1U << 23)	/* data needs writing */
 #define BM_VALID				(1U << 24)	/* data is valid */
@@ -396,6 +397,7 @@ extern PGDLLIMPORT CkptSortItem *CkptBufferIds;
 /* ResourceOwner callbacks to hold buffer I/Os and pins */
 extern PGDLLIMPORT const ResourceOwnerDesc buffer_io_resowner_desc;
 extern PGDLLIMPORT const ResourceOwnerDesc buffer_pin_resowner_desc;
+extern PGDLLIMPORT const ResourceOwnerDesc buffer_setting_hints_resowner_desc;
 
 /* Convenience wrappers over ResourceOwnerRemember/Forget */
 static inline void
@@ -418,6 +420,18 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
 {
 	ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc);
 }
+static inline void
+ResourceOwnerRememberBufferSettingHints(ResourceOwner owner, Buffer buffer)
+{
+	ResourceOwnerRemember(owner, Int32GetDatum(buffer),
+						  &buffer_setting_hints_resowner_desc);
+}
+static inline void
+ResourceOwnerForgetBufferSettingHints(ResourceOwner owner, Buffer buffer)
+{
+	ResourceOwnerForget(owner, Int32GetDatum(buffer),
+						&buffer_setting_hints_resowner_desc);
+}
 
 /*
  * Internal buffer management routines
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index ded46a57889..a37872377bc 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -300,6 +300,9 @@ extern void LimitAdditionalLocalPins(uint32 *additional_pins);
 
 extern bool EvictUnpinnedBuffer(Buffer buf);
 
+extern bool BufferPrepareToSetHintBits(Buffer buf);
+extern void BufferFinishSetHintBits(Buffer buf);
+
 /* in buf_init.c */
 extern void BufferManagerShmemInit(void);
 extern Size BufferManagerShmemSize(void);
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README
index 011af7aff3e..756ad542910 100644
--- a/src/backend/storage/buffer/README
+++ b/src/backend/storage/buffer/README
@@ -58,7 +58,8 @@ tuple while they are doing visibility checks.
 4. It is considered OK to update tuple commit status bits (ie, OR the
 values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
 HEAP_XMAX_INVALID into t_infomask) while holding only a shared lock and
-pin on a buffer.  This is OK because another backend looking at the tuple
+pin on a buffer after calling BufferPrepareToSetHintBits().
+This is OK because another backend looking at the tuple
 at about the same time would OR the same bits into the field, so there
 is little or no risk of conflicting update; what's more, if there did
 manage to be a conflict it would merely mean that one bit-update would
@@ -68,6 +69,9 @@ great harm is done if they get reset to zero by conflicting updates.
 Note, however, that a tuple is frozen by setting both HEAP_XMIN_INVALID
 and HEAP_XMIN_COMMITTED; this is a critical update and accordingly requires
 an exclusive buffer lock (and it must also be WAL-logged).
+Calling BufferPrepareToSetHintBits() ensures that write IO cannot happen at
+the same time as modifying hint bits, which can lead the checksum computed at
+the start of the write to not match anymore.
 
 5. To physically remove a tuple or compact free space on a page, one
 must hold a pin and an exclusive lock, *and* observe while holding the
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f56f7f9f441..ee8b898121c 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -222,6 +222,8 @@ static void ResOwnerReleaseBufferIO(Datum res);
 static char *ResOwnerPrintBufferIO(Datum res);
 static void ResOwnerReleaseBufferPin(Datum res);
 static char *ResOwnerPrintBufferPin(Datum res);
+static void ResOwnerReleaseBufferSettingHints(Datum res);
+static char *ResOwnerPrintBufferSettingHints(Datum res);
 
 const ResourceOwnerDesc buffer_io_resowner_desc =
 {
@@ -241,6 +243,15 @@ const ResourceOwnerDesc buffer_pin_resowner_desc =
 	.DebugPrint = ResOwnerPrintBufferPin
 };
 
+const ResourceOwnerDesc buffer_setting_hints_resowner_desc =
+{
+	.name = "buffer setting hints",
+	.release_phase = RESOURCE_RELEASE_BEFORE_LOCKS,
+	.release_priority = RELEASE_PRIO_BUFFER_IOS,
+	.ReleaseResource = ResOwnerReleaseBufferSettingHints,
+	.DebugPrint = ResOwnerPrintBufferSettingHints
+};
+
 /*
  * Ensure that the PrivateRefCountArray has sufficient space to store one more
  * entry. This has to be called before using NewPrivateRefCountEntry() to fill
@@ -1737,7 +1748,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 
 	/* some sanity checks while we hold the buffer header lock */
 	Assert(BUF_STATE_GET_REFCOUNT(victim_buf_state) == 1);
-	Assert(!(victim_buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY | BM_IO_IN_PROGRESS)));
+	Assert(!(victim_buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY
+								 | BM_IO_IN_PROGRESS | BM_SETTING_HINTS)));
 
 	victim_buf_hdr->tag = newTag;
 
@@ -2092,7 +2104,8 @@ again:
 	buf_state = pg_atomic_read_u32(&buf_hdr->state);
 
 	Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 1);
-	Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
+	Assert(!(buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY
+						  | BM_IO_IN_PROGRESS | BM_SETTING_HINTS)));
 
 	CheckBufferIsPinnedOnce(buf);
 #endif
@@ -5544,7 +5557,8 @@ BufferLockHeldByMe(Buffer buffer, int mode)
  */
 
 /*
- * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
+ * WaitIO -- Block until the IO_IN_PROGRESS and SETTING_HINTS flags on 'buf'
+ * are cleared.
  */
 static void
 WaitIO(BufferDesc *buf)
@@ -5555,6 +5569,7 @@ WaitIO(BufferDesc *buf)
 	for (;;)
 	{
 		uint32		buf_state;
+		uint32		wait_event;
 
 		/*
 		 * It may not be necessary to acquire the spinlock to check the flag
@@ -5564,9 +5579,21 @@ WaitIO(BufferDesc *buf)
 		buf_state = LockBufHdr(buf);
 		UnlockBufHdr(buf, buf_state);
 
-		if (!(buf_state & BM_IO_IN_PROGRESS))
+		if (likely((buf_state & (BM_IO_IN_PROGRESS | BM_SETTING_HINTS)) == 0))
 			break;
-		ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO);
+		else if (buf_state & BM_IO_IN_PROGRESS)
+		{
+			Assert(!(buf_state & BM_SETTING_HINTS));
+			wait_event = WAIT_EVENT_BUFFER_IO;
+		}
+		else
+		{
+			Assert(buf_state & BM_SETTING_HINTS);
+			Assert(!(buf_state & BM_IO_IN_PROGRESS));
+			wait_event = WAIT_EVENT_BUFFER_SETTING_HINTS;
+		}
+
+		ConditionVariableSleep(cv, wait_event);
 	}
 	ConditionVariableCancelSleep();
 }
@@ -5606,7 +5633,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 	{
 		buf_state = LockBufHdr(buf);
 
-		if (!(buf_state & BM_IO_IN_PROGRESS))
+		if (!(buf_state & (BM_IO_IN_PROGRESS | BM_SETTING_HINTS)))
 			break;
 		UnlockBufHdr(buf, buf_state);
 		if (nowait)
@@ -5623,6 +5650,8 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 		return false;
 	}
 
+	Assert(!(buf_state & BM_SETTING_HINTS));
+
 	buf_state |= BM_IO_IN_PROGRESS;
 	UnlockBufHdr(buf, buf_state);
 
@@ -5661,6 +5690,7 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
 	buf_state = LockBufHdr(buf);
 
 	Assert(buf_state & BM_IO_IN_PROGRESS);
+	Assert(!(buf_state & BM_SETTING_HINTS));
 
 	buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
 	if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
@@ -5697,6 +5727,7 @@ AbortBufferIO(Buffer buffer)
 
 	buf_state = LockBufHdr(buf_hdr);
 	Assert(buf_state & (BM_IO_IN_PROGRESS | BM_TAG_VALID));
+	Assert(!(buf_state & BM_SETTING_HINTS));
 
 	if (!(buf_state & BM_VALID))
 	{
@@ -6122,6 +6153,23 @@ ResOwnerPrintBufferPin(Datum res)
 	return DebugPrintBufferRefcount(DatumGetInt32(res));
 }
 
+
+static void
+ResOwnerReleaseBufferSettingHints(Datum res)
+{
+	Buffer		buffer = DatumGetInt32(res);
+
+	BufferFinishSetHintBits(buffer);
+}
+
+static char *
+ResOwnerPrintBufferSettingHints(Datum res)
+{
+	Buffer		buffer = DatumGetInt32(res);
+
+	return psprintf("lost track of setting hint bits on buffer %d", buffer);
+}
+
 /*
  * Try to evict the current block in a shared buffer.
  *
@@ -6184,3 +6232,120 @@ EvictUnpinnedBuffer(Buffer buf)
 
 	return result;
 }
+
+/*
+ * Try to acquire the right to set hint bits for buf.
+ *
+ * It's only permitted to set hint bits in a buffer if the buffer is not
+ * undergoing IO. Otherwise the page level checksum could be corrupted. This
+ * could happen both for PG's checksum and on the OS/filesystem
+ * level. E.g. btrfs with direct-io relies on the page to not change while IO
+ * is going on.
+ *
+ * We can't just check if IO going on at the time BufferPrepareToSetHintBits()
+ * is called, we also need to block IO from starting before we're done setting
+ * hints. This is achieved by setting BM_SETTING_HINTS for the buffer and
+ * having StartBufferIO()/WaitIO() wait for that.  We could combine
+ * BM_SETTING_HINTS and BM_IO_IN_PROGRESS into one, as they can never be set
+ * at the same time, but that seems unnecessarily confusing.
+ *
+ * Because we use only a single bit (BM_SETTING_HINTS) to track whether hint
+ * bits are currently being set, we cannot allow multiple backends to set
+ * hints at the same time - it'd be unknown whether BM_SETTING_HINTS would
+ * need to be remain set when a backend finishes setting hint bits.  In almost
+ * all situations the two backends would just set the same hint bits anyway,
+ * so this is unlikely to be a problem.
+ *
+ * If allowed to set hint bits, the caller needs to call
+ * BufferFinishSetHintBits() once done setting hint bits. In case of an error
+ * occuring before BufferFinishSetHintBits() is reached, the cleanup will be
+ * done via resowner.c.
+ *
+ * It can make sense to amortize the cost of BufferPrepareToSetHintBits() +
+ * BufferFinishSetHintBits() over multiple hint bit sets on a page.
+ *
+ * Returns whether caller is allowed to set hint bits.
+ */
+bool
+BufferPrepareToSetHintBits(Buffer buf)
+{
+	BufferDesc *desc;
+	uint32		old_buf_state;
+
+	Assert(BufferIsPinned(buf));
+
+	if (BufferIsLocal(buf))
+		return true;
+
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+
+	desc = GetBufferDescriptor(buf - 1);
+
+	/*
+	 * We could use LockBufHdr() instead, but a CAS loop turns out to be
+	 * slightly faster and has better concurrency behaviour due to not
+	 * blocking other backends.
+	 */
+	old_buf_state = pg_atomic_read_u32(&desc->state);
+
+	for (;;)
+	{
+		uint32		new_buf_state;
+
+		if (unlikely(old_buf_state & BM_LOCKED))
+			old_buf_state = WaitBufHdrUnlocked(desc);
+
+		if (unlikely((old_buf_state & (BM_SETTING_HINTS | BM_IO_IN_PROGRESS)) != 0))
+			return false;
+
+		new_buf_state = old_buf_state;
+
+		new_buf_state |= BM_SETTING_HINTS;
+
+		if (pg_atomic_compare_exchange_u32(&desc->state, &old_buf_state,
+										   new_buf_state))
+		{
+			ResourceOwnerRememberBufferSettingHints(CurrentResourceOwner, buf);
+			return true;
+		}
+	}
+}
+
+/*
+ * Release the permission to set hint bits on the current page.
+ */
+void
+BufferFinishSetHintBits(Buffer buf)
+{
+	BufferDesc *desc;
+	uint32		buf_state;
+	uint32		new_buf_state;
+
+	if (BufferIsLocal(buf))
+		return;
+
+	desc = GetBufferDescriptor(buf - 1);
+
+	buf_state = LockBufHdr(desc);
+	new_buf_state = buf_state;
+	new_buf_state &= ~BM_SETTING_HINTS;
+	UnlockBufHdr(desc, new_buf_state);
+
+	Assert(buf_state & (BM_TAG_VALID));
+	Assert(buf_state & (BM_VALID));
+	Assert(buf_state & (BM_SETTING_HINTS));
+	Assert(!(buf_state & (BM_IO_IN_PROGRESS)));
+
+	ResourceOwnerForgetBufferSettingHints(CurrentResourceOwner, buf);
+
+	/*
+	 * Wake everyone that might be waiting for a chance to perform IO (i.e.
+	 * WaitIO()).
+	 *
+	 * XXX: Right now this is somewhat expensive, due to
+	 * ConditionVariableBroadcast() acquiring its spinlock unconditionally. I
+	 * don't see a good way to avoid that from the bufmgr.c side, we don't
+	 * know if there is somebody waiting.
+	 */
+	ConditionVariableBroadcast(BufferDescriptorGetIOCV(desc));
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 8efb4044d6f..a1222a1cfbb 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -111,6 +111,7 @@ BGWORKER_SHUTDOWN	"Waiting for background worker to shut down."
 BGWORKER_STARTUP	"Waiting for background worker to start up."
 BTREE_PAGE	"Waiting for the page number needed to continue a parallel B-tree scan to become available."
 BUFFER_IO	"Waiting for buffer I/O to complete."
+BUFFER_SETTING_HINTS	"Waiting for hint bit setting to complete."
 CHECKPOINT_DELAY_COMPLETE	"Waiting for a backend that blocks a checkpoint from completing."
 CHECKPOINT_DELAY_START	"Waiting for a backend that blocks a checkpoint from starting."
 CHECKPOINT_DONE	"Waiting for a checkpoint to complete."
-- 
2.46.0.519.g2e7b89e038

>From 1271da941a9b093abb4565ddb2893f23eafbb33b Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 17 Oct 2024 14:05:15 -0400
Subject: [PATCH v1 09/14] heapam: Acquire right to set hint bits

This commit starts to use BufferPrepareToSetHintBits(), introduced in a
previous commit, for the reasons explained in said commit.

To amortize the cost of BufferPrepareToSetHintBits() for cases where hint bits
are set at a high frequency, HeapTupleSatisfiesMVCCBatch() uses the new
SetHintBitsExt() which defers BufferFinishSetHintBits() until all hint bits on
a page have been set.

It's likely worth introducing additional batch visibility routines, e.g. for
vacuuming, but I did not find a regression with the state as of this
commit. So that's left for later.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/heap/heapam_visibility.c | 116 +++++++++++++++++---
 src/tools/pgindent/typedefs.list            |   1 +
 2 files changed, 99 insertions(+), 18 deletions(-)

diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 4baafc01a2c..61fbac6e7f5 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -80,10 +80,35 @@
 
 
 /*
- * SetHintBits()
+ * To be allowed to set hint bits SetHintBits() needs to call
+ * BufferPrepareToSetHintBits(). However, that's not free, and some callsites
+ * call SetHintBits() on numerous tuples in a row. For those it makes sense to
+ * amortize the cost of BufferPrepareToSetHintBits(). Additionally it's
+ * desirable to defer the cost of BufferPrepareToSetHintBits() until a hint
+ * bit needs to actually be set. This enum serves as the necessary state space
+ * passed to SetHintbitsExt().
+ */
+typedef enum SetHintBitsState
+{
+	/* not yet checked if hint bits may be set */
+	SHB_INITIAL,
+	/* failed to get permission to set hint bits, don't check again */
+	SHB_DISABLED,
+	/* allowed to set hint bits */
+	SHB_ENABLED,
+} SetHintBitsState;
+
+/*
+ * SetHintBitsExt()
  *
  * Set commit/abort hint bits on a tuple, if appropriate at this time.
  *
+ * To be allowed to set a hint bit on a tuple, the page must not be undergoing
+ * IO at this time (otherwise we e.g. could corrupt PG's page checksum or even
+ * the filesystem's, as is known to happen with btrfs). The right to set a
+ * hint bit is acquired on a page level with BufferPrepareToSetHintBits().
+ * Only a single backend gets the right to set hint bits at a time.
+ *
  * It is only safe to set a transaction-committed hint bit if we know the
  * transaction's commit record is guaranteed to be flushed to disk before the
  * buffer, or if the table is temporary or unlogged and will be obliterated by
@@ -111,9 +136,45 @@
  * InvalidTransactionId if no check is needed.
  */
 static inline void
-SetHintBits(HeapTupleHeader tuple, Buffer buffer,
-			uint16 infomask, TransactionId xid)
+SetHintBitsExt(HeapTupleHeader tuple, Buffer buffer,
+			   uint16 infomask, TransactionId xid, SetHintBitsState *state)
 {
+	/*
+	 * In batched mode and we previously did not get permission to set hint
+	 * bits. Don't try again, in all likelihood IO is still going on.
+	 */
+	if (state && *state == SHB_DISABLED)
+		return;
+
+	/*
+	 * If not batching or this is the first hint that we'd like to set on the
+	 * page, check if we are allowed to do so.
+	 *
+	 * XXX: Should we defer this until after the TransactionIdIsValid() check
+	 * below?
+	 */
+	if ((!state || *state == SHB_INITIAL))
+	{
+		if (BufferPrepareToSetHintBits(buffer))
+		{
+			if (state)
+				*state = SHB_ENABLED;
+		}
+		else
+		{
+			/*
+			 * If we hold an exclusive lock nobody else should be able to
+			 * prevent us from setting hint bits. This is important as there
+			 * are a few hint bit sets that are important for correctness -
+			 * all those happen with the buffer exclusively locked.
+			 */
+			Assert(!BufferLockHeldByMe(buffer, BUFFER_LOCK_EXCLUSIVE));
+			if (state)
+				*state = SHB_DISABLED;
+			return;
+		}
+	}
+
 	if (TransactionIdIsValid(xid))
 	{
 		/* NB: xid must be known committed here! */
@@ -123,12 +184,27 @@ SetHintBits(HeapTupleHeader tuple, Buffer buffer,
 			BufferGetLSNAtomic(buffer) < commitLSN)
 		{
 			/* not flushed and no LSN interlock, so don't set hint */
-			return;
+			goto out;
 		}
 	}
 
 	tuple->t_infomask |= infomask;
 	MarkBufferDirtyHint(buffer, true);
+
+out:
+	if (!state)
+		BufferFinishSetHintBits(buffer);
+}
+
+/*
+ * Simple wrapper around SetHintBitExt(), use when operating on a single
+ * tuple.
+ */
+static inline void
+SetHintBits(HeapTupleHeader tuple, Buffer buffer,
+			uint16 infomask, TransactionId xid)
+{
+	SetHintBitsExt(tuple, buffer, infomask, xid, NULL);
 }
 
 /*
@@ -864,9 +940,9 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
  * inserting/deleting transaction was still running --- which was more cycles
  * and more contention on ProcArrayLock.
  */
-static bool
+static inline bool
 HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
-					   Buffer buffer)
+					   Buffer buffer, SetHintBitsState *state)
 {
 	HeapTupleHeader tuple = htup->t_data;
 
@@ -912,8 +988,8 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 			if (!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))
 			{
 				/* deleting subtransaction must have aborted */
-				SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-							InvalidTransactionId);
+				SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+							   InvalidTransactionId, state);
 				return true;
 			}
 
@@ -925,13 +1001,13 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
 			return false;
 		else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
-			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-						HeapTupleHeaderGetRawXmin(tuple));
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_COMMITTED,
+						   HeapTupleHeaderGetRawXmin(tuple), state);
 		else
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_INVALID,
+						   InvalidTransactionId, state);
 			return false;
 		}
 	}
@@ -994,14 +1070,14 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+						   InvalidTransactionId, state);
 			return true;
 		}
 
 		/* xmax transaction committed */
-		SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
-					HeapTupleHeaderGetRawXmax(tuple));
+		SetHintBitsExt(tuple, buffer, HEAP_XMAX_COMMITTED,
+					   HeapTupleHeaderGetRawXmax(tuple), state);
 	}
 	else
 	{
@@ -1597,6 +1673,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 							OffsetNumber *vistuples_dense)
 {
 	int			nvis = 0;
+	SetHintBitsState state = SHB_INITIAL;
 
 	Assert(IsMVCCSnapshot(snapshot));
 
@@ -1609,7 +1686,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		HeapTuple	tup = &tuples[i];
 #endif
 
-		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer);
+		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer, &state);
 #ifdef BATCHMVCC_FEWER_ARGS
 		batchmvcc->visible[i] = valid;
 #else
@@ -1622,6 +1699,9 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		}
 	}
 
+	if (state == SHB_ENABLED)
+		BufferFinishSetHintBits(buffer);
+
 	return nvis;
 }
 
@@ -1641,7 +1721,7 @@ HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, Buffer buffer)
 	switch (snapshot->snapshot_type)
 	{
 		case SNAPSHOT_MVCC:
-			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer);
+			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer, NULL);
 		case SNAPSHOT_SELF:
 			return HeapTupleSatisfiesSelf(htup, snapshot, buffer);
 		case SNAPSHOT_ANY:
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 15c7673465b..04b4f5cb240 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2604,6 +2604,7 @@ SetConstraintStateData
 SetConstraintTriggerData
 SetExprState
 SetFunctionReturnMode
+SetHintBitsState
 SetOp
 SetOpCmd
 SetOpPath
-- 
2.46.0.519.g2e7b89e038

>From 853d9e57765e89db711ac6d9751061a2327a571f Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 28 Oct 2024 16:44:11 -0400
Subject: [PATCH v1 10/14] Acquire right to set hint bits in the remaining
 places

Use BufferPrepareToSetHintBits() in the remaining places that modify buffers
without an exclusive lock. The remaining places are indexes with support for
kill_prior_tuples and the freespace code.

After this we do not need to copy buffers to write them out anymore. That
change is done separately however.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/gist/gistget.c         | 12 +++++----
 src/backend/access/hash/hashutil.c        |  6 +++++
 src/backend/access/nbtree/nbtinsert.c     | 31 +++++++++++++++--------
 src/backend/access/nbtree/nbtutils.c      |  7 +++++
 src/backend/storage/freespace/freespace.c | 14 ++++++----
 src/backend/storage/freespace/fsmpage.c   |  8 +++++-
 6 files changed, 57 insertions(+), 21 deletions(-)

diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index b35b8a97577..c0a39768fdf 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -57,17 +57,16 @@ gistkillitems(IndexScanDesc scan)
 	gistcheckpage(scan->indexRelation, buffer);
 	page = BufferGetPage(buffer);
 
+	if (!BufferPrepareToSetHintBits(buffer))
+		goto unlock;
+
 	/*
 	 * If page LSN differs it means that the page was modified since the last
 	 * read. killedItems could be not valid so LP_DEAD hints applying is not
 	 * safe.
 	 */
 	if (BufferGetLSNAtomic(buffer) != so->curPageLSN)
-	{
-		UnlockReleaseBuffer(buffer);
-		so->numKilled = 0;		/* reset counter */
-		return;
-	}
+		goto unlock;
 
 	Assert(GistPageIsLeaf(page));
 
@@ -89,6 +88,9 @@ gistkillitems(IndexScanDesc scan)
 		MarkBufferDirtyHint(buffer, true);
 	}
 
+	BufferFinishSetHintBits(buffer);
+
+unlock:
 	UnlockReleaseBuffer(buffer);
 
 	/*
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index 20028f5cd14..a9adf95bafa 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -572,6 +572,9 @@ _hash_kill_items(IndexScanDesc scan)
 	else
 		buf = _hash_getbuf(rel, blkno, HASH_READ, LH_OVERFLOW_PAGE);
 
+	if (!BufferPrepareToSetHintBits(so->currPos.buf))
+		goto unlock_page;
+
 	page = BufferGetPage(buf);
 	opaque = HashPageGetOpaque(page);
 	maxoff = PageGetMaxOffsetNumber(page);
@@ -613,6 +616,9 @@ _hash_kill_items(IndexScanDesc scan)
 		MarkBufferDirtyHint(buf, true);
 	}
 
+	BufferFinishSetHintBits(so->currPos.buf);
+
+unlock_page:
 	if (so->hashso_bucket_buf == so->currPos.buf ||
 		havePin)
 		LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 99043da8412..eef498ada7c 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -679,20 +679,31 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				{
 					/*
 					 * The conflicting tuple (or all HOT chains pointed to by
-					 * all posting list TIDs) is dead to everyone, so mark the
-					 * index entry killed.
+					 * all posting list TIDs) is dead to everyone, so try to
+					 * mark the index entry killed. It's ok if we're not
+					 * allowed to, this isn't required for correctness.
 					 */
-					ItemIdMarkDead(curitemid);
-					opaque->btpo_flags |= BTP_HAS_GARBAGE;
+					Buffer		buf;
 
-					/*
-					 * Mark buffer with a dirty hint, since state is not
-					 * crucial. Be sure to mark the proper buffer dirty.
-					 */
+					/* Be sure to operate on the proper buffer */
 					if (nbuf != InvalidBuffer)
-						MarkBufferDirtyHint(nbuf, true);
+						buf = nbuf;
 					else
-						MarkBufferDirtyHint(insertstate->buf, true);
+						buf = insertstate->buf;
+
+					if (BufferPrepareToSetHintBits(buf))
+					{
+						ItemIdMarkDead(curitemid);
+						opaque->btpo_flags |= BTP_HAS_GARBAGE;
+
+						/*
+						 * Mark buffer with a dirty hint, since state is not
+						 * crucial.
+						 */
+						MarkBufferDirtyHint(buf, true);
+
+						BufferFinishSetHintBits(buf);
+					}
 				}
 
 				/*
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 76be65123c8..ea618a5565d 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -4210,6 +4210,10 @@ _bt_killitems(IndexScanDesc scan)
 		}
 	}
 
+	/* if we're not able to set hint bits, there's no point continuing */
+	if (!BufferPrepareToSetHintBits(so->currPos.buf))
+		goto unlock_page;
+
 	opaque = BTPageGetOpaque(page);
 	minoff = P_FIRSTDATAKEY(opaque);
 	maxoff = PageGetMaxOffsetNumber(page);
@@ -4329,6 +4333,9 @@ _bt_killitems(IndexScanDesc scan)
 		MarkBufferDirtyHint(so->currPos.buf, true);
 	}
 
+	BufferFinishSetHintBits(so->currPos.buf);
+
+unlock_page:
 	_bt_unlockbuf(scan->indexRelation, so->currPos.buf);
 }
 
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index b1262ac3b69..864c7e231ee 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -904,12 +904,16 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 	max_avail = fsm_get_max_avail(page);
 
 	/*
-	 * Reset the next slot pointer. This encourages the use of low-numbered
-	 * pages, increasing the chances that a later vacuum can truncate the
-	 * relation.  We don't bother with a lock here, nor with marking the page
-	 * dirty if it wasn't already, since this is just a hint.
+	 * Try to reset the next slot pointer. This encourages the use of
+	 * low-numbered pages, increasing the chances that a later vacuum can
+	 * truncate the relation.  We don't bother with a lock here, nor with
+	 * marking the page dirty if it wasn't already, since this is just a hint.
 	 */
-	((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+	if (BufferPrepareToSetHintBits(buf))
+	{
+		((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+		BufferFinishSetHintBits(buf);
+	}
 
 	ReleaseBuffer(buf);
 
diff --git a/src/backend/storage/freespace/fsmpage.c b/src/backend/storage/freespace/fsmpage.c
index ba85cad0697..84bfa410dc0 100644
--- a/src/backend/storage/freespace/fsmpage.c
+++ b/src/backend/storage/freespace/fsmpage.c
@@ -300,7 +300,13 @@ restart:
 	 *
 	 * Wrap-around is handled at the beginning of this function.
 	 */
-	fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+	if (exclusive_lock_held || BufferPrepareToSetHintBits(buf))
+	{
+		fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+
+		if (!exclusive_lock_held)
+			BufferFinishSetHintBits(buf);
+	}
 
 	return slot;
 }
-- 
2.46.0.519.g2e7b89e038

>From 1199f480b9d4f4be9095307bbd41dee3d92d338e Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 28 Oct 2024 18:11:23 -0400
Subject: [PATCH v1 11/14] bufmgr: Detect some missing
 BufferPrepareToSetHintBits() calls

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/buffer/bufmgr.c | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ee8b898121c..5c7aa6662c5 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5018,6 +5018,23 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	/* here, either share or exclusive lock is OK */
 	Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
 
+	/*
+	 * A share-locked buffer may only be marked dirty for hints after having
+	 * gotten the permission to do so with BufferPrepareToSetHintBits(). It'd
+	 * perhaps be cheap enough to test this even outside of assert enabled
+	 * builds, but LWLockHeldByMeInMode() says "debug support only".
+	 */
+#ifdef USE_ASSERT_CHECKING
+	{
+		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+		if (!LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE))
+		{
+			Assert(buf_state & BM_SETTING_HINTS);
+		}
+	}
+#endif
+
 	/*
 	 * This routine might get called many times on the same page, if we are
 	 * making the first scan after commit of an xact that added/deleted many
-- 
2.46.0.519.g2e7b89e038

>From c8f3dd69d09a1e6bb2f393e091aff02edc47a5f2 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 17 Oct 2024 14:14:35 -0400
Subject: [PATCH v1 12/14] bufmgr: Don't copy pages while writing out

After the series of preceding commits introducing and using
BufferPrepareToSetHintBits() hint bits cannot be set while IO is going
on. Therefore we do not need to copy pages while they are being written out
anymore.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufpage.h         |  3 +-
 src/backend/access/hash/hashpage.c    |  2 +-
 src/backend/storage/buffer/bufmgr.c   | 23 +++++--------
 src/backend/storage/buffer/localbuf.c |  2 +-
 src/backend/storage/page/bufpage.c    | 48 ++++-----------------------
 src/backend/storage/smgr/bulk_write.c |  2 +-
 6 files changed, 18 insertions(+), 62 deletions(-)

diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 6222d46e535..e8cadc73aad 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -507,7 +507,6 @@ extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
 extern void PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum);
 extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 									Item newtup, Size newsize);
-extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
-extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
+extern void PageSetChecksum(Page page, BlockNumber blkno);
 
 #endif							/* BUFPAGE_H */
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index d09c349e28f..2becf48bf06 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -1029,7 +1029,7 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 					zerobuf.data,
 					true);
 
-	PageSetChecksumInplace(page, lastblock);
+	PageSetChecksum(page, lastblock);
 	smgrextend(RelationGetSmgr(rel), MAIN_FORKNUM, lastblock, zerobuf.data,
 			   false);
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 5c7aa6662c5..a123e4e7f40 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1996,9 +1996,8 @@ again:
 	/*
 	 * If the buffer was dirty, try to write it out.  There is a race
 	 * condition here, in that someone might dirty it after we released the
-	 * buffer header lock above, or even while we are writing it out (since
-	 * our share-lock won't prevent hint-bit updates).  We will recheck the
-	 * dirty bit after re-locking the buffer header.
+	 * buffer header lock above.  We will recheck the dirty bit after
+	 * re-locking the buffer header.
 	 */
 	if (buf_state & BM_DIRTY)
 	{
@@ -3786,9 +3785,8 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  *
  * The caller must hold a pin on the buffer and have share-locked the
  * buffer contents.  (Note: a share-lock does not prevent updates of
- * hint bits in the buffer, so the page could change while the write
- * is in progress, but we assume that that will not invalidate the data
- * written.)
+ * hint bits in the buffer, but hint bits are not allowed to be set after
+ * StartBufferIO() succeeded.)
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
  * as the second parameter.  If not, pass NULL.
@@ -3801,7 +3799,6 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	ErrorContextCallback errcallback;
 	instr_time	io_start;
 	Block		bufBlock;
-	char	   *bufToWrite;
 	uint32		buf_state;
 
 	/*
@@ -3867,12 +3864,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 */
 	bufBlock = BufHdrGetBlock(buf);
 
-	/*
-	 * Update page checksum if desired.  Since we have only shared lock on the
-	 * buffer, other processes might be updating hint bits in it, so we must
-	 * copy the page to private storage if we do checksumming.
-	 */
-	bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
+	/* Update page checksum if desired. */
+	PageSetChecksum((Page) bufBlock, buf->tag.blockNum);
 
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
@@ -3882,7 +3875,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	smgrwrite(reln,
 			  BufTagGetForkNum(&buf->tag),
 			  buf->tag.blockNum,
-			  bufToWrite,
+			  bufBlock,
 			  false);
 
 	/*
@@ -4531,7 +4524,7 @@ FlushRelationBuffers(Relation rel)
 				errcallback.previous = error_context_stack;
 				error_context_stack = &errcallback;
 
-				PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
+				PageSetChecksum(localpage, bufHdr->tag.blockNum);
 
 				io_start = pgstat_prepare_io_time(track_io_timing);
 
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 8da7dd6c98a..e75a1960d10 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -242,7 +242,7 @@ GetLocalVictimBuffer(void)
 		/* Find smgr relation for buffer */
 		oreln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag), MyProcNumber);
 
-		PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
+		PageSetChecksum(localpage, bufHdr->tag.blockNum);
 
 		io_start = pgstat_prepare_io_time(track_io_timing);
 
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index be6f1f62d29..2930fd936cd 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1495,51 +1495,15 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 /*
  * Set checksum for a page in shared buffers.
  *
- * If checksums are disabled, or if the page is not initialized, just return
- * the input.  Otherwise, we must make a copy of the page before calculating
- * the checksum, to prevent concurrent modifications (e.g. setting hint bits)
- * from making the final checksum invalid.  It doesn't matter if we include or
- * exclude hints during the copy, as long as we write a valid page and
- * associated checksum.
+ * If checksums are disabled, or if the page is not initialized, just
+ * return. Otherwise compute and set the checksum.
  *
- * Returns a pointer to the block-sized data that needs to be written. Uses
- * statically-allocated memory, so the caller must immediately write the
- * returned page and not refer to it again.
- */
-char *
-PageSetChecksumCopy(Page page, BlockNumber blkno)
-{
-	static char *pageCopy = NULL;
-
-	/* If we don't need a checksum, just return the passed-in data */
-	if (PageIsNew(page) || !DataChecksumsEnabled())
-		return (char *) page;
-
-	/*
-	 * We allocate the copy space once and use it over on each subsequent
-	 * call.  The point of palloc'ing here, rather than having a static char
-	 * array, is first to ensure adequate alignment for the checksumming code
-	 * and second to avoid wasting space in processes that never call this.
-	 */
-	if (pageCopy == NULL)
-		pageCopy = MemoryContextAllocAligned(TopMemoryContext,
-											 BLCKSZ,
-											 PG_IO_ALIGN_SIZE,
-											 0);
-
-	memcpy(pageCopy, (char *) page, BLCKSZ);
-	((PageHeader) pageCopy)->pd_checksum = pg_checksum_page(pageCopy, blkno);
-	return pageCopy;
-}
-
-/*
- * Set checksum for a page in private memory.
- *
- * This must only be used when we know that no other process can be modifying
- * the page buffer.
+ * In the past this needed to be done on a copy of the page, due to the
+ * possibility of e.g. hint bits being set concurrently. However, this is not
+ * necessary anymore as hint bits won't be set while IO is going on.
  */
 void
-PageSetChecksumInplace(Page page, BlockNumber blkno)
+PageSetChecksum(Page page, BlockNumber blkno)
 {
 	/* If we don't need a checksum, just return */
 	if (PageIsNew(page) || !DataChecksumsEnabled())
diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index f0a65bfe242..2c294ddab1f 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -277,7 +277,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		BlockNumber blkno = pending_writes[i].blkno;
 		Page		page = pending_writes[i].buf->data;
 
-		PageSetChecksumInplace(page, blkno);
+		PageSetChecksum(page, blkno);
 
 		if (blkno >= bulkstate->pages_written)
 		{
-- 
2.46.0.519.g2e7b89e038

>From d4e63de7705ec094affa0f877c905713a7dc6c32 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 28 Oct 2024 18:03:53 -0400
Subject: [PATCH v1 13/14] WIP: bufmgr: Detect some bad buffer accesses

I wrote this mainly to ensure that I did not miss converting any hint bit sets
to BufferPrepareToSetHintBits(), but it seems like it might be more generally
useful.

If we do want to include it, it needs a bit more polishing.

On my workstation, the performance effect of this test infrastructure is as
follows:

base:
real	1m4.613s
user	4m31.409s
sys	3m20.445s

ENFORCE_BUFFER_PROT

real	1m11.912s
user	4m27.332s
sys	3m28.063s

ENFORCE_BUFFER_PROT + ENFORCE_BUFFER_PROT_READ
real	1m33.455s
user	4m32.188s
sys	3m41.275s

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/pg_config_manual.h        |  24 ++++
 src/include/storage/buf_internals.h   |   9 ++
 src/include/storage/pg_shmem.h        |   2 +
 src/backend/storage/buffer/buf_init.c |  10 ++
 src/backend/storage/buffer/bufmgr.c   | 154 ++++++++++++++++++++++++--
 src/backend/utils/misc/guc_tables.c   |   2 +-
 6 files changed, 191 insertions(+), 10 deletions(-)

diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index e49eb13e43c..8ea96bd1f1b 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -361,3 +361,27 @@
  * Enable tracing of syncscan operations (see also the trace_syncscan GUC var).
  */
 /* #define TRACE_SYNCSCAN */
+
+
+/*
+ * On some operating systems we know how to change whether pages are
+ * readable/writeable. We can use that to verify that we are following buffer
+ * locking rules, we can make pages inaccessible or read-only when we don't
+ * have sufficient locks etc. This obviously is fairly expensive, so by
+ * default we only so in assert enabled builds.
+ */
+#if defined(USE_ASSERT_CHECKING) && !defined(WIN32)
+#define ENFORCE_BUFFER_PROT
+#endif
+
+/*
+ * Protecting pages against being modified without an exclusive lock /
+ * BufferPrepareToSetHintBits() is reasonably cheap, neither happens *that*
+ * often. Pinning/unpinning buffers is a lot more common, making it more
+ * expensive to call mprotect() that often.
+ *
+ * Therefore disable this by default, even in assert enabled builds.
+ */
+#ifdef ENFORCE_BUFFER_PROT
+/* #define ENFORCE_BUFFER_PROT_READ */
+#endif
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 1f8a0482e90..f542c245e27 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -457,6 +457,15 @@ extern Size StrategyShmemSize(void);
 extern void StrategyInitialize(bool init);
 extern bool have_free_buffer(void);
 
+#ifdef ENFORCE_BUFFER_PROT
+extern void SetBufferProtection(Buffer buf, bool allow_reads, bool allow_writes);
+#else
+static inline void
+SetBufferProtection(Buffer buf, bool allow_reads, bool allow_writes)
+{
+}
+#endif
+
 /* buf_table.c */
 extern Size BufTableShmemSize(int size);
 extern void InitBufTable(int size);
diff --git a/src/include/storage/pg_shmem.h b/src/include/storage/pg_shmem.h
index 3065ff5be71..42a12688b9a 100644
--- a/src/include/storage/pg_shmem.h
+++ b/src/include/storage/pg_shmem.h
@@ -63,6 +63,8 @@ typedef enum
 	SHMEM_TYPE_MMAP,
 }			PGShmemType;
 
+extern PGDLLIMPORT int huge_pages_status;
+
 #ifndef WIN32
 extern PGDLLIMPORT unsigned long UsedShmemSegID;
 #else
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index 56761a8eedc..0e02d8e1e9b 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -135,6 +135,16 @@ BufferManagerShmemInit(void)
 							 LWTRANCHE_BUFFER_CONTENT);
 
 			ConditionVariableInit(BufferDescriptorGetIOCV(buf));
+
+			/*
+			 * Unused buffers are inaccessible. But if we're not enforcing
+			 * making buffers inaccessible without a pin, we won't mark
+			 * buffers as accessible during pinning, therefore we better don't
+			 * make them initially inaccessible.
+			 */
+#if defined(ENFORCE_BUFFER_PROT) && defined(ENFORCE_BUFFER_PROT_READ)
+			SetBufferProtection(i + 1, false, false);
+#endif
 		}
 
 		/* Correct last entry of linked list */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a123e4e7f40..50947b1021f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,10 @@
 #include <sys/file.h>
 #include <unistd.h>
 
+#ifdef ENFORCE_BUFFER_PROT
+#include <sys/mman.h>			/* for mprotect() */
+#endif							/* ENFORCE_BUFFER_PROT */
+
 #include "access/tableam.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
@@ -53,6 +57,7 @@
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "storage/pg_shmem.h"
 #include "storage/proc.h"
 #include "storage/read_stream.h"
 #include "storage/smgr.h"
@@ -1065,8 +1070,6 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 
 	if (need_to_zero)
 	{
-		memset(BufferGetPage(buffer), 0, BLCKSZ);
-
 		/*
 		 * Grab the buffer content lock before marking the page as valid, to
 		 * make sure that no other backend sees the zeroed page before the
@@ -1079,7 +1082,12 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 		 * already valid.)
 		 */
 		if (!isLocalBuf)
+		{
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+			SetBufferProtection(buffer, true, true);
+		}
+
+		memset(BufferGetPage(buffer), 0, BLCKSZ);
 
 		if (isLocalBuf)
 		{
@@ -1505,6 +1513,9 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_first_block = blocknum + i;
 		io_buffers_len = 1;
 
+		if (persistence != RELPERSISTENCE_TEMP)
+			SetBufferProtection(io_buffers[0], true, true);
+
 		/*
 		 * How many neighboring-on-disk blocks can we scatter-read into other
 		 * buffers at the same time?  In this case we don't wait if we see an
@@ -1520,7 +1531,13 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				   BufferGetBlockNumber(buffers[i]) + 1);
 
 			io_buffers[io_buffers_len] = buffers[++i];
-			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+			io_pages[io_buffers_len] = BufferGetBlock(buffers[i]);
+
+			/* smgrreadv() needs to modify the buffer */
+			if (persistence != RELPERSISTENCE_TEMP)
+				SetBufferProtection(io_buffers[io_buffers_len], true, true);
+
+			io_buffers_len++;
 		}
 
 		io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1580,6 +1597,9 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				TerminateBufferIO(bufHdr, false, BM_VALID, true);
 			}
 
+			if (persistence != RELPERSISTENCE_TEMP)
+				SetBufferProtection(io_buffers[j], true, false);
+
 			/* Report I/Os as completing individually. */
 			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
 											  operation->smgr->smgr_rlocator.locator.spcOid,
@@ -2233,7 +2253,9 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 		buf_block = BufHdrGetBlock(GetBufferDescriptor(buffers[i] - 1));
 
 		/* new buffers are zero-filled */
+		SetBufferProtection(buffers[i], true, true);
 		MemSet((char *) buf_block, 0, BLCKSZ);
+		SetBufferProtection(buffers[i], true, false);
 	}
 
 	/*
@@ -2460,7 +2482,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 		}
 
 		if (lock)
-			LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
+			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
 		TerminateBufferIO(buf_hdr, false, BM_VALID, true);
 	}
@@ -2719,6 +2741,10 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 				 * non-accessible in any case.
 				 */
 				VALGRIND_MAKE_MEM_DEFINED(BufHdrGetBlock(buf), BLCKSZ);
+
+#if defined(ENFORCE_BUFFER_PROT) && defined(ENFORCE_BUFFER_PROT_READ)
+				SetBufferProtection(BufferDescriptorGetBuffer(buf), true, false);
+#endif
 				break;
 			}
 		}
@@ -2791,6 +2817,10 @@ PinBuffer_Locked(BufferDesc *buf)
 	 */
 	VALGRIND_MAKE_MEM_DEFINED(BufHdrGetBlock(buf), BLCKSZ);
 
+#if defined(ENFORCE_BUFFER_PROT) && defined(ENFORCE_BUFFER_PROT_READ)
+	SetBufferProtection(BufferDescriptorGetBuffer(buf), true, false);
+#endif
+
 	/*
 	 * Since we hold the buffer spinlock, we can update the buffer state and
 	 * release the lock in one operation.
@@ -2850,6 +2880,10 @@ UnpinBufferNoOwner(BufferDesc *buf)
 		 */
 		VALGRIND_MAKE_MEM_NOACCESS(BufHdrGetBlock(buf), BLCKSZ);
 
+#if defined(ENFORCE_BUFFER_PROT) && defined(ENFORCE_BUFFER_PROT_READ)
+		SetBufferProtection(BufferDescriptorGetBuffer(buf), false, false);
+#endif
+
 		/* I'd better not still hold the buffer content lock */
 		Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
 
@@ -3670,6 +3704,69 @@ CheckForBufferLeaks(void)
 #endif
 }
 
+/*
+ * To verify that we are following buffer locking rules, we can make pages
+ * inaccessible or read-only when we don't have sufficient locks etc.
+ *
+ * XXX: It might be possible to fold the VALGRIND_MAKE_MEM_NOACCESS() /
+ * VALGRIND_MAKE_MEM_DEFINED() calls into this.
+ */
+#ifdef ENFORCE_BUFFER_PROT
+void
+SetBufferProtection(Buffer buf, bool allow_reads, bool allow_writes)
+{
+	static long	pagesz = 0;
+	int			prot = PROT_NONE;
+	int			rc;
+
+	Assert(huge_pages_status != HUGE_PAGES_UNKNOWN &&
+		   huge_pages_status != HUGE_PAGES_TRY);
+	StaticAssertStmt(PROT_NONE == 0, "that can't be right");
+
+	if (unlikely(pagesz == 0))
+	{
+		pagesz = sysconf(_SC_PAGESIZE);
+
+		elog(DEBUG1, "sysconf(_SC_PAGESIZE) = %ld", pagesz);
+		if (pagesz == -1)
+		{
+			elog(ERROR, "sysconf(_SC_PAGESIZE) failed: %m");
+		}
+		else if (pagesz > BLCKSZ)
+		{
+			elog(DEBUG1, "pagesz > BLCKSZ, disabling buffer protection mode");
+			pagesz = -1;
+		}
+		else if(BLCKSZ % pagesz != 0)
+		{
+			elog(DEBUG1, "BLCKSZ %% pagesz != 0, disabling buffer protection mode");
+			pagesz = -1;
+		}
+		else if (huge_pages_status == HUGE_PAGES_ON)
+		{
+			/* can't set status in a granular enough way */
+			elog(DEBUG1, "huge pages enabled, disabling buffer protection mode");
+			pagesz = -1;
+		}
+	}
+
+	/* disabled */
+	if (pagesz == -1)
+		return;
+
+	if (allow_reads)
+		prot |= PROT_READ;
+
+	if (allow_writes)
+		prot |= PROT_WRITE;
+
+	rc = mprotect(BufferGetBlock(buf), BLCKSZ, prot);
+
+	if (rc != 0)
+		elog(ERROR, "mprotect(%d, %d) failed: %m", buf, prot);
+}
+#endif							/* ENFORCE_BUFFER_PROT */
+
 /*
  * Helper routine to issue warnings when a buffer is unexpectedly pinned
  */
@@ -3865,7 +3962,10 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	bufBlock = BufHdrGetBlock(buf);
 
 	/* Update page checksum if desired. */
+	SetBufferProtection(BufferDescriptorGetBuffer(buf), true, true);
 	PageSetChecksum((Page) bufBlock, buf->tag.blockNum);
+	/* FIXME: could theoretically be exclusively locked */
+	SetBufferProtection(BufferDescriptorGetBuffer(buf), true, false);
 
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
@@ -5181,19 +5281,38 @@ void
 LockBuffer(Buffer buffer, int mode)
 {
 	BufferDesc *buf;
+	LWLock	   *content_lock;
 
 	Assert(BufferIsPinned(buffer));
 	if (BufferIsLocal(buffer))
 		return;					/* local buffers need no lock */
 
 	buf = GetBufferDescriptor(buffer - 1);
+	content_lock = BufferDescriptorGetContentLock(buf);
 
 	if (mode == BUFFER_LOCK_UNLOCK)
-		LWLockRelease(BufferDescriptorGetContentLock(buf));
+	{
+#ifdef ENFORCE_BUFFER_PROT
+		bool		was_exclusive;
+
+		was_exclusive = LWLockHeldByMeInMode(content_lock, LW_EXCLUSIVE);
+#endif							/* ENFORCE_BUFFER_PROT */
+
+		LWLockRelease(content_lock);
+
+#ifdef ENFORCE_BUFFER_PROT
+		if (was_exclusive)
+			SetBufferProtection(buffer, true, false);
+#endif							/* ENFORCE_BUFFER_PROT */
+	}
 	else if (mode == BUFFER_LOCK_SHARE)
-		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
+		LWLockAcquire(content_lock, LW_SHARED);
 	else if (mode == BUFFER_LOCK_EXCLUSIVE)
-		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
+	{
+		LWLockAcquire(content_lock, LW_EXCLUSIVE);
+
+		SetBufferProtection(buffer, true, true);
+	}
 	else
 		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
 }
@@ -5207,6 +5326,7 @@ bool
 ConditionalLockBuffer(Buffer buffer)
 {
 	BufferDesc *buf;
+	bool		ret;
 
 	Assert(BufferIsPinned(buffer));
 	if (BufferIsLocal(buffer))
@@ -5214,8 +5334,13 @@ ConditionalLockBuffer(Buffer buffer)
 
 	buf = GetBufferDescriptor(buffer - 1);
 
-	return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
-									LW_EXCLUSIVE);
+	ret = LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
+								   LW_EXCLUSIVE);
+
+	if (ret)
+		SetBufferProtection(buffer, true, true);
+
+	return ret;
 }
 
 /*
@@ -6316,6 +6441,12 @@ BufferPrepareToSetHintBits(Buffer buf)
 										   new_buf_state))
 		{
 			ResourceOwnerRememberBufferSettingHints(CurrentResourceOwner, buf);
+
+#ifdef ENFORCE_BUFFER_PROT
+			if (!LWLockHeldByMeInMode(BufferDescriptorGetContentLock(desc), LW_EXCLUSIVE))
+				SetBufferProtection(buf, true, true);
+#endif							/* ENFORCE_BUFFER_PROT */
+
 			return true;
 		}
 	}
@@ -6358,4 +6489,9 @@ BufferFinishSetHintBits(Buffer buf)
 	 * know if there is somebody waiting.
 	 */
 	ConditionVariableBroadcast(BufferDescriptorGetIOCV(desc));
+
+#ifdef ENFORCE_BUFFER_PROT
+	if (!LWLockHeldByMeInMode(BufferDescriptorGetContentLock(desc), LW_EXCLUSIVE))
+		SetBufferProtection(buf, true, false);
+#endif							/* ENFORCE_BUFFER_PROT */
 }
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 8a67f01200c..4c94c20f0cb 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -561,7 +561,7 @@ static int	ssl_renegotiation_limit;
  */
 int			huge_pages = HUGE_PAGES_TRY;
 int			huge_page_size;
-static int	huge_pages_status = HUGE_PAGES_UNKNOWN;
+int			huge_pages_status = HUGE_PAGES_UNKNOWN;
 
 /*
  * These variables are all dummies that don't do anything, except in some
-- 
2.46.0.519.g2e7b89e038

Reply via email to