On Thu, 4 Apr 2024 at 16:45, David Rowley <dgrowle...@gmail.com> wrote:
> I've pushed the v9-0001 with that rename done.

I've now just pushed the 0002 patch with some revisions:

1. The function declarations you added for heapgettup_advance_block()
and heapgettup_initial_block() didn't match the properties of their
definitions.  You'd declared both of these static inline but neither
of these were.
2. I felt inclined to rename heapfetchbuf() to heapfetchnextbuf() as
that's effectively what it does with v8-0002, however, that's just too
many words all shoved together, so I renamed it to
heap_fetch_next_buffer().
3. I changed heapgettup_initial_block() to pg_noinline as it both
makes more sense to have this out of line and it also fixed a small
performance regression.

Looks like I also failed to grep for all the remaining instances of
"heapgetpage" in 44086b097.  Those are now fixed by 3a4a3537a.

I also rebased the 0003 patch which I've attached as a raw patch.

FWIW, using Heikki's test in [1] with a pg_prewarm each time after
restarting the instance. No parallel aggregate.

drowley@amd3990x:~$ cat bench.sql
 select count(*) from giga;

drowley@amd3990x:~$ pgbench -n -f bench.sql -T 120 postgres | grep latency

44086b097~1
latency average = 34.323 ms
latency average = 34.332 ms

44086b097
latency average = 34.811 ms
latency average = 34.862 ms

3a4a3537a
latency average = 34.497 ms
latency average = 34.538 ms

3a4a3537a + read_stream_for_seqscans.patch
latency average = 40.923 ms
latency average = 41.415 ms

i.e. no meaningful change from the refactor, but a regression from a
cached workload that changes the page often without doing much work in
between with the read stread patch.

I'm happy to run further benchmarks, but for the remainder of the
committer responsibility for the next patches, I'm going to leave that
to Thomas.

David

[1] 
https://www.postgresql.org/message-id/3b0f3701-addd-4629-9257-cf28e1a6e...@iki.fi
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index dada2ecd1e..f7946a39fd 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -223,6 +223,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 
1] =
  * ----------------------------------------------------------------
  */
 
+static BlockNumber
+heap_scan_stream_read_next(ReadStream *pgsr, void *private_data,
+                                                  void *per_buffer_data)
+{
+       HeapScanDesc scan = (HeapScanDesc) private_data;
+
+       if (unlikely(!scan->rs_inited))
+       {
+               scan->rs_prefetch_block = heapgettup_initial_block(scan, 
scan->rs_dir);
+               scan->rs_inited = true;
+       }
+       else
+               scan->rs_prefetch_block = heapgettup_advance_block(scan,
+                                                                               
                                   scan->rs_prefetch_block,
+                                                                               
                                   scan->rs_dir);
+
+       return scan->rs_prefetch_block;
+}
+
 /* ----------------
  *             initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -325,6 +344,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool 
keep_startblock)
        scan->rs_cbuf = InvalidBuffer;
        scan->rs_cblock = InvalidBlockNumber;
 
+       /*
+        * Initialize to ForwardScanDirection because it is most common and heap
+        * scans usually must go forwards before going backward.
+        */
+       scan->rs_dir = ForwardScanDirection;
+       scan->rs_prefetch_block = InvalidBlockNumber;
+
        /* page-at-a-time fields are always invalid when not rs_inited */
 
        /*
@@ -462,12 +488,14 @@ heap_prepare_pagescan(TableScanDesc sscan)
 /*
  * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
  *
- * Read the next block of the scan relation into a buffer and pin that buffer
- * before saving it in the scan descriptor.
+ * Read the next block of the scan relation from the read stream and pin that
+ * buffer before saving it in the scan descriptor.
  */
 static inline void
 heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
 {
+       Assert(scan->rs_read_stream);
+
        /* release previous scan buffer, if any */
        if (BufferIsValid(scan->rs_cbuf))
        {
@@ -482,25 +510,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection 
dir)
         */
        CHECK_FOR_INTERRUPTS();
 
-       if (unlikely(!scan->rs_inited))
+       /*
+        * If the scan direction is changing, reset the prefetch block to the
+        * current block. Otherwise, we will incorrectly prefetch the blocks
+        * between the prefetch block and the current block again before
+        * prefetching blocks in the new, correct scan direction.
+        */
+       if (unlikely(scan->rs_dir != dir))
        {
-               scan->rs_cblock = heapgettup_initial_block(scan, dir);
+               scan->rs_prefetch_block = scan->rs_cblock;
+               read_stream_reset(scan->rs_read_stream);
+       }
 
-               /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
-               Assert(scan->rs_cblock != InvalidBlockNumber ||
-                          !BufferIsValid(scan->rs_cbuf));
+       scan->rs_dir = dir;
 
-               scan->rs_inited = true;
-       }
-       else
-               scan->rs_cblock = heapgettup_advance_block(scan, 
scan->rs_cblock,
-                                                                               
                   dir);
-
-       /* read block if valid */
-       if (BlockNumberIsValid(scan->rs_cblock))
-               scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, 
MAIN_FORKNUM,
-                                                                               
   scan->rs_cblock, RBM_NORMAL,
-                                                                               
   scan->rs_strategy);
+       scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
+       if (BufferIsValid(scan->rs_cbuf))
+               scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
 }
 
 /*
@@ -833,6 +859,7 @@ continue_page:
 
        scan->rs_cbuf = InvalidBuffer;
        scan->rs_cblock = InvalidBlockNumber;
+       scan->rs_prefetch_block = InvalidBlockNumber;
        tuple->t_data = NULL;
        scan->rs_inited = false;
 }
@@ -928,6 +955,7 @@ continue_page:
                ReleaseBuffer(scan->rs_cbuf);
        scan->rs_cbuf = InvalidBuffer;
        scan->rs_cblock = InvalidBlockNumber;
+       scan->rs_prefetch_block = InvalidBlockNumber;
        tuple->t_data = NULL;
        scan->rs_inited = false;
 }
@@ -1021,6 +1049,28 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 
        initscan(scan, key, false);
 
+       scan->rs_read_stream = NULL;
+
+       /*
+        * For sequential scans and TID range scans, we will set up a read 
stream.
+        * We do not know the scan direction yet. If the scan does not end up
+        * being a forward scan, the read stream will be freed. This should be
+        * done after initscan() because initscan() allocates the
+        * BufferAccessStrategy object.
+        */
+       if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN ||
+               scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN)
+       {
+               scan->rs_read_stream = 
read_stream_begin_relation(READ_STREAM_SEQUENTIAL,
+                                                                               
                                  scan->rs_strategy,
+                                                                               
                                  scan->rs_base.rs_rd,
+                                                                               
                                  MAIN_FORKNUM,
+                                                                               
                                  heap_scan_stream_read_next,
+                                                                               
                                  scan,
+                                                                               
                                  0);
+       }
+
+
        return (TableScanDesc) scan;
 }
 
@@ -1055,6 +1105,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool 
set_params,
        if (BufferIsValid(scan->rs_cbuf))
                ReleaseBuffer(scan->rs_cbuf);
 
+       /*
+        * The read stream is reset on rescan. This must be done before
+        * initscan(), as some state referred to by read_stream_reset() is reset
+        * in initscan().
+        */
+       if (scan->rs_read_stream)
+               read_stream_reset(scan->rs_read_stream);
+
        /*
         * reinitialize scan descriptor
         */
@@ -1074,6 +1132,12 @@ heap_endscan(TableScanDesc sscan)
        if (BufferIsValid(scan->rs_cbuf))
                ReleaseBuffer(scan->rs_cbuf);
 
+       /*
+        * Must free the read stream before freeing the BufferAccessStrategy.
+        */
+       if (scan->rs_read_stream)
+               read_stream_end(scan->rs_read_stream);
+
        /*
         * decrement relation reference count and free scan descriptor storage
         */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 2765efc4e5..332a7faa8d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,6 +25,7 @@
 #include "storage/bufpage.h"
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
+#include "storage/read_stream.h"
 #include "storage/shm_toc.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -70,6 +71,20 @@ typedef struct HeapScanDescData
 
        HeapTupleData rs_ctup;          /* current tuple in scan, if any */
 
+       /* For scans that stream reads */
+       ReadStream *rs_read_stream;
+
+       /*
+        * For sequential scans and TID range scans to stream reads. The read
+        * stream is allocated at the beginning of the scan and reset on rescan 
or
+        * when the scan direction changes. The scan direction is saved each 
time
+        * a new page is requested. If the scan direction changes from one page 
to
+        * the next, the read stream releases all previously pinned buffers and
+        * resets the prefetch block.
+        */
+       ScanDirection rs_dir;
+       BlockNumber rs_prefetch_block;
+
        /*
         * For parallel scans to store page allocation data.  NULL when not
         * performing a parallel scan.

Reply via email to