On Wed, Mar 12, 2025 at 8:29 AM Andres Freund <and...@anarazel.de> wrote: > On 2025-03-12 07:35:46 +1300, Thomas Munro wrote: > > On Thu, Feb 27, 2025 at 11:20 PM Andres Freund <and...@anarazel.de> wrote: > > > On 2025-02-27 11:19:55 +1300, Thomas Munro wrote: > > I wonder if we should use temp_buffers - 100? Then leave the minimum GUC > > value at 100 still, so you have an easy way to test with 0, 1, > > ... additional buffers? > > I think that just makes it harder to test the exhaustion scenario without > really fixing anything?
Not sure I agree yet but I'll come back to this in a bit (I think there might be something worth thinking about some more here but it's not in the way of committing these patches). > > +/* see GetAdditionalPinLimit() */ > > +uint32 > > +GetAdditionalLocalPinLimit(void) > > +{ > > + Assert(NLocalPinnedBuffers <= num_temp_buffers); > > + return num_temp_buffers - NLocalPinnedBuffers; > > +} > > This doesn't behave quite the way GetAdditionalPinLimit() does - it can return > 0. Which makes some sense, pinning an additional buffer will always > fail. Perhaps worth calling out though? No, GetAdditionalPinLimit() works that way too. It's only LimitAdditional[Local]PinLimit() that has the special "you can always have one" logic that I needed to escape from. But yes I should highlight that in a comment: done above GetAdditionalPinLimit(). GetAdditionalLocalPinLimit() just tells you to see the shared version. > > static void > > read_stream_look_ahead(ReadStream *stream, bool suppress_advice) > > { > > while (stream->ios_in_progress < stream->max_ios && > > - stream->pinned_buffers + stream->pending_read_nblocks < > > stream->distance) > > + ((stream->pinned_buffers == 0 && stream->distance > 0) || > > + stream->pinned_buffers + stream->pending_read_nblocks > > < stream->distance)) > > What does the new "stream->pinned_buffers == 0 && stream->distance > 0" really > mean? And when would it be true when the pre-existing condition wouldn't > already be true? Well the reason is basically that the distance can get chomped lower than pending_read_nblocks if you recently hit the pin limit, and we have to be able to start your I/O anyway (at least one block of it) to make progress. But I realised that was a stupid way to handle that, and actually I had screwed up further down, and the right way is just: @@ -382,15 +435,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) * io_combine_limit size once more buffers have been consumed. However, * if we've already reached io_combine_limit, or we've reached the * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. + * signaled end-of-stream, we start the read immediately. Note that the + * pending read could even exceed the distance goal, if the latter was + * reduced on buffer limit exhaustion. */ if (stream->pending_read_nblocks > 0 && (stream->pending_read_nblocks == stream->io_combine_limit || - (stream->pending_read_nblocks == stream->distance && + (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream, suppress_advice); > > { > > BlockNumber blocknum; > > int16 buffer_index; > > void *per_buffer_data; > > > > - if (stream->pending_read_nblocks == io_combine_limit) > > + /* If have a pending read that can't be extended, start it > > now. */ > > + Assert(stream->pinned_buffers + stream->pending_read_nblocks > > <= > > + stream->max_pinned_buffers); > > + if (stream->pending_read_nblocks == io_combine_limit || > > + (stream->pinned_buffers == 0 && > > + stream->pending_read_nblocks == > > stream->max_pinned_buffers)) > > { > > read_stream_start_pending_read(stream, > > suppress_advice); > > suppress_advice = false; > > @@ -360,14 +409,15 @@ read_stream_look_ahead(ReadStream *stream, bool > > suppress_advice) > > /* We have to start the pending read before we can build > > another. */ > > while (stream->pending_read_nblocks > 0) > > { > > - read_stream_start_pending_read(stream, > > suppress_advice); > > - suppress_advice = false; > > - if (stream->ios_in_progress == stream->max_ios) > > + if (!read_stream_start_pending_read(stream, > > suppress_advice) || > > + stream->ios_in_progress == stream->max_ios) > > { > > - /* And we've hit the limit. Rewind, and stop > > here. */ > > + /* And we've hit a buffer or I/O limit. > > Rewind and wait. */ > > read_stream_unget_block(stream, blocknum); > > return; > > } > > + > > + suppress_advice = false; > > } > > If read_stream_start_pending_read() returns false because we hit the pin > limit, does it really help to call read_stream_unget_block()? IIUC that'll > defer one block for later - but what about the other buffers in a multi-block > read? They are already represented by (pending_read_blocknum, pending_read_nblocks). We were unable to append this block number to the pending read because it's already full-sized or the newly acquired block number isn't consecutive, but we were also unable to start the pending read to get it out of the way. That was a pre-existing edge case that you could already hit if it turned out to be a short read: ie the remaining part of the pending read is still in your way, and now you've reached stream->max_ios so you can't start it. So we had to put it aside for later. This change piggy-backs on that approach for buffer starvation: read_stream_start_buffers() can now decline to start even a partial read. In fact it usually declines, unless it is forced to accept a short read because stream->pinned_buffers == 0 (ie, we have to do whatever we can to make progress). It's OK that pending_read_nblocks exceeds what we can start right now, we still remember it, and we can always start it one block at a time if it comes to it. Make sense? > > @@ -260,16 +261,30 @@ read_stream_start_pending_read(ReadStream *stream, > > bool suppress_advice) > > else > > Assert(stream->next_buffer_index == > > stream->oldest_buffer_index); > > > > - /* > > - * If advice hasn't been suppressed, this system supports it, and this > > - * isn't a strictly sequential pattern, then we'll issue advice. > > - */ > > - if (!suppress_advice && > > - stream->advice_enabled && > > - stream->pending_read_blocknum != stream->seq_blocknum) > > + /* Do we need to issue read-ahead advice? */ > > + flags = 0; > > + if (stream->advice_enabled) > > + { > > flags = READ_BUFFERS_ISSUE_ADVICE; > > - else > > - flags = 0; > > + > > + if (stream->pending_read_blocknum == stream->seq_blocknum) > > + { > > + /* > > + * Suppress advice if our WaitReadBuffers() calls > > have caught up > > + * with the first advice we issued for this > > sequential run. > > + */ > > + if (stream->seq_start == InvalidBlockNumber) > > + suppress_advice = true; > > + } > > + else > > + { > > + /* Random jump, so start a new sequential run. */ > > + stream->seq_start = stream->pending_read_blocknum; > > + } > > + > > + if (suppress_advice) > > + flags = 0; > > + } > > Seems a bit confusing to first set > flags = READ_BUFFERS_ISSUE_ADVICE > to then later unset it again. Maybe just set it in if (!suppress_advice)? Yeah that was a bit too tangly. I found a better expression of that logic, which also removed that annoying suppress_advice function argument. I hope this is much clearer. > > * Skip the initial ramp-up phase if the caller says we're going to be > > @@ -825,6 +842,15 @@ read_stream_next_buffer(ReadStream *stream, void > > **per_buffer_data) > > distance = stream->distance * 2; > > distance = Min(distance, stream->max_pinned_buffers); > > stream->distance = distance; > > + > > + /* > > + * If we've caught up with the first advice issued > > for the current > > + * sequential run, cancel further advice until the > > next random > > + * jump. The kernel should be able to see the > > pattern now that > > + * we're issuing sequential preadv() calls. > > + */ > > + if (stream->ios[io_index].op.blocknum == > > stream->seq_start) > > + stream->seq_start = InvalidBlockNumber; > > So stream->seq_start doesn't really denote the start of sequentialness, it > denotes up to where the caller needs to process before we disable sequential > access. Maybe add a comment to it and rename it to something like > ->seq_until_processed? WFM. > Other than this the approach seems to make sense! Cool, so I'm planning to start pushing the earlier ones tomorrow. Here also are the buffer forwarding ones, rebased on top. Here's some strace art showing the old and new advice for patch 0003. I traced ANALYZE io_combine_limit=8 and used different default_statistics_targets to find interesting test cases. The "bracket" on the right shows a sequential range of blocks. Master only calls fadvise once per sequential chunk: fadvise ●──────────────────────╮││ 3 0.000006 ││╰─► pread 1 676..676 2 0.000007 fadvise ●─────────────────────╮││ 3 0.000006 ││╰──► pread 1 678..678 2 0.000007 fadvise ●────────────────────╮││ 3 0.000007 ││╰───► pread 3 680..682 2 0.000031 │╰────► pread 6 684..689 1 0.000015 ╰─────► pread 8 691..698 ─╮ 0 0.000018 pread 8 699..706 │ 0 0.000016 fadvise ●────────────────────────╮ │ 1 0.000007 │ pread 8 707..714 │ 1 0.000019 │ pread 7 715..721 ─╯ 1 0.000017 ╰─► pread 8 723..730 ─╮ 0 0.000016 pread 8 731..738 │ 0 0.000019 fadvise ●────────────────────────╮ │ 1 0.000007 │ pread 8 739..746 │ 1 0.000018 │ pread 5 747..751 ─╯ 1 0.000013 The patch can call three times when that's the configured I/O concurrency level, because that controls when the pread() calls catch up with the first block: fadvise ●────────────────────╮││ 3 0.000007 ││╰───► pread 2 255..256 2 0.000014 fadvise ●───────────────────╮││ 3 0.000007 ││╰────► pread 8 258..265 ─╮ 2 0.000035 │╰─────► preadv 8 266..273 │ 1 0.000021 ╰──────► pread 8 274..281 │ 0 0.000017 fadvise ●────────────────────────╮ │ 1 0.000007 │ pread 8 282..289 │ 1 0.000017 fadvise ●───────────────────────╮│ │ 2 0.000007 ││ pread 6 290..295 ─╯ 2 0.000015 fadvise ●──────────────────────╮││ 3 0.000007 ││╰─► pread 8 297..304 ─╮ 2 0.000015 fadvise ●─────────────────────╮││ │ 3 0.000007 ││╰──► pread 8 305..312 │ 2 0.000017 Purely sequential streams still get none: pread 1 0..0 ─╮ 0 0.000016 pread 2 1..2 │ 0 0.000014 pread 4 3..6 │ 0 0.000021 pread 8 7..14 │ 0 0.000034 ... blah blah blah ... pread 8 4455..4462 │ 0 0.000029 pread 8 4463..4470 │ 0 0.000026 pread 8 4471..4478 │ 0 0.000020 pread 1 4479..4479 ─╯ 0 0.000010
From a493ef860337641678ab1c39c48758d038481196 Mon Sep 17 00:00:00 2001 From: Thomas Munro <tmu...@postgresql.org> Date: Thu, 27 Feb 2025 21:03:39 +1300 Subject: [PATCH v3 1/6] Improve buffer manager API for backend pin limits. Previously the support functions assumed that the caller needed one pin to make progress, and could optionally use some more. Add a couple more functions for callers that want to know: * what the maximum possible number could be irrespective of currently held pins, for space planning purposes, called the "soft pin limit" * how many additional pins they could acquire right now, without the special case allowing one pin, for users that already hold pins and could make progress even if zero extra pins are available These APIs are better suited to read_stream.c, which will be improved in a follow-up patch. Also compute MaxProportionalPins up front, to avoid performing division whenever we check the balance. Reviewed-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 85 +++++++++++++++++++-------- src/backend/storage/buffer/localbuf.c | 16 +++++ src/include/storage/bufmgr.h | 4 ++ 3 files changed, 80 insertions(+), 25 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7915ed624c1..a6138e79306 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0; static uint32 PrivateRefCountClock = 0; static PrivateRefCountEntry *ReservedRefCountEntry = NULL; +static uint32 MaxProportionalPins; + static void ReservePrivateRefCountEntry(void); static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer); static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move); @@ -2097,43 +2099,67 @@ again: return buf; } +/* + * Return the maximum number of buffer than this backend should try to pin at + * once, to avoid pinning more than its fair share. This is the highest value + * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return. + * + * It's called a soft limit because nothing stops a backend from trying to + * acquire more pins than this if it needs them to make progress, but code that + * wants optional extra buffers for optimizations should respect this + * per-backend limit. + */ +uint32 +GetSoftPinLimit(void) +{ + return MaxProportionalPins; +} + +/* + * Return the maximum number of additional buffers that this backend should + * pin if it wants to stay under the per-backend soft limit, considering the + * number of buffers it has already pinned. Unlike LimitAdditionalPins(), the + * result can be zero, so the caller is expected to adjust it if required to + * make progress. + */ +uint32 +GetAdditionalPinLimit(void) +{ + uint32 estimated_pins_held; + + /* + * We get the number of "overflowed" pins for free, but don't know the + * number of pins in PrivateRefCountArray. The cost of calculating that + * exactly doesn't seem worth it, so just assume the max. + */ + estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; + + /* Is this backend already holding more than its fair share? */ + if (estimated_pins_held > MaxProportionalPins) + return 0; + + return MaxProportionalPins - estimated_pins_held; +} + /* * Limit the number of pins a batch operation may additionally acquire, to * avoid running out of pinnable buffers. * - * One additional pin is always allowed, as otherwise the operation likely - * cannot be performed at all. - * - * The number of allowed pins for a backend is computed based on - * shared_buffers and the maximum number of connections possible. That's very - * pessimistic, but outside of toy-sized shared_buffers it should allow - * sufficient pins. + * One additional pin is always allowed, on the assumption that the operation + * requires at least one to make progress. */ void LimitAdditionalPins(uint32 *additional_pins) { - uint32 max_backends; - int max_proportional_pins; + uint32 limit; if (*additional_pins <= 1) return; - max_backends = MaxBackends + NUM_AUXILIARY_PROCS; - max_proportional_pins = NBuffers / max_backends; - - /* - * Subtract the approximate number of buffers already pinned by this - * backend. We get the number of "overflowed" pins for free, but don't - * know the number of pins in PrivateRefCountArray. The cost of - * calculating that exactly doesn't seem worth it, so just assume the max. - */ - max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; - - if (max_proportional_pins <= 0) - max_proportional_pins = 1; - - if (*additional_pins > max_proportional_pins) - *additional_pins = max_proportional_pins; + limit = GetAdditionalPinLimit(); + limit = Max(limit, 1); + if (limit < *additional_pins) + *additional_pins = limit; } /* @@ -3575,6 +3601,15 @@ InitBufferManagerAccess(void) { HASHCTL hash_ctl; + /* + * The soft limit on the number of pins each backend should respect, based + * on shared_buffers and the maximum number of connections possible. + * That's very pessimistic, but outside toy-sized shared_buffers it should + * allow plenty of pins. LimitAdditionalPins() or GetAdditionalPinLimit() + * can be used to check the remaining balance. + */ + MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); + memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray)); hash_ctl.keysize = sizeof(int32); diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 80b83444eb2..5378ba84316 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -286,6 +286,22 @@ GetLocalVictimBuffer(void) return BufferDescriptorGetBuffer(bufHdr); } +/* see GetSoftPinLimit() */ +uint32 +GetSoftLocalPinLimit(void) +{ + /* Every backend has its own temporary buffers, and can pin them all. */ + return num_temp_buffers; +} + +/* see GetAdditionalPinLimit() */ +uint32 +GetAdditionalLocalPinLimit(void) +{ + Assert(NLocalPinnedBuffers <= num_temp_buffers); + return num_temp_buffers - NLocalPinnedBuffers; +} + /* see LimitAdditionalPins() */ void LimitAdditionalLocalPins(uint32 *additional_pins) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index b204e4731c1..74b5afe8a1a 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -290,6 +290,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern uint32 GetSoftPinLimit(void); +extern uint32 GetSoftLocalPinLimit(void); +extern uint32 GetAdditionalPinLimit(void); +extern uint32 GetAdditionalLocalPinLimit(void); extern void LimitAdditionalPins(uint32 *additional_pins); extern void LimitAdditionalLocalPins(uint32 *additional_pins); -- 2.39.5
From b168a9a7369302bcd92288d48cde8e0b6792760d Mon Sep 17 00:00:00 2001 From: Thomas Munro <tmu...@postgresql.org> Date: Thu, 27 Feb 2025 21:42:05 +1300 Subject: [PATCH v3 2/6] Respect pin limits accurately in read_stream.c. To avoid pinning too much of the buffer pool at once, we previously used LimitAdditionalBuffers(). The coding was naive, and only considered the available buffers at stream construction time. This commit checks at the time of use with new buffer manager APIs. The result might change dynamically due to pins acquired outside this stream by the same backend. No extra CPU cycles are added to the all-buffered fast-path code, but the I/O-starting path now considers the up-to-date remaining buffer limit when making look-ahead decisions. In practice it was very difficult to exceed the limit in v17, so no back-patch, but changes due to land soon make it easy. Per code review from Andres, in the course of testing his AIO patches. Reviewed-by: Andres Freund <and...@anarazel.de> Reported-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 104 ++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 36fb9fe152c..11ee16ec228 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -116,6 +116,7 @@ struct ReadStream int16 pinned_buffers; int16 distance; bool advice_enabled; + bool temporary; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -225,7 +226,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } -static void +/* + * Start as much of the current pending read as we can. If we have to split it + * because of the per-backend buffer limit, or the buffer manager decides to + * split it, then the pending read is adjusted to hold the remaining portion. + * + * We can always start a read of at least size one if we have no progress yet. + * Otherwise it's possible that we can't start a read at all because of a lack + * of buffers, and then false is returned. Buffer shortages also reduce the + * distance to a level that prevents look-ahead until buffers are released. + */ +static bool read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { bool need_wait; @@ -234,12 +245,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) int16 io_index; int16 overflow; int16 buffer_index; + int16 buffer_limit; /* This should only be called with a pending read. */ Assert(stream->pending_read_nblocks > 0); Assert(stream->pending_read_nblocks <= stream->io_combine_limit); - /* We had better not exceed the pin limit by starting this read. */ + /* We had better not exceed the per-stream buffer limit with this read. */ Assert(stream->pinned_buffers + stream->pending_read_nblocks <= stream->max_pinned_buffers); @@ -260,10 +272,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else flags = 0; - /* We say how many blocks we want to read, but may be smaller on return. */ + /* Compute the remaining portion of the per-backend buffer limit. */ + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); + else + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; /* guarantee progress */ + + /* Does the per-backend buffer limit affect this read? */ + nblocks = stream->pending_read_nblocks; + if (buffer_limit < nblocks) + { + int16 new_distance; + + /* Shrink distance: no more look-ahead until buffers are released. */ + new_distance = stream->pinned_buffers + buffer_limit; + if (stream->distance > new_distance) + stream->distance = new_distance; + + /* If we've already made progress, just give up and wait for buffers. */ + if (stream->pinned_buffers > 0) + return false; + + /* A short read is required to make progress. */ + nblocks = buffer_limit; + } + + /* + * We say how many blocks we want to read, but it may be smaller on return + * if the buffer manager decides it needs a short read at its level. + */ buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = stream->pending_read_nblocks; need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -313,6 +354,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* Adjust the pending read to cover the remaining portion, if any. */ stream->pending_read_blocknum += nblocks; stream->pending_read_nblocks -= nblocks; + + return true; } static void @@ -361,14 +404,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; - if (stream->ios_in_progress == stream->max_ios) + if (!read_stream_start_pending_read(stream, suppress_advice) || + stream->ios_in_progress == stream->max_ios) { - /* And we've hit the limit. Rewind, and stop here. */ + /* And we've hit a buffer or I/O limit. Rewind and wait. */ read_stream_unget_block(stream, blocknum); return; } + + suppress_advice = false; } /* This is the start of a new pending read. */ @@ -382,15 +426,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) * io_combine_limit size once more buffers have been consumed. However, * if we've already reached io_combine_limit, or we've reached the * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. + * signaled end-of-stream, we start the read immediately. Note that the + * pending read could even exceed the distance goal, if the latter was + * reduced on buffer limit exhaustion. */ if (stream->pending_read_nblocks > 0 && (stream->pending_read_nblocks == stream->io_combine_limit || - (stream->pending_read_nblocks == stream->distance && + (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream, suppress_advice); + + /* + * There should always be something pinned when we leave this function, + * whether started by this call or not, unless we've hit the end of the + * stream. In the worst case we can always make progress one buffer at a + * time. + */ + Assert(stream->pinned_buffers > 0 || stream->distance == 0); } /* @@ -420,6 +474,7 @@ read_stream_begin_impl(int flags, int max_ios; int strategy_pin_limit; uint32 max_pinned_buffers; + uint32 max_possible_buffer_limit; Oid tablespace_id; /* @@ -475,12 +530,23 @@ read_stream_begin_impl(int flags, strategy_pin_limit = GetAccessStrategyPinLimit(strategy); max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers); - /* Don't allow this backend to pin more than its share of buffers. */ + /* + * Also limit our queue to the maximum number of pins we could possibly + * ever be allowed to acquire according to the buffer manager. We may not + * really be able to use them all due to other pins held by this backend, + * but we'll check that later in read_stream_start_pending_read(). + */ if (SmgrIsTemp(smgr)) - LimitAdditionalLocalPins(&max_pinned_buffers); + max_possible_buffer_limit = GetSoftLocalPinLimit(); else - LimitAdditionalPins(&max_pinned_buffers); - Assert(max_pinned_buffers > 0); + max_possible_buffer_limit = GetSoftPinLimit(); + max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit); + + /* + * The soft limit might be zero on a system configured with more + * connections than buffers. We need at least one to make progress. + */ + max_pinned_buffers = Max(1, max_pinned_buffers); /* * We need one extra entry for buffers and per-buffer data, because users @@ -546,6 +612,7 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->temporary = SmgrIsTemp(smgr); /* * Skip the initial ramp-up phase if the caller says we're going to be @@ -674,6 +741,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * arbitrary I/O entry (they're all free). We don't have to * adjust pinned_buffers because we're transferring one to caller * but pinning one more. + * + * In the fast path we don't need to check the pin limit. We're + * always allowed at least one pin so that progress can be made, + * and that's all we need here. Although two pins are momentarily + * held at the same time, the model used here is that the stream + * holds only one, and the other now belongs to the caller. */ if (likely(!StartReadBuffer(&stream->ios[0].op, &stream->buffers[oldest_buffer_index], @@ -874,6 +947,9 @@ read_stream_reset(ReadStream *stream) stream->buffered_blocknum = InvalidBlockNumber; stream->fast_path = false; + /* There is no point in reading whatever was pending. */ + stream->pending_read_nblocks = 0; + /* Unpin anything that wasn't consumed. */ while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); -- 2.39.5
From 2dd3d12fa75248bf8473b4b69884dd056bdc9163 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 18 Feb 2025 15:59:13 +1300 Subject: [PATCH v3 3/6] Improve read stream advice for large random chunks. read_stream.c tries not to issue advice when it thinks the kernel's readahead should be active, ie when using buffered I/O and reading sequential blocks. It previously gave up a little too easily: it should issue advice until it has started running sequential pread() calls, not just when it's planning to. The simpler strategy worked for random chunks of size <= io_combine_limit and entirely sequential streams, but so not well when reading random chunks > io_combine limit. For example, a 256kB chunk of sequential data would benefit from only one fadvise(), but (assuming io_combine_limit=128kB) could suffer an I/O stall for the second half of it. Keep issuing advice until the pread() calls catch up with the start of the region we're currently issuing advice for, if ever. In practice, if there are any jumps in the lookahead window, we'll never stop issuing advice, and if the whole lookahead window becomes sequential we'll finally stop issuing advice. Discovered by Tomas Vondra's regression testing of many data clustering patterns using Melanie Plageman's streaming Bitmap Heap Scan patch, with analysis of the I/O stall-producing pattern from Andres Freund. Reviewed-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com Discussion: https://postgr.es/m/CA%2BhUKGJ3HSWciQCz8ekP1Zn7N213RfA4nbuotQawfpq23%2Bw-5Q%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 71 +++++++++++++++++++-------- 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 11ee16ec228..a8a96baf8c1 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -133,6 +133,7 @@ struct ReadStream /* Next expected block, for detecting sequential access. */ BlockNumber seq_blocknum; + BlockNumber seq_until_processed; /* The read operation we are currently preparing. */ BlockNumber pending_read_blocknum; @@ -237,11 +238,11 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) * distance to a level that prevents look-ahead until buffers are released. */ static bool -read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +read_stream_start_pending_read(ReadStream *stream) { bool need_wait; int nblocks; - int flags; + int flags = 0; int16 io_index; int16 overflow; int16 buffer_index; @@ -261,16 +262,36 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else Assert(stream->next_buffer_index == stream->oldest_buffer_index); - /* - * If advice hasn't been suppressed, this system supports it, and this - * isn't a strictly sequential pattern, then we'll issue advice. - */ - if (!suppress_advice && - stream->advice_enabled && - stream->pending_read_blocknum != stream->seq_blocknum) - flags = READ_BUFFERS_ISSUE_ADVICE; - else - flags = 0; + /* Do we need to issue read-ahead advice? */ + if (stream->advice_enabled) + { + bool no_wait; + + /* + * We only issue advice if we won't immediately have to call + * WaitReadBuffers(). + */ + no_wait = stream->pinned_buffers > 0 || + stream->pending_read_nblocks < stream->distance; + + if (stream->pending_read_blocknum == stream->seq_blocknum) + { + /* + * Sequential: issue advice only until the WaitReadBuffers() calls + * catch up with the first advice issued for this sequential + * region, so the kernel can see sequential access. + */ + if (stream->seq_until_processed != InvalidBlockNumber && no_wait) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + else + { + /* Random jump: start tracking new region. */ + stream->seq_until_processed = stream->pending_read_blocknum; + if (no_wait) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + } /* Compute the remaining portion of the per-backend buffer limit. */ if (stream->temporary) @@ -359,7 +380,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) } static void -read_stream_look_ahead(ReadStream *stream, bool suppress_advice) +read_stream_look_ahead(ReadStream *stream) { while (stream->ios_in_progress < stream->max_ios && stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) @@ -370,8 +391,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) if (stream->pending_read_nblocks == stream->io_combine_limit) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; + read_stream_start_pending_read(stream); continue; } @@ -404,15 +424,13 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - if (!read_stream_start_pending_read(stream, suppress_advice) || + if (!read_stream_start_pending_read(stream) || stream->ios_in_progress == stream->max_ios) { /* And we've hit a buffer or I/O limit. Rewind and wait. */ read_stream_unget_block(stream, blocknum); return; } - - suppress_advice = false; } /* This is the start of a new pending read. */ @@ -436,7 +454,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) - read_stream_start_pending_read(stream, suppress_advice); + read_stream_start_pending_read(stream); /* * There should always be something pinned when we leave this function, @@ -612,6 +630,8 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->seq_blocknum = InvalidBlockNumber; + stream->seq_until_processed = InvalidBlockNumber; stream->temporary = SmgrIsTemp(smgr); /* @@ -792,7 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * space for more, but if we're just starting up we'll need to crank * the handle to get started. */ - read_stream_look_ahead(stream, true); + read_stream_look_ahead(stream); /* End of stream reached? */ if (stream->pinned_buffers == 0) @@ -837,6 +857,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) distance = stream->distance * 2; distance = Min(distance, stream->max_pinned_buffers); stream->distance = distance; + + /* + * If we've caught up with the first advice issued for the current + * sequential region, cancel further advice until the next random + * jump. The kernel should be able to see the pattern now that + * we're actually making sequential preadv() calls. + */ + if (stream->ios[io_index].op.blocknum == stream->seq_until_processed) + stream->seq_until_processed = InvalidBlockNumber; } else { @@ -898,7 +927,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->oldest_buffer_index = 0; /* Prepare for the next call. */ - read_stream_look_ahead(stream, false); + read_stream_look_ahead(stream); #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ -- 2.39.5
From 5457ef3b17fd28be63c1ba31fcfc1d845a3010ca Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 19 Feb 2025 01:25:40 +1300 Subject: [PATCH v3 4/6] Look ahead more when sequential in read_stream.c. Previously, sequential reads would cause the look-ahead distance to fall back to io_combine_limit, on the basis that kernel read-ahead should start helping. It also meant that we'd have to ramp the distance back up when a sequential region was followed by a burst of random jumps, with little hope of avoiding a stall, which is not a good trade-off and is incompatible with AIO plans (you have to look ahead if you have to start real I/O). Simplify the algorithm: now only cache hits make the look-ahead distance drop off, and cache misses still make it grow rapidly. Random vs sequential heuristics are no longer taken into consideration while making that decision. Reviewed-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 92 ++++++++++----------------- 1 file changed, 33 insertions(+), 59 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index a8a96baf8c1..57cde89cfdc 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -17,30 +17,12 @@ * pending read. When that isn't possible, the existing pending read is sent * to StartReadBuffers() so that a new one can begin to form. * - * The algorithm for controlling the look-ahead distance tries to classify the - * stream into three ideal behaviors: + * The algorithm for controlling the look-ahead distance is based on recent + * cache hits and misses: * - * A) No I/O is necessary, because the requested blocks are fully cached - * already. There is no benefit to looking ahead more than one block, so - * distance is 1. This is the default initial assumption. - * - * B) I/O is necessary, but read-ahead advice is undesirable because the - * access is sequential and we can rely on the kernel's read-ahead heuristics, - * or impossible because direct I/O is enabled, or the system doesn't support - * read-ahead advice. There is no benefit in looking ahead more than - * io_combine_limit, because in this case the only goal is larger read system - * calls. Looking further ahead would pin many buffers and perform - * speculative work for no benefit. - * - * C) I/O is necessary, it appears to be random, and this system supports - * read-ahead advice. We'll look further ahead in order to reach the - * configured level of I/O concurrency. - * - * The distance increases rapidly and decays slowly, so that it moves towards - * those levels as different I/O patterns are discovered. For example, a - * sequential scan of fully cached data doesn't bother looking ahead, but a - * sequential scan that hits a region of uncached blocks will start issuing - * increasingly wide read calls until it plateaus at io_combine_limit. + * When no I/O is necessary, there is no point in looking ahead more than one + * block. This is the default initial assumption. Otherwise rapidly increase + * the distance to try to benefit from I/O combining and I/O concurrency. * * The main data structure is a circular queue of buffers of size * max_pinned_buffers plus some extra space for technical reasons, ready to be @@ -336,7 +318,7 @@ read_stream_start_pending_read(ReadStream *stream) /* Remember whether we need to wait before returning this buffer. */ if (!need_wait) { - /* Look-ahead distance decays, no I/O necessary (behavior A). */ + /* Look-ahead distance decays, no I/O necessary. */ if (stream->distance > 1) stream->distance--; } @@ -517,6 +499,15 @@ read_stream_begin_impl(int flags, else max_ios = get_tablespace_io_concurrency(tablespace_id); + /* + * XXX Since we don't have asynchronous I/O yet, if direct I/O is enabled + * then just behave as though I/O concurrency is set to 0. Otherwise we + * would look ahead pinning many buffers for no benefit, for lack of + * advice and AIO. + */ + if (io_direct_flags & IO_DIRECT_DATA) + max_ios = 0; + /* Cap to INT16_MAX to avoid overflowing below */ max_ios = Min(max_ios, PG_INT16_MAX); @@ -637,7 +628,7 @@ read_stream_begin_impl(int flags, /* * Skip the initial ramp-up phase if the caller says we're going to be * reading the whole relation. This way we start out assuming we'll be - * doing full io_combine_limit sized reads (behavior B). + * doing full io_combine_limit sized reads. */ if (flags & READ_STREAM_FULL) stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); @@ -728,10 +719,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* - * A fast path for all-cached scans (behavior A). This is the same as the - * usual algorithm, but it is specialized for no I/O and no per-buffer - * data, so we can skip the queue management code, stay in the same buffer - * slot and use singular StartReadBuffer(). + * A fast path for all-cached scans. This is the same as the usual + * algorithm, but it is specialized for no I/O and no per-buffer data, so + * we can skip the queue management code, stay in the same buffer slot and + * use singular StartReadBuffer(). */ if (likely(stream->fast_path)) { @@ -851,37 +842,20 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (++stream->oldest_io_index == stream->max_ios) stream->oldest_io_index = 0; - if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE) - { - /* Distance ramps up fast (behavior C). */ - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; + /* Look-ahead distance ramps up quickly after we do I/O. */ + distance = stream->distance * 2; + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; - /* - * If we've caught up with the first advice issued for the current - * sequential region, cancel further advice until the next random - * jump. The kernel should be able to see the pattern now that - * we're actually making sequential preadv() calls. - */ - if (stream->ios[io_index].op.blocknum == stream->seq_until_processed) - stream->seq_until_processed = InvalidBlockNumber; - } - else - { - /* No advice; move towards io_combine_limit (behavior B). */ - if (stream->distance > stream->io_combine_limit) - { - stream->distance--; - } - else - { - distance = stream->distance * 2; - distance = Min(distance, stream->io_combine_limit); - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; - } - } + /* + * If we've caught up with the first advice issued for the current + * sequential region, cancel further advice until the next random + * jump. The kernel should be able to see the pattern now that we're + * actually making sequential preadv() calls. + */ + if (stream->advice_enabled && + stream->ios[io_index].op.blocknum == stream->seq_until_processed) + stream->seq_until_processed = InvalidBlockNumber; } #ifdef CLOBBER_FREED_MEMORY -- 2.39.5
From b8573757a4c61d72004e805974dc2a8900ed3eef Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Thu, 30 Jan 2025 11:42:03 +1300 Subject: [PATCH v3 5/6] Support buffer forwarding in read_stream.c. In preparation for a following change to the buffer manager, teach read stream to keep track of buffers that were "forwarded" from one call to StartReadBuffers() to the next. Since StartReadBuffers() buffers argument will become an in/out argument, we need to initialize the buffer queue entries with InvalidBuffer. We don't want to do that up front, because we try to keep stream initialization cheap and code that uses the fast path stays in one single buffer queue element. Satisfy both goals by initializing the queue incrementally on the first cycle. Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com --- src/backend/storage/aio/read_stream.c | 102 +++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 10 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 57cde89cfdc..51c15330117 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -95,8 +95,10 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; + int16 forwarded_buffers; int16 pinned_buffers; int16 distance; + int16 initialized_buffers; bool advice_enabled; bool temporary; @@ -223,8 +225,10 @@ static bool read_stream_start_pending_read(ReadStream *stream) { bool need_wait; + int requested_nblocks; int nblocks; int flags = 0; + int forwarded; int16 io_index; int16 overflow; int16 buffer_index; @@ -275,11 +279,19 @@ read_stream_start_pending_read(ReadStream *stream) } } - /* Compute the remaining portion of the per-backend buffer limit. */ + /* + * Compute the remaining portion of the per-backend buffer limit. If we + * already have some forwarded buffers, we can certainly use those. They + * are already pinned, and are mapped to the starting blocks of the pending + * read, they just don't have any I/O started yet and are not counted in + * stream->pinned_buffers. + */ if (stream->temporary) buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); else buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + buffer_limit += stream->forwarded_buffers; if (buffer_limit == 0 && stream->pinned_buffers == 0) buffer_limit = 1; /* guarantee progress */ @@ -306,8 +318,31 @@ read_stream_start_pending_read(ReadStream *stream) * We say how many blocks we want to read, but it may be smaller on return * if the buffer manager decides it needs a short read at its level. */ + requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks); + nblocks = requested_nblocks; buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; + + /* + * The first time around the queue we initialize it as we go, including + * the overflow zone, because otherwise the entries would appear as + * forwarded buffers. This avoids initializing the whole queue up front + * in cases where it is large but we don't ever use it due to the + * all-cached fast path or small scans. + */ + while (stream->initialized_buffers < buffer_index + nblocks) + stream->buffers[stream->initialized_buffers++] = InvalidBuffer; + + /* + * Start the I/O. Any buffers that are not InvalidBuffer will be + * interpreted as already pinned, forwarded by an earlier call to + * StartReadBuffers(), and must map to the expected blocks. The nblocks + * value may be smaller on return indicating the size of the I/O that + * could be started. Buffers beyond the output nblocks number may also + * have been pinned without starting I/O due to various edge cases. In + * that case we'll just leave them in the queue ahead of us, "forwarded" + * to the next call, avoiding the need to unpin/repin. + */ need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -336,16 +371,35 @@ read_stream_start_pending_read(ReadStream *stream) stream->seq_blocknum = stream->pending_read_blocknum + nblocks; } + /* + * How many pins were acquired but forwarded to the next call? These need + * to be passed to the next StartReadBuffers() call, or released if the + * stream ends early. We need the number for accounting purposes, since + * they are not counted in stream->pinned_buffers but we already hold + * them. + */ + forwarded = 0; + while (nblocks + forwarded < requested_nblocks && + stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) + forwarded++; + stream->forwarded_buffers = forwarded; + /* * We gave a contiguous range of buffer space to StartReadBuffers(), but - * we want it to wrap around at queue_size. Slide overflowing buffers to - * the front of the array. + * we want it to wrap around at queue_size. Copy overflowing buffers to + * the front of the array where they'll be consumed, but also leave a copy + * in the overflow zone which the I/O operation has a pointer to (it needs + * a contiguous array). Both copies will be cleared when the buffers are + * handed to the consumer. */ - overflow = (buffer_index + nblocks) - stream->queue_size; + overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; if (overflow > 0) - memmove(&stream->buffers[0], - &stream->buffers[stream->queue_size], - sizeof(stream->buffers[0]) * overflow); + { + Assert(overflow < stream->queue_size); /* can't overlap */ + memcpy(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + } /* Compute location of start of next read, without using % operator. */ buffer_index += nblocks; @@ -730,10 +784,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -782,6 +838,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; + stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -858,10 +915,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->seq_until_processed = InvalidBlockNumber; } -#ifdef CLOBBER_FREED_MEMORY - /* Clobber old buffer for debugging purposes. */ + /* + * We must zap this queue entry, or else it would appear as a forwarded + * buffer. If it's potentially in the overflow zone (ie it wrapped around + * the queue), also zap that copy. + */ stream->buffers[oldest_buffer_index] = InvalidBuffer; -#endif + if (oldest_buffer_index < io_combine_limit - 1) + stream->buffers[stream->queue_size + oldest_buffer_index] = + InvalidBuffer; #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) @@ -906,6 +968,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && + stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && @@ -941,6 +1004,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) void read_stream_reset(ReadStream *stream) { + int16 index; Buffer buffer; /* Stop looking ahead. */ @@ -957,6 +1021,24 @@ read_stream_reset(ReadStream *stream) while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); + /* Unpin any unused forwarded buffers. */ + index = stream->next_buffer_index; + while (index < stream->initialized_buffers && + (buffer = stream->buffers[index]) != InvalidBuffer) + { + Assert(stream->forwarded_buffers > 0); + stream->forwarded_buffers--; + ReleaseBuffer(buffer); + + stream->buffers[index] = InvalidBuffer; + if (index < io_combine_limit - 1) + stream->buffers[stream->queue_size + index] = InvalidBuffer; + + if (++index == stream->queue_size) + index = 0; + } + + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); -- 2.39.5
From 02cd8f3f977de017760ff8f4b89fcc83c5aa36ab Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 10 Feb 2025 21:55:40 +1300 Subject: [PATCH v3 6/6] Support buffer forwarding in StartReadBuffers(). Sometimes we have to perform a short read because we hit a cached block that ends a contiguous run of blocks requiring I/O. We don't want StartReadBuffers() to have to start more than one I/O, so we stop there. We also don't want to have to unpin the cached block (and repin it later), so previously we'd silently pretend the hit was part of the I/O, and just leave it out of the read from disk. Now, we'll "forward" it to the next call. We still write it to the buffers[] array for the caller to pass back to us later, but it's not included in *nblocks. This policy means that we no longer mix hits and misses in a single operation's results, so we avoid the requirement to call WaitReadBuffers(), which might stall, before the caller can make use of the hits. The caller will get the hit in the next call instead, and know that it doesn't have to wait. That's important for later work on out-of-order read streams that minimize I/O stalls. This also makes life easier for proposed work on true AIO, which occasionally needs to split a large I/O after pinning all the buffers, while the current coding only ever forwards a single bookending hit. This API is natural for read_stream.c: it just leaves forwarded buffers where they are in its circular queue, where the next call will pick them up and continue, minimizing pin churn. If we ever think of a good reason to disable this feature, i.e. for other users of StartReadBuffers() that don't want to deal with forwarded buffers, then we could add a flag for that. For now read_steam.c is the only user. Discussion: https://postgr.es/m/CA%2BhUKGK_%3D4CVmMHvsHjOVrK6t4F%3DLBpFzsrr3R%2BaJYN8kcTfWg%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 128 ++++++++++++++++++++-------- src/include/storage/bufmgr.h | 1 - 2 files changed, 91 insertions(+), 38 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a6138e79306..d56bff96cec 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1257,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, - int flags) + int flags, + bool allow_forwarding) { int actual_nblocks = *nblocks; - int io_buffers_len = 0; int maxcombine = 0; Assert(*nblocks > 0); @@ -1270,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, { bool found; - buffers[i] = PinBufferForBlock(operation->rel, - operation->smgr, - operation->persistence, - operation->forknum, - blockNum + i, - operation->strategy, - &found); + if (allow_forwarding && buffers[i] != InvalidBuffer) + { + BufferDesc *bufHdr; + + /* + * This is a buffer that was pinned by an earlier call to + * StartReadBuffers(), but couldn't be handled in one operation at + * that time. The operation was split, and the caller has passed + * an already pinned buffer back to us to handle the rest of the + * operation. It must continue at the expected block number. + */ + Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i); + + /* + * It might be an already valid buffer (a hit) that followed the + * final contiguous block of an earlier I/O (a miss) marking the + * end of it, or a buffer that some other backend has since made + * valid by performing the I/O for us, in which case we can handle + * it as a hit now. It is safe to check for a BM_VALID flag with + * a relaxed load, because we got a fresh view of it while pinning + * it in the previous call. + * + * On the other hand if we don't see BM_VALID yet, it must be an + * I/O that was split by the previous call and we need to try to + * start a new I/O from this block. We're also racing against any + * other backend that might start the I/O or even manage to mark + * it BM_VALID after this check, BM_VALID after this check, but + * StartBufferIO() will handle those cases. + */ + if (BufferIsLocal(buffers[i])) + bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1); + else + bufHdr = GetBufferDescriptor(buffers[i] - 1); + found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID; + } + else + { + buffers[i] = PinBufferForBlock(operation->rel, + operation->smgr, + operation->persistence, + operation->forknum, + blockNum + i, + operation->strategy, + &found); + } if (found) { /* - * Terminate the read as soon as we get a hit. It could be a - * single buffer hit, or it could be a hit that follows a readable - * range. We don't want to create more than one readable range, - * so we stop here. + * We have a hit. If it's the first block in the requested range, + * we can return it immediately and report that WaitReadBuffers() + * does not need to be called. If the initial value of *nblocks + * was larger, the caller will have to call again for the rest. */ - actual_nblocks = i + 1; + if (i == 0) + { + *nblocks = 1; + return false; + } + + /* + * Otherwise we already have an I/O to perform, but this block + * can't be included as it is already valid. Split the I/O here. + * There may or may not be more blocks requiring I/O after this + * one, we haven't checked, but it can't be contiguous with this + * hit in the way. We'll leave this buffer pinned, forwarding it + * to the next call, avoiding the need to unpin it here and re-pin + * it in the next call. + */ + actual_nblocks = i; break; } else { - /* Extend the readable range to cover this block. */ - io_buffers_len++; - /* * Check how many blocks we can cover with the same IO. The smgr * implementation might e.g. be limited due to a segment boundary. @@ -1314,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, } *nblocks = actual_nblocks; - if (likely(io_buffers_len == 0)) - return false; - /* Populate information needed for I/O. */ operation->buffers = buffers; operation->blocknum = blockNum; operation->flags = flags; operation->nblocks = actual_nblocks; - operation->io_buffers_len = io_buffers_len; if (flags & READ_BUFFERS_ISSUE_ADVICE) { @@ -1337,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, smgrprefetch(operation->smgr, operation->forknum, blockNum, - operation->io_buffers_len); + actual_nblocks); } /* Indicate that WaitReadBuffers() should be called. */ @@ -1351,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, * actual number, which may be fewer than requested. Caller sets some of the * members of operation; see struct definition. * + * The initial contents of the elements of buffers up to *nblocks should + * either be InvalidBuffer or an already-pinned buffer that was left by an + * preceding call to StartReadBuffers() that had to be split. On return, some + * elements of buffers may hold pinned buffers beyond the number indicated by + * the updated value of *nblocks. Operations are split on boundaries known to + * smgr (eg md.c segment boundaries that require crossing into a different + * underlying file), or when already cached blocks are found in the buffer + * that prevent the formation of a contiguous read. + * * If false is returned, no I/O is necessary. If true is returned, one I/O * has been started, and WaitReadBuffers() must be called with the same * operation object before the buffers are accessed. Along with the operation * object, the caller-supplied array of buffers must remain valid until - * WaitReadBuffers() is called. + * WaitReadBuffers() is called, and any forwarded buffers must also be + * preserved for a future call unless explicitly released. * * Currently the I/O is only started with optional operating system advice if * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O @@ -1369,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation, int *nblocks, int flags) { - return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags); + return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags, + true /* expect forwarded buffers */ ); } /* * Single block version of the StartReadBuffers(). This might save a few * instructions when called from another translation unit, because it is * specialized for nblocks == 1. + * + * This version does not support "forwarded" buffers: they cannot be created + * by reading only one block, and the current contents of *buffer is ignored + * on entry. */ bool StartReadBuffer(ReadBuffersOperation *operation, @@ -1386,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation, int nblocks = 1; bool result; - result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); + result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags, + false /* single block, no forwarding */ ); Assert(nblocks == 1); /* single block can't be short */ return result; @@ -1416,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation) IOObject io_object; char persistence; - /* - * Currently operations are only allowed to include a read of some range, - * with an optional extra buffer that is already pinned at the end. So - * nblocks can be at most one more than io_buffers_len. - */ - Assert((operation->nblocks == operation->io_buffers_len) || - (operation->nblocks == operation->io_buffers_len + 1)); - /* Find the range of the physical read we need to perform. */ - nblocks = operation->io_buffers_len; - if (nblocks == 0) - return; /* nothing to do */ - + nblocks = operation->nblocks; buffers = &operation->buffers[0]; blocknum = operation->blocknum; forknum = operation->forknum; persistence = operation->persistence; + Assert(nblocks > 0); + Assert(nblocks <= MAX_IO_COMBINE_LIMIT); + if (persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 74b5afe8a1a..307f36af384 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -130,7 +130,6 @@ struct ReadBuffersOperation BlockNumber blocknum; int flags; int16 nblocks; - int16 io_buffers_len; }; typedef struct ReadBuffersOperation ReadBuffersOperation; -- 2.39.5