On Thu, 4 Apr 2024 at 16:45, David Rowley <dgrowle...@gmail.com> wrote: > I've pushed the v9-0001 with that rename done.
I've now just pushed the 0002 patch with some revisions: 1. The function declarations you added for heapgettup_advance_block() and heapgettup_initial_block() didn't match the properties of their definitions. You'd declared both of these static inline but neither of these were. 2. I felt inclined to rename heapfetchbuf() to heapfetchnextbuf() as that's effectively what it does with v8-0002, however, that's just too many words all shoved together, so I renamed it to heap_fetch_next_buffer(). 3. I changed heapgettup_initial_block() to pg_noinline as it both makes more sense to have this out of line and it also fixed a small performance regression. Looks like I also failed to grep for all the remaining instances of "heapgetpage" in 44086b097. Those are now fixed by 3a4a3537a. I also rebased the 0003 patch which I've attached as a raw patch. FWIW, using Heikki's test in [1] with a pg_prewarm each time after restarting the instance. No parallel aggregate. drowley@amd3990x:~$ cat bench.sql select count(*) from giga; drowley@amd3990x:~$ pgbench -n -f bench.sql -T 120 postgres | grep latency 44086b097~1 latency average = 34.323 ms latency average = 34.332 ms 44086b097 latency average = 34.811 ms latency average = 34.862 ms 3a4a3537a latency average = 34.497 ms latency average = 34.538 ms 3a4a3537a + read_stream_for_seqscans.patch latency average = 40.923 ms latency average = 41.415 ms i.e. no meaningful change from the refactor, but a regression from a cached workload that changes the page often without doing much work in between with the read stread patch. I'm happy to run further benchmarks, but for the remainder of the committer responsibility for the next patches, I'm going to leave that to Thomas. David [1] https://www.postgresql.org/message-id/3b0f3701-addd-4629-9257-cf28e1a6e...@iki.fi
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index dada2ecd1e..f7946a39fd 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -223,6 +223,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_scan_stream_read_next(ReadStream *pgsr, void *private_data, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) private_data; + + if (unlikely(!scan->rs_inited)) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + scan->rs_dir); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -325,6 +344,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + /* + * Initialize to ForwardScanDirection because it is most common and heap + * scans usually must go forwards before going backward. + */ + scan->rs_dir = ForwardScanDirection; + scan->rs_prefetch_block = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -462,12 +488,14 @@ heap_prepare_pagescan(TableScanDesc sscan) /* * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM. * - * Read the next block of the scan relation into a buffer and pin that buffer - * before saving it in the scan descriptor. + * Read the next block of the scan relation from the read stream and pin that + * buffer before saving it in the scan descriptor. */ static inline void heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) { + Assert(scan->rs_read_stream); + /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -482,25 +510,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (unlikely(!scan->rs_inited)) + /* + * If the scan direction is changing, reset the prefetch block to the + * current block. Otherwise, we will incorrectly prefetch the blocks + * between the prefetch block and the current block again before + * prefetching blocks in the new, correct scan direction. + */ + if (unlikely(scan->rs_dir != dir)) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); + scan->rs_prefetch_block = scan->rs_cblock; + read_stream_reset(scan->rs_read_stream); + } - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(scan->rs_cblock != InvalidBlockNumber || - !BufferIsValid(scan->rs_cbuf)); + scan->rs_dir = dir; - scan->rs_inited = true; - } - else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, - dir); - - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, - scan->rs_strategy); + scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } /* @@ -833,6 +859,7 @@ continue_page: scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -928,6 +955,7 @@ continue_page: ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -1021,6 +1049,28 @@ heap_beginscan(Relation relation, Snapshot snapshot, initscan(scan, key, false); + scan->rs_read_stream = NULL; + + /* + * For sequential scans and TID range scans, we will set up a read stream. + * We do not know the scan direction yet. If the scan does not end up + * being a forward scan, the read stream will be freed. This should be + * done after initscan() because initscan() allocates the + * BufferAccessStrategy object. + */ + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || + scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) + { + scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, + scan->rs_strategy, + scan->rs_base.rs_rd, + MAIN_FORKNUM, + heap_scan_stream_read_next, + scan, + 0); + } + + return (TableScanDesc) scan; } @@ -1055,6 +1105,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * The read stream is reset on rescan. This must be done before + * initscan(), as some state referred to by read_stream_reset() is reset + * in initscan(). + */ + if (scan->rs_read_stream) + read_stream_reset(scan->rs_read_stream); + /* * reinitialize scan descriptor */ @@ -1074,6 +1132,12 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * Must free the read stream before freeing the BufferAccessStrategy. + */ + if (scan->rs_read_stream) + read_stream_end(scan->rs_read_stream); + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 2765efc4e5..332a7faa8d 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -25,6 +25,7 @@ #include "storage/bufpage.h" #include "storage/dsm.h" #include "storage/lockdefs.h" +#include "storage/read_stream.h" #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" @@ -70,6 +71,20 @@ typedef struct HeapScanDescData HeapTupleData rs_ctup; /* current tuple in scan, if any */ + /* For scans that stream reads */ + ReadStream *rs_read_stream; + + /* + * For sequential scans and TID range scans to stream reads. The read + * stream is allocated at the beginning of the scan and reset on rescan or + * when the scan direction changes. The scan direction is saved each time + * a new page is requested. If the scan direction changes from one page to + * the next, the read stream releases all previously pinned buffers and + * resets the prefetch block. + */ + ScanDirection rs_dir; + BlockNumber rs_prefetch_block; + /* * For parallel scans to store page allocation data. NULL when not * performing a parallel scan.