Hi, On Mon, 19 Feb 2024 at 18:13, Nazir Bilal Yavuz <byavu...@gmail.com> wrote: > > I worked on using the currently proposed streaming read API [1] in ANALYZE. > The patch is attached. 0001 is the not yet merged streaming read API code > changes that can be applied to the master, 0002 is the actual code. > > The blocks to analyze are obtained by using the streaming read API now. > > - Since streaming read API is already doing prefetch, I removed the #ifdef > USE_PREFETCH code from acquire_sample_rows(). > > - Changed 'while (BlockSampler_HasMore(&bs))' to 'while (nblocks)' because > the prefetch mechanism in the streaming read API will advance 'bs' before > returning buffers. > > - Removed BlockNumber and BufferAccessStrategy from the declaration of > scan_analyze_next_block(), passing pgsr (PgStreamingRead) instead of them. > > I counted syscalls of analyzing ~5GB table. It can be seen that the patched > version did ~1300 less read calls. > > Patched: > > % time seconds usecs/call calls errors syscall > ------ ----------- ----------- --------- --------- ---------------- > 39.67 0.012128 0 29809 pwrite64 > 36.96 0.011299 0 28594 pread64 > 23.24 0.007104 0 27611 fadvise64 > > Master (21a71648d3): > > % time seconds usecs/call calls errors syscall > ------ ----------- ----------- --------- --------- ---------------- > 38.94 0.016457 0 29816 pwrite64 > 36.79 0.015549 0 29850 pread64 > 23.91 0.010106 0 29848 fadvise64 > > > Any kind of feedback would be appreciated. > > [1]: > https://www.postgresql.org/message-id/CA%2BhUKGJkOiOCa%2Bmag4BF%2BzHo7qo%3Do9CFheB8%3Dg6uT5TUm2gkvA%40mail.gmail.com
The new version of the streaming read API [1] is posted. I updated the streaming read API changes patch (0001), using the streaming read API in ANALYZE patch (0002) remains the same. This should make it easier to review as it can be applied on top of master [1]: https://www.postgresql.org/message-id/CA%2BhUKGJtLyxcAEvLhVUhgD4fMQkOu3PDaj8Qb9SR_UsmzgsBpQ%40mail.gmail.com -- Regards, Nazir Bilal Yavuz Microsoft
From 21d9043501284c6bae996522ff2f3ac693f81986 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 26 Feb 2024 23:48:31 +1300 Subject: [PATCH v2 1/2] Streaming read API changes that are not committed to master yet Discussion: https://www.postgresql.org/message-id/CA%2BhUKGJkOiOCa%2Bmag4BF%2BzHo7qo%3Do9CFheB8%3Dg6uT5TUm2gkvA%40mail.gmail.com --- src/include/storage/bufmgr.h | 45 ++ src/include/storage/streaming_read.h | 52 ++ src/backend/storage/Makefile | 2 +- src/backend/storage/aio/Makefile | 14 + src/backend/storage/aio/meson.build | 5 + src/backend/storage/aio/streaming_read.c | 612 ++++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 687 +++++++++++++++-------- src/backend/storage/buffer/localbuf.c | 14 +- src/backend/storage/meson.build | 1 + src/tools/pgindent/typedefs.list | 3 + 10 files changed, 1202 insertions(+), 233 deletions(-) create mode 100644 src/include/storage/streaming_read.h create mode 100644 src/backend/storage/aio/Makefile create mode 100644 src/backend/storage/aio/meson.build create mode 100644 src/backend/storage/aio/streaming_read.c diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index d51d46d3353..b57f71f97e3 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -14,6 +14,7 @@ #ifndef BUFMGR_H #define BUFMGR_H +#include "port/pg_iovec.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount; #define BUFFER_LOCK_SHARE 1 #define BUFFER_LOCK_EXCLUSIVE 2 +/* + * Maximum number of buffers for multi-buffer I/O functions. This is set to + * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum. + */ +#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ) /* * prototypes for functions in bufmgr.c @@ -177,6 +183,42 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent); + +#define READ_BUFFERS_ZERO_ON_ERROR 0x01 +#define READ_BUFFERS_ISSUE_ADVICE 0x02 + +/* + * Private state used by StartReadBuffers() and WaitReadBuffers(). Declared + * in public header only to allow inclusion in other structs, but contents + * should not be accessed. + */ +struct ReadBuffersOperation +{ + /* Parameters passed in to StartReadBuffers(). */ + BufferManagerRelation bmr; + Buffer *buffers; + ForkNumber forknum; + BlockNumber blocknum; + int nblocks; + BufferAccessStrategy strategy; + int flags; + + /* Range of buffers, if we need to perform a read. */ + int io_buffers_len; +}; + +typedef struct ReadBuffersOperation ReadBuffersOperation; + +extern bool StartReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forknum, + BlockNumber blocknum, + int *nblocks, + BufferAccessStrategy strategy, + int flags, + ReadBuffersOperation *operation); +extern void WaitReadBuffers(ReadBuffersOperation *operation); + extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern bool BufferIsExclusiveLocked(Buffer buffer); @@ -250,6 +292,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern void LimitAdditionalPins(uint32 *additional_pins); +extern void LimitAdditionalLocalPins(uint32 *additional_pins); + /* in buf_init.c */ extern void InitBufferPool(void); extern Size BufferShmemSize(void); diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h new file mode 100644 index 00000000000..c4d3892bb26 --- /dev/null +++ b/src/include/storage/streaming_read.h @@ -0,0 +1,52 @@ +#ifndef STREAMING_READ_H +#define STREAMING_READ_H + +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "storage/smgr.h" + +/* Default tuning, reasonable for many users. */ +#define PGSR_FLAG_DEFAULT 0x00 + +/* + * I/O streams that are performing maintenance work on behalf of potentially + * many users. + */ +#define PGSR_FLAG_MAINTENANCE 0x01 + +/* + * We usually avoid issuing prefetch advice automatically when sequential + * access is detected, but this flag explicitly disables it, for cases that + * might not be correctly detected. Explicit advice is known to perform worse + * than letting the kernel (at least Linux) detect sequential access. + */ +#define PGSR_FLAG_SEQUENTIAL 0x02 + +/* + * We usually ramp up from smaller reads to larger ones, to support users who + * don't know if it's worth reading lots of buffers yet. This flag disables + * that, declaring ahead of time that we'll be reading all available buffers. + */ +#define PGSR_FLAG_FULL 0x04 + +struct PgStreamingRead; +typedef struct PgStreamingRead PgStreamingRead; + +/* Callback that returns the next block number to read. */ +typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr, + void *pgsr_private, + void *per_buffer_private); + +extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags, + void *pgsr_private, + size_t per_buffer_private_size, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb); + +extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); +extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private); +extern void pg_streaming_read_free(PgStreamingRead *pgsr); + +#endif diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca20..eec03f6f2b4 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 00000000000..bcab44c802f --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + streaming_read.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 00000000000..39aef2a84a2 --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +backend_sources += files( + 'streaming_read.c', +) diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c new file mode 100644 index 00000000000..71f2c4a70b6 --- /dev/null +++ b/src/backend/storage/aio/streaming_read.c @@ -0,0 +1,612 @@ +#include "postgres.h" + +#include "storage/streaming_read.h" +#include "utils/rel.h" + +/* + * Element type for PgStreamingRead's circular array of block ranges. + */ +typedef struct PgStreamingReadRange +{ + bool need_wait; + bool advice_issued; + BlockNumber blocknum; + int nblocks; + int per_buffer_data_index; + Buffer buffers[MAX_BUFFERS_PER_TRANSFER]; + ReadBuffersOperation operation; +} PgStreamingReadRange; + +/* + * Streaming read object. + */ +struct PgStreamingRead +{ + int max_ios; + int ios_in_progress; + int max_pinned_buffers; + int pinned_buffers; + int pinned_buffers_trigger; + int next_tail_buffer; + int ramp_up_pin_limit; + int ramp_up_pin_stall; + bool finished; + bool advice_enabled; + void *pgsr_private; + PgStreamingReadBufferCB callback; + + BufferAccessStrategy strategy; + BufferManagerRelation bmr; + ForkNumber forknum; + + /* Sometimes we need to buffer one block for flow control. */ + BlockNumber unget_blocknum; + void *unget_per_buffer_data; + + /* Next expected block, for detecting sequential access. */ + BlockNumber seq_blocknum; + + /* Space for optional per-buffer private data. */ + size_t per_buffer_data_size; + void *per_buffer_data; + + /* Circular buffer of ranges. */ + int size; + int head; + int tail; + PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER]; +}; + +static PgStreamingRead * +pg_streaming_read_buffer_alloc_internal(int flags, + void *pgsr_private, + size_t per_buffer_data_size, + BufferAccessStrategy strategy) +{ + PgStreamingRead *pgsr; + int size; + int max_ios; + uint32 max_pinned_buffers; + + + /* + * Decide how many assumed I/Os we will allow to run concurrently. That + * is, advice to the kernel to tell it that we will soon read. This + * number also affects how far we look ahead for opportunities to start + * more I/Os. + */ + if (flags & PGSR_FLAG_MAINTENANCE) + max_ios = maintenance_io_concurrency; + else + max_ios = effective_io_concurrency; + + /* + * The desired level of I/O concurrency controls how far ahead we are + * willing to look ahead. We also clamp it to at least + * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a full + * sized read, even when max_ios is zero. + */ + max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER); + + /* + * The *_io_concurrency GUCs might be set to 0, but we want to allow at + * least one, to keep our gating logic simple. + */ + max_ios = Max(max_ios, 1); + + /* + * Don't allow this backend to pin too many buffers. For now we'll apply + * the limit for the shared buffer pool and the local buffer pool, without + * worrying which it is. + */ + LimitAdditionalPins(&max_pinned_buffers); + LimitAdditionalLocalPins(&max_pinned_buffers); + Assert(max_pinned_buffers > 0); + + /* + * pgsr->ranges is a circular buffer. When it is empty, head == tail. + * When it is full, there is an empty element between head and tail. Head + * can also be empty (nblocks == 0), therefore we need two extra elements + * for non-occupied ranges, on top of max_pinned_buffers to allow for the + * maxmimum possible number of occupied ranges of the smallest possible + * size of one. + */ + size = max_pinned_buffers + 2; + + pgsr = (PgStreamingRead *) + palloc0(offsetof(PgStreamingRead, ranges) + + sizeof(pgsr->ranges[0]) * size); + + pgsr->max_ios = max_ios; + pgsr->per_buffer_data_size = per_buffer_data_size; + pgsr->max_pinned_buffers = max_pinned_buffers; + pgsr->pgsr_private = pgsr_private; + pgsr->strategy = strategy; + pgsr->size = size; + + pgsr->unget_blocknum = InvalidBlockNumber; + +#ifdef USE_PREFETCH + + /* + * This system supports prefetching advice. As long as direct I/O isn't + * enabled, and the caller hasn't promised sequential access, we can use + * it. + */ + if ((io_direct_flags & IO_DIRECT_DATA) == 0 && + (flags & PGSR_FLAG_SEQUENTIAL) == 0) + pgsr->advice_enabled = true; +#endif + + /* + * We start off building small ranges, but double that quickly, for the + * benefit of users that don't know how far ahead they'll read. This can + * be disabled by users that already know they'll read all the way. + */ + if (flags & PGSR_FLAG_FULL) + pgsr->ramp_up_pin_limit = INT_MAX; + else + pgsr->ramp_up_pin_limit = 1; + + /* + * We want to avoid creating ranges that are smaller than they could be + * just because we hit max_pinned_buffers. We only look ahead when the + * number of pinned buffers falls below this trigger number, or put + * another way, we stop looking ahead when we wouldn't be able to build a + * "full sized" range. + */ + pgsr->pinned_buffers_trigger = + Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER); + + /* Space for the callback to store extra data along with each block. */ + if (per_buffer_data_size) + pgsr->per_buffer_data = palloc(per_buffer_data_size * max_pinned_buffers); + + return pgsr; +} + +/* + * Create a new streaming read object that can be used to perform the + * equivalent of a series of ReadBuffer() calls for one fork of one relation. + * Internally, it generates larger vectored reads where possible by looking + * ahead. + */ +PgStreamingRead * +pg_streaming_read_buffer_alloc(int flags, + void *pgsr_private, + size_t per_buffer_data_size, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb) +{ + PgStreamingRead *result; + + result = pg_streaming_read_buffer_alloc_internal(flags, + pgsr_private, + per_buffer_data_size, + strategy); + result->callback = next_block_cb; + result->bmr = bmr; + result->forknum = forknum; + + return result; +} + +/* + * Find the per-buffer data index for the Nth block of a range. + */ +static int +get_per_buffer_data_index(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n) +{ + int result; + + /* + * Find slot in the circular buffer of per-buffer data, without using the + * expensive % operator. + */ + result = range->per_buffer_data_index + n; + if (result >= pgsr->max_pinned_buffers) + result -= pgsr->max_pinned_buffers; + Assert(result == (range->per_buffer_data_index + n) % pgsr->max_pinned_buffers); + + return result; +} + +/* + * Return a pointer to the per-buffer data by index. + */ +static void * +get_per_buffer_data_by_index(PgStreamingRead *pgsr, int per_buffer_data_index) +{ + return (char *) pgsr->per_buffer_data + + pgsr->per_buffer_data_size * per_buffer_data_index; +} + +/* + * Return a pointer to the per-buffer data for the Nth block of a range. + */ +static void * +get_per_buffer_data(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n) +{ + return get_per_buffer_data_by_index(pgsr, + get_per_buffer_data_index(pgsr, + range, + n)); +} + +/* + * Start reading the head range, and create a new head range. The new head + * range is returned. It may not be empty, if StartReadBuffers() couldn't + * start the entire range; in that case the returned range contains the + * remaining portion of the range. + */ +static PgStreamingReadRange * +pg_streaming_read_start_head_range(PgStreamingRead *pgsr) +{ + PgStreamingReadRange *head_range; + PgStreamingReadRange *new_head_range; + int nblocks_pinned; + int flags; + + /* Caller should make sure we never exceed max_ios. */ + Assert(pgsr->ios_in_progress < pgsr->max_ios); + + /* Should only call if the head range has some blocks to read. */ + head_range = &pgsr->ranges[pgsr->head]; + Assert(head_range->nblocks > 0); + + /* + * If advice hasn't been suppressed, and this system supports it, this + * isn't a strictly sequential pattern, then we'll issue advice. + */ + if (pgsr->advice_enabled && head_range->blocknum != pgsr->seq_blocknum) + flags = READ_BUFFERS_ISSUE_ADVICE; + else + flags = 0; + + + /* Start reading as many blocks as we can from the head range. */ + nblocks_pinned = head_range->nblocks; + head_range->need_wait = + StartReadBuffers(pgsr->bmr, + head_range->buffers, + pgsr->forknum, + head_range->blocknum, + &nblocks_pinned, + pgsr->strategy, + flags, + &head_range->operation); + + /* Did that start an I/O? */ + if (head_range->need_wait && (flags & READ_BUFFERS_ISSUE_ADVICE)) + { + head_range->advice_issued = true; + pgsr->ios_in_progress++; + Assert(pgsr->ios_in_progress <= pgsr->max_ios); + } + + /* + * StartReadBuffers() might have pinned fewer blocks than we asked it to, + * but always at least one. + */ + Assert(nblocks_pinned <= head_range->nblocks); + Assert(nblocks_pinned >= 1); + pgsr->pinned_buffers += nblocks_pinned; + + /* + * Remember where the next block would be after that, so we can detect + * sequential access next time. + */ + pgsr->seq_blocknum = head_range->blocknum + nblocks_pinned; + + /* + * Create a new head range. There must be space, because we have enough + * elements for every range to hold just one block, up to the pin limit. + */ + Assert(pgsr->size > pgsr->max_pinned_buffers); + Assert((pgsr->head + 1) % pgsr->size != pgsr->tail); + if (++pgsr->head == pgsr->size) + pgsr->head = 0; + new_head_range = &pgsr->ranges[pgsr->head]; + new_head_range->nblocks = 0; + new_head_range->advice_issued = false; + + /* + * If we didn't manage to start the whole read above, we split the range, + * moving the remainder into the new head range. + */ + if (nblocks_pinned < head_range->nblocks) + { + int nblocks_remaining = head_range->nblocks - nblocks_pinned; + + head_range->nblocks = nblocks_pinned; + + new_head_range->blocknum = head_range->blocknum + nblocks_pinned; + new_head_range->nblocks = nblocks_remaining; + } + + /* The new range has per-buffer data starting after the previous range. */ + new_head_range->per_buffer_data_index = + get_per_buffer_data_index(pgsr, head_range, nblocks_pinned); + + return new_head_range; +} + +/* + * Ask the callback which block it would like us to read next, with a small + * buffer in front to allow pg_streaming_unget_block() to work. + */ +static BlockNumber +pg_streaming_get_block(PgStreamingRead *pgsr, void *per_buffer_data) +{ + BlockNumber result; + + if (unlikely(pgsr->unget_blocknum != InvalidBlockNumber)) + { + /* + * If we had to unget a block, now it is time to return that one + * again. + */ + result = pgsr->unget_blocknum; + pgsr->unget_blocknum = InvalidBlockNumber; + + /* + * The same per_buffer_data element must have been used, and still + * contains whatever data the callback wrote into it. So we just + * sanity-check that we were called with the value that + * pg_streaming_unget_block() pushed back. + */ + Assert(per_buffer_data == pgsr->unget_per_buffer_data); + } + else + { + /* Use the installed callback directly. */ + result = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data); + } + + return result; +} + +/* + * In order to deal with short reads in StartReadBuffers(), we sometimes need + * to defer handling of a block until later. This *must* be called with the + * last value returned by pg_streaming_get_block(). + */ +static void +pg_streaming_unget_block(PgStreamingRead *pgsr, BlockNumber blocknum, void *per_buffer_data) +{ + Assert(pgsr->unget_blocknum == InvalidBlockNumber); + pgsr->unget_blocknum = blocknum; + pgsr->unget_per_buffer_data = per_buffer_data; +} + +static void +pg_streaming_read_look_ahead(PgStreamingRead *pgsr) +{ + PgStreamingReadRange *range; + + /* + * If we're still ramping up, we may have to stall to wait for buffers to + * be consumed first before we do any more prefetching. + */ + if (pgsr->ramp_up_pin_stall > 0) + { + Assert(pgsr->pinned_buffers > 0); + return; + } + + /* + * If we're finished or can't start more I/O, then don't look ahead. + */ + if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios) + return; + + /* + * We'll also wait until the number of pinned buffers falls below our + * trigger level, so that we have the chance to create a full range. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + return; + + do + { + BlockNumber blocknum; + void *per_buffer_data; + + /* Do we have a full-sized range? */ + range = &pgsr->ranges[pgsr->head]; + if (range->nblocks == lengthof(range->buffers)) + { + /* Start as much of it as we can. */ + range = pg_streaming_read_start_head_range(pgsr); + + /* If we're now at the I/O limit, stop here. */ + if (pgsr->ios_in_progress == pgsr->max_ios) + return; + + /* + * If we couldn't form a full range, then stop here to avoid + * creating small I/O. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + return; + + /* + * That might have only been partially started, but always + * processes at least one so that'll do for now. + */ + Assert(range->nblocks < lengthof(range->buffers)); + } + + /* Find per-buffer data slot for the next block. */ + per_buffer_data = get_per_buffer_data(pgsr, range, range->nblocks); + + /* Find out which block the callback wants to read next. */ + blocknum = pg_streaming_get_block(pgsr, per_buffer_data); + if (blocknum == InvalidBlockNumber) + { + /* End of stream. */ + pgsr->finished = true; + break; + } + + /* + * Is there a head range that we cannot extend, because the requested + * block is not consecutive? + */ + if (range->nblocks > 0 && + range->blocknum + range->nblocks != blocknum) + { + /* Yes. Start it, so we can begin building a new one. */ + range = pg_streaming_read_start_head_range(pgsr); + + /* + * It's possible that it was only partially started, and we have a + * new range with the remainder. Keep starting I/Os until we get + * it all out of the way, or we hit the I/O limit. + */ + while (range->nblocks > 0 && pgsr->ios_in_progress < pgsr->max_ios) + range = pg_streaming_read_start_head_range(pgsr); + + /* + * We have to 'unget' the block returned by the callback if we + * don't have enough I/O capacity left to start something. + */ + if (pgsr->ios_in_progress == pgsr->max_ios) + { + pg_streaming_unget_block(pgsr, blocknum, per_buffer_data); + return; + } + } + + /* If we have a new, empty range, initialize the start block. */ + if (range->nblocks == 0) + { + range->blocknum = blocknum; + } + + /* This block extends the range by one. */ + Assert(range->blocknum + range->nblocks == blocknum); + range->nblocks++; + + } while (pgsr->pinned_buffers + range->nblocks < pgsr->max_pinned_buffers && + pgsr->pinned_buffers + range->nblocks < pgsr->ramp_up_pin_limit); + + /* If we've hit the ramp-up limit, insert a stall. */ + if (pgsr->pinned_buffers + range->nblocks >= pgsr->ramp_up_pin_limit) + { + /* Can't get here if an earlier stall hasn't finished. */ + Assert(pgsr->ramp_up_pin_stall == 0); + /* Don't do any more prefetching until these buffers are consumed. */ + pgsr->ramp_up_pin_stall = pgsr->ramp_up_pin_limit; + /* Double it. It will soon be out of the way. */ + pgsr->ramp_up_pin_limit *= 2; + } + + /* Start as much as we can. */ + while (range->nblocks > 0) + { + range = pg_streaming_read_start_head_range(pgsr); + if (pgsr->ios_in_progress == pgsr->max_ios) + break; + } +} + +Buffer +pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data) +{ + pg_streaming_read_look_ahead(pgsr); + + /* See if we have one buffer to return. */ + while (pgsr->tail != pgsr->head) + { + PgStreamingReadRange *tail_range; + + tail_range = &pgsr->ranges[pgsr->tail]; + + /* + * Do we need to perform an I/O before returning the buffers from this + * range? + */ + if (tail_range->need_wait) + { + WaitReadBuffers(&tail_range->operation); + tail_range->need_wait = false; + + /* + * We don't really know if the kernel generated a physical I/O + * when we issued advice, let alone when it finished, but it has + * certainly finished now because we've performed the read. + */ + if (tail_range->advice_issued) + { + Assert(pgsr->ios_in_progress > 0); + pgsr->ios_in_progress--; + } + } + + /* Are there more buffers available in this range? */ + if (pgsr->next_tail_buffer < tail_range->nblocks) + { + int buffer_index; + Buffer buffer; + + buffer_index = pgsr->next_tail_buffer++; + buffer = tail_range->buffers[buffer_index]; + + Assert(BufferIsValid(buffer)); + + /* We are giving away ownership of this pinned buffer. */ + Assert(pgsr->pinned_buffers > 0); + pgsr->pinned_buffers--; + + if (pgsr->ramp_up_pin_stall > 0) + pgsr->ramp_up_pin_stall--; + + if (per_buffer_data) + *per_buffer_data = get_per_buffer_data(pgsr, tail_range, buffer_index); + + return buffer; + } + + /* Advance tail to next range, if there is one. */ + if (++pgsr->tail == pgsr->size) + pgsr->tail = 0; + pgsr->next_tail_buffer = 0; + + /* + * If tail crashed into head, and head is not empty, then it is time + * to start that range. + */ + if (pgsr->tail == pgsr->head && + pgsr->ranges[pgsr->head].nblocks > 0) + pg_streaming_read_start_head_range(pgsr); + } + + Assert(pgsr->pinned_buffers == 0); + + return InvalidBuffer; +} + +void +pg_streaming_read_free(PgStreamingRead *pgsr) +{ + Buffer buffer; + + /* Stop looking ahead. */ + pgsr->finished = true; + + /* Unpin anything that wasn't consumed. */ + while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != InvalidBuffer) + ReleaseBuffer(buffer); + + Assert(pgsr->pinned_buffers == 0); + Assert(pgsr->ios_in_progress == 0); + + /* Release memory. */ + if (pgsr->per_buffer_data) + pfree(pgsr->per_buffer_data); + + pfree(pgsr); +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index bdf89bbc4dc..3b1b0ad99df 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -19,6 +19,11 @@ * and pin it so that no one can destroy it while this process * is using it. * + * StartReadBuffers() -- as above, but for multiple contiguous blocks in + * two steps. + * + * WaitReadBuffers() -- second step of StartReadBuffers(). + * * ReleaseBuffer() -- unpin a buffer * * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". @@ -472,10 +477,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) ) -static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, +static Buffer ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, - ReadBufferMode mode, BufferAccessStrategy strategy, - bool *hit); + ReadBufferMode mode, BufferAccessStrategy strategy); static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr, ForkNumber fork, BufferAccessStrategy strategy, @@ -501,7 +505,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); -static bool StartBufferIO(BufferDesc *buf, bool forInput); +static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, bool forget_owner); static void AbortBufferIO(Buffer buffer); @@ -782,7 +786,6 @@ Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) { - bool hit; Buffer buf; /* @@ -795,15 +798,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions"))); - /* - * Read the buffer, and update pgstat counters to reflect a cache hit or - * miss. - */ - pgstat_count_buffer_read(reln); - buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence, - forkNum, blockNum, mode, strategy, &hit); - if (hit) - pgstat_count_buffer_hit(reln); + buf = ReadBuffer_common(BMR_REL(reln), + forkNum, blockNum, mode, strategy); + return buf; } @@ -823,13 +820,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent) { - bool hit; - SMgrRelation smgr = smgropen(rlocator, InvalidBackendId); - return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT : - RELPERSISTENCE_UNLOGGED, forkNum, blockNum, - mode, strategy, &hit); + return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT : + RELPERSISTENCE_UNLOGGED), + forkNum, blockNum, + mode, strategy); } /* @@ -995,35 +991,68 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, */ if (buffer == InvalidBuffer) { - bool hit; - Assert(extended_by == 0); - buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence, - fork, extend_to - 1, mode, strategy, - &hit); + buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy); } return buffer; } +/* + * Zero a buffer and lock it, as part of the implementation of + * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already + * pinned. It does not have to be valid, but it is valid and locked on + * return. + */ +static void +ZeroBuffer(Buffer buffer, ReadBufferMode mode) +{ + BufferDesc *bufHdr; + uint32 buf_state; + + Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); + + if (BufferIsLocal(buffer)) + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + else + { + bufHdr = GetBufferDescriptor(buffer - 1); + if (mode == RBM_ZERO_AND_LOCK) + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + else + LockBufferForCleanup(buffer); + } + + memset(BufferGetPage(buffer), 0, BLCKSZ); + + if (BufferIsLocal(buffer)) + { + buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + buf_state = LockBufHdr(bufHdr); + buf_state |= BM_VALID; + UnlockBufHdr(bufHdr, buf_state); + } +} + /* * ReadBuffer_common -- common logic for all ReadBuffer variants * * *hit is set to true if the request was satisfied from shared buffer cache. */ static Buffer -ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, +ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, - BufferAccessStrategy strategy, bool *hit) + BufferAccessStrategy strategy) { - BufferDesc *bufHdr; - Block bufBlock; - bool found; - IOContext io_context; - IOObject io_object; - bool isLocalBuf = SmgrIsTemp(smgr); - - *hit = false; + ReadBuffersOperation operation; + Buffer buffer; + int nblocks; + int flags; /* * Backward compatibility path, most code should use ExtendBufferedRel() @@ -1042,181 +1071,404 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) flags |= EB_LOCK_FIRST; - return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence), - forkNum, strategy, flags); + return ExtendBufferedRel(bmr, forkNum, strategy, flags); } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend); + nblocks = 1; + if (mode == RBM_ZERO_ON_ERROR) + flags = READ_BUFFERS_ZERO_ON_ERROR; + else + flags = 0; + if (StartReadBuffers(bmr, + &buffer, + forkNum, + blockNum, + &nblocks, + strategy, + flags, + &operation)) + WaitReadBuffers(&operation); + Assert(nblocks == 1); /* single block can't be short */ + if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK) + ZeroBuffer(buffer, mode); + + return buffer; +} + +static Buffer +PrepareReadBuffer(BufferManagerRelation bmr, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr) +{ + BufferDesc *bufHdr; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; + + Assert(blockNum != P_NEW); + + Assert(bmr.smgr); + + isLocalBuf = SmgrIsTemp(bmr.smgr); if (isLocalBuf) { - /* - * We do not use a BufferAccessStrategy for I/O of temporary tables. - * However, in some cases, the "strategy" may not be NULL, so we can't - * rely on IOContextForStrategy() to set the right IOContext for us. - * This may happen in cases like CREATE TEMPORARY TABLE AS... - */ io_context = IOCONTEXT_NORMAL; io_object = IOOBJECT_TEMP_RELATION; - bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found); - if (found) - pgBufferUsage.local_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.local_blks_read++; } else { - /* - * lookup the buffer. IO_IN_PROGRESS is set if the requested block is - * not currently in memory. - */ io_context = IOContextForStrategy(strategy); io_object = IOOBJECT_RELATION; - bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, - strategy, &found, io_context); - if (found) - pgBufferUsage.shared_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.shared_blks_read++; } - /* At this point we do NOT hold any locks. */ + TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend); - /* if it was already in the buffer pool, we're done */ - if (found) + ResourceOwnerEnlarge(CurrentResourceOwner); + if (isLocalBuf) + { + bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr); + if (*foundPtr) + pgBufferUsage.local_blks_hit++; + } + else + { + bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum, + strategy, foundPtr, io_context); + if (*foundPtr) + pgBufferUsage.shared_blks_hit++; + } + if (bmr.rel) + { + /* + * While pgBufferUsage's "read" counter isn't bumped unless we reach + * WaitReadBuffers() (so, not for hits, and not for buffers that are + * zeroed instead), the per-relation stats always count them. + */ + pgstat_count_buffer_read(bmr.rel); + if (*foundPtr) + pgstat_count_buffer_hit(bmr.rel); + } + if (*foundPtr) { - /* Just need to update stats before we exit */ - *hit = true; VacuumPageHit++; pgstat_count_io_op(io_object, io_context, IOOP_HIT); - if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit; TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); - - /* - * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked - * on return. - */ - if (!isLocalBuf) - { - if (mode == RBM_ZERO_AND_LOCK) - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE); - else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) - LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); - } - - return BufferDescriptorGetBuffer(bufHdr); + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); } - /* - * if we have gotten to this point, we have allocated a buffer for the - * page but its contents are not yet valid. IO_IN_PROGRESS is set for it, - * if it's a shared buffer. - */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ - - bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); - - /* - * Read in the page, unless the caller intends to overwrite it and just - * wants us to allocate a buffer. - */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - MemSet((char *) bufBlock, 0, BLCKSZ); - else - { - instr_time io_start = pgstat_prepare_io_time(track_io_timing); - - smgrread(smgr, forkNum, blockNum, bufBlock); - - pgstat_count_io_op_time(io_object, io_context, - IOOP_READ, io_start, 1); - - /* check for garbage data */ - if (!PageIsVerifiedExtended((Page) bufBlock, blockNum, - PIV_LOG_WARNING | PIV_REPORT_STAT)) - { - if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) - { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - MemSet((char *) bufBlock, 0, BLCKSZ); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - } - } - - /* - * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer - * content lock before marking the page as valid, to make sure that no - * other backend sees the zeroed page before the caller has had a chance - * to initialize it. - * - * Since no-one else can be looking at the page contents yet, there is no - * difference between an exclusive lock and a cleanup-strength lock. (Note - * that we cannot use LockBuffer() or LockBufferForCleanup() here, because - * they assert that the buffer is already valid.) - */ - if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && - !isLocalBuf) - { - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); - } - - if (isLocalBuf) - { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); - - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); - } - else - { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); - } - - VacuumPageMiss++; - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss; - - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); - return BufferDescriptorGetBuffer(bufHdr); } /* - * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared - * buffer. If no buffer exists already, selects a replacement - * victim and evicts the old page, but does NOT read in new page. + * Begin reading a range of blocks beginning at blockNum and extending for + * *nblocks. On return, up to *nblocks pinned buffers holding those blocks + * are written into the buffers array, and *nblocks is updated to contain the + * actual number, which may be fewer than requested. + * + * If false is returned, no I/O is necessary and WaitReadBuffers() is not + * necessary. If true is returned, one I/O has been started, and + * WaitReadBuffers() must be called with the same operation object before the + * buffers are accessed. Along with the operation object, the caller-supplied + * array of buffers must remain valid until WaitReadBuffers() is called. + * + * Currently the I/O is only started with optional operating system advice, + * and the real I/O happens in WaitReadBuffers(). In future work, true I/O + * could be initiated here. + */ +bool +StartReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forkNum, + BlockNumber blockNum, + int *nblocks, + BufferAccessStrategy strategy, + int flags, + ReadBuffersOperation *operation) +{ + int actual_nblocks = *nblocks; + + if (bmr.rel) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + + operation->bmr = bmr; + operation->forknum = forkNum; + operation->blocknum = blockNum; + operation->buffers = buffers; + operation->nblocks = actual_nblocks; + operation->strategy = strategy; + operation->flags = flags; + + operation->io_buffers_len = 0; + + for (int i = 0; i < actual_nblocks; ++i) + { + bool found; + + buffers[i] = PrepareReadBuffer(bmr, + forkNum, + blockNum + i, + strategy, + &found); + + if (found) + { + /* + * Terminate the read as soon as we get a hit. It could be a + * single buffer hit, or it could be a hit that follows a readable + * range. We don't want to create more than one readable range, + * so we stop here. + */ + actual_nblocks = operation->nblocks = *nblocks = i + 1; + } + else + { + /* Extend the readable range to cover this block. */ + operation->io_buffers_len++; + } + } + + if (operation->io_buffers_len > 0) + { + if (flags & READ_BUFFERS_ISSUE_ADVICE) + { + /* + * In theory we should only do this if PrepareReadBuffers() had to + * allocate new buffers above. That way, if two calls to + * StartReadBuffers() were made for the same blocks before + * WaitReadBuffers(), only the first would issue the advice. + * That'd be a better simulation of true asynchronous I/O, which + * would only start the I/O once, but isn't done here for + * simplicity. Note also that the following call might actually + * issue two advice calls if we cross a segment boundary; in a + * true asynchronous version we might choose to process only one + * real I/O at a time in that case. + */ + smgrprefetch(bmr.smgr, forkNum, blockNum, operation->io_buffers_len); + } + + /* Indicate that WaitReadBuffers() should be called. */ + return true; + } + else + { + return false; + } +} + +static inline bool +WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) +{ + if (BufferIsLocal(buffer)) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); + + return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; + } + else + return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); +} + +void +WaitReadBuffers(ReadBuffersOperation *operation) +{ + BufferManagerRelation bmr; + Buffer *buffers; + int nblocks; + BlockNumber blocknum; + ForkNumber forknum; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; + + /* + * Currently operations are only allowed to include a read of some range, + * with an optional extra buffer that is already pinned at the end. So + * nblocks can be at most one more than io_buffers_len. + */ + Assert((operation->nblocks == operation->io_buffers_len) || + (operation->nblocks == operation->io_buffers_len + 1)); + + /* Find the range of the physical read we need to perform. */ + nblocks = operation->io_buffers_len; + if (nblocks == 0) + return; /* nothing to do */ + + buffers = &operation->buffers[0]; + blocknum = operation->blocknum; + forknum = operation->forknum; + bmr = operation->bmr; + + isLocalBuf = SmgrIsTemp(bmr.smgr); + if (isLocalBuf) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; + } + + /* + * We count all these blocks as read by this backend. This is traditional + * behavior, but might turn out to be not true if we find that someone + * else has beaten us and completed the read of some of these blocks. In + * that case the system globally double-counts, but we traditionally don't + * count this as a "hit", and we don't have a separate counter for "miss, + * but another backend completed the read". + */ + if (isLocalBuf) + pgBufferUsage.local_blks_read += nblocks; + else + pgBufferUsage.shared_blks_read += nblocks; + + for (int i = 0; i < nblocks; ++i) + { + int io_buffers_len; + Buffer io_buffers[MAX_BUFFERS_PER_TRANSFER]; + void *io_pages[MAX_BUFFERS_PER_TRANSFER]; + instr_time io_start; + BlockNumber io_first_block; + + /* + * Skip this block if someone else has already completed it. If an + * I/O is already in progress in another backend, this will wait for + * the outcome: either done, or something went wrong and we will + * retry. + */ + if (!WaitReadBuffersCanStartIO(buffers[i], false)) + { + /* + * Report this as a 'hit' for this backend, even though it must + * have started out as a miss in PrepareReadBuffer(). + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + continue; + } + + /* We found a buffer that we need to read in. */ + io_buffers[0] = buffers[i]; + io_pages[0] = BufferGetBlock(buffers[i]); + io_first_block = blocknum + i; + io_buffers_len = 1; + + /* + * How many neighboring-on-disk blocks can we can scatter-read into + * other buffers at the same time? In this case we don't wait if we + * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS + * for the head block, so we should get on with that I/O as soon as + * possible. We'll come back to this block again, above. + */ + while ((i + 1) < nblocks && + WaitReadBuffersCanStartIO(buffers[i + 1], true)) + { + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i + 1]) == + BufferGetBlockNumber(buffers[i]) + 1); + + io_buffers[io_buffers_len] = buffers[++i]; + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); + } + + io_start = pgstat_prepare_io_time(track_io_timing); + smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, + io_buffers_len); + + /* Verify each block we read, and terminate the I/O. */ + for (int j = 0; j < io_buffers_len; ++j) + { + BufferDesc *bufHdr; + Block bufBlock; + + if (isLocalBuf) + { + bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); + bufBlock = LocalBufHdrGetBlock(bufHdr); + } + else + { + bufHdr = GetBufferDescriptor(io_buffers[j] - 1); + bufBlock = BufHdrGetBlock(bufHdr); + } + + /* check for garbage data */ + if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + memset(bufBlock, 0, BLCKSZ); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + } + + /* Terminate I/O and set BM_VALID. */ + if (isLocalBuf) + { + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + /* Set BM_VALID, terminate IO, and wake up any waiters */ + TerminateBufferIO(bufHdr, false, BM_VALID, true); + } + + /* Report I/Os as completing individually. */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + false); + } + + VacuumPageMiss += io_buffers_len; + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + } +} + +/* + * BufferAlloc -- subroutine for StartReadBuffers. Handles lookup of a shared + * buffer. If no buffer exists already, selects a replacement victim and + * evicts the old page, but does NOT read in new page. * * "strategy" can be a buffer replacement strategy object, or NULL for * the default strategy. The selected buffer's usage_count is advanced when @@ -1224,11 +1476,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * * The returned buffer is pinned and is already marked as holding the * desired page. If it already did have the desired page, *foundPtr is - * set true. Otherwise, *foundPtr is set false and the buffer is marked - * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. - * - * *foundPtr is actually redundant with the buffer's BM_VALID flag, but - * we keep it for simplicity in ReadBuffer. + * set true. Otherwise, *foundPtr is set false. * * io_context is passed as an output parameter to avoid calling * IOContextForStrategy() when there is a shared buffers hit and no IO @@ -1287,19 +1535,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(buf, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return buf; @@ -1364,19 +1603,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(existing_buf_hdr, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return existing_buf_hdr; @@ -1408,15 +1638,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, LWLockRelease(newPartitionLock); /* - * Buffer contents are currently invalid. Try to obtain the right to - * start I/O. If StartBufferIO returns false, then someone else managed - * to read it before we did, so there's nothing left for BufferAlloc() to - * do. + * Buffer contents are currently invalid. */ - if (StartBufferIO(victim_buf_hdr, true)) - *foundPtr = false; - else - *foundPtr = true; + *foundPtr = false; return victim_buf_hdr; } @@ -1770,7 +1994,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; @@ -2035,7 +2259,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, buf_state &= ~BM_VALID; UnlockBufHdr(existing_hdr, buf_state); - } while (!StartBufferIO(existing_hdr, true)); + } while (!StartBufferIO(existing_hdr, true, false)); } else { @@ -2058,7 +2282,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true); + StartBufferIO(victim_buf_hdr, true, false); } } @@ -2373,7 +2597,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) else { /* - * If we previously pinned the buffer, it must surely be valid. + * If we previously pinned the buffer, it is likely to be valid, but + * it may not be if StartReadBuffers() was called and + * WaitReadBuffers() hasn't been called yet. We'll check by loading + * the flags without locking. This is racy, but it's OK to return + * false spuriously: when WaitReadBuffers() calls StartBufferIO(), + * it'll see that it's now valid. * * Note: We deliberately avoid a Valgrind client request here. * Individual access methods can optionally superimpose buffer page @@ -2382,7 +2611,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) * that the buffer page is legitimately non-accessible here. We * cannot meddle with that. */ - result = true; + result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; } ref->refcount++; @@ -3450,7 +3679,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false)) + if (!StartBufferIO(buf, false, false)) return; /* Setup error traceback support for ereport() */ @@ -5185,9 +5414,15 @@ WaitIO(BufferDesc *buf) * * Returns true if we successfully marked the buffer as I/O busy, * false if someone else already did the work. + * + * If nowait is true, then we don't wait for an I/O to be finished by another + * backend. In that case, false indicates either that the I/O was already + * finished, or is still in progress. This is useful for callers that want to + * find out if they can perform the I/O as part of a larger operation, without + * waiting for the answer or distinguishing the reasons why not. */ static bool -StartBufferIO(BufferDesc *buf, bool forInput) +StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) { uint32 buf_state; @@ -5200,6 +5435,8 @@ StartBufferIO(BufferDesc *buf, bool forInput) if (!(buf_state & BM_IO_IN_PROGRESS)) break; UnlockBufHdr(buf, buf_state); + if (nowait) + return false; WaitIO(buf); } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 1f02fed250e..6956d4e5b49 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, * LocalBufferAlloc - * Find or create a local buffer for the given page of the given relation. * - * API is similar to bufmgr.c's BufferAlloc, except that we do not need - * to do any locking since this is all local. Also, IO_IN_PROGRESS - * does not get set. Lastly, we support only default access strategy - * (hence, usage_count is always advanced). + * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do + * any locking since this is all local. We support only default access + * strategy (hence, usage_count is always advanced). */ BufferDesc * LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, @@ -288,7 +287,7 @@ GetLocalVictimBuffer(void) } /* see LimitAdditionalPins() */ -static void +void LimitAdditionalLocalPins(uint32 *additional_pins) { uint32 max_pins; @@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins) /* * In contrast to LimitAdditionalPins() other backends don't play a role - * here. We can allow up to NLocBuffer pins in total. + * here. We can allow up to NLocBuffer pins in total, but it might not be + * initialized yet so read num_temp_buffers. */ - max_pins = (NLocBuffer - NLocalPinnedBuffers); + max_pins = (num_temp_buffers - NLocalPinnedBuffers); if (*additional_pins >= max_pins) *additional_pins = max_pins; diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 40345bdca27..739d13293fb 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index fc8b15d0cf2..cfb58cf4836 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2097,6 +2097,8 @@ PgStat_TableCounts PgStat_TableStatus PgStat_TableXactStatus PgStat_WalStats +PgStreamingRead +PgStreamingReadRange PgXmlErrorContext PgXmlStrictness Pg_finfo_record @@ -2267,6 +2269,7 @@ ReInitializeDSMForeignScan_function ReScanForeignScan_function ReadBufPtrType ReadBufferMode +ReadBuffersOperation ReadBytePtrType ReadExtraTocPtrType ReadFunc -- 2.43.0
From 6bca02c986e29e86bcda21675f285a99149b6f74 Mon Sep 17 00:00:00 2001 From: Nazir Bilal Yavuz <byavu...@gmail.com> Date: Mon, 19 Feb 2024 14:30:47 +0300 Subject: [PATCH v2 2/2] Use streaming read API in ANALYZE ANALYZE command gets random tuples using BlockSampler algorithm. Use streaming reads to get these tuples by using BlockSampler algorithm in streaming read API prefetch logic. --- src/include/access/tableam.h | 16 ++-- src/backend/access/heap/heapam_handler.c | 11 +-- src/backend/commands/analyze.c | 97 ++++++++---------------- 3 files changed, 45 insertions(+), 79 deletions(-) diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d2..7e6e99ba71d 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -21,6 +21,7 @@ #include "access/sdir.h" #include "access/xact.h" #include "executor/tuptable.h" +#include "storage/streaming_read.h" #include "utils/rel.h" #include "utils/snapshot.h" @@ -648,9 +649,9 @@ typedef struct TableAmRoutine BufferAccessStrategy bstrategy); /* - * Prepare to analyze block `blockno` of `scan`. The scan has been started - * with table_beginscan_analyze(). See also - * table_scan_analyze_next_block(). + * Prepare to analyze next block of `scan`. Next block is decided by + * callback function of `pgsr`. The scan has been started with + * table_beginscan_analyze(). See also table_scan_analyze_next_block(). * * The callback may acquire resources like locks that are held until * table_scan_analyze_next_tuple() returns false. It e.g. can make sense @@ -665,8 +666,7 @@ typedef struct TableAmRoutine * isn't one yet. */ bool (*scan_analyze_next_block) (TableScanDesc scan, - BlockNumber blockno, - BufferAccessStrategy bstrategy); + PgStreamingRead *pgsr); /* * See table_scan_analyze_next_tuple(). @@ -1714,11 +1714,9 @@ table_relation_vacuum(Relation rel, struct VacuumParams *params, * Returns false if block is unsuitable for sampling, true otherwise. */ static inline bool -table_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, - BufferAccessStrategy bstrategy) +table_scan_analyze_next_block(TableScanDesc scan, PgStreamingRead *pgsr) { - return scan->rs_rd->rd_tableam->scan_analyze_next_block(scan, blockno, - bstrategy); + return scan->rs_rd->rd_tableam->scan_analyze_next_block(scan, pgsr); } /* diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 680a50bf8b1..7ffaf8ac402 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -993,10 +993,10 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, } static bool -heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, - BufferAccessStrategy bstrategy) +heapam_scan_analyze_next_block(TableScanDesc scan, PgStreamingRead *pgsr) { HeapScanDesc hscan = (HeapScanDesc) scan; + BlockNumber *current_block; /* * We must maintain a pin on the target page's buffer to ensure that @@ -1007,10 +1007,11 @@ heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, * doing much work per tuple, the extra lock traffic is probably better * avoided. */ - hscan->rs_cblock = blockno; + hscan->rs_cbuf = pg_streaming_read_buffer_get_next(pgsr, (void **) ¤t_block); + hscan->rs_cblock = *current_block; hscan->rs_cindex = FirstOffsetNumber; - hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, - blockno, RBM_NORMAL, bstrategy); + + Assert(BufferIsValid(hscan->rs_cbuf)); LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE); /* in heap all blocks can contain tuples, so always return true */ diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index a03495d6c95..7576afcf655 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -1112,6 +1112,26 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr) return stats; } +/* + * Prefetch callback function to get next block number while using + * BlockSampling algorithm + */ +static BlockNumber +pg_block_sampling_streaming_read_next(PgStreamingRead *pgsr, + void *pgsr_private, + void *per_io_data) +{ + BlockSamplerData *bs = pgsr_private; + BlockNumber *current_block = per_io_data; + + if (BlockSampler_HasMore(bs)) + *current_block = BlockSampler_Next(bs); + else + *current_block = InvalidBlockNumber; + + return *current_block; +} + /* * acquire_sample_rows -- acquire a random sample of rows from the table * @@ -1164,10 +1184,7 @@ acquire_sample_rows(Relation onerel, int elevel, TableScanDesc scan; BlockNumber nblocks; BlockNumber blksdone = 0; -#ifdef USE_PREFETCH - int prefetch_maximum = 0; /* blocks to prefetch if enabled */ - BlockSamplerData prefetch_bs; -#endif + PgStreamingRead *pgsr = NULL; Assert(targrows > 0); @@ -1180,13 +1197,6 @@ acquire_sample_rows(Relation onerel, int elevel, randseed = pg_prng_uint32(&pg_global_prng_state); nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed); -#ifdef USE_PREFETCH - prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace); - /* Create another BlockSampler, using the same seed, for prefetching */ - if (prefetch_maximum) - (void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed); -#endif - /* Report sampling block numbers */ pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL, nblocks); @@ -1197,68 +1207,23 @@ acquire_sample_rows(Relation onerel, int elevel, scan = table_beginscan_analyze(onerel); slot = table_slot_create(onerel, NULL); -#ifdef USE_PREFETCH - - /* - * If we are doing prefetching, then go ahead and tell the kernel about - * the first set of pages we are going to want. This also moves our - * iterator out ahead of the main one being used, where we will keep it so - * that we're always pre-fetching out prefetch_maximum number of blocks - * ahead. - */ - if (prefetch_maximum) - { - for (int i = 0; i < prefetch_maximum; i++) - { - BlockNumber prefetch_block; - - if (!BlockSampler_HasMore(&prefetch_bs)) - break; - - prefetch_block = BlockSampler_Next(&prefetch_bs); - PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block); - } - } -#endif + pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT, + &bs, + sizeof(BlockSamplerData), + vac_strategy, + BMR_REL(scan->rs_rd), + MAIN_FORKNUM, + pg_block_sampling_streaming_read_next); /* Outer loop over blocks to sample */ - while (BlockSampler_HasMore(&bs)) + while (nblocks) { bool block_accepted; - BlockNumber targblock = BlockSampler_Next(&bs); -#ifdef USE_PREFETCH - BlockNumber prefetch_targblock = InvalidBlockNumber; - - /* - * Make sure that every time the main BlockSampler is moved forward - * that our prefetch BlockSampler also gets moved forward, so that we - * always stay out ahead. - */ - if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs)) - prefetch_targblock = BlockSampler_Next(&prefetch_bs); -#endif vacuum_delay_point(); - block_accepted = table_scan_analyze_next_block(scan, targblock, vac_strategy); + block_accepted = table_scan_analyze_next_block(scan, pgsr); -#ifdef USE_PREFETCH - - /* - * When pre-fetching, after we get a block, tell the kernel about the - * next one we will want, if there's any left. - * - * We want to do this even if the table_scan_analyze_next_block() call - * above decides against analyzing the block it picked. - */ - if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber) - PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock); -#endif - - /* - * Don't analyze if table_scan_analyze_next_block() indicated this - * block is unsuitable for analyzing. - */ if (!block_accepted) continue; @@ -1309,7 +1274,9 @@ acquire_sample_rows(Relation onerel, int elevel, pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_DONE, ++blksdone); + nblocks--; } + pg_streaming_read_free(pgsr); ExecDropSingleTupleTableSlot(slot); table_endscan(scan); -- 2.43.0