On Fri, Feb 28, 2025 at 2:29 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > On Fri, Feb 28, 2025 at 11:58 AM Melanie Plageman > <melanieplage...@gmail.com> wrote: > > On Thu, Feb 27, 2025 at 1:08 PM Tom Lane <t...@sss.pgh.pa.us> wrote: > > > I wonder if it'd be a good idea to add something like > > > > > > Assert(stream->distance == 1); > > > Assert(stream->pending_read_nblocks == 0); > > > Assert(stream->per_buffer_data_size == 0); > > > + Assert(per_buffer_data == NULL); > > > > > > in read_stream_next_buffer. I doubt that this will shut Coverity > > > up, but it would help to catch caller coding errors, i.e. passing > > > a per_buffer_data pointer when there's no per-buffer data. > > > > I think this is a good stopgap. I was discussing adding this assert > > off-list with Thomas and he wanted to detail his more ambitious plans > > for type safety improvements in the read stream API. Less on the order > > of a redesign and more like a separate read_stream_next_buffer()s for > > when there is per buffer data and when there isn't. And a by-value and > > by-reference version for the one where there is data. > > Here's what I had in mind. Is it better?
Here's a slightly better one. I think when you use read_stream_get_buffer_and_value(stream, &value), or read_stream_put_value(stream, space, value), then we should assert that sizeof(value) strictly matches the available space, as shown. But, new in v2, if you use read_stream_get_buffer_and_pointer(stream, &pointer), then sizeof(*pointer) should only have to be <= the storage space, not ==, because someone might plausibly want to make per_buffer_data_size variable at runtime (ie decide when they construct the stream), and then be able to retrieve a pointer to the start of a struct with a flexible array or something like that. In v1 I was just trying to assert that it was a pointer-to-a-pointer-to-something and no more (in a confusing compile-time assertion), but v2 is simpler, and is happy with a pointer to a pointer to something that doesn't exceed the space (run-time assertion).
From b2dd9c90f970a889deea2c2e9e16097e4e06ece8 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.munro@gmail.com> Date: Fri, 28 Feb 2025 10:48:29 +1300 Subject: [PATCH v2 1/2] Improve API for retrieving data from read streams. Dealing with the per_buffer_data argument to read_stream_next_buffer() has proven a bit clunky. Provide some new wrapper functions/macros: buffer = read_stream_get_buffer(rs); buffer = read_stream_get_buffer_and_value(rs, &my_int); buffer = read_stream_get_buffer_and_pointer(rs, &my_pointer_to_int); These improve readability and type safety via assertions. --- contrib/pg_prewarm/pg_prewarm.c | 4 +- contrib/pg_visibility/pg_visibility.c | 6 +-- src/backend/access/heap/heapam.c | 2 +- src/backend/access/heap/heapam_handler.c | 2 +- src/backend/access/heap/vacuumlazy.c | 6 +-- src/backend/storage/aio/read_stream.c | 12 ++++++ src/backend/storage/buffer/bufmgr.c | 4 +- src/include/storage/read_stream.h | 55 ++++++++++++++++++++++++ 8 files changed, 78 insertions(+), 13 deletions(-) diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c index a2f0ac4af0c..f6ae266d7b0 100644 --- a/contrib/pg_prewarm/pg_prewarm.c +++ b/contrib/pg_prewarm/pg_prewarm.c @@ -208,11 +208,11 @@ pg_prewarm(PG_FUNCTION_ARGS) Buffer buf; CHECK_FOR_INTERRUPTS(); - buf = read_stream_next_buffer(stream, NULL); + buf = read_stream_get_buffer(stream); ReleaseBuffer(buf); ++blocks_done; } - Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + Assert(read_stream_get_buffer(stream) == InvalidBuffer); read_stream_end(stream); } diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c index 7f268a18a74..e7187a46c9d 100644 --- a/contrib/pg_visibility/pg_visibility.c +++ b/contrib/pg_visibility/pg_visibility.c @@ -556,7 +556,7 @@ collect_visibility_data(Oid relid, bool include_pd) Buffer buffer; Page page; - buffer = read_stream_next_buffer(stream, NULL); + buffer = read_stream_get_buffer(stream); LockBuffer(buffer, BUFFER_LOCK_SHARE); page = BufferGetPage(buffer); @@ -569,7 +569,7 @@ collect_visibility_data(Oid relid, bool include_pd) if (include_pd) { - Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + Assert(read_stream_get_buffer(stream) == InvalidBuffer); read_stream_end(stream); } @@ -752,7 +752,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) 0); /* Loop over every block in the relation. */ - while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) + while ((buffer = read_stream_get_buffer(stream)) != InvalidBuffer) { bool check_frozen = all_frozen; bool check_visible = all_visible; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index fa7935a0ed3..86f280069e0 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -609,7 +609,7 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) scan->rs_dir = dir; - scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + scan->rs_cbuf = read_stream_get_buffer(scan->rs_read_stream); if (BufferIsValid(scan->rs_cbuf)) scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index e78682c3cef..7487896b06c 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1010,7 +1010,7 @@ heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream) * re-acquire sharelock for each tuple, but since we aren't doing much * work per tuple, the extra lock traffic is probably better avoided. */ - hscan->rs_cbuf = read_stream_next_buffer(stream, NULL); + hscan->rs_cbuf = read_stream_get_buffer(stream); if (!BufferIsValid(hscan->rs_cbuf)) return false; diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 1af18a78a2b..ac7a4d8c21d 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -1230,7 +1230,6 @@ lazy_scan_heap(LVRelState *vacrel) Page page; uint8 blk_info = 0; bool has_lpdead_items; - void *per_buffer_data = NULL; bool vm_page_frozen = false; bool got_cleanup_lock = false; @@ -1287,13 +1286,12 @@ lazy_scan_heap(LVRelState *vacrel) PROGRESS_VACUUM_PHASE_SCAN_HEAP); } - buf = read_stream_next_buffer(stream, &per_buffer_data); + buf = read_stream_get_buffer_and_value(stream, &blk_info); /* The relation is exhausted. */ if (!BufferIsValid(buf)) break; - blk_info = *((uint8 *) per_buffer_data); CheckBufferIsPinnedOnce(buf); page = BufferGetPage(buf); blkno = BufferGetBlockNumber(buf); @@ -2740,7 +2738,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel) vacuum_delay_point(false); - buf = read_stream_next_buffer(stream, (void **) &iter_result); + buf = read_stream_get_buffer_and_pointer(stream, &iter_result); /* The relation is exhausted */ if (!BufferIsValid(buf)) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 04bdb5e6d4b..0f1332c46f6 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -615,6 +615,9 @@ read_stream_begin_smgr_relation(int flags, * valid until the next call to read_stream_next_buffer(). When the stream * runs out of data, InvalidBuffer is returned. The caller may decide to end * the stream early at any time by calling read_stream_end(). + * + * See read_stream.h for read_stream_get_buffer() and variants that provide + * some degree of type safety for the per_buffer_data argument. */ Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) @@ -840,6 +843,15 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) return read_stream_get_block(stream, NULL); } +/* + * Return the configured per-buffer data size, for use in assertions. + */ +size_t +read_stream_per_buffer_data_size(ReadStream *stream) +{ + return stream->per_buffer_data_size; +} + /* * Reset a read stream by releasing any queued up buffers, allowing the stream * to be used again for different blocks. This can be used to clear an diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7915ed624c1..f4cedd15109 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -4690,7 +4690,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator, CHECK_FOR_INTERRUPTS(); /* Read block from source relation. */ - srcBuf = read_stream_next_buffer(src_stream, NULL); + srcBuf = read_stream_get_buffer(src_stream); LockBuffer(srcBuf, BUFFER_LOCK_SHARE); srcPage = BufferGetPage(srcBuf); @@ -4715,7 +4715,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator, UnlockReleaseBuffer(dstBuf); UnlockReleaseBuffer(srcBuf); } - Assert(read_stream_next_buffer(src_stream, NULL) == InvalidBuffer); + Assert(read_stream_get_buffer(src_stream) == InvalidBuffer); read_stream_end(src_stream); FreeAccessStrategy(bstrategy_src); diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index c11d8ce3300..c6066c0f296 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -70,6 +70,7 @@ extern ReadStream *read_stream_begin_relation(int flags, extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data); extern BlockNumber read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy); +extern size_t read_stream_per_buffer_data_size(ReadStream *stream); extern ReadStream *read_stream_begin_smgr_relation(int flags, BufferAccessStrategy strategy, SMgrRelation smgr, @@ -81,4 +82,58 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); +/* + * Get the next buffer from a stream that is not using per-buffer data. + */ +static inline Buffer +read_stream_get_buffer(ReadStream *stream) +{ + Assert(read_stream_per_buffer_data_size(stream) == 0); + return read_stream_next_buffer(stream, NULL); +} + +/* + * Helper for read_stream_get_buffer_and_value(). + */ +static inline Buffer +read_stream_get_buffer_and_value_with_size(ReadStream *stream, + void *output_data, + size_t output_data_size) +{ + Buffer buffer; + void *per_buffer_data; + + Assert(read_stream_per_buffer_data_size(stream) == output_data_size); + buffer = read_stream_next_buffer(stream, &per_buffer_data); + if (buffer != InvalidBuffer) + memcpy(output_data, per_buffer_data, output_data_size); + + return buffer; +} + +/* + * Get the next buffer and a copy of the associated per-buffer data. + * InvalidBuffer means end-of-stream, and in that case the per-buffer data is + * undefined. Example of use: + * + * int my_int; + * + * buf = read_stream_get_buffer_and_value(stream, &my_int); + */ +#define read_stream_get_buffer_and_value(stream, vp) \ + read_stream_get_buffer_and_value_with_size((stream), (vp), sizeof(*(vp))) + +/* + * Get the next buffer and a pointer to the associated per-buffer data. This + * avoids casts in the calling code, and asserts that we received a pointer to + * a pointer to a type that doesn't exceed the storage size. For example: + * + * int *my_int_p; + * + * buf = read_stream_get_buffer_and_pointer(stream, &my_int_p); + */ +#define read_stream_get_buffer_and_pointer(stream, pointer) \ + (AssertMacro(sizeof(**(pointer)) <= read_stream_per_buffer_data_size(stream)), \ + read_stream_next_buffer((stream), ((void **) (pointer)))) + #endif /* READ_STREAM_H */ -- 2.48.1
From 2586d9c7321391168a40cb0f14e5a80182792b64 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.munro@gmail.com> Date: Fri, 28 Feb 2025 12:53:23 +1300 Subject: [PATCH v2 2/2] Improve API for storing data in read streams. Read stream callbacks receive a void pointer into the per-buffer data queue so that can store data there for later retrieval by the buffer consumer. We can improve readability and safety a bit by changing cast-and-assign or raw memcpy() to: read_stream_put_value(stream, per_buffer_data, my_int); This form infers the size and asserts that the storage space matches, generally mirroring the read_stream_get_buffer_and_value() call used for retrieving the streamed data later. --- src/backend/access/heap/vacuumlazy.c | 6 +++--- src/include/storage/read_stream.h | 9 +++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index ac7a4d8c21d..9563906fb27 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -1612,7 +1612,7 @@ heap_vac_scan_next_block(ReadStream *stream, */ vacrel->current_block = next_block; blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM; - *((uint8 *) per_buffer_data) = blk_info; + read_stream_put_value(stream, per_buffer_data, blk_info); return vacrel->current_block; } else @@ -1628,7 +1628,7 @@ heap_vac_scan_next_block(ReadStream *stream, blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM; if (vacrel->next_unskippable_eager_scanned) blk_info |= VAC_BLK_WAS_EAGER_SCANNED; - *((uint8 *) per_buffer_data) = blk_info; + read_stream_put_value(stream, per_buffer_data, blk_info); return vacrel->current_block; } } @@ -2671,7 +2671,7 @@ vacuum_reap_lp_read_stream_next(ReadStream *stream, * Save the TidStoreIterResult for later, so we can extract the offsets. * It is safe to copy the result, according to TidStoreIterateNext(). */ - memcpy(per_buffer_data, iter_result, sizeof(*iter_result)); + read_stream_put_value(stream, per_buffer_data, *iter_result); return iter_result->blkno; } diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index c6066c0f296..5af801969b4 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -136,4 +136,13 @@ read_stream_get_buffer_and_value_with_size(ReadStream *stream, (AssertMacro(sizeof(**(pointer)) <= read_stream_per_buffer_data_size(stream)), \ read_stream_next_buffer((stream), ((void **) (pointer)))) +/* + * Set the per-buffer data by value. This can be called from inside a + * callback that is returning block numbers. It asserts that the value's size + * matches the available space. + */ +#define read_stream_put_value(stream, per_buffer_data, value) \ + (AssertMacro(sizeof(value) == read_stream_per_buffer_data_size(stream)), \ + memcpy((per_buffer_data), &(value), sizeof(value))) + #endif /* READ_STREAM_H */ -- 2.48.1