Hi, Here are some patches that address some of Andres's feedback since the AIO v2 rebase[1], anticipate out-of-order streams, and make some other minor improvements. They are independent of the main AIO patch set and apply to master, hence separate thread.
0001-Refactor-read_stream.c-s-circular-arithmetic.patch This just replaced open-coded arithmetic with inline functions. They will be used a lot more in later work, and provide central places to put assertions that were not checked as uniformly as I would like. 0002-Allow-more-buffers-for-sequential-read-streams.patch Currently we're only generating I/O concurrency for random I/O, and I originally guesstimated that random I/O wouldn't typically be able to combine more than about 4 blocks on average when capping the buffer queue. Debatable perhaps, but soon obsolete: for true asynchronous I/O, we'll need full concurrency * full combine limit, so we'll potentially want to pin more buffers. This just changes that hard-coded 4 to io_combine_limit. 0003-Improve-buffer-pool-API-for-per-backend-pin-limits.patch In preparation for the next patch, provide some new bufmgr APIs. 0004-Respect-pin-limits-accurately-in-read_stream.c.patch The current coding only computes the remaining "fair share" of the buffer pool for this backend at stream initialisation. It's hard, but not impossible, to get one backend to pin more than 1/max_connections of the buffer pool (the intended cap), when using multiple streams at the same time in one backend. This patch moves the time of check to the time of use, so it respects the limit strictly. I avoid adding any changes to the fast path for all-cached streams, which only pin one buffer at a time. This is basically a TOCTOU bug, and could in theory be back-patched, but I don't personally think it's necessary yet, since it's so hard to actually break anything with v17's rather limited use of streams. The only way I can think of right now to see a concrete problem would be to get many backends all to create many CURSORs that stream cold data, and FETCH in a particular order only after they've all been constructed, which is also a recipe for running out of buffers without using streams, albeit not quite as quickly. 0005-Support-buffer-forwarding-in-read_stream.c.patch 0006-Support-buffer-forwarding-in-StartReadBuffers.patch Background: StartReadBuffers() reports a short read when it finds a cached block that divides a potential I/O. For example, if you ask to read 8 blocks, and the 6th one turns out to be a hit, ie already valid in the buffer pool, then we only need to read the first 5 from disk. Perhaps the 7th and 8th blocks will also need I/O, but we don't want StartReadBuffers() to start *two* I/Os, so the 6th block terminates our search. The question is what to do with that 6th block. Currently we'll tell the user that the size of the operation is 6 blocks -- that is, we'll silently include that hit that prevented further combining, even though we'll only actually do the read for the first 5 blocks, and if someone else manages to make any buffers valid before you call WaitReadBuffers(), we'll just silently skip over them and loop. That was simple enough, but having hits that are invisibly mixed up with misses prevents us from implementing fully general reordering of hits, an important optimisation that reduces I/O stalls for consumers that can cope with out-of-order data (I'll post a new patch for that separately). A simple solution we rejected was to unpin such buffers and then repin later, but that would waste cycles, allow eviction and potentially mess up the usage count. The AIO patch set also introduces another case involving potentially many buffers: it moves the BM_IO_IN_PROGRESS negotiation into StartReadBuffers(), and reports a short read then too for middle blocks, but it has already acquired all the pins. The solution we agreed on is to introduce a way for StartReadBuffers() to communicate with future calls, and "forward" pinned buffers between calls. The function arguments don't change, but its "buffers" argument becomes an in/out array: one StartReadBuffers() call can leave extra pinned buffers after the ones that were included in a short read (*nblocks), and then when you retry (or possibly extend) the rest of the read, you have to pass them back in. That is easy for the read stream code, as it can just leave them in its circular queue for the next call to take as input. It only needs to be aware of them for pin limit accounting and stream reset (including early shutdown). One goal was to avoid introducing any new instructions into ReadBuffer() or the read stream fast path, so StartReadBuffer(), the single-block specialisation, doesn't support forwarding (it already can't split reads, they are only one block, but it also doesn't support receiving forwarded buffers). I plan to share some new C-level tests and illustrations of the internal states of read_stream.c separately, as the complexity is obviously increasing a bit (especially with out-of-order streams, about which more soon). [1] https://www.postgresql.org/message-id/CA%2BhUKGLxH1tsUgzZfng4BU6GqnS6bKF2ThvxH1_w5c7-sLRKQw%40mail.gmail.com
From 57cc7f12e4a7799c581d4ced054dfbb2663c5b81 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 15 Feb 2025 14:47:25 +1300 Subject: [PATCH v1 1/6] Refactor read_stream.c's circular arithmetic. Several places have open-coded circular index arithmetic. Make some common functions for better readability and consistent assertion checking. This avoids adding yet more open-coded duplication in later patches, and standardizes on the vocabulary "advance" and "retreat" as used elsewhere in PostgreSQL. --- src/backend/storage/aio/read_stream.c | 78 +++++++++++++++++++++------ 1 file changed, 61 insertions(+), 17 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 4b499dfb441..eadfe88c35a 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -224,6 +224,55 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } +/* + * Increment index, wrapping around at queue size. + */ +static inline void +read_stream_index_advance(ReadStream *stream, int16 *index) +{ + Assert(*index >= 0); + Assert(*index < stream->queue_size); + + *index += 1; + if (*index == stream->queue_size) + *index = 0; +} + +/* + * Increment index by n, wrapping around at queue size. + */ +static inline void +read_stream_index_advance_n(ReadStream *stream, int16 *index, int16 n) +{ + Assert(*index >= 0); + Assert(*index < stream->queue_size); + Assert(n <= MAX_IO_COMBINE_LIMIT); + + *index += n; + if (*index >= stream->queue_size) + *index -= stream->queue_size; + + Assert(*index >= 0); + Assert(*index < stream->queue_size); +} + +#if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) +/* + * Decrement index, wrapping around at queue size. + */ +static inline void +read_stream_index_retreat(ReadStream *stream, int16 *index) +{ + Assert(*index >= 0); + Assert(*index < stream->queue_size); + + if (*index == 0) + *index = stream->queue_size - 1; + else + *index -= 1; +} +#endif + static void read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { @@ -302,11 +351,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) &stream->buffers[stream->queue_size], sizeof(stream->buffers[0]) * overflow); - /* Compute location of start of next read, without using % operator. */ - buffer_index += nblocks; - if (buffer_index >= stream->queue_size) - buffer_index -= stream->queue_size; - Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + /* Move to the location of start of next read. */ + read_stream_index_advance_n(stream, &buffer_index, nblocks); stream->next_buffer_index = buffer_index; /* Adjust the pending read to cover the remaining portion, if any. */ @@ -334,12 +380,12 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* * See which block the callback wants next in the stream. We need to * compute the index of the Nth block of the pending read including - * wrap-around, but we don't want to use the expensive % operator. + * wrap-around. */ - buffer_index = stream->next_buffer_index + stream->pending_read_nblocks; - if (buffer_index >= stream->queue_size) - buffer_index -= stream->queue_size; - Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + buffer_index = stream->next_buffer_index; + read_stream_index_advance_n(stream, + &buffer_index, + stream->pending_read_nblocks); per_buffer_data = get_per_buffer_data(stream, buffer_index); blocknum = read_stream_get_block(stream, per_buffer_data); if (blocknum == InvalidBlockNumber) @@ -777,12 +823,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) */ if (stream->per_buffer_data) { + int16 index; void *per_buffer_data; - per_buffer_data = get_per_buffer_data(stream, - oldest_buffer_index == 0 ? - stream->queue_size - 1 : - oldest_buffer_index - 1); + index = oldest_buffer_index; + read_stream_index_retreat(stream, &index); + per_buffer_data = get_per_buffer_data(stream, index); #if defined(CLOBBER_FREED_MEMORY) /* This also tells Valgrind the memory is "noaccess". */ @@ -800,9 +846,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->pinned_buffers--; /* Advance oldest buffer, with wrap-around. */ - stream->oldest_buffer_index++; - if (stream->oldest_buffer_index == stream->queue_size) - stream->oldest_buffer_index = 0; + read_stream_index_advance(stream, &stream->oldest_buffer_index); /* Prepare for the next call. */ read_stream_look_ahead(stream, false); -- 2.48.1
From cb597bdb53904a82cd27d2f67b8bad9873066f14 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 21 Jan 2025 08:08:08 +1300 Subject: [PATCH v1 2/6] Allow more buffers for sequential read streams. Read streams currently only start concurrent I/Os (via read-ahead advice) for random access, with a hard-coded guesstimate that their average size is likely to be at most 4 blocks when planning the size of the buffer queue. Sequential streams benefit from kernel readahead when using buffered I/O, and read-ahead advice doesn't exist for direct I/O by definition, so we didn't need to look ahead more than io_combine_limit in that case. Proposed patches need more buffers to be able start multiple asynchronous I/O operations even for sequential access. Adjust the arithmetic in preparation, replacing "4" with io_combine_limit, though there is no benefit yet, just some wasted queue space. As of the time of writing, the maximum GUC values for effective_io_concurrent (1000) and io_combine_limit (32) imply a queue with around 32K entries (slightly more for technical reasons), though those numbers are likely to change. That requires a wider type in one place that has a intermediate value that might overflow before clamping. --- src/backend/storage/aio/read_stream.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index eadfe88c35a..cb308948b6e 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -499,7 +499,7 @@ read_stream_begin_impl(int flags, * overflow (even though that's not possible with the current GUC range * limits), allowing also for the spare entry and the overflow space. */ - max_pinned_buffers = Max(max_ios * 4, io_combine_limit); + max_pinned_buffers = Max(max_ios, 1) * io_combine_limit; max_pinned_buffers = Min(max_pinned_buffers, PG_INT16_MAX - io_combine_limit - 1); @@ -771,7 +771,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - int16 distance; + int32 distance; /* wider temporary value, clamped below */ /* Sanity check that we still agree on the buffers. */ Assert(stream->ios[io_index].op.buffers == -- 2.48.1
From d98dbce9475d4640305bee2e6f536512de2d20d3 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 24 Jan 2025 10:59:39 +1300 Subject: [PATCH v1 3/6] Improve buffer pool API for per-backend pin limits. Previously the support functions assumed that you needed one additional pin to make progress, and could optionally use some more. Add a couple more functions for callers that want to know: * what the maximum possible number could be, for space planning purposes, called the "soft pin limit" * how many additional pins they could acquire right now, without the special case allowing one pin (ie for users that already hold pins and can already make progress even if zero extra pins are available now) These APIs are better suited to read_stream.c, which will be adjusted in a follow-up patch. Also move the computation of the each backend's fair share of the buffer pool to backend initialization time, since the answer doesn't change and we don't want to perform a division operation every time we compute availability. --- src/backend/storage/buffer/bufmgr.c | 75 ++++++++++++++++++++------- src/backend/storage/buffer/localbuf.c | 16 ++++++ src/include/storage/bufmgr.h | 4 ++ 3 files changed, 77 insertions(+), 18 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 80b0d0c5ded..2ca641204fb 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -211,6 +211,8 @@ static int32 PrivateRefCountOverflowed = 0; static uint32 PrivateRefCountClock = 0; static PrivateRefCountEntry *ReservedRefCountEntry = NULL; +static uint32 MaxProportionalPins; + static void ReservePrivateRefCountEntry(void); static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer); static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move); @@ -2097,6 +2099,46 @@ again: return buf; } +/* + * Return the maximum number of buffer than this backend should try to pin at + * once, to avoid pinning more than its fair share. This is the highest value + * that GetAdditionalPinLimit() and LimitAdditionalPins() could ever return. + * + * It's called a soft limit because nothing stops a backend from trying to + * acquire more pins than this this with ReadBuffer(), but code that wants more + * for I/O optimizations should respect this per-backend limit when it can + * still make progress without them. + */ +uint32 +GetSoftPinLimit(void) +{ + return MaxProportionalPins; +} + +/* + * Return the maximum number of additional buffers that this backend should + * pin if it wants to stay under the per-backend soft limit, considering the + * number of buffers it has already pinned. + */ +uint32 +GetAdditionalPinLimit(void) +{ + uint32 estimated_pins_held; + + /* + * We get the number of "overflowed" pins for free, but don't know the + * number of pins in PrivateRefCountArray. The cost of calculating that + * exactly doesn't seem worth it, so just assume the max. + */ + estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; + + /* Is this backend already holding more than its fair share? */ + if (estimated_pins_held > MaxProportionalPins) + return 0; + + return MaxProportionalPins - estimated_pins_held; +} + /* * Limit the number of pins a batch operation may additionally acquire, to * avoid running out of pinnable buffers. @@ -2112,28 +2154,15 @@ again: void LimitAdditionalPins(uint32 *additional_pins) { - uint32 max_backends; - int max_proportional_pins; + uint32 limit; if (*additional_pins <= 1) return; - max_backends = MaxBackends + NUM_AUXILIARY_PROCS; - max_proportional_pins = NBuffers / max_backends; - - /* - * Subtract the approximate number of buffers already pinned by this - * backend. We get the number of "overflowed" pins for free, but don't - * know the number of pins in PrivateRefCountArray. The cost of - * calculating that exactly doesn't seem worth it, so just assume the max. - */ - max_proportional_pins -= PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; - - if (max_proportional_pins <= 0) - max_proportional_pins = 1; - - if (*additional_pins > max_proportional_pins) - *additional_pins = max_proportional_pins; + limit = GetAdditionalPinLimit(); + limit = Max(limit, 1); + if (limit < *additional_pins) + *additional_pins = limit; } /* @@ -3574,6 +3603,16 @@ InitBufferManagerAccess(void) { HASHCTL hash_ctl; + /* + * The soft limit on the number of pins each backend should respect, bast + * on shared_buffers and the maximum number of connections possible. + * That's very pessimistic, but outside toy-sized shared_buffers it should + * allow plenty of pins. Higher level code that pins non-trivial numbers + * of buffers should use LimitAdditionalPins() or GetAdditionalPinLimit() + * to stay under this limit. + */ + MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); + memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray)); hash_ctl.keysize = sizeof(int32); diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 64931efaa75..3c055f6ec8b 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -286,6 +286,22 @@ GetLocalVictimBuffer(void) return BufferDescriptorGetBuffer(bufHdr); } +/* see GetSoftPinLimit() */ +uint32 +GetSoftLocalPinLimit(void) +{ + /* Every backend has its own temporary buffers, and can pin them all. */ + return num_temp_buffers; +} + +/* see GetAdditionalPinLimit() */ +uint32 +GetAdditionalLocalPinLimit(void) +{ + Assert(NLocalPinnedBuffers <= num_temp_buffers); + return num_temp_buffers - NLocalPinnedBuffers; +} + /* see LimitAdditionalPins() */ void LimitAdditionalLocalPins(uint32 *additional_pins) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 7c1e4316dde..597ecb97897 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -290,6 +290,10 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern uint32 GetSoftPinLimit(void); +extern uint32 GetSoftLocalPinLimit(void); +extern uint32 GetAdditionalPinLimit(void); +extern uint32 GetAdditionalLocalPinLimit(void); extern void LimitAdditionalPins(uint32 *additional_pins); extern void LimitAdditionalLocalPins(uint32 *additional_pins); -- 2.48.1
From b71e09f174c61adf475355e5fff4c01b6a0af399 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 24 Jan 2025 23:52:53 +1300 Subject: [PATCH v1 4/6] Respect pin limits accurately in read_stream.c. Read streams pin multiple buffers at once as required to combine I/O. This also avoids having to unpin and repin later when issuing read-ahead advice, and will be needed for proposed work that starts "real" asynchronous I/O. To avoid pinning too much of the buffer pool at once, we previously used LimitAdditionalBuffers() to avoid pinning more than this backend's fair share of the pool as a cap. The coding was a naive and only checked the cap once at stream initialization. This commit moves the check to the time of use with new bufmgr APIs from an earlier commit, since the result might change later due to pins acquired later outside this stream. No extra CPU cycles are added to the all-buffered fast-path code (it only pins one buffer at a time), but the I/O-starting path now re-checks the limit every time using simple arithmetic. In practice it was difficult to exceed the limit, but you could contrive a workload to do it using multiple CURSORs and FETCHing from sequential scans in round-robin fashion, so that each underlying stream computes its limit before all the others have ramped up to their full look-ahead distance. Therefore, no back-patch for now. Per code review from Andres, in the course of his AIO work. Reported-by: Andres Freund <and...@anarazel.de> --- src/backend/storage/aio/read_stream.c | 111 ++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 16 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index cb308948b6e..dc5ae60a089 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -115,6 +115,7 @@ struct ReadStream int16 pinned_buffers; int16 distance; bool advice_enabled; + bool temporary; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -274,7 +275,9 @@ read_stream_index_retreat(ReadStream *stream, int16 *index) #endif static void -read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +read_stream_start_pending_read(ReadStream *stream, + int16 buffer_limit, + bool suppress_advice) { bool need_wait; int nblocks; @@ -308,10 +311,14 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else flags = 0; - /* We say how many blocks we want to read, but may be smaller on return. */ + /* + * We say how many blocks we want to read, but may be smaller on return. + * On memory-constrained systems we may be also have to ask for a smaller + * read ourselves. + */ buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = stream->pending_read_nblocks; + nblocks = Min(buffer_limit, stream->pending_read_nblocks); need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -360,11 +367,60 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) stream->pending_read_nblocks -= nblocks; } +/* + * How many more buffers could we use, while respecting the soft limit? + */ +static int16 +read_stream_get_buffer_limit(ReadStream *stream) +{ + uint32 buffers; + + /* Check how many local or shared pins we could acquire. */ + if (stream->temporary) + buffers = GetAdditionalLocalPinLimit(); + else + buffers = GetAdditionalPinLimit(); + + /* + * Each stream is always allowed to try to acquire one pin if it doesn't + * hold one already. This is needed to guarantee progress, and just like + * the simple ReadBuffer() operation in code that is not using this stream + * API, if a buffer can't be pinned we'll raise an error when trying to + * pin, ie the buffer pool is simply too small for the workload. + */ + if (buffers == 0 && stream->pinned_buffers == 0) + return 1; + + /* + * Otherwise, see how many additional pins the backend can currently pin, + * which may be zero. As above, this only guarantees that this backend + * won't use more than its fair share if all backends can respect the soft + * limit, not that a pin can actually be acquired without error. + */ + return Min(buffers, INT16_MAX); +} + static void read_stream_look_ahead(ReadStream *stream, bool suppress_advice) { + int16 buffer_limit; + + /* + * Check how many pins we could acquire now. We do this here rather than + * pushing it down into read_stream_start_pending_read(), because it + * allows more flexibility in behavior when we run out of allowed pins. + * Currently the policy is to start an I/O when we've run out of allowed + * pins only if we have to to make progress, and otherwise to stop looking + * ahead until more pins become available, so that we don't start issuing + * a lot of smaller I/Os, prefering to build the largest ones we can. This + * choice is debatable, but it should only really come up with the buffer + * pool/connection ratio is very constrained. + */ + buffer_limit = read_stream_get_buffer_limit(stream); + while (stream->ios_in_progress < stream->max_ios && - stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + stream->pinned_buffers + stream->pending_read_nblocks < + Min(stream->distance, buffer_limit)) { BlockNumber blocknum; int16 buffer_index; @@ -372,7 +428,9 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) if (stream->pending_read_nblocks == io_combine_limit) { - read_stream_start_pending_read(stream, suppress_advice); + read_stream_start_pending_read(stream, buffer_limit, + suppress_advice); + buffer_limit = read_stream_get_buffer_limit(stream); suppress_advice = false; continue; } @@ -406,11 +464,12 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - read_stream_start_pending_read(stream, suppress_advice); + read_stream_start_pending_read(stream, buffer_limit, suppress_advice); + buffer_limit = read_stream_get_buffer_limit(stream); suppress_advice = false; - if (stream->ios_in_progress == stream->max_ios) + if (stream->ios_in_progress == stream->max_ios || buffer_limit == 0) { - /* And we've hit the limit. Rewind, and stop here. */ + /* And we've hit a limit. Rewind, and stop here. */ read_stream_unget_block(stream, blocknum); return; } @@ -426,16 +485,17 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) * limit, preferring to give it another chance to grow to full * io_combine_limit size once more buffers have been consumed. However, * if we've already reached io_combine_limit, or we've reached the - * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. + * distance limit or buffer limit and there isn't anything pinned yet, or + * the callback has signaled end-of-stream, we start the read immediately. */ if (stream->pending_read_nblocks > 0 && (stream->pending_read_nblocks == io_combine_limit || - (stream->pending_read_nblocks == stream->distance && + ((stream->pending_read_nblocks == stream->distance || + stream->pending_read_nblocks == buffer_limit) && stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) - read_stream_start_pending_read(stream, suppress_advice); + read_stream_start_pending_read(stream, buffer_limit, suppress_advice); } /* @@ -464,6 +524,7 @@ read_stream_begin_impl(int flags, int max_ios; int strategy_pin_limit; uint32 max_pinned_buffers; + uint32 max_possible_buffer_limit; Oid tablespace_id; /* @@ -507,12 +568,23 @@ read_stream_begin_impl(int flags, strategy_pin_limit = GetAccessStrategyPinLimit(strategy); max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers); - /* Don't allow this backend to pin more than its share of buffers. */ + /* + * Also limit by the maximum possible number of pins we could be allowed + * to acquire according to bufmgr. We may not be able to use them all due + * to other pins held by this backend, but we'll enforce the dynamic limit + * later when starting I/O. + */ if (SmgrIsTemp(smgr)) - LimitAdditionalLocalPins(&max_pinned_buffers); + max_possible_buffer_limit = GetSoftLocalPinLimit(); else - LimitAdditionalPins(&max_pinned_buffers); - Assert(max_pinned_buffers > 0); + max_possible_buffer_limit = GetSoftPinLimit(); + max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit); + + /* + * The soft limit might be zero on a system configured with more + * connections than buffers. We need at least one. + */ + max_pinned_buffers = Max(1, max_pinned_buffers); /* * We need one extra entry for buffers and per-buffer data, because users @@ -572,6 +644,7 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->temporary = SmgrIsTemp(smgr); /* * Skip the initial ramp-up phase if the caller says we're going to be @@ -700,6 +773,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * arbitrary I/O entry (they're all free). We don't have to * adjust pinned_buffers because we're transferring one to caller * but pinning one more. + * + * In the fast path we don't need to check the pin limit. We're + * always allowed at least one pin so that progress can be made, + * and that's all we need here. Although two pins are momentarily + * held at the same time, the model used here is that the stream + * holds only one, and the other now belongs to the caller. */ if (likely(!StartReadBuffer(&stream->ios[0].op, &stream->buffers[oldest_buffer_index], -- 2.48.1
From 4e2250ad385572c560c89ed0cf3e345c47651b8c Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Thu, 30 Jan 2025 11:42:03 +1300 Subject: [PATCH v1 5/6] Support buffer forwarding in read_stream.c. In preparation for a following change to the buffer manager, teach read stream to keep track of buffers that were "forwarded" from one call to StartReadBuffers() to the next. Since StartReadBuffers() buffers argument will become an in/out argument, we need to initialize the buffer queue entries with InvalidBuffer. We don't want to do that up front, because we try to keep stream initialization cheap and code that uses the fast path stays in one single buffer queue element. Satisfy both goals by initializing the queue incrementally on the first cycle. --- src/backend/storage/aio/read_stream.c | 108 ++++++++++++++++++++++---- 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index dc5ae60a089..049fda98257 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -112,8 +112,10 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; + int16 forwarded_buffers; int16 pinned_buffers; int16 distance; + int16 initialized_buffers; bool advice_enabled; bool temporary; @@ -280,7 +282,9 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { bool need_wait; + int requested_nblocks; int nblocks; + int forwarded; int flags; int16 io_index; int16 overflow; @@ -312,13 +316,34 @@ read_stream_start_pending_read(ReadStream *stream, flags = 0; /* - * We say how many blocks we want to read, but may be smaller on return. - * On memory-constrained systems we may be also have to ask for a smaller - * read ourselves. + * On buffer-constrained systems we may need to limit the I/O size by the + * available pin count. */ + requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks); + nblocks = requested_nblocks; buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = Min(buffer_limit, stream->pending_read_nblocks); + + /* + * The first time around the queue we initialize it as we go, including + * the overflow zone, because otherwise the entries would appear as + * forwarded buffers. This avoids initializing the whole queue up front + * in cases where it is large but we don't ever use it due to the + * all-cached fast path or small scans. + */ + while (stream->initialized_buffers < buffer_index + nblocks) + stream->buffers[stream->initialized_buffers++] = InvalidBuffer; + + /* + * Start the I/O. Any buffers that are not InvalidBuffer will be + * interpreted as already pinned, forwarded by an earlier call to + * StartReadBuffers(), and must map to the expected blocks. The nblocks + * value may be smaller on return indicating the size of the I/O that + * could be started. Buffers beyond the output nblocks number may also + * have been pinned without starting I/O due to various edge cases. In + * that case we'll just leave them in the queue ahead of us, "forwarded" + * to the next call, avoiding the need to unpin/repin. + */ need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -347,16 +372,35 @@ read_stream_start_pending_read(ReadStream *stream, stream->seq_blocknum = stream->pending_read_blocknum + nblocks; } + /* + * How many pins were acquired but forwarded to the next call? These need + * to be passed to the next StartReadBuffers() call, or released if the + * stream ends early. We need the number for accounting purposes, since + * they are not counted in stream->pinned_buffers but we already hold + * them. + */ + forwarded = 0; + while (nblocks + forwarded < requested_nblocks && + stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) + forwarded++; + stream->forwarded_buffers = forwarded; + /* * We gave a contiguous range of buffer space to StartReadBuffers(), but - * we want it to wrap around at queue_size. Slide overflowing buffers to - * the front of the array. + * we want it to wrap around at queue_size. Copy overflowing buffers to + * the front of the array where they'll be consumed, but also leave a copy + * in the overflow zone which the I/O operation has a pointer to (it needs + * a contiguous array). Both copies will be cleared when the buffers are + * handed to the consumer. */ - overflow = (buffer_index + nblocks) - stream->queue_size; + overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; if (overflow > 0) - memmove(&stream->buffers[0], - &stream->buffers[stream->queue_size], - sizeof(stream->buffers[0]) * overflow); + { + Assert(overflow < stream->queue_size); /* can't overlap */ + memcpy(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + } /* Move to the location of start of next read. */ read_stream_index_advance_n(stream, &buffer_index, nblocks); @@ -381,6 +425,15 @@ read_stream_get_buffer_limit(ReadStream *stream) else buffers = GetAdditionalPinLimit(); + /* + * If we already have some forwarded buffers, we can certainly use those. + * They are already pinned, and are mapped to the starting blocks of the + * pending read, they just don't have any I/O started yet and are not + * counted in stream->pinned_buffers. + */ + Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + buffers += stream->forwarded_buffers; + /* * Each stream is always allowed to try to acquire one pin if it doesn't * hold one already. This is needed to guarantee progress, and just like @@ -389,7 +442,7 @@ read_stream_get_buffer_limit(ReadStream *stream) * pin, ie the buffer pool is simply too small for the workload. */ if (buffers == 0 && stream->pinned_buffers == 0) - return 1; + buffers = 1; /* * Otherwise, see how many additional pins the backend can currently pin, @@ -751,10 +804,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -803,6 +858,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; + stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -887,10 +943,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) } } -#ifdef CLOBBER_FREED_MEMORY - /* Clobber old buffer for debugging purposes. */ + /* + * We must zap this queue entry, or else it would appear as a forwarded + * buffer. If it's potentially in the overflow zone (ie it wrapped around + * the queue), also zap that copy. + */ stream->buffers[oldest_buffer_index] = InvalidBuffer; -#endif + if (oldest_buffer_index < io_combine_limit - 1) + stream->buffers[stream->queue_size + oldest_buffer_index] = + InvalidBuffer; #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) @@ -933,6 +994,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && + stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && @@ -968,6 +1030,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) void read_stream_reset(ReadStream *stream) { + int16 index; Buffer buffer; /* Stop looking ahead. */ @@ -981,6 +1044,23 @@ read_stream_reset(ReadStream *stream) while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); + /* Unpin any unused forwarded buffers. */ + index = stream->next_buffer_index; + while (index < stream->initialized_buffers && + (buffer = stream->buffers[index]) != InvalidBuffer) + { + Assert(stream->forwarded_buffers > 0); + stream->forwarded_buffers--; + ReleaseBuffer(buffer); + + stream->buffers[index] = InvalidBuffer; + if (index < io_combine_limit - 1) + stream->buffers[stream->queue_size + index] = InvalidBuffer; + + read_stream_index_advance(stream, &index); + } + + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0); -- 2.48.1
From 37316501bfbe9fc6fc86f72351e18d6f103b9077 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 10 Feb 2025 21:55:40 +1300 Subject: [PATCH v1 6/6] Support buffer forwarding in StartReadBuffers(). Sometimes we have to perform a short read because we hit a cached block that ends a contiguous run of blocks requiring I/O. We don't want StartReadBuffers() to have to start more than one I/O, so we stop there. We also don't want to have to unpin the cached block (and repin it later), so previously we'd silently pretend the hit was part of the I/O, and just leave it out of the read from disk. Now, we'll "forward" it to the next call. We still write it to the buffers[] array for the caller to pass back to us later, but it's not included in *nblocks. This policy means that we no longer mix hits and misses in a single operation's results, so we avoid the requirement to call WaitReadBuffers(), which might stall, before the caller can make use of the hits. The caller will get the hit in the next call instead, and know that it doesn't have to wait. That's important for later work on out-of-order read streams that minimize I/O stalls. This also makes life easier for proposed work on true AIO, which occasionally needs to split a large I/O after pinning all the buffers, while the current coding only ever forwards a single bookending hit. This API is natural for read_stream.c: it just leaves forwarded buffers where they are in its circular queue, where the next call will pick them up and continue, minimizing pin churn. If we ever think of a good reason to disable this feature, i.e. for other users of StartReadBuffers() that don't want to deal with forwarded buffers, then we could add a flag for that. For now read_steam.c is the only user. --- src/backend/storage/buffer/bufmgr.c | 128 ++++++++++++++++++++-------- src/include/storage/bufmgr.h | 1 - 2 files changed, 91 insertions(+), 38 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 2ca641204fb..6fe70328e38 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1257,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, Buffer *buffers, BlockNumber blockNum, int *nblocks, - int flags) + int flags, + bool allow_forwarding) { int actual_nblocks = *nblocks; - int io_buffers_len = 0; int maxcombine = 0; Assert(*nblocks > 0); @@ -1270,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, { bool found; - buffers[i] = PinBufferForBlock(operation->rel, - operation->smgr, - operation->persistence, - operation->forknum, - blockNum + i, - operation->strategy, - &found); + if (allow_forwarding && buffers[i] != InvalidBuffer) + { + BufferDesc *bufHdr; + + /* + * This is a buffer that was pinned by an earlier call to + * StartReadBuffers(), but couldn't be handled in one operation at + * that time. The operation was split, and the caller has passed + * an already pinned buffer back to us to handle the rest of the + * operation. It must continue at the expected block number. + */ + Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i); + + /* + * It might be an already valid buffer (a hit) that followed the + * final contiguous block of an earlier I/O (a miss) marking the + * end of it, or a buffer that some other backend has since made + * valid by performing the I/O for us, in which case we can handle + * it as a hit now. It is safe to check for a BM_VALID flag with + * a relaxed load, because we got a fresh view of it while pinning + * it in the previous call. + * + * On the other hand if we don't see BM_VALID yet, it must be an + * I/O that was split by the previous call and we need to try to + * start a new I/O from this block. We're also racing against any + * other backend that might start the I/O or even manage to mark + * it BM_VALID after this check, BM_VALID after this check, but + * StartBufferIO() will handle those cases. + */ + if (BufferIsLocal(buffers[i])) + bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1); + else + bufHdr = GetBufferDescriptor(buffers[i] - 1); + found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID; + } + else + { + buffers[i] = PinBufferForBlock(operation->rel, + operation->smgr, + operation->persistence, + operation->forknum, + blockNum + i, + operation->strategy, + &found); + } if (found) { /* - * Terminate the read as soon as we get a hit. It could be a - * single buffer hit, or it could be a hit that follows a readable - * range. We don't want to create more than one readable range, - * so we stop here. + * We have a hit. If it's the first block in the requested range, + * we can return it immediately and report that WaitReadBuffers() + * does not need to be called. If the initial value of *nblocks + * was larger, the caller will have to call again for the rest. */ - actual_nblocks = i + 1; + if (i == 0) + { + *nblocks = 1; + return false; + } + + /* + * Otherwise we already have an I/O to perform, but this block + * can't be included as it is already valid. Split the I/O here. + * There may or may not be more blocks requiring I/O after this + * one, we haven't checked, but it can't be contiguous with this + * hit in the way. We'll leave this buffer pinned, forwarding it + * to the next call, avoiding the need to unpin it here and re-pin + * it in the next call. + */ + actual_nblocks = i; break; } else { - /* Extend the readable range to cover this block. */ - io_buffers_len++; - /* * Check how many blocks we can cover with the same IO. The smgr * implementation might e.g. be limited due to a segment boundary. @@ -1314,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, } *nblocks = actual_nblocks; - if (likely(io_buffers_len == 0)) - return false; - /* Populate information needed for I/O. */ operation->buffers = buffers; operation->blocknum = blockNum; operation->flags = flags; operation->nblocks = actual_nblocks; - operation->io_buffers_len = io_buffers_len; if (flags & READ_BUFFERS_ISSUE_ADVICE) { @@ -1337,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, smgrprefetch(operation->smgr, operation->forknum, blockNum, - operation->io_buffers_len); + actual_nblocks); } /* Indicate that WaitReadBuffers() should be called. */ @@ -1351,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, * actual number, which may be fewer than requested. Caller sets some of the * members of operation; see struct definition. * + * The initial contents of the elements of buffers up to *nblocks should + * either be InvalidBuffer or an already-pinned buffer that was left by an + * preceding call to StartReadBuffers() that had to be split. On return, some + * elements of buffers may hold pinned buffers beyond the number indicated by + * the updated value of *nblocks. Operations are split on boundaries known to + * smgr (eg md.c segment boundaries that require crossing into a different + * underlying file), or when already cached blocks are found in the buffer + * that prevent the formation of a contiguous read. + * * If false is returned, no I/O is necessary. If true is returned, one I/O * has been started, and WaitReadBuffers() must be called with the same * operation object before the buffers are accessed. Along with the operation * object, the caller-supplied array of buffers must remain valid until - * WaitReadBuffers() is called. + * WaitReadBuffers() is called, and any forwarded buffers must also be + * preserved for a future call unless explicitly released. * * Currently the I/O is only started with optional operating system advice if * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O @@ -1369,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation, int *nblocks, int flags) { - return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags); + return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags, + true /* expect forwarded buffers */ ); } /* * Single block version of the StartReadBuffers(). This might save a few * instructions when called from another translation unit, because it is * specialized for nblocks == 1. + * + * This version does not support "forwarded" buffers: they cannot be created + * by reading only one block, and the current contents of *buffer is ignored + * on entry. */ bool StartReadBuffer(ReadBuffersOperation *operation, @@ -1386,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation, int nblocks = 1; bool result; - result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); + result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags, + false /* single block, no forwarding */ ); Assert(nblocks == 1); /* single block can't be short */ return result; @@ -1416,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation) IOObject io_object; char persistence; - /* - * Currently operations are only allowed to include a read of some range, - * with an optional extra buffer that is already pinned at the end. So - * nblocks can be at most one more than io_buffers_len. - */ - Assert((operation->nblocks == operation->io_buffers_len) || - (operation->nblocks == operation->io_buffers_len + 1)); - /* Find the range of the physical read we need to perform. */ - nblocks = operation->io_buffers_len; - if (nblocks == 0) - return; /* nothing to do */ - + nblocks = operation->nblocks; buffers = &operation->buffers[0]; blocknum = operation->blocknum; forknum = operation->forknum; persistence = operation->persistence; + Assert(nblocks > 0); + Assert(nblocks <= MAX_IO_COMBINE_LIMIT); + if (persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 597ecb97897..4a035f59a7d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -130,7 +130,6 @@ struct ReadBuffersOperation BlockNumber blocknum; int flags; int16 nblocks; - int16 io_buffers_len; }; typedef struct ReadBuffersOperation ReadBuffersOperation; -- 2.48.1