On 24/03/2024 15:02, Thomas Munro wrote:
On Wed, Mar 20, 2024 at 4:04 AM Heikki Linnakangas <hlinn...@iki.fi> wrote:
Maybe 'pg_streaming_read_next_buffer' or just 'pg_streaming_read_next',
for a shorter name.
Hmm. The idea of 'buffer' appearing in a couple of names is that
there are conceptually other kinds of I/O that we might want to
stream, like raw files or buffers other than the buffer pool, maybe
even sockets, so this would be part of a family of similar interfaces.
I think it needs to be clear that this variant gives you buffers. I'm
OK with removing "get" but I guess it would be better to keep the
words in the same order across the three functions? What about these?
streaming_read_buffer_begin();
streaming_read_buffer_next();
streaming_read_buffer_end();
Tried like that in this version. Other ideas would be to make
"stream" the main noun, buffered_read_stream_begin() or something.
Ideas welcome.
Works for me, although "streaming_read_buffer" is a pretty long prefix.
The flags like "STREAMING_READ_MAINTENANCE" probably ought to be
"STREAMING_READ_BUFFER_MAINTENANCE" as well.
Maybe "buffer_stream_next()"?
Here are some other changes:
* I'm fairly happy with the ABC adaptive distance algorithm so far, I
think, but I spent more time tidying up the way it is implemented. I
didn't like the way each 'range' had buffer[MAX_BUFFERS_PER_TRANSFER],
so I created a new dense array stream->buffers that behaved as a
second circular queue.
* The above also made it trivial for MAX_BUFFERS_PER_TRANSFER to
become the GUC that it always wanted to be: buffer_io_size defaulting
to 128kB. Seems like a reasonable thing to have? Could also
influence things like bulk write? (The main problem I have with the
GUC currently is choosing a category, async resources is wrong....)
* By analogy, it started to look a bit funny that each range had room
for a ReadBuffersOperation, and we had enough ranges for
max_pinned_buffers * 1 block range. So I booted that out to another
dense array, of size max_ios.
* At the same time, Bilal and Andres had been complaining privately
about 'range' management overheads showing up in perf and creating a
regression against master on fully cached scans that do nothing else
(eg pg_prewarm, where we lookup, pin, unpin every page and do no I/O
and no CPU work with the page, a somewhat extreme case but a
reasonable way to isolate the management costs); having made the above
change, it suddenly seemed obvious that I should make the buffers
array the 'main' circular queue, pointing off to another place for
information required for dealing with misses. In this version, there
are no more range objects. This feels better and occupies and touches
less memory. See pictures below.
+1 for all that. Much better!
* Various indexes and sizes that couldn't quite fit in uint8_t but
couldn't possibly exceed a few thousand because they are bounded by
numbers deriving from range-limited GUCs are now int16_t (while I was
looking for low hanging opportunities to reduce memory usage...)
Is int16 enough though? It seems so, because:
max_pinned_buffers = Max(max_ios * 4, buffer_io_size);
and max_ios is constrained by the GUC's maximum MAX_IO_CONCURRENCY, and
buffer_io_size is constrained by MAX_BUFFER_IO_SIZE == PG_IOV_MAX == 32.
If someone changes those constants though, int16 might overflow and fail
in weird ways. I'd suggest being more careful here and explicitly clamp
max_pinned_buffers at PG_INT16_MAX or have a static assertion or
something. (I think it needs to be somewhat less than PG_INT16_MAX,
because of the extra "overflow buffers" stuff and some other places
where you do arithmetic.)
/*
* We gave a contiguous range of buffer space to StartReadBuffers(), but
* we want it to wrap around at max_pinned_buffers. Move values that
* overflowed into the extra space. At the same time, put -1 in the I/O
* slots for the rest of the buffers to indicate no I/O. They are
covered
* by the head buffer's I/O, if there is one. We avoid a % operator.
*/
overflow = (stream->next_buffer_index + nblocks) -
stream->max_pinned_buffers;
if (overflow > 0)
{
memmove(&stream->buffers[0],
&stream->buffers[stream->max_pinned_buffers],
sizeof(stream->buffers[0]) * overflow);
for (int i = 0; i < overflow; ++i)
stream->buffer_io_indexes[i] = -1;
for (int i = 1; i < nblocks - overflow; ++i)
stream->buffer_io_indexes[stream->next_buffer_index +
i] = -1;
}
else
{
for (int i = 1; i < nblocks; ++i)
stream->buffer_io_indexes[stream->next_buffer_index +
i] = -1;
}
Instead of clearing buffer_io_indexes here, it might be cheaper/simpler
to initialize the array to -1 in streaming_read_buffer_begin(), and
reset buffer_io_indexes[io_index] = -1 in streaming_read_buffer_next(),
after the WaitReadBuffers() call. In other words, except when an I/O is
in progress, keep all the elements at -1, even the elements that are not
currently in use.
Alternatively, you could remember the first buffer that the I/O applies
to in the 'ios' array. In other words, instead of pointing from buffer
to the I/O that it depends on, point from the I/O to the buffer that
depends on it. The last attached patch implements that approach. I'm not
wedded to it, but it feels a little simpler.
if (stream->ios[io_index].flags & READ_BUFFERS_ISSUE_ADVICE)
{
/* Distance ramps up fast (behavior C). */
...
}
else
{
/* No advice; move towards full I/O size (behavior B).
*/
...
}
The comment on ReadBuffersOperation says "Declared in public header only
to allow inclusion in other structs, but contents should not be
accessed", but here you access the 'flags' field.
You also mentioned that the StartReadBuffers() argument list is too
long. Perhaps the solution is to redefine ReadBuffersOperation so that
it consists of two parts: 1st part is filled in by the caller, and
contains the arguments, and 2nd part is private to bufmgr.c. The
signature for StartReadBuffers() would then be just:
bool StartReadBuffers(ReadBuffersOperation *operation);
That would make it OK to read the 'flags' field. It would also allow
reusing the same ReadBuffersOperation struct for multiple I/Os for the
same relation; you only need to change the changing parts of the struct
on each operation.
In the attached patch set, the first three patches are your v9 with no
changes. The last patch refactors away 'buffer_io_indexes' like I
mentioned above. The others are fixes for some other trivial things that
caught my eye.
--
Heikki Linnakangas
Neon (https://neon.tech)
From 359dcac7725f5bf35f5b135fe1b7fe9d8e050436 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 26 Feb 2024 23:48:31 +1300
Subject: [PATCH v9.heikki 1/9] Provide vectored variant of ReadBuffer().
Break ReadBuffer() up into two steps: StartReadBuffers() and
WaitReadBuffers(). This has two advantages:
1. Multiple consecutive blocks can be read with one system call.
2. Advice (hints of future reads) can optionally be issued to the kernel.
The traditional ReadBuffer() function is now implemented in terms of
those functions, to avoid duplication. For now we still only read a
block at a time so there is no change to generated system calls yet, but
later commits will provide infrastructure to help build up larger calls.
Callers should respect the new GUC buffer_io_size, and the limit on
per-backend pins which is now exposed as a public interface.
With some more infrastructure in later work, StartReadBuffers() could
be extended to start real asynchronous I/O instead of advice.
Reviewed-by: Melanie Plageman <melanieplage...@gmail.com>
Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi>
Reviewed-by: Nazir Bilal Yavuz <byavu...@gmail.com>
Reviewed-by: Dilip Kumar <dilipbal...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6ut5tum2g...@mail.gmail.com
---
doc/src/sgml/config.sgml | 14 +
src/backend/storage/buffer/bufmgr.c | 658 ++++++++++++------
src/backend/storage/buffer/localbuf.c | 14 +-
src/backend/utils/misc/guc_tables.c | 14 +
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/storage/bufmgr.h | 45 +-
src/tools/pgindent/typedefs.list | 1 +
7 files changed, 535 insertions(+), 212 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 65a6e6c4086..3af86c59384 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2719,6 +2719,20 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
+ <varlistentry id="guc-buffer-io-size" xreflabel="buffer_io_size">
+ <term><varname>buffer_io_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>buffer_io_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Controls the target I/O size in operations that coalesce buffer I/O.
+ The default is 128kB.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-max-worker-processes" xreflabel="max_worker_processes">
<term><varname>max_worker_processes</varname> (<type>integer</type>)
<indexterm>
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f0f8d4259c5..b5347678726 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -19,6 +19,11 @@
* and pin it so that no one can destroy it while this process
* is using it.
*
+ * StartReadBuffers() -- as above, but for multiple contiguous blocks in
+ * two steps.
+ *
+ * WaitReadBuffers() -- second step of StartReadBuffers().
+ *
* ReleaseBuffer() -- unpin a buffer
*
* MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
@@ -160,6 +165,12 @@ int checkpoint_flush_after = DEFAULT_CHECKPOINT_FLUSH_AFTER;
int bgwriter_flush_after = DEFAULT_BGWRITER_FLUSH_AFTER;
int backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER;
+/*
+ * How many buffers should be coalesced into single I/O operations where
+ * possible.
+ */
+int buffer_io_size = DEFAULT_BUFFER_IO_SIZE;
+
/* local state for LockBufferForCleanup */
static BufferDesc *PinCountWaitBuf = NULL;
@@ -471,10 +482,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
)
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
ForkNumber forkNum, BlockNumber blockNum,
- ReadBufferMode mode, BufferAccessStrategy strategy,
- bool *hit);
+ ReadBufferMode mode, BufferAccessStrategy strategy);
static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr,
ForkNumber fork,
BufferAccessStrategy strategy,
@@ -500,7 +510,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used,
WritebackContext *wb_context);
static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
uint32 set_flag_bits, bool forget_owner);
static void AbortBufferIO(Buffer buffer);
@@ -781,7 +791,6 @@ Buffer
ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
ReadBufferMode mode, BufferAccessStrategy strategy)
{
- bool hit;
Buffer buf;
/*
@@ -794,15 +803,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot access temporary tables of other sessions")));
- /*
- * Read the buffer, and update pgstat counters to reflect a cache hit or
- * miss.
- */
- pgstat_count_buffer_read(reln);
- buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
- forkNum, blockNum, mode, strategy, &hit);
- if (hit)
- pgstat_count_buffer_hit(reln);
+ buf = ReadBuffer_common(BMR_REL(reln),
+ forkNum, blockNum, mode, strategy);
+
return buf;
}
@@ -822,13 +825,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode,
BufferAccessStrategy strategy, bool permanent)
{
- bool hit;
-
SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
- return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
- RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
- mode, strategy, &hit);
+ return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+ RELPERSISTENCE_UNLOGGED),
+ forkNum, blockNum,
+ mode, strategy);
}
/*
@@ -994,35 +996,66 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
*/
if (buffer == InvalidBuffer)
{
- bool hit;
-
Assert(extended_by == 0);
- buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
- fork, extend_to - 1, mode, strategy,
- &hit);
+ buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy);
}
return buffer;
}
+/*
+ * Zero a buffer and lock it, as part of the implementation of
+ * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already
+ * pinned. It does not have to be valid, but it is valid and locked on
+ * return.
+ */
+static void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+ BufferDesc *bufHdr;
+ uint32 buf_state;
+
+ Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+ if (BufferIsLocal(buffer))
+ bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+ else
+ {
+ bufHdr = GetBufferDescriptor(buffer - 1);
+ if (mode == RBM_ZERO_AND_LOCK)
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ else
+ LockBufferForCleanup(buffer);
+ }
+
+ memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+ if (BufferIsLocal(buffer))
+ {
+ buf_state = pg_atomic_read_u32(&bufHdr->state);
+ buf_state |= BM_VALID;
+ pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+ }
+ else
+ {
+ buf_state = LockBufHdr(bufHdr);
+ buf_state |= BM_VALID;
+ UnlockBufHdr(bufHdr, buf_state);
+ }
+}
+
/*
* ReadBuffer_common -- common logic for all ReadBuffer variants
- *
- * *hit is set to true if the request was satisfied from shared buffer cache.
*/
static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode,
- BufferAccessStrategy strategy, bool *hit)
+ BufferAccessStrategy strategy)
{
- BufferDesc *bufHdr;
- Block bufBlock;
- bool found;
- IOContext io_context;
- IOObject io_object;
- bool isLocalBuf = SmgrIsTemp(smgr);
-
- *hit = false;
+ ReadBuffersOperation operation;
+ Buffer buffer;
+ int nblocks;
+ int flags;
/*
* Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1041,181 +1074,413 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
flags |= EB_LOCK_FIRST;
- return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
- forkNum, strategy, flags);
+ return ExtendBufferedRel(bmr, forkNum, strategy, flags);
}
- TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
- smgr->smgr_rlocator.locator.spcOid,
- smgr->smgr_rlocator.locator.dbOid,
- smgr->smgr_rlocator.locator.relNumber,
- smgr->smgr_rlocator.backend);
+ nblocks = 1;
+ if (mode == RBM_ZERO_ON_ERROR)
+ flags = READ_BUFFERS_ZERO_ON_ERROR;
+ else
+ flags = 0;
+ if (StartReadBuffers(bmr,
+ &buffer,
+ forkNum,
+ blockNum,
+ &nblocks,
+ strategy,
+ flags,
+ &operation))
+ WaitReadBuffers(&operation);
+ Assert(nblocks == 1); /* single block can't be short */
+
+ if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+ ZeroBuffer(buffer, mode);
+
+ return buffer;
+}
+
+/*
+ * Pin a buffer for a given block. *foundPtr is set to true if the block was
+ * already present, or false if more work is required to either read it in or
+ * zero it.
+ */
+static inline Buffer
+PinBufferForBlock(BufferManagerRelation bmr,
+ ForkNumber forkNum,
+ BlockNumber blockNum,
+ BufferAccessStrategy strategy,
+ bool *foundPtr)
+{
+ BufferDesc *bufHdr;
+ bool isLocalBuf;
+ IOContext io_context;
+ IOObject io_object;
+ Assert(blockNum != P_NEW);
+
+ Assert(bmr.smgr);
+
+ isLocalBuf = SmgrIsTemp(bmr.smgr);
if (isLocalBuf)
{
- /*
- * We do not use a BufferAccessStrategy for I/O of temporary tables.
- * However, in some cases, the "strategy" may not be NULL, so we can't
- * rely on IOContextForStrategy() to set the right IOContext for us.
- * This may happen in cases like CREATE TEMPORARY TABLE AS...
- */
io_context = IOCONTEXT_NORMAL;
io_object = IOOBJECT_TEMP_RELATION;
- bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
- if (found)
- pgBufferUsage.local_blks_hit++;
- else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
- mode == RBM_ZERO_ON_ERROR)
- pgBufferUsage.local_blks_read++;
}
else
{
- /*
- * lookup the buffer. IO_IN_PROGRESS is set if the requested block is
- * not currently in memory.
- */
io_context = IOContextForStrategy(strategy);
io_object = IOOBJECT_RELATION;
- bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
- strategy, &found, io_context);
- if (found)
- pgBufferUsage.shared_blks_hit++;
- else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
- mode == RBM_ZERO_ON_ERROR)
- pgBufferUsage.shared_blks_read++;
}
- /* At this point we do NOT hold any locks. */
+ TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+ bmr.smgr->smgr_rlocator.locator.spcOid,
+ bmr.smgr->smgr_rlocator.locator.dbOid,
+ bmr.smgr->smgr_rlocator.locator.relNumber,
+ bmr.smgr->smgr_rlocator.backend);
- /* if it was already in the buffer pool, we're done */
- if (found)
+ if (isLocalBuf)
+ {
+ bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr);
+ if (*foundPtr)
+ pgBufferUsage.local_blks_hit++;
+ }
+ else
+ {
+ bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
+ strategy, foundPtr, io_context);
+ if (*foundPtr)
+ pgBufferUsage.shared_blks_hit++;
+ }
+ if (bmr.rel)
+ {
+ /*
+ * While pgBufferUsage's "read" counter isn't bumped unless we reach
+ * WaitReadBuffers() (so, not for hits, and not for buffers that are
+ * zeroed instead), the per-relation stats always count them.
+ */
+ pgstat_count_buffer_read(bmr.rel);
+ if (*foundPtr)
+ pgstat_count_buffer_hit(bmr.rel);
+ }
+ if (*foundPtr)
{
- /* Just need to update stats before we exit */
- *hit = true;
VacuumPageHit++;
pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageHit;
TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
- smgr->smgr_rlocator.locator.spcOid,
- smgr->smgr_rlocator.locator.dbOid,
- smgr->smgr_rlocator.locator.relNumber,
- smgr->smgr_rlocator.backend,
- found);
+ bmr.smgr->smgr_rlocator.locator.spcOid,
+ bmr.smgr->smgr_rlocator.locator.dbOid,
+ bmr.smgr->smgr_rlocator.locator.relNumber,
+ bmr.smgr->smgr_rlocator.backend,
+ true);
+ }
- /*
- * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
- * on return.
- */
- if (!isLocalBuf)
+ return BufferDescriptorGetBuffer(bufHdr);
+}
+
+/*
+ * Begin reading a range of blocks beginning at blockNum and extending for
+ * *nblocks. On return, up to *nblocks pinned buffers holding those blocks
+ * are written into the buffers array, and *nblocks is updated to contain the
+ * actual number, which may be fewer than requested.
+ *
+ * If false is returned, no I/O is necessary and WaitReadBuffers() is not
+ * necessary. If true is returned, one I/O has been started, and
+ * WaitReadBuffers() must be called with the same operation object before the
+ * buffers are accessed. Along with the operation object, the caller-supplied
+ * array of buffers must remain valid until WaitReadBuffers() is called.
+ *
+ * Currently the I/O is only started with optional operating system advice,
+ * and the real I/O happens in WaitReadBuffers(). In future work, true I/O
+ * could be initiated here.
+ */
+bool
+StartReadBuffers(BufferManagerRelation bmr,
+ Buffer *buffers,
+ ForkNumber forkNum,
+ BlockNumber blockNum,
+ int *nblocks,
+ BufferAccessStrategy strategy,
+ int flags,
+ ReadBuffersOperation *operation)
+{
+ int actual_nblocks = *nblocks;
+ int io_buffers_len = 0;
+
+ Assert(*nblocks > 0);
+ Assert(*nblocks <= MAX_BUFFER_IO_SIZE);
+
+ if (bmr.rel)
+ {
+ bmr.smgr = RelationGetSmgr(bmr.rel);
+ bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+ }
+
+ for (int i = 0; i < actual_nblocks; ++i)
+ {
+ bool found;
+
+ buffers[i] = PinBufferForBlock(bmr,
+ forkNum,
+ blockNum + i,
+ strategy,
+ &found);
+
+ if (found)
+ {
+ /*
+ * Terminate the read as soon as we get a hit. It could be a
+ * single buffer hit, or it could be a hit that follows a readable
+ * range. We don't want to create more than one readable range,
+ * so we stop here.
+ */
+ actual_nblocks = operation->nblocks = *nblocks = i + 1;
+ break;
+ }
+ else
+ {
+ /* Extend the readable range to cover this block. */
+ io_buffers_len++;
+ }
+ }
+
+ if (io_buffers_len > 0)
+ {
+ /* Populate extra information needed for I/O. */
+ operation->io_buffers_len = io_buffers_len;
+ operation->blocknum = blockNum;
+ operation->buffers = buffers;
+ operation->nblocks = actual_nblocks;
+ operation->bmr = bmr;
+ operation->forknum = forkNum;
+ operation->strategy = strategy;
+ operation->flags = flags;
+
+ if (flags & READ_BUFFERS_ISSUE_ADVICE)
{
- if (mode == RBM_ZERO_AND_LOCK)
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
- LW_EXCLUSIVE);
- else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
- LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
+ /*
+ * In theory we should only do this if PinBufferForBlock() had to
+ * allocate new buffers above. That way, if two calls to
+ * StartReadBuffers() were made for the same blocks before
+ * WaitReadBuffers(), only the first would issue the advice.
+ * That'd be a better simulation of true asynchronous I/O, which
+ * would only start the I/O once, but isn't done here for
+ * simplicity. Note also that the following call might actually
+ * issue two advice calls if we cross a segment boundary; in a
+ * true asynchronous version we might choose to process only one
+ * real I/O at a time in that case.
+ */
+ smgrprefetch(bmr.smgr, forkNum, blockNum, operation->io_buffers_len);
}
- return BufferDescriptorGetBuffer(bufHdr);
+ /* Indicate that WaitReadBuffers() should be called. */
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+static inline bool
+WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+ if (BufferIsLocal(buffer))
+ {
+ BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+
+ return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
}
+ else
+ return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+}
+
+void
+WaitReadBuffers(ReadBuffersOperation *operation)
+{
+ BufferManagerRelation bmr;
+ Buffer *buffers;
+ int nblocks;
+ BlockNumber blocknum;
+ ForkNumber forknum;
+ bool isLocalBuf;
+ IOContext io_context;
+ IOObject io_object;
/*
- * if we have gotten to this point, we have allocated a buffer for the
- * page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
- * if it's a shared buffer.
+ * Currently operations are only allowed to include a read of some range,
+ * with an optional extra buffer that is already pinned at the end. So
+ * nblocks can be at most one more than io_buffers_len.
*/
- Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
+ Assert((operation->nblocks == operation->io_buffers_len) ||
+ (operation->nblocks == operation->io_buffers_len + 1));
- bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+ /* Find the range of the physical read we need to perform. */
+ nblocks = operation->io_buffers_len;
+ if (nblocks == 0)
+ return; /* nothing to do */
+
+ buffers = &operation->buffers[0];
+ blocknum = operation->blocknum;
+ forknum = operation->forknum;
+ bmr = operation->bmr;
+
+ isLocalBuf = SmgrIsTemp(bmr.smgr);
+ if (isLocalBuf)
+ {
+ io_context = IOCONTEXT_NORMAL;
+ io_object = IOOBJECT_TEMP_RELATION;
+ }
+ else
+ {
+ io_context = IOContextForStrategy(operation->strategy);
+ io_object = IOOBJECT_RELATION;
+ }
/*
- * Read in the page, unless the caller intends to overwrite it and just
- * wants us to allocate a buffer.
+ * We count all these blocks as read by this backend. This is traditional
+ * behavior, but might turn out to be not true if we find that someone
+ * else has beaten us and completed the read of some of these blocks. In
+ * that case the system globally double-counts, but we traditionally don't
+ * count this as a "hit", and we don't have a separate counter for "miss,
+ * but another backend completed the read".
*/
- if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
- MemSet((char *) bufBlock, 0, BLCKSZ);
+ if (isLocalBuf)
+ pgBufferUsage.local_blks_read += nblocks;
else
+ pgBufferUsage.shared_blks_read += nblocks;
+
+ for (int i = 0; i < nblocks; ++i)
{
- instr_time io_start = pgstat_prepare_io_time(track_io_timing);
+ int io_buffers_len;
+ Buffer io_buffers[MAX_BUFFER_IO_SIZE];
+ void *io_pages[MAX_BUFFER_IO_SIZE];
+ instr_time io_start;
+ BlockNumber io_first_block;
+
+ /*
+ * Skip this block if someone else has already completed it. If an
+ * I/O is already in progress in another backend, this will wait for
+ * the outcome: either done, or something went wrong and we will
+ * retry.
+ */
+ if (!WaitReadBuffersCanStartIO(buffers[i], false))
+ {
+ /*
+ * Report this as a 'hit' for this backend, even though it must
+ * have started out as a miss in PinBufferForBlock().
+ */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+ bmr.smgr->smgr_rlocator.locator.spcOid,
+ bmr.smgr->smgr_rlocator.locator.dbOid,
+ bmr.smgr->smgr_rlocator.locator.relNumber,
+ bmr.smgr->smgr_rlocator.backend,
+ true);
+ continue;
+ }
+
+ /* We found a buffer that we need to read in. */
+ io_buffers[0] = buffers[i];
+ io_pages[0] = BufferGetBlock(buffers[i]);
+ io_first_block = blocknum + i;
+ io_buffers_len = 1;
- smgrread(smgr, forkNum, blockNum, bufBlock);
+ /*
+ * How many neighboring-on-disk blocks can we can scatter-read into
+ * other buffers at the same time? In this case we don't wait if we
+ * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS
+ * for the head block, so we should get on with that I/O as soon as
+ * possible. We'll come back to this block again, above.
+ */
+ while ((i + 1) < nblocks &&
+ WaitReadBuffersCanStartIO(buffers[i + 1], true))
+ {
+ /* Must be consecutive block numbers. */
+ Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+ BufferGetBlockNumber(buffers[i]) + 1);
- pgstat_count_io_op_time(io_object, io_context,
- IOOP_READ, io_start, 1);
+ io_buffers[io_buffers_len] = buffers[++i];
+ io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+ }
- /* check for garbage data */
- if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
- PIV_LOG_WARNING | PIV_REPORT_STAT))
+ io_start = pgstat_prepare_io_time(track_io_timing);
+ smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
+ pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
+ io_buffers_len);
+
+ /* Verify each block we read, and terminate the I/O. */
+ for (int j = 0; j < io_buffers_len; ++j)
{
- if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
+ BufferDesc *bufHdr;
+ Block bufBlock;
+
+ if (isLocalBuf)
{
- ereport(WARNING,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s; zeroing out page",
- blockNum,
- relpath(smgr->smgr_rlocator, forkNum))));
- MemSet((char *) bufBlock, 0, BLCKSZ);
+ bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+ bufBlock = LocalBufHdrGetBlock(bufHdr);
}
else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s",
- blockNum,
- relpath(smgr->smgr_rlocator, forkNum))));
- }
- }
-
- /*
- * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
- * content lock before marking the page as valid, to make sure that no
- * other backend sees the zeroed page before the caller has had a chance
- * to initialize it.
- *
- * Since no-one else can be looking at the page contents yet, there is no
- * difference between an exclusive lock and a cleanup-strength lock. (Note
- * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
- * they assert that the buffer is already valid.)
- */
- if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
- !isLocalBuf)
- {
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
- }
+ {
+ bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+ bufBlock = BufHdrGetBlock(bufHdr);
+ }
- if (isLocalBuf)
- {
- /* Only need to adjust flags */
- uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
+ /* check for garbage data */
+ if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+ PIV_LOG_WARNING | PIV_REPORT_STAT))
+ {
+ if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("invalid page in block %u of relation %s; zeroing out page",
+ io_first_block + j,
+ relpath(bmr.smgr->smgr_rlocator, forknum))));
+ memset(bufBlock, 0, BLCKSZ);
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("invalid page in block %u of relation %s",
+ io_first_block + j,
+ relpath(bmr.smgr->smgr_rlocator, forknum))));
+ }
- buf_state |= BM_VALID;
- pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
- }
- else
- {
- /* Set BM_VALID, terminate IO, and wake up any waiters */
- TerminateBufferIO(bufHdr, false, BM_VALID, true);
- }
+ /* Terminate I/O and set BM_VALID. */
+ if (isLocalBuf)
+ {
+ uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
- VacuumPageMiss++;
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageMiss;
+ buf_state |= BM_VALID;
+ pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+ }
+ else
+ {
+ /* Set BM_VALID, terminate IO, and wake up any waiters */
+ TerminateBufferIO(bufHdr, false, BM_VALID, true);
+ }
- TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
- smgr->smgr_rlocator.locator.spcOid,
- smgr->smgr_rlocator.locator.dbOid,
- smgr->smgr_rlocator.locator.relNumber,
- smgr->smgr_rlocator.backend,
- found);
+ /* Report I/Os as completing individually. */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+ bmr.smgr->smgr_rlocator.locator.spcOid,
+ bmr.smgr->smgr_rlocator.locator.dbOid,
+ bmr.smgr->smgr_rlocator.locator.relNumber,
+ bmr.smgr->smgr_rlocator.backend,
+ false);
+ }
- return BufferDescriptorGetBuffer(bufHdr);
+ VacuumPageMiss += io_buffers_len;
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+ }
}
/*
- * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared
- * buffer. If no buffer exists already, selects a replacement
- * victim and evicts the old page, but does NOT read in new page.
+ * BufferAlloc -- subroutine for PinBufferForBlock. Handles lookup of a shared
+ * buffer. If no buffer exists already, selects a replacement victim and
+ * evicts the old page, but does NOT read in new page.
*
* "strategy" can be a buffer replacement strategy object, or NULL for
* the default strategy. The selected buffer's usage_count is advanced when
@@ -1223,11 +1488,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*
* The returned buffer is pinned and is already marked as holding the
* desired page. If it already did have the desired page, *foundPtr is
- * set true. Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true. Otherwise, *foundPtr is set false.
*
* io_context is passed as an output parameter to avoid calling
* IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1286,19 +1547,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
/*
* We can only get here if (a) someone else is still reading in
- * the page, or (b) a previous read attempt failed. We have to
- * wait for any active read attempt to finish, and then set up our
- * own read attempt if the page is still not BM_VALID.
- * StartBufferIO does it all.
+ * the page, (b) a previous read attempt failed, or (c) someone
+ * called StartReadBuffers() but not yet WaitReadBuffers().
*/
- if (StartBufferIO(buf, true))
- {
- /*
- * If we get here, previous attempts to read the buffer must
- * have failed ... but we shall bravely try again.
- */
- *foundPtr = false;
- }
+ *foundPtr = false;
}
return buf;
@@ -1363,19 +1615,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
/*
* We can only get here if (a) someone else is still reading in
- * the page, or (b) a previous read attempt failed. We have to
- * wait for any active read attempt to finish, and then set up our
- * own read attempt if the page is still not BM_VALID.
- * StartBufferIO does it all.
+ * the page, (b) a previous read attempt failed, or (c) someone
+ * called StartReadBuffers() but not yet WaitReadBuffers().
*/
- if (StartBufferIO(existing_buf_hdr, true))
- {
- /*
- * If we get here, previous attempts to read the buffer must
- * have failed ... but we shall bravely try again.
- */
- *foundPtr = false;
- }
+ *foundPtr = false;
}
return existing_buf_hdr;
@@ -1407,15 +1650,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
LWLockRelease(newPartitionLock);
/*
- * Buffer contents are currently invalid. Try to obtain the right to
- * start I/O. If StartBufferIO returns false, then someone else managed
- * to read it before we did, so there's nothing left for BufferAlloc() to
- * do.
+ * Buffer contents are currently invalid.
*/
- if (StartBufferIO(victim_buf_hdr, true))
- *foundPtr = false;
- else
- *foundPtr = true;
+ *foundPtr = false;
return victim_buf_hdr;
}
@@ -1769,7 +2006,7 @@ again:
* pessimistic, but outside of toy-sized shared_buffers it should allow
* sufficient pins.
*/
-static void
+void
LimitAdditionalPins(uint32 *additional_pins)
{
uint32 max_backends;
@@ -2034,7 +2271,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
buf_state &= ~BM_VALID;
UnlockBufHdr(existing_hdr, buf_state);
- } while (!StartBufferIO(existing_hdr, true));
+ } while (!StartBufferIO(existing_hdr, true, false));
}
else
{
@@ -2057,7 +2294,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
LWLockRelease(partition_lock);
/* XXX: could combine the locked operations in it with the above */
- StartBufferIO(victim_buf_hdr, true);
+ StartBufferIO(victim_buf_hdr, true, false);
}
}
@@ -2372,7 +2609,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
else
{
/*
- * If we previously pinned the buffer, it must surely be valid.
+ * If we previously pinned the buffer, it is likely to be valid, but
+ * it may not be if StartReadBuffers() was called and
+ * WaitReadBuffers() hasn't been called yet. We'll check by loading
+ * the flags without locking. This is racy, but it's OK to return
+ * false spuriously: when WaitReadBuffers() calls StartBufferIO(),
+ * it'll see that it's now valid.
*
* Note: We deliberately avoid a Valgrind client request here.
* Individual access methods can optionally superimpose buffer page
@@ -2381,7 +2623,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
* that the buffer page is legitimately non-accessible here. We
* cannot meddle with that.
*/
- result = true;
+ result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
}
ref->refcount++;
@@ -3449,7 +3691,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
* someone else flushed the buffer before we could, so we need not do
* anything.
*/
- if (!StartBufferIO(buf, false))
+ if (!StartBufferIO(buf, false, false))
return;
/* Setup error traceback support for ereport() */
@@ -5184,9 +5426,15 @@ WaitIO(BufferDesc *buf)
*
* Returns true if we successfully marked the buffer as I/O busy,
* false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend. In that case, false indicates either that the I/O was already
+ * finished, or is still in progress. This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
*/
static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
{
uint32 buf_state;
@@ -5199,6 +5447,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
UnlockBufHdr(buf, buf_state);
+ if (nowait)
+ return false;
WaitIO(buf);
}
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index fcfac335a57..985a2c7049c 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -108,10 +108,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
* LocalBufferAlloc -
* Find or create a local buffer for the given page of the given relation.
*
- * API is similar to bufmgr.c's BufferAlloc, except that we do not need
- * to do any locking since this is all local. Also, IO_IN_PROGRESS
- * does not get set. Lastly, we support only default access strategy
- * (hence, usage_count is always advanced).
+ * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
+ * any locking since this is all local. We support only default access
+ * strategy (hence, usage_count is always advanced).
*/
BufferDesc *
LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
@@ -287,7 +286,7 @@ GetLocalVictimBuffer(void)
}
/* see LimitAdditionalPins() */
-static void
+void
LimitAdditionalLocalPins(uint32 *additional_pins)
{
uint32 max_pins;
@@ -297,9 +296,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins)
/*
* In contrast to LimitAdditionalPins() other backends don't play a role
- * here. We can allow up to NLocBuffer pins in total.
+ * here. We can allow up to NLocBuffer pins in total, but it might not be
+ * initialized yet so read num_temp_buffers.
*/
- max_pins = (NLocBuffer - NLocalPinnedBuffers);
+ max_pins = (num_temp_buffers - NLocalPinnedBuffers);
if (*additional_pins >= max_pins)
*additional_pins = max_pins;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 1e71e7db4a0..71889471266 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3112,6 +3112,20 @@ struct config_int ConfigureNamesInt[] =
NULL
},
+ {
+ {"buffer_io_size",
+ PGC_USERSET,
+ RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Target size for coalescing reads and writes of buffered data blocks."),
+ NULL,
+ GUC_UNIT_BLOCKS
+ },
+ &buffer_io_size,
+ DEFAULT_BUFFER_IO_SIZE,
+ 1, MAX_BUFFER_IO_SIZE,
+ NULL, NULL, NULL
+ },
+
{
{"backend_flush_after", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 2244ee52f79..b7a4143df21 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -203,6 +203,7 @@
#backend_flush_after = 0 # measured in pages, 0 disables
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#maintenance_io_concurrency = 10 # 1-1000; 0 disables prefetching
+#buffer_io_size = 128kB
#max_worker_processes = 8 # (change requires restart)
#max_parallel_workers_per_gather = 2 # limited by max_parallel_workers
#max_parallel_maintenance_workers = 2 # limited by max_parallel_workers
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d51d46d3353..1cc198bde21 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
#ifndef BUFMGR_H
#define BUFMGR_H
+#include "port/pg_iovec.h"
#include "storage/block.h"
#include "storage/buf.h"
#include "storage/bufpage.h"
@@ -133,6 +134,10 @@ extern PGDLLIMPORT bool track_io_timing;
extern PGDLLIMPORT int effective_io_concurrency;
extern PGDLLIMPORT int maintenance_io_concurrency;
+#define MAX_BUFFER_IO_SIZE PG_IOV_MAX
+#define DEFAULT_BUFFER_IO_SIZE Min(MAX_BUFFER_IO_SIZE, (128 * 1024) / BLCKSZ)
+extern PGDLLIMPORT int buffer_io_size;
+
extern PGDLLIMPORT int checkpoint_flush_after;
extern PGDLLIMPORT int backend_flush_after;
extern PGDLLIMPORT int bgwriter_flush_after;
@@ -158,7 +163,6 @@ extern PGDLLIMPORT int32 *LocalRefCount;
#define BUFFER_LOCK_SHARE 1
#define BUFFER_LOCK_EXCLUSIVE 2
-
/*
* prototypes for functions in bufmgr.c
*/
@@ -177,6 +181,42 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
ForkNumber forkNum, BlockNumber blockNum,
ReadBufferMode mode, BufferAccessStrategy strategy,
bool permanent);
+
+#define READ_BUFFERS_ZERO_ON_ERROR 0x01
+#define READ_BUFFERS_ISSUE_ADVICE 0x02
+
+/*
+ * Private state used by StartReadBuffers() and WaitReadBuffers(). Declared
+ * in public header only to allow inclusion in other structs, but contents
+ * should not be accessed.
+ */
+struct ReadBuffersOperation
+{
+ /* Parameters passed in to StartReadBuffers(). */
+ BufferManagerRelation bmr;
+ Buffer *buffers;
+ ForkNumber forknum;
+ BlockNumber blocknum;
+ int16 nblocks;
+ BufferAccessStrategy strategy;
+ int flags;
+
+ /* Range of buffers, if we need to perform a read. */
+ int16 io_buffers_len;
+};
+
+typedef struct ReadBuffersOperation ReadBuffersOperation;
+
+extern bool StartReadBuffers(BufferManagerRelation bmr,
+ Buffer *buffers,
+ ForkNumber forknum,
+ BlockNumber blocknum,
+ int *nblocks,
+ BufferAccessStrategy strategy,
+ int flags,
+ ReadBuffersOperation *operation);
+extern void WaitReadBuffers(ReadBuffersOperation *operation);
+
extern void ReleaseBuffer(Buffer buffer);
extern void UnlockReleaseBuffer(Buffer buffer);
extern bool BufferIsExclusiveLocked(Buffer buffer);
@@ -250,6 +290,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
extern bool BgBufferSync(struct WritebackContext *wb_context);
+extern void LimitAdditionalPins(uint32 *additional_pins);
+extern void LimitAdditionalLocalPins(uint32 *additional_pins);
+
/* in buf_init.c */
extern void InitBufferPool(void);
extern Size BufferShmemSize(void);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cfa9d5aaeac..97edd1388e9 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2286,6 +2286,7 @@ ReInitializeDSMForeignScan_function
ReScanForeignScan_function
ReadBufPtrType
ReadBufferMode
+ReadBuffersOperation
ReadBytePtrType
ReadExtraTocPtrType
ReadFunc
--
2.39.2
From 8c6d754275af5fcf2f0a638a2770e2ad1a60659b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Tue, 27 Feb 2024 00:01:42 +1300
Subject: [PATCH v9.heikki 2/9] Provide API for "streaming" reads of relations.
"Streaming reads" can be used as a more efficient replacement for
a series of ReadBuffer() calls.
The client code supplies a callback that can say which block to read
next, and then consumes individual buffers one at a time. This division
allows streaming_read.c to build up large calls to StartReadBuffers(),
and issue fadvise() advice about future random reads in a systematic
way.
This API is based on an idea proposed by Andres Freund, to pave the way
for asynchronous I/O in future work as required to support direct I/O.
The longer term aim is to create an abstraction that insulates client
code from future improvements to the I/O subsystem. In the short term,
this mechanism allows improvements and simplification even with
traditional synchronous I/O.
An extended API may be necessary in future for more complicated cases
(for example recovery, which has a related mechanism LsnReadQueue in
xlogprefetcher.c that could eventually be replaced by this), but this
basic API is thought to be sufficient for many common usage patterns
involving predictable access to a single relation fork.
Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi>
Reviewed-by: Melanie Plageman <melanieplage...@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavu...@gmail.com>
Reviewed-by: Andres Freund <and...@anarazel.de>
Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6ut5tum2g...@mail.gmail.com
---
src/backend/storage/Makefile | 2 +-
src/backend/storage/aio/Makefile | 14 +
src/backend/storage/aio/meson.build | 5 +
src/backend/storage/aio/streaming_read.c | 678 +++++++++++++++++++++++
src/backend/storage/meson.build | 1 +
src/include/storage/streaming_read.h | 50 ++
src/tools/pgindent/typedefs.list | 1 +
7 files changed, 750 insertions(+), 1 deletion(-)
create mode 100644 src/backend/storage/aio/Makefile
create mode 100644 src/backend/storage/aio/meson.build
create mode 100644 src/backend/storage/aio/streaming_read.c
create mode 100644 src/include/storage/streaming_read.h
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca20..eec03f6f2b4 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 00000000000..bcab44c802f
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+ streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 00000000000..39aef2a84a2
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+ 'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 00000000000..760a231500a
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,678 @@
+/*-------------------------------------------------------------------------
+ *
+ * streaming_read.c
+ * Mechanism for buffer access with look-ahead
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * Code that needs to access relation data typically pins blocks one at a
+ * time, often in a predictable order that might be sequential or data-driven.
+ * Calling the simple ReadBuffer() function for each block is inefficient,
+ * because blocks that are not yet in the buffer pool require I/O operations
+ * that are small and might stall waiting for storage. This mechanism looks
+ * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
+ * neighboring blocks together and ahead of time, with an adaptive look-ahead
+ * distance.
+ *
+ * A user-provided callback generates a stream of block numbers that is used
+ * to form reads of up to size buffer_io_size, by attempting to merge them
+ * with a pending read. When that isn't possible, the existing pending read
+ * is sent to StartReadBuffers() so that a new one can begin to form.
+ *
+ * The algorithm for controlling the look-ahead distance tries to classify the
+ * stream into three ideal behaviors:
+ *
+ * A) No I/O is necessary, because the requested blocks are fully cached
+ * already. There is no benefit to looking ahead more than one block, so
+ * distance is 1. This is the default initial assumption.
+ *
+ * B) I/O is necessary, but fadvise is undesirable because the access is
+ * sequential, or impossible because direct I/O is enabled or the system
+ * doesn't support advice. There is no benefit in looking ahead more than
+ * buffer_io_size (the GUC controlling physical read size), because in this
+ * case only goal is larger read system calls. Looking further ahead would
+ * pin many buffers and perform speculative work looking ahead for no benefit.
+ *
+ * C) I/O is necesssary, it appears random, and this system supports fadvise.
+ * We'll look further ahead in order to reach the configured level of I/O
+ * concurrency.
+ *
+ * The distance increases rapidly and decays slowly, so that it moves towards
+ * those levels as different I/O patterns are discovered. For example, a
+ * sequential scan of fully cached data doesn't bother looking ahead, but a
+ * sequential scan that hits a region of uncached blocks will start issuing
+ * increasingly wide read calls until it plateaus at buffer_io_size.
+ *
+ * The main data structure is a circular queue of buffers of size
+ * max_pinned_buffers, ready to be returned by streaming_read_buffer_next().
+ * Each buffer also has an optional variable sized object that is passed from
+ * the callback to the consumer of buffers. A third array records whether
+ * WaitReadBuffers() must be called before returning the buffer, and if so,
+ * points to the relevant ReadBuffersOperation object.
+ *
+ * For example, if the callback return block numbers 10, 42, 43, 60 in
+ * successive calls, then these data structures might appear as follows:
+ *
+ * buffers buf/data buf/io ios
+ *
+ * +----+ +-----+ +---+ +--------+
+ * | | | | | | +---->| 42..44 |
+ * +----+ +-----+ +---+ | +--------+
+ * oldest_buffer_index -> | 10 | | ? | | | | +-->| 60..60 |
+ * +----+ +-----+ +---+ | | +--------+
+ * | 42 | | ? | | 0 +--+ | | |
+ * +----+ +-----+ +---+ | +--------+
+ * | 43 | | ? | | | | | |
+ * +----+ +-----+ +---+ | +--------+
+ * | 44 | | ? | | | | | |
+ * +----+ +-----+ +---+ | +--------+
+ * | 60 | | ? | | 1 +----+
+ * +----+ +-----+ +---+
+ * next_buffer_index -> | | | | | |
+ * +----+ +-----+ +---+
+ *
+ * In the example, 5 buffers are pinned, and the next buffer to be streamed to
+ * the client is block 10. Block 10 was a hit and has no associated I/O, but
+ * the range 42..44 requires an I/O wait before its buffers are returned, as
+ * does block 60.
+ *
+ * IDENTIFICATION
+ * src/backend/storage/storage/aio/streaming_read.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_tablespace.h"
+#include "miscadmin.h"
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+#include "utils/spccache.h"
+
+/*
+ * Streaming read object.
+ */
+struct StreamingRead
+{
+ int16 max_ios;
+ int16 ios_in_progress;
+ int16 max_pinned_buffers;
+ int16 pinned_buffers;
+ int16 distance;
+ bool started;
+ bool finished;
+ bool advice_enabled;
+
+ /*
+ * The callback that will tell us which block numbers to read, and an
+ * opaque pointer that will be pass to it for its own purposes.
+ */
+ StreamingReadBufferCB callback;
+ void *callback_private_data;
+
+ /* The relation we will read. */
+ BufferAccessStrategy strategy;
+ BufferManagerRelation bmr;
+ ForkNumber forknum;
+
+ /* Sometimes we need to buffer one block for flow control. */
+ BlockNumber unget_blocknum;
+ void *unget_per_buffer_data;
+
+ /* Next expected block, for detecting sequential access. */
+ BlockNumber seq_blocknum;
+
+ /* The read operation we are currently preparing. */
+ BlockNumber pending_read_blocknum;
+ int16 pending_read_nblocks;
+
+ /* Space for buffers and optional per-buffer private data. */
+ Buffer *buffers;
+ size_t per_buffer_data_size;
+ void *per_buffer_data;
+ int16 *buffer_io_indexes;
+
+ /* Read operations that have been started by not waited for yet. */
+ ReadBuffersOperation *ios;
+ int16 next_io_index;
+
+ /* Head and tail of the circular queue of buffers. */
+ int16 oldest_buffer_index; /* Next pinned buffer to return */
+ int16 next_buffer_index; /* Index of next buffer to pin */
+};
+
+/*
+ * Return a pointer to the per-buffer data by index.
+ */
+static void *
+get_per_buffer_data(StreamingRead *stream, int16 buffer_index)
+{
+ return (char *) stream->per_buffer_data +
+ stream->per_buffer_data_size * buffer_index;
+}
+
+/*
+ * Ask the callback which block it would like us to read next, with a small
+ * buffer in front to allow streaming_unget_block() to work.
+ */
+static BlockNumber
+streaming_read_get_block(StreamingRead *stream, void *per_buffer_data)
+{
+ BlockNumber result;
+
+ if (unlikely(stream->unget_blocknum != InvalidBlockNumber))
+ {
+ /*
+ * If we had to unget a block, now it is time to return that one
+ * again.
+ */
+ result = stream->unget_blocknum;
+ stream->unget_blocknum = InvalidBlockNumber;
+
+ /*
+ * The same per_buffer_data element must have been used, and still
+ * contains whatever data the callback wrote into it. So we just
+ * sanity-check that we were called with the value that
+ * streaming_unget_block() pushed back.
+ */
+ Assert(per_buffer_data == stream->unget_per_buffer_data);
+ }
+ else
+ {
+ /* Use the installed callback directly. */
+ result = stream->callback(stream,
+ stream->callback_private_data,
+ per_buffer_data);
+ }
+
+ return result;
+}
+
+/*
+ * In order to deal with short reads in StartReadBuffers(), we sometimes need
+ * to defer handling of a block until later. This *must* be called with the
+ * last value returned by streaming_get_block().
+ */
+static void
+streaming_read_unget_block(StreamingRead *stream, BlockNumber blocknum, void *per_buffer_data)
+{
+ Assert(stream->unget_blocknum == InvalidBlockNumber);
+ stream->unget_blocknum = blocknum;
+ stream->unget_per_buffer_data = per_buffer_data;
+}
+
+static void
+streaming_read_start_pending_read(StreamingRead *stream)
+{
+ bool need_wait;
+ int nblocks;
+ int16 io_index;
+ int16 overflow;
+ int flags;
+
+ /* This should only be called with a pending read. */
+ Assert(stream->pending_read_nblocks > 0);
+ Assert(stream->pending_read_nblocks <= buffer_io_size);
+
+ /* We had better not exceed the pin limit by starting this read. */
+ Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
+ stream->max_pinned_buffers);
+
+ /* We had better not be overwriting an existing pinned buffer. */
+ if (stream->pinned_buffers > 0)
+ Assert(stream->next_buffer_index != stream->oldest_buffer_index);
+ else
+ Assert(stream->next_buffer_index == stream->oldest_buffer_index);
+
+ /*
+ * If advice hasn't been suppressed, and this system supports it, this
+ * isn't a strictly sequential pattern, then we'll issue advice.
+ */
+ if (stream->advice_enabled &&
+ stream->started &&
+ stream->pending_read_blocknum != stream->seq_blocknum)
+ flags = READ_BUFFERS_ISSUE_ADVICE;
+ else
+ flags = 0;
+
+ /* Suppress advice on the first call, because it's too late to benefit. */
+ if (!stream->started)
+ stream->started = true;
+
+ /* We say how many blocks we want to read, but may be smaller on return. */
+ nblocks = stream->pending_read_nblocks;
+ need_wait =
+ StartReadBuffers(stream->bmr,
+ &stream->buffers[stream->next_buffer_index],
+ stream->forknum,
+ stream->pending_read_blocknum,
+ &nblocks,
+ stream->strategy,
+ flags,
+ &stream->ios[stream->next_io_index]);
+ stream->pinned_buffers += nblocks;
+
+ /* Remember whether we need to wait before returning this buffer. */
+ if (!need_wait)
+ {
+ io_index = -1;
+
+ /* Look-ahead distance decays, no I/O necessary (behavior A). */
+ if (stream->distance > 1)
+ stream->distance--;
+ }
+ else
+ {
+ /*
+ * Remember to call WaitReadBuffers() before returning head buffer.
+ * Look-ahead distance will be adjusted after waiting.
+ */
+ io_index = stream->next_io_index;
+ if (++stream->next_io_index == stream->max_ios)
+ stream->next_io_index = 0;
+
+ Assert(stream->ios_in_progress < stream->max_ios);
+ stream->ios_in_progress++;
+ }
+
+ /* Set up the pointer to the I/O for the head buffer, if there is one. */
+ stream->buffer_io_indexes[stream->next_buffer_index] = io_index;
+
+ /*
+ * We gave a contiguous range of buffer space to StartReadBuffers(), but
+ * we want it to wrap around at max_pinned_buffers. Move values that
+ * overflowed into the extra space. At the same time, put -1 in the I/O
+ * slots for the rest of the buffers to indicate no I/O. They are covered
+ * by the head buffer's I/O, if there is one. We avoid a % operator.
+ */
+ overflow = (stream->next_buffer_index + nblocks) - stream->max_pinned_buffers;
+ if (overflow > 0)
+ {
+ memmove(&stream->buffers[0],
+ &stream->buffers[stream->max_pinned_buffers],
+ sizeof(stream->buffers[0]) * overflow);
+ for (int i = 0; i < overflow; ++i)
+ stream->buffer_io_indexes[i] = -1;
+ for (int i = 1; i < nblocks - overflow; ++i)
+ stream->buffer_io_indexes[stream->next_buffer_index + i] = -1;
+ }
+ else
+ {
+ for (int i = 1; i < nblocks; ++i)
+ stream->buffer_io_indexes[stream->next_buffer_index + i] = -1;
+ }
+
+ /*
+ * Remember where the next block would be after that, so we can detect
+ * sequential access next time and suppress advice.
+ */
+ stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
+
+ /* Compute location of start of next read, without using % operator. */
+ stream->next_buffer_index += nblocks;
+ if (stream->next_buffer_index >= stream->max_pinned_buffers)
+ stream->next_buffer_index -= stream->max_pinned_buffers;
+ Assert(stream->next_buffer_index >= 0);
+ Assert(stream->next_buffer_index < stream->max_pinned_buffers);
+
+ /* Adjust the pending read to cover the remaining portion, if any. */
+ stream->pending_read_blocknum += nblocks;
+ stream->pending_read_nblocks -= nblocks;
+}
+
+static void
+streaming_read_look_ahead(StreamingRead *stream)
+{
+ while (!stream->finished &&
+ stream->ios_in_progress < stream->max_ios &&
+ stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+ {
+ BlockNumber blocknum;
+ int16 buffer_index;
+ void *per_buffer_data;
+
+ if (stream->pending_read_nblocks == buffer_io_size)
+ {
+ streaming_read_start_pending_read(stream);
+ continue;
+ }
+
+ /*
+ * See which block the callback wants next in the stream. We need to
+ * compute the index of the Nth block of the pending read including
+ * wrap-around, but we don't want to use the expensive % operator.
+ */
+ buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
+ if (buffer_index > stream->max_pinned_buffers)
+ buffer_index -= stream->max_pinned_buffers;
+ per_buffer_data = get_per_buffer_data(stream, buffer_index);
+ blocknum = streaming_read_get_block(stream, per_buffer_data);
+ if (blocknum == InvalidBlockNumber)
+ {
+ stream->finished = true;
+ continue;
+ }
+
+ /* Can we merge it with the pending read? */
+ if (stream->pending_read_nblocks > 0 &&
+ stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
+ {
+ stream->pending_read_nblocks++;
+ continue;
+ }
+
+ /* We have to start the pending read before we can build another. */
+ if (stream->pending_read_nblocks > 0)
+ {
+ streaming_read_start_pending_read(stream);
+ if (stream->ios_in_progress == stream->max_ios)
+ {
+ /* And we've hit the limit. Rewind, and stop here. */
+ streaming_read_unget_block(stream, blocknum, per_buffer_data);
+ return;
+ }
+ }
+
+ /* This is the start of a new pending read. */
+ stream->pending_read_blocknum = blocknum;
+ stream->pending_read_nblocks = 1;
+ }
+
+ /*
+ * Normally we don't start the pending read just because we've hit a
+ * limit, preferring to give it another chance to grow to a larger size
+ * once more buffers have been consumed. However, in cases where that
+ * can't possibly happen, we might as well start the read immediately.
+ */
+ if (((stream->pending_read_nblocks > 0 && stream->finished) ||
+ (stream->pending_read_nblocks == stream->distance)) &&
+ stream->ios_in_progress < stream->max_ios)
+ streaming_read_start_pending_read(stream);
+}
+
+/*
+ * Create a new streaming read object that can be used to perform the
+ * equivalent of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead. The callback should return block numbers or InvalidBlockNumber to
+ * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
+ * write extra data for each block into the space provided to it. It will
+ * also receive callback_private_data for its own purposes.
+ */
+StreamingRead *
+streaming_read_buffer_begin(int flags,
+ BufferAccessStrategy strategy,
+ BufferManagerRelation bmr,
+ ForkNumber forknum,
+ StreamingReadBufferCB callback,
+ void *callback_private_data,
+ size_t per_buffer_data_size)
+{
+ StreamingRead *stream;
+ int16 max_ios;
+ uint32 max_pinned_buffers;
+ Oid tablespace_id;
+
+ /*
+ * Make sure our bmr's smgr and persistent are populated. The caller
+ * asserts that the storage manager will remain valid.
+ */
+ if (!bmr.smgr)
+ {
+ bmr.smgr = RelationGetSmgr(bmr.rel);
+ bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
+ }
+
+ /*
+ * Decide how many assumed I/Os we will allow to run concurrently. That
+ * is, advice to the kernel to tell it that we will soon read. This
+ * number also affects how far we look ahead for opportunities to start
+ * more I/Os.
+ */
+ tablespace_id = bmr.smgr->smgr_rlocator.locator.spcOid;
+ if (!OidIsValid(MyDatabaseId) ||
+ (bmr.rel && IsCatalogRelation(bmr.rel)) ||
+ IsCatalogRelationOid(bmr.smgr->smgr_rlocator.locator.relNumber))
+ {
+ /*
+ * Avoid circularity while trying to look up tablespace settings or
+ * before spccache.c is ready.
+ */
+ max_ios = effective_io_concurrency;
+ }
+ else if (flags & STREAMING_READ_MAINTENANCE)
+ max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
+ else
+ max_ios = get_tablespace_io_concurrency(tablespace_id);
+
+ /*
+ * Choose a maximum number of buffers we're prepared to pin. We try to
+ * pin fewer if we can, though. We clamp it to at least buffer_io_size so
+ * that we can have a chance to build up a full sized read, even when
+ * max_ios is zero.
+ */
+ max_pinned_buffers = Max(max_ios * 4, buffer_io_size);
+
+ /* Don't allow this backend to pin more than its share of buffers. */
+ if (SmgrIsTemp(bmr.smgr))
+ LimitAdditionalLocalPins(&max_pinned_buffers);
+ else
+ LimitAdditionalPins(&max_pinned_buffers);
+ Assert(max_pinned_buffers > 0);
+
+ stream = (StreamingRead *) palloc0(sizeof(StreamingRead));
+
+#ifdef USE_PREFETCH
+
+ /*
+ * This system supports prefetching advice. We can use it as long as
+ * direct I/O isn't enabled, the caller hasn't promised sequential access
+ * (overriding our detection heuristics), and max_ios hasn't been set to
+ * zero.
+ */
+ if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+ (flags & STREAMING_READ_SEQUENTIAL) == 0 &&
+ max_ios > 0)
+ stream->advice_enabled = true;
+#endif
+
+ /*
+ * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
+ * above. If we had real asynchronous I/O we might need a slightly
+ * different definition.
+ */
+ if (max_ios == 0)
+ max_ios = 1;
+
+ stream->max_ios = max_ios;
+ stream->per_buffer_data_size = per_buffer_data_size;
+ stream->max_pinned_buffers = max_pinned_buffers;
+ stream->strategy = strategy;
+
+ stream->bmr = bmr;
+ stream->forknum = forknum;
+ stream->callback = callback;
+ stream->callback_private_data = callback_private_data;
+
+ stream->unget_blocknum = InvalidBlockNumber;
+
+ /*
+ * Skip the initial ramp-up phase if the caller says we're going to be
+ * reading the whole relation. This way we start out doing full-sized
+ * reads.
+ */
+ if (flags & STREAMING_READ_FULL)
+ stream->distance = stream->max_pinned_buffers;
+ else
+ stream->distance = 1;
+
+ /*
+ * Space for the buffers we pin. Though we never pin more than
+ * max_pinned_buffers, we want to be able to assume that all the buffers
+ * for a single read are contiguous (i.e. don't wrap around halfway
+ * through), so we let the final one run past that position temporarily by
+ * allocating an extra buffer_io_size - 1 elements.
+ */
+ stream->buffers = palloc((max_pinned_buffers + buffer_io_size - 1) *
+ sizeof(stream->buffers[0]));
+
+ /* Space for per-buffer data, if configured. */
+ if (per_buffer_data_size)
+ stream->per_buffer_data =
+ palloc(per_buffer_data_size * (max_pinned_buffers +
+ buffer_io_size - 1));
+
+ /* Space for the IOs that we might run. */
+ stream->buffer_io_indexes = palloc(max_pinned_buffers * sizeof(stream->buffer_io_indexes[0]));
+ stream->ios = palloc(max_ios * sizeof(ReadBuffersOperation));
+
+ return stream;
+}
+
+/*
+ * Pull one pinned buffer out of a stream created with
+ * streaming_read_buffer_begin(). Each call returns successive blocks in the
+ * order specified by the callback. If per_buffer_data_size was set to a
+ * non-zero size, *per_buffer_data receives a pointer to the extra per-buffer
+ * data that the callback had a chance to populate. When the stream runs out
+ * of data, InvalidBuffer is returned. The caller may decide to end the
+ * stream early at any time by calling streaming_read_end().
+ */
+Buffer
+streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data)
+{
+ Buffer buffer;
+ int16 io_index;
+ int16 oldest_buffer_index;
+
+ if (unlikely(stream->pinned_buffers == 0))
+ {
+ Assert(stream->oldest_buffer_index == stream->next_buffer_index);
+
+ if (stream->finished)
+ return InvalidBuffer;
+
+ /*
+ * The usual order of operations is that we look ahead at the bottom
+ * of this function after potentially finishing an I/O and making
+ * space for more, but we need a special case to prime the stream when
+ * we're getting started.
+ */
+ Assert(!stream->started);
+ streaming_read_look_ahead(stream);
+ if (stream->pinned_buffers == 0)
+ return InvalidBuffer;
+ }
+
+ /* Grab the oldest pinned buffer and associated per-buffer data. */
+ oldest_buffer_index = stream->oldest_buffer_index;
+ Assert(oldest_buffer_index >= 0 &&
+ oldest_buffer_index < stream->max_pinned_buffers);
+ buffer = stream->buffers[oldest_buffer_index];
+ if (per_buffer_data)
+ *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
+
+ Assert(BufferIsValid(buffer));
+
+ /* Do we have to wait for an associated I/O first? */
+ io_index = stream->buffer_io_indexes[oldest_buffer_index];
+ Assert(io_index >= -1 && io_index < stream->max_ios);
+ if (io_index >= 0)
+ {
+ int distance;
+
+ /* Sanity check that we still agree on the buffers. */
+ Assert(stream->ios[io_index].buffers == &stream->buffers[oldest_buffer_index]);
+
+ WaitReadBuffers(&stream->ios[io_index]);
+
+ Assert(stream->ios_in_progress > 0);
+ stream->ios_in_progress--;
+
+ if (stream->ios[io_index].flags & READ_BUFFERS_ISSUE_ADVICE)
+ {
+ /* Distance ramps up fast (behavior C). */
+ distance = stream->distance * 2;
+ distance = Min(distance, stream->max_pinned_buffers);
+ stream->distance = distance;
+ }
+ else
+ {
+ /* No advice; move towards full I/O size (behavior B). */
+ if (stream->distance > buffer_io_size)
+ {
+ stream->distance--;
+ }
+ else
+ {
+ distance = stream->distance * 2;
+ distance = Min(distance, buffer_io_size);
+ distance = Min(distance, stream->max_pinned_buffers);
+ stream->distance = distance;
+ }
+ }
+ }
+
+ /* Advance the oldest buffer, but clobber it first for debugging. */
+#ifdef USE_ASSERT_CHECKING
+ stream->buffers[oldest_buffer_index] = InvalidBuffer;
+ stream->buffer_io_indexes[oldest_buffer_index] = -1;
+ if (stream->per_buffer_data)
+ memset(get_per_buffer_data(stream, oldest_buffer_index),
+ 0xff,
+ stream->per_buffer_data_size);
+#endif
+ if (++stream->oldest_buffer_index == stream->max_pinned_buffers)
+ stream->oldest_buffer_index = 0;
+
+ /* We are transferring ownership of the pin to the caller. */
+ Assert(stream->pinned_buffers > 0);
+ stream->pinned_buffers--;
+
+ /*
+ * When distance is minimal, we finish up with no queued buffers. As a
+ * micro-optimization, we can then reset our circular queues, so that
+ * all-cached streams re-use the same elements instead of rotating through
+ * memory.
+ */
+ if (stream->pinned_buffers == 0)
+ {
+ Assert(stream->oldest_buffer_index == stream->next_buffer_index);
+ stream->oldest_buffer_index = 0;
+ stream->next_buffer_index = 0;
+ stream->next_io_index = 0;
+ }
+
+ /* Prepare for the next call. */
+ streaming_read_look_ahead(stream);
+
+ return buffer;
+}
+
+/*
+ * Finish streaming blocks and release all resources.
+ */
+void
+streaming_read_buffer_end(StreamingRead *stream)
+{
+ Buffer buffer;
+
+ /* Stop looking ahead. */
+ stream->finished = true;
+
+ /* Unpin anything that wasn't consumed. */
+ while ((buffer = streaming_read_buffer_next(stream, NULL)) != InvalidBuffer)
+ ReleaseBuffer(buffer);
+
+ Assert(stream->pinned_buffers == 0);
+ Assert(stream->ios_in_progress == 0);
+
+ /* Release memory. */
+ pfree(stream->buffers);
+ if (stream->per_buffer_data)
+ pfree(stream->per_buffer_data);
+ pfree(stream->ios);
+
+ pfree(stream);
+}
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca27..739d13293fb 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
+subdir('aio')
subdir('buffer')
subdir('file')
subdir('freespace')
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
new file mode 100644
index 00000000000..7991402631a
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,50 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define STREAMING_READ_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users.
+ */
+#define STREAMING_READ_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected. Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define STREAMING_READ_SEQUENTIAL 0x02
+
+/*
+ * We usually ramp up from smaller reads to larger ones, to support users who
+ * don't know if it's worth reading lots of buffers yet. This flag disables
+ * that, declaring ahead of time that we'll be reading all available buffers.
+ */
+#define STREAMING_READ_FULL 0x04
+
+struct StreamingRead;
+typedef struct StreamingRead StreamingRead;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*StreamingReadBufferCB) (StreamingRead *stream,
+ void *callback_private_data,
+ void *per_buffer_data);
+
+extern StreamingRead *streaming_read_buffer_begin(int flags,
+ BufferAccessStrategy strategy,
+ BufferManagerRelation bmr,
+ ForkNumber forknum,
+ StreamingReadBufferCB callback,
+ void *callback_private_data,
+ size_t per_buffer_data_size);
+extern Buffer streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_private);
+extern void streaming_read_buffer_end(StreamingRead *stream);
+
+#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 97edd1388e9..783d20ba058 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2693,6 +2693,7 @@ StopList
StrategyNumber
StreamCtl
StreamStopReason
+StreamingRead
String
StringInfo
StringInfoData
--
2.39.2
From 464367523da22f615607c11231c7c56871b4da9b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sun, 23 Jul 2023 09:28:42 +1200
Subject: [PATCH v9.heikki 3/9] Use streaming reads in pg_prewarm.
Instead of calling ReadBuffer() repeatedly, use streaming reads. This
provides a simple example of such a transformation, and generates fewer
system calls.
Reviewed-by:
Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6ut5tum2g...@mail.gmail.com
---
contrib/pg_prewarm/pg_prewarm.c | 40 ++++++++++++++++++++++++++++++++-
1 file changed, 39 insertions(+), 1 deletion(-)
diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index 8541e4d6e46..cd2e15aa3d8 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -20,6 +20,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
+#include "storage/streaming_read.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -38,6 +39,25 @@ typedef enum
static PGIOAlignedBlock blockbuffer;
+struct pg_prewarm_streaming_read_private
+{
+ BlockNumber blocknum;
+ int64 last_block;
+};
+
+static BlockNumber
+pg_prewarm_streaming_read_next(StreamingRead *stream,
+ void *user_data,
+ void *per_buffer_data)
+{
+ struct pg_prewarm_streaming_read_private *p = user_data;
+
+ if (p->blocknum <= p->last_block)
+ return p->blocknum++;
+
+ return InvalidBlockNumber;
+}
+
/*
* pg_prewarm(regclass, mode text, fork text,
* first_block int8, last_block int8)
@@ -183,18 +203,36 @@ pg_prewarm(PG_FUNCTION_ARGS)
}
else if (ptype == PREWARM_BUFFER)
{
+ struct pg_prewarm_streaming_read_private p;
+ StreamingRead *stream;
+
/*
* In buffer mode, we actually pull the data into shared_buffers.
*/
+
+ /* Set up the private state for our streaming buffer read callback. */
+ p.blocknum = first_block;
+ p.last_block = last_block;
+
+ stream = streaming_read_buffer_begin(STREAMING_READ_FULL,
+ NULL,
+ BMR_REL(rel),
+ forkNumber,
+ pg_prewarm_streaming_read_next,
+ &p,
+ 0);
+
for (block = first_block; block <= last_block; ++block)
{
Buffer buf;
CHECK_FOR_INTERRUPTS();
- buf = ReadBufferExtended(rel, forkNumber, block, RBM_NORMAL, NULL);
+ buf = streaming_read_buffer_next(stream, NULL);
ReleaseBuffer(buf);
++blocks_done;
}
+ Assert(streaming_read_buffer_next(stream, NULL) == InvalidBuffer);
+ streaming_read_buffer_end(stream);
}
/* Close relation, release lock. */
--
2.39.2
From 7f3a341dcd13a0271016922da899123fc5eb8892 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 26 Mar 2024 11:35:22 +0200
Subject: [PATCH v9.heikki 4/9] Cleanup boilerplate file header comments
---
src/backend/storage/aio/streaming_read.c | 9 +++++----
src/include/storage/streaming_read.h | 15 ++++++++++++++-
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 760a231500a..e530a6da0ed 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -3,9 +3,6 @@
* streaming_read.c
* Mechanism for buffer access with look-ahead
*
- * Portions Copyright (c) 2024, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
* Code that needs to access relation data typically pins blocks one at a
* time, often in a predictable order that might be sequential or data-driven.
* Calling the simple ReadBuffer() function for each block is inefficient,
@@ -77,8 +74,12 @@
* the range 42..44 requires an I/O wait before its buffers are returned, as
* does block 60.
*
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
* IDENTIFICATION
- * src/backend/storage/storage/aio/streaming_read.c
+ * src/backend/storage/aio/streaming_read.c
*
*-------------------------------------------------------------------------
*/
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
index 7991402631a..f0ce3e45956 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -1,3 +1,16 @@
+/*-------------------------------------------------------------------------
+ *
+ * streaming_read.h
+ * Mechanism for buffer access with look-ahead
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/streaming_read.h
+ *
+ *-------------------------------------------------------------------------
+ */
#ifndef STREAMING_READ_H
#define STREAMING_READ_H
@@ -47,4 +60,4 @@ extern StreamingRead *streaming_read_buffer_begin(int flags,
extern Buffer streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_private);
extern void streaming_read_buffer_end(StreamingRead *stream);
-#endif
+#endif /* STREAMING_READ_H */
--
2.39.2
From 0526b6c98afaec357ebac2b8f9ac94a8a30378fc Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 26 Mar 2024 12:54:25 +0200
Subject: [PATCH v9.heikki 5/9] Tidy up #includes
Nothing in the prototypes defined in streaming_read.h depend on smgr.h
or fd.h.
---
src/backend/storage/aio/streaming_read.c | 2 ++
src/include/storage/streaming_read.h | 2 --
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index e530a6da0ed..82a2227d4d5 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -87,6 +87,8 @@
#include "catalog/pg_tablespace.h"
#include "miscadmin.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
#include "storage/streaming_read.h"
#include "utils/rel.h"
#include "utils/spccache.h"
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
index f0ce3e45956..74053d704ff 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -15,8 +15,6 @@
#define STREAMING_READ_H
#include "storage/bufmgr.h"
-#include "storage/fd.h"
-#include "storage/smgr.h"
/* Default tuning, reasonable for many users. */
#define STREAMING_READ_DEFAULT 0x00
--
2.39.2
From 5a4841a008b4ebaa52f0d42a85a76e45e50bbdea Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 26 Mar 2024 11:35:32 +0200
Subject: [PATCH v9.heikki 6/9] Mention that STREAMING_READ_MAINTENANCE is used
by VACUUM and CREATE INDEX
It took me a while to understand what a maintenance operation is. An
example helps.
Maybe we should even explicitly mention here that it means that
maintenance_io_concurrency is used instead of
effective_io_concurrency?
---
src/include/storage/streaming_read.h | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
index 74053d704ff..b48be02d4bf 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -21,7 +21,7 @@
/*
* I/O streams that are performing maintenance work on behalf of potentially
- * many users.
+ * many users. For example, VACUUM or CREATE INDEX.
*/
#define STREAMING_READ_MAINTENANCE 0x01
--
2.39.2
From 55ccdfc0273b32eac850b5cc170578b5306dff08 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 26 Mar 2024 12:46:48 +0200
Subject: [PATCH v9.heikki 7/9] Trivial comment fixes
---
src/backend/storage/aio/streaming_read.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 82a2227d4d5..76ce9cef53e 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -48,7 +48,7 @@
* WaitReadBuffers() must be called before returning the buffer, and if so,
* points to the relevant ReadBuffersOperation object.
*
- * For example, if the callback return block numbers 10, 42, 43, 60 in
+ * For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
* successive calls, then these data structures might appear as follows:
*
* buffers buf/data buf/io ios
@@ -136,7 +136,7 @@ struct StreamingRead
void *per_buffer_data;
int16 *buffer_io_indexes;
- /* Read operations that have been started by not waited for yet. */
+ /* Read operations that have been started but not waited for yet. */
ReadBuffersOperation *ios;
int16 next_io_index;
--
2.39.2
From 8be3ae3eddce73846692943912e57520e7df032a Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 26 Mar 2024 12:47:41 +0200
Subject: [PATCH v9.heikki 8/9] Use wipe_mem() for callback data that should be
considered invalid
wipe_mem() includes Valgrind hints, which is nice.
---
src/backend/storage/aio/streaming_read.c | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 76ce9cef53e..0ed81017b78 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -90,6 +90,7 @@
#include "storage/fd.h"
#include "storage/smgr.h"
#include "storage/streaming_read.h"
+#include "utils/memdebug.h"
#include "utils/rel.h"
#include "utils/spccache.h"
@@ -618,13 +619,12 @@ streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data)
}
/* Advance the oldest buffer, but clobber it first for debugging. */
-#ifdef USE_ASSERT_CHECKING
+#ifdef CLOBBER_FREED_MEMORY
stream->buffers[oldest_buffer_index] = InvalidBuffer;
stream->buffer_io_indexes[oldest_buffer_index] = -1;
if (stream->per_buffer_data)
- memset(get_per_buffer_data(stream, oldest_buffer_index),
- 0xff,
- stream->per_buffer_data_size);
+ wipe_mem(get_per_buffer_data(stream, oldest_buffer_index),
+ stream->per_buffer_data_size);
#endif
if (++stream->oldest_buffer_index == stream->max_pinned_buffers)
stream->oldest_buffer_index = 0;
--
2.39.2
From 76f60b29bae4cd90515b7b942272858df35acaca Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 26 Mar 2024 13:03:06 +0200
Subject: [PATCH v9.heikki 9/9] Replace 'buffer_io_indexes' array with ref from
IO to buffer.
When the block pattern is sequential so that we manage to build large
I/Os, this avoids having to repeatedly reset the buffer_io_indexes
array elements to -1, when most of them already are -1. That could be
slightly more efficient, although I didn't try to measure that and it
probably doesn't make any difference in practice either way.
---
src/backend/storage/aio/streaming_read.c | 91 +++++++++++-------------
1 file changed, 43 insertions(+), 48 deletions(-)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 0ed81017b78..00b92284a47 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -44,30 +44,32 @@
* The main data structure is a circular queue of buffers of size
* max_pinned_buffers, ready to be returned by streaming_read_buffer_next().
* Each buffer also has an optional variable sized object that is passed from
- * the callback to the consumer of buffers. A third array records whether
- * WaitReadBuffers() must be called before returning the buffer, and if so,
- * points to the relevant ReadBuffersOperation object.
+ * the callback to the consumer of buffers.
+ *
+ * Parallel to the queue of buffers, there is a circular queue of in-progress
+ * I/Os that have been started with StartReadBuffers(), and for which
+ * WaitReadBuffers() must be called before returning the buffer.
*
* For example, if the callback returns block numbers 10, 42, 43, 44, 60 in
* successive calls, then these data structures might appear as follows:
*
- * buffers buf/data buf/io ios
+ * buffers buf/data ios
*
- * +----+ +-----+ +---+ +--------+
- * | | | | | | +---->| 42..44 |
- * +----+ +-----+ +---+ | +--------+
- * oldest_buffer_index -> | 10 | | ? | | | | +-->| 60..60 |
- * +----+ +-----+ +---+ | | +--------+
- * | 42 | | ? | | 0 +--+ | | |
- * +----+ +-----+ +---+ | +--------+
- * | 43 | | ? | | | | | |
- * +----+ +-----+ +---+ | +--------+
- * | 44 | | ? | | | | | |
- * +----+ +-----+ +---+ | +--------+
- * | 60 | | ? | | 1 +----+
- * +----+ +-----+ +---+
- * next_buffer_index -> | | | | | |
- * +----+ +-----+ +---+
+ * +----+ +-----+ +--------+
+ * | | | | +-----+ 42..44 | <- oldest_io_index
+ * +----+ +-----+ | +--------+
+ * oldest_buffer_index -> | 10 | | ? | | +-->| 60..60 |
+ * +----+ +-----+ | | +--------+
+ * | 42 | | ? | <------+ | | | <- next_io_index
+ * +----+ +-----+ | +--------+
+ * | 43 | | ? | | | |
+ * +----+ +-----+ | +--------+
+ * | 44 | | ? | | | |
+ * +----+ +-----+ | +--------+
+ * | 60 | | ? | <--------+
+ * +----+ +-----+
+ * next_buffer_index -> | | | |
+ * +----+ +-----+
*
* In the example, 5 buffers are pinned, and the next buffer to be streamed to
* the client is block 10. Block 10 was a hit and has no associated I/O, but
@@ -94,6 +96,12 @@
#include "utils/rel.h"
#include "utils/spccache.h"
+typedef struct
+{
+ int16 buffer_index;
+ ReadBuffersOperation op;
+} InProgressIO;
+
/*
* Streaming read object.
*/
@@ -135,10 +143,10 @@ struct StreamingRead
Buffer *buffers;
size_t per_buffer_data_size;
void *per_buffer_data;
- int16 *buffer_io_indexes;
/* Read operations that have been started but not waited for yet. */
- ReadBuffersOperation *ios;
+ InProgressIO *ios;
+ int16 oldest_io_index;
int16 next_io_index;
/* Head and tail of the circular queue of buffers. */
@@ -211,7 +219,6 @@ streaming_read_start_pending_read(StreamingRead *stream)
{
bool need_wait;
int nblocks;
- int16 io_index;
int16 overflow;
int flags;
@@ -254,14 +261,12 @@ streaming_read_start_pending_read(StreamingRead *stream)
&nblocks,
stream->strategy,
flags,
- &stream->ios[stream->next_io_index]);
+ &stream->ios[stream->next_io_index].op);
stream->pinned_buffers += nblocks;
/* Remember whether we need to wait before returning this buffer. */
if (!need_wait)
{
- io_index = -1;
-
/* Look-ahead distance decays, no I/O necessary (behavior A). */
if (stream->distance > 1)
stream->distance--;
@@ -272,7 +277,7 @@ streaming_read_start_pending_read(StreamingRead *stream)
* Remember to call WaitReadBuffers() before returning head buffer.
* Look-ahead distance will be adjusted after waiting.
*/
- io_index = stream->next_io_index;
+ stream->ios[stream->next_io_index].buffer_index = stream->next_buffer_index;
if (++stream->next_io_index == stream->max_ios)
stream->next_io_index = 0;
@@ -280,9 +285,6 @@ streaming_read_start_pending_read(StreamingRead *stream)
stream->ios_in_progress++;
}
- /* Set up the pointer to the I/O for the head buffer, if there is one. */
- stream->buffer_io_indexes[stream->next_buffer_index] = io_index;
-
/*
* We gave a contiguous range of buffer space to StartReadBuffers(), but
* we want it to wrap around at max_pinned_buffers. Move values that
@@ -296,15 +298,6 @@ streaming_read_start_pending_read(StreamingRead *stream)
memmove(&stream->buffers[0],
&stream->buffers[stream->max_pinned_buffers],
sizeof(stream->buffers[0]) * overflow);
- for (int i = 0; i < overflow; ++i)
- stream->buffer_io_indexes[i] = -1;
- for (int i = 1; i < nblocks - overflow; ++i)
- stream->buffer_io_indexes[stream->next_buffer_index + i] = -1;
- }
- else
- {
- for (int i = 1; i < nblocks; ++i)
- stream->buffer_io_indexes[stream->next_buffer_index + i] = -1;
}
/*
@@ -528,8 +521,7 @@ streaming_read_buffer_begin(int flags,
buffer_io_size - 1));
/* Space for the IOs that we might run. */
- stream->buffer_io_indexes = palloc(max_pinned_buffers * sizeof(stream->buffer_io_indexes[0]));
- stream->ios = palloc(max_ios * sizeof(ReadBuffersOperation));
+ stream->ios = palloc(max_ios * sizeof(InProgressIO));
return stream;
}
@@ -547,7 +539,6 @@ Buffer
streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data)
{
Buffer buffer;
- int16 io_index;
int16 oldest_buffer_index;
if (unlikely(stream->pinned_buffers == 0))
@@ -580,21 +571,23 @@ streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data)
Assert(BufferIsValid(buffer));
/* Do we have to wait for an associated I/O first? */
- io_index = stream->buffer_io_indexes[oldest_buffer_index];
- Assert(io_index >= -1 && io_index < stream->max_ios);
- if (io_index >= 0)
+ if (stream->ios_in_progress > 0 &&
+ stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
{
+ int16 io_index = stream->oldest_io_index;
int distance;
/* Sanity check that we still agree on the buffers. */
- Assert(stream->ios[io_index].buffers == &stream->buffers[oldest_buffer_index]);
+ Assert(stream->ios[io_index].op.buffers == &stream->buffers[oldest_buffer_index]);
- WaitReadBuffers(&stream->ios[io_index]);
+ WaitReadBuffers(&stream->ios[io_index].op);
Assert(stream->ios_in_progress > 0);
stream->ios_in_progress--;
+ if (++stream->oldest_io_index == stream->max_ios)
+ stream->oldest_io_index = 0;
- if (stream->ios[io_index].flags & READ_BUFFERS_ISSUE_ADVICE)
+ if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
{
/* Distance ramps up fast (behavior C). */
distance = stream->distance * 2;
@@ -621,7 +614,6 @@ streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data)
/* Advance the oldest buffer, but clobber it first for debugging. */
#ifdef CLOBBER_FREED_MEMORY
stream->buffers[oldest_buffer_index] = InvalidBuffer;
- stream->buffer_io_indexes[oldest_buffer_index] = -1;
if (stream->per_buffer_data)
wipe_mem(get_per_buffer_data(stream, oldest_buffer_index),
stream->per_buffer_data_size);
@@ -644,7 +636,10 @@ streaming_read_buffer_next(StreamingRead *stream, void **per_buffer_data)
Assert(stream->oldest_buffer_index == stream->next_buffer_index);
stream->oldest_buffer_index = 0;
stream->next_buffer_index = 0;
+ Assert(stream->next_io_index == stream->oldest_io_index);
+ Assert(stream->ios_in_progress == 0);
stream->next_io_index = 0;
+ stream->oldest_io_index = 0;
}
/* Prepare for the next call. */
--
2.39.2