On Fri, Feb 28, 2025 at 11:58 AM Melanie Plageman <melanieplage...@gmail.com> wrote: > On Thu, Feb 27, 2025 at 1:08 PM Tom Lane <t...@sss.pgh.pa.us> wrote: > > I wonder if it'd be a good idea to add something like > > > > Assert(stream->distance == 1); > > Assert(stream->pending_read_nblocks == 0); > > Assert(stream->per_buffer_data_size == 0); > > + Assert(per_buffer_data == NULL); > > > > in read_stream_next_buffer. I doubt that this will shut Coverity > > up, but it would help to catch caller coding errors, i.e. passing > > a per_buffer_data pointer when there's no per-buffer data. > > I think this is a good stopgap. I was discussing adding this assert > off-list with Thomas and he wanted to detail his more ambitious plans > for type safety improvements in the read stream API. Less on the order > of a redesign and more like a separate read_stream_next_buffer()s for > when there is per buffer data and when there isn't. And a by-value and > by-reference version for the one where there is data.
Here's what I had in mind. Is it better?
From 68e9424b590051959142917459eb4ea074589b79 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 28 Feb 2025 10:48:29 +1300 Subject: [PATCH 1/2] Improve API for retrieving data from read streams. Dealing with the per_buffer_data argument to read_stream_next_buffer() has proven a bit clunky. Provide some new wrapper functions/macros: buffer = read_stream_get_buffer(rs); buffer = read_stream_get_buffer_and_value(rs, &my_int); buffer = read_stream_get_buffer_and_pointer(rs, &my_pointer_to_int); These improve readability and type safety via assertions. --- contrib/pg_prewarm/pg_prewarm.c | 4 +- contrib/pg_visibility/pg_visibility.c | 6 +-- src/backend/access/heap/heapam.c | 2 +- src/backend/access/heap/heapam_handler.c | 2 +- src/backend/access/heap/vacuumlazy.c | 6 +-- src/backend/storage/aio/read_stream.c | 12 +++++ src/backend/storage/buffer/bufmgr.c | 4 +- src/include/storage/read_stream.h | 64 ++++++++++++++++++++++++ 8 files changed, 87 insertions(+), 13 deletions(-) diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c index a2f0ac4af0c..f6ae266d7b0 100644 --- a/contrib/pg_prewarm/pg_prewarm.c +++ b/contrib/pg_prewarm/pg_prewarm.c @@ -208,11 +208,11 @@ pg_prewarm(PG_FUNCTION_ARGS) Buffer buf; CHECK_FOR_INTERRUPTS(); - buf = read_stream_next_buffer(stream, NULL); + buf = read_stream_get_buffer(stream); ReleaseBuffer(buf); ++blocks_done; } - Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + Assert(read_stream_get_buffer(stream) == InvalidBuffer); read_stream_end(stream); } diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c index 7f268a18a74..e7187a46c9d 100644 --- a/contrib/pg_visibility/pg_visibility.c +++ b/contrib/pg_visibility/pg_visibility.c @@ -556,7 +556,7 @@ collect_visibility_data(Oid relid, bool include_pd) Buffer buffer; Page page; - buffer = read_stream_next_buffer(stream, NULL); + buffer = read_stream_get_buffer(stream); LockBuffer(buffer, BUFFER_LOCK_SHARE); page = BufferGetPage(buffer); @@ -569,7 +569,7 @@ collect_visibility_data(Oid relid, bool include_pd) if (include_pd) { - Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + Assert(read_stream_get_buffer(stream) == InvalidBuffer); read_stream_end(stream); } @@ -752,7 +752,7 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) 0); /* Loop over every block in the relation. */ - while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) + while ((buffer = read_stream_get_buffer(stream)) != InvalidBuffer) { bool check_frozen = all_frozen; bool check_visible = all_visible; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index fa7935a0ed3..86f280069e0 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -609,7 +609,7 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) scan->rs_dir = dir; - scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + scan->rs_cbuf = read_stream_get_buffer(scan->rs_read_stream); if (BufferIsValid(scan->rs_cbuf)) scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index e78682c3cef..7487896b06c 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1010,7 +1010,7 @@ heapam_scan_analyze_next_block(TableScanDesc scan, ReadStream *stream) * re-acquire sharelock for each tuple, but since we aren't doing much * work per tuple, the extra lock traffic is probably better avoided. */ - hscan->rs_cbuf = read_stream_next_buffer(stream, NULL); + hscan->rs_cbuf = read_stream_get_buffer(stream); if (!BufferIsValid(hscan->rs_cbuf)) return false; diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 1af18a78a2b..ac7a4d8c21d 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -1230,7 +1230,6 @@ lazy_scan_heap(LVRelState *vacrel) Page page; uint8 blk_info = 0; bool has_lpdead_items; - void *per_buffer_data = NULL; bool vm_page_frozen = false; bool got_cleanup_lock = false; @@ -1287,13 +1286,12 @@ lazy_scan_heap(LVRelState *vacrel) PROGRESS_VACUUM_PHASE_SCAN_HEAP); } - buf = read_stream_next_buffer(stream, &per_buffer_data); + buf = read_stream_get_buffer_and_value(stream, &blk_info); /* The relation is exhausted. */ if (!BufferIsValid(buf)) break; - blk_info = *((uint8 *) per_buffer_data); CheckBufferIsPinnedOnce(buf); page = BufferGetPage(buf); blkno = BufferGetBlockNumber(buf); @@ -2740,7 +2738,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel) vacuum_delay_point(false); - buf = read_stream_next_buffer(stream, (void **) &iter_result); + buf = read_stream_get_buffer_and_pointer(stream, &iter_result); /* The relation is exhausted */ if (!BufferIsValid(buf)) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 04bdb5e6d4b..0f1332c46f6 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -615,6 +615,9 @@ read_stream_begin_smgr_relation(int flags, * valid until the next call to read_stream_next_buffer(). When the stream * runs out of data, InvalidBuffer is returned. The caller may decide to end * the stream early at any time by calling read_stream_end(). + * + * See read_stream.h for read_stream_get_buffer() and variants that provide + * some degree of type safety for the per_buffer_data argument. */ Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) @@ -840,6 +843,15 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) return read_stream_get_block(stream, NULL); } +/* + * Return the configured per-buffer data size, for use in assertions. + */ +size_t +read_stream_per_buffer_data_size(ReadStream *stream) +{ + return stream->per_buffer_data_size; +} + /* * Reset a read stream by releasing any queued up buffers, allowing the stream * to be used again for different blocks. This can be used to clear an diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7915ed624c1..f4cedd15109 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -4690,7 +4690,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator, CHECK_FOR_INTERRUPTS(); /* Read block from source relation. */ - srcBuf = read_stream_next_buffer(src_stream, NULL); + srcBuf = read_stream_get_buffer(src_stream); LockBuffer(srcBuf, BUFFER_LOCK_SHARE); srcPage = BufferGetPage(srcBuf); @@ -4715,7 +4715,7 @@ RelationCopyStorageUsingBuffer(RelFileLocator srclocator, UnlockReleaseBuffer(dstBuf); UnlockReleaseBuffer(srcBuf); } - Assert(read_stream_next_buffer(src_stream, NULL) == InvalidBuffer); + Assert(read_stream_get_buffer(src_stream) == InvalidBuffer); read_stream_end(src_stream); FreeAccessStrategy(bstrategy_src); diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index c11d8ce3300..68c9340b0e3 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -70,6 +70,7 @@ extern ReadStream *read_stream_begin_relation(int flags, extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_data); extern BlockNumber read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy); +extern size_t read_stream_per_buffer_data_size(ReadStream *stream); extern ReadStream *read_stream_begin_smgr_relation(int flags, BufferAccessStrategy strategy, SMgrRelation smgr, @@ -81,4 +82,67 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); +/* + * Get the next buffer from a stream that is not using per-buffer data. + */ +static inline Buffer +read_stream_get_buffer(ReadStream *stream) +{ + Assert(read_stream_per_buffer_data_size(stream) == 0); + return read_stream_next_buffer(stream, NULL); +} + +/* + * Inlinable helper for read_stream_get_buffer_and_value() macro. + */ +static inline Buffer +read_stream_get_buffer_and_value_with_size(ReadStream *stream, + void *output_data, + size_t output_data_size) +{ + Buffer buffer; + void *per_buffer_data; + + Assert(read_stream_per_buffer_data_size(stream) == output_data_size); + buffer = read_stream_next_buffer(stream, &per_buffer_data); + if (buffer != InvalidBuffer) + memcpy(output_data, per_buffer_data, output_data_size); + + return buffer; +} + +/* + * Get the next buffer and a copy of the associated per-buffer data. + * InvalidBuffer means end-of-stream, and in that case the per-buffer data is + * undefined. Example of use: + * + * int my_int; + * + * buf = read_stream_get_buffer_and_value(stream, &my_int); + */ +#define read_stream_get_buffer_and_value(stream, vp) \ + read_stream_get_buffer_and_value_with_size((stream), (vp), sizeof(*(vp))) + +/* + * Get the next buffer and a pointer to the associated per-buffer data. This + * avoids casts, while still checking that we have the expected level of + * indirection. InvalidBuffer means end-of-stream, and in that case the output + * pointer is undefined. Otherwise the output pointer should only be + * dereferenced up until the next call. For example: + * + * int *my_int_p; + * + * buf = read_stream_get_buffer_and_pointer(stream, &my_int_p); + */ +#if HAVE__BUILTIN_TYPES_COMPATIBLE_P +#define read_stream_get_buffer_and_pointer(stream, pointer) \ + (StaticAssertExpr(!__builtin_types_compatible_p(__typeof__(**(pointer)), \ + void), \ + "expected pointer to pointer to non-void"), \ + read_stream_next_buffer((stream), ((void **) (pointer)))) +#else +#define read_stream_get_buffer_and_pointer(stream, pointer) \ + read_stream_next_buffer((stream), ((void **) (pointer))) +#endif + #endif /* READ_STREAM_H */ -- 2.48.1
From 02bb13b80abc20cd8221287b4c908f9c7c0dde23 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Fri, 28 Feb 2025 12:53:23 +1300 Subject: [PATCH 2/2] Improve API for storing data in read streams. Read stream callbacks receive a void pointer into the per-buffer data queue so that can store data there for later retrieval by the buffer consumer. We can improve readability and safety a bit by changing cast-and-assign or raw memcpy() to: read_stream_put_value(stream, per_buffer_data, my_int); This form infers the size and asserts that the storage space matches, generally mirroring the read_stream_get_buffer_and_value() call used for retrieving the streamed data later. --- src/backend/access/heap/vacuumlazy.c | 6 +++--- src/include/storage/read_stream.h | 9 +++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index ac7a4d8c21d..9563906fb27 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -1612,7 +1612,7 @@ heap_vac_scan_next_block(ReadStream *stream, */ vacrel->current_block = next_block; blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM; - *((uint8 *) per_buffer_data) = blk_info; + read_stream_put_value(stream, per_buffer_data, blk_info); return vacrel->current_block; } else @@ -1628,7 +1628,7 @@ heap_vac_scan_next_block(ReadStream *stream, blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM; if (vacrel->next_unskippable_eager_scanned) blk_info |= VAC_BLK_WAS_EAGER_SCANNED; - *((uint8 *) per_buffer_data) = blk_info; + read_stream_put_value(stream, per_buffer_data, blk_info); return vacrel->current_block; } } @@ -2671,7 +2671,7 @@ vacuum_reap_lp_read_stream_next(ReadStream *stream, * Save the TidStoreIterResult for later, so we can extract the offsets. * It is safe to copy the result, according to TidStoreIterateNext(). */ - memcpy(per_buffer_data, iter_result, sizeof(*iter_result)); + read_stream_put_value(stream, per_buffer_data, *iter_result); return iter_result->blkno; } diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 68c9340b0e3..673dd75ccaf 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -145,4 +145,13 @@ read_stream_get_buffer_and_value_with_size(ReadStream *stream, read_stream_next_buffer((stream), ((void **) (pointer))) #endif +/* + * Set the per-buffer data by value. This can be called from inside a + * callback that is returning block numbers. It asserts that the value's size + * matches the available space. + */ +#define read_stream_put_value(stream, per_buffer_data, value) \ + (AssertMacro(sizeof(value) == read_stream_per_buffer_data_size(stream)), \ + memcpy((per_buffer_data), &(value), sizeof(value))) + #endif /* READ_STREAM_H */ -- 2.48.1